mz_adapter/coord/cluster_scheduling.rs
1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use crate::coord::{Coordinator, Message};
11use itertools::Itertools;
12use mz_audit_log::SchedulingDecisionsWithReasonsV2;
13use mz_catalog::memory::objects::{CatalogItem, ClusterVariant, ClusterVariantManaged};
14use mz_controller_types::ClusterId;
15use mz_ore::collections::CollectionExt;
16use mz_ore::{soft_assert_or_log, soft_panic_or_log};
17use mz_repr::adt::interval::Interval;
18use mz_repr::{GlobalId, TimestampManipulation};
19use mz_sql::catalog::CatalogCluster;
20use mz_sql::plan::{AlterClusterPlanStrategy, ClusterSchedule};
21use std::time::{Duration, Instant};
22use tracing::{debug, warn};
23
24const POLICIES: &[&str] = &[REFRESH_POLICY_NAME];
25
26const REFRESH_POLICY_NAME: &str = "refresh";
27
28/// A policy's decision for whether it wants a certain cluster to be On, along with its reason.
29/// (Among the reasons there can be settings of the policy as well as other information about the
30/// state of the system.)
31#[derive(Clone, Debug)]
32pub enum SchedulingDecision {
33 /// The reason for the refresh policy for wanting to turn a cluster On or Off.
34 Refresh(RefreshDecision),
35}
36
37impl SchedulingDecision {
38 /// Extract the On/Off decision from the policy-specific structs.
39 pub fn cluster_on(&self) -> bool {
40 match &self {
41 SchedulingDecision::Refresh(RefreshDecision { cluster_on, .. }) => cluster_on.clone(),
42 }
43 }
44}
45
46#[derive(Clone, Debug)]
47pub struct RefreshDecision {
48 /// Whether the ON REFRESH policy wants a certain cluster to be On.
49 cluster_on: bool,
50 /// Objects that currently need a refresh on the cluster (taking into account the rehydration
51 /// time estimate), and therefore should keep the cluster On.
52 objects_needing_refresh: Vec<GlobalId>,
53 /// Objects for which we estimate that they currently need Persist compaction, and therefore
54 /// should keep the cluster On.
55 objects_needing_compaction: Vec<GlobalId>,
56 /// The HYDRATION TIME ESTIMATE setting of the cluster.
57 hydration_time_estimate: Duration,
58}
59
60impl SchedulingDecision {
61 pub fn reasons_to_audit_log_reasons<'a, I>(reasons: I) -> SchedulingDecisionsWithReasonsV2
62 where
63 I: IntoIterator<Item = &'a SchedulingDecision>,
64 {
65 SchedulingDecisionsWithReasonsV2 {
66 on_refresh: reasons
67 .into_iter()
68 .filter_map(|r| match r {
69 SchedulingDecision::Refresh(RefreshDecision {
70 cluster_on,
71 objects_needing_refresh,
72 objects_needing_compaction,
73 hydration_time_estimate,
74 }) => {
75 soft_assert_or_log!(
76 !cluster_on
77 || !objects_needing_refresh.is_empty()
78 || !objects_needing_compaction.is_empty(),
79 "`cluster_on = true` should have an explanation"
80 );
81 let mut hydration_time_estimate_str = String::new();
82 mz_repr::strconv::format_interval(
83 &mut hydration_time_estimate_str,
84 Interval::from_duration(hydration_time_estimate).expect(
85 "planning ensured that this is convertible back to Interval",
86 ),
87 );
88 Some(mz_audit_log::RefreshDecisionWithReasonV2 {
89 decision: (*cluster_on).into(),
90 objects_needing_refresh: objects_needing_refresh
91 .iter()
92 .map(|id| id.to_string())
93 .collect(),
94 objects_needing_compaction: objects_needing_compaction
95 .iter()
96 .map(|id| id.to_string())
97 .collect(),
98 hydration_time_estimate: hydration_time_estimate_str,
99 })
100 }
101 })
102 .into_element(), // Each policy should have exactly one opinion on each cluster.
103 }
104 }
105}
106
107impl Coordinator {
108 #[mz_ore::instrument(level = "debug")]
109 /// Call each scheduling policy.
110 pub(crate) async fn check_scheduling_policies(&mut self) {
111 // (So far, we have only this one policy.)
112 self.check_refresh_policy();
113 }
114
115 /// Runs the `SCHEDULE = ON REFRESH` cluster scheduling policy, which makes cluster On/Off
116 /// decisions based on REFRESH materialized view write frontiers and the current time (the local
117 /// oracle read ts), and sends `Message::SchedulingDecisions` with these decisions.
118 /// (Queries the timestamp oracle on a background task.)
119 fn check_refresh_policy(&self) {
120 let start_time = Instant::now();
121
122 // Collect information about REFRESH MVs:
123 // - cluster
124 // - hydration_time_estimate of the cluster
125 // - MV's id
126 // - MV's write frontier
127 // - MV's refresh schedule
128 let mut refresh_mv_infos = Vec::new();
129 for cluster in self.catalog().clusters() {
130 if let ClusterVariant::Managed(ref config) = cluster.config.variant {
131 match config.schedule {
132 ClusterSchedule::Manual => {
133 // Nothing to do, user manages this cluster manually.
134 }
135 ClusterSchedule::Refresh {
136 hydration_time_estimate,
137 } => {
138 let mvs = cluster
139 .bound_objects()
140 .iter()
141 .filter_map(|id| {
142 if let CatalogItem::MaterializedView(mv) =
143 self.catalog().get_entry(id).item()
144 {
145 mv.refresh_schedule.clone().map(|refresh_schedule| {
146 let (_since, write_frontier) = self
147 .controller
148 .storage
149 .collection_frontiers(mv.global_id())
150 .expect("the storage controller should know about MVs that exist in the catalog");
151 (mv.global_id(), write_frontier, refresh_schedule)
152 })
153 } else {
154 None
155 }
156 })
157 .collect_vec();
158 debug!(%cluster.id, ?refresh_mv_infos, "check_refresh_policy");
159 refresh_mv_infos.push((cluster.id, hydration_time_estimate, mvs));
160 }
161 }
162 }
163 }
164
165 // Spawn a background task that queries the timestamp oracle for the current read timestamp,
166 // compares this ts with the REFRESH MV write frontiers, thus making On/Off decisions per
167 // cluster, and sends a `Message::SchedulingDecisions` with these decisions.
168 let ts_oracle = self.get_local_timestamp_oracle();
169 let internal_cmd_tx = self.internal_cmd_tx.clone();
170 let check_scheduling_policies_seconds_cloned =
171 self.metrics.check_scheduling_policies_seconds.clone();
172 let compaction_estimate = self
173 .catalog()
174 .system_config()
175 .cluster_refresh_mv_compaction_estimate()
176 .try_into()
177 .expect("should be configured to a reasonable value");
178 mz_ore::task::spawn(|| "refresh policy get ts and make decisions", async move {
179 let task_start_time = Instant::now();
180 let local_read_ts = ts_oracle.read_ts().await;
181 debug!(%local_read_ts, ?refresh_mv_infos, "check_refresh_policy background task");
182 let decisions = refresh_mv_infos
183 .into_iter()
184 .map(|(cluster_id, hydration_time_estimate, refresh_mv_info)| {
185 // 1. check that
186 // write_frontier < local_read_ts + hydration_time_estimate
187 let hydration_estimate = &hydration_time_estimate
188 .try_into()
189 .expect("checked during planning");
190 let local_read_ts_adjusted = local_read_ts.step_forward_by(hydration_estimate);
191 let mvs_needing_refresh = refresh_mv_info
192 .iter()
193 .cloned()
194 .filter_map(|(id, frontier, _refresh_schedule)| {
195 if frontier.less_than(&local_read_ts_adjusted) {
196 Some(id)
197 } else {
198 None
199 }
200 })
201 .collect_vec();
202
203 // 2. check that
204 // prev_refresh + compaction_estimate > local_read_ts
205 let mvs_needing_compaction = refresh_mv_info
206 .into_iter()
207 .filter_map(|(id, frontier, refresh_schedule)| {
208 let frontier = frontier.as_option();
209 // `prev_refresh` will be None in two cases:
210 // 1. When there is no previous refresh, because we haven't yet had
211 // the first refresh. In this case, there is no need to schedule
212 // time now for compaction.
213 // 2. In the niche case where a `REFRESH EVERY` MV's write frontier
214 // is empty. In this case, it's not impossible that there would be a
215 // need for compaction. But I can't see any easy way to correctly
216 // handle this case, because we don't have any info handy about when
217 // the last refresh happened in wall clock time, because the
218 // frontiers have no relation to wall clock time. So, we'll not
219 // schedule any compaction time.
220 // (Note that `REFRESH AT` MVs with empty frontiers, which is a more
221 // common case, are fine, because `last_refresh` will return
222 // Some(...) for them.)
223 let prev_refresh = match frontier {
224 Some(frontier) => frontier.round_down_minus_1(&refresh_schedule),
225 None => refresh_schedule.last_refresh(),
226 };
227 prev_refresh
228 .map(|prev_refresh| {
229 if prev_refresh.step_forward_by(&compaction_estimate)
230 > local_read_ts
231 {
232 Some(id)
233 } else {
234 None
235 }
236 })
237 .flatten()
238 })
239 .collect_vec();
240
241 let cluster_on =
242 !mvs_needing_refresh.is_empty() || !mvs_needing_compaction.is_empty();
243 (
244 cluster_id,
245 SchedulingDecision::Refresh(RefreshDecision {
246 cluster_on,
247 objects_needing_refresh: mvs_needing_refresh,
248 objects_needing_compaction: mvs_needing_compaction,
249 hydration_time_estimate,
250 }),
251 )
252 })
253 .collect();
254 if let Err(e) = internal_cmd_tx.send(Message::SchedulingDecisions(vec![(
255 REFRESH_POLICY_NAME,
256 decisions,
257 )])) {
258 // It is not an error for this task to be running after `internal_cmd_rx` is dropped.
259 warn!("internal_cmd_rx dropped before we could send: {:?}", e);
260 }
261 check_scheduling_policies_seconds_cloned
262 .with_label_values(&[REFRESH_POLICY_NAME, "background"])
263 .observe((Instant::now() - task_start_time).as_secs_f64());
264 });
265
266 self.metrics
267 .check_scheduling_policies_seconds
268 .with_label_values(&[REFRESH_POLICY_NAME, "main"])
269 .observe((Instant::now() - start_time).as_secs_f64());
270 }
271
272 /// Handles `SchedulingDecisions`:
273 /// 1. Adds the newly made decisions to `cluster_scheduling_decisions`.
274 /// 2. Cleans up old decisions that are for clusters no longer in scope of automated scheduling
275 /// decisions.
276 /// 3. For each cluster, it sums up `cluster_scheduling_decisions`, checks the summed up decision
277 /// against the cluster state, and turns cluster On/Off if needed.
278 #[mz_ore::instrument(level = "debug")]
279 pub(crate) async fn handle_scheduling_decisions(
280 &mut self,
281 decisions: Vec<(&'static str, Vec<(ClusterId, SchedulingDecision)>)>,
282 ) {
283 let start_time = Instant::now();
284
285 // 1. Add the received decisions to `cluster_scheduling_decisions`.
286 for (policy_name, decisions) in decisions.iter() {
287 for (cluster_id, decision) in decisions {
288 self.cluster_scheduling_decisions
289 .entry(*cluster_id)
290 .or_insert_with(Default::default)
291 .insert(policy_name, decision.clone());
292 }
293 }
294
295 // 2. Clean up those clusters from `scheduling_decisions` that
296 // - have been dropped, or
297 // - were switched to unmanaged, or
298 // - were switched to `SCHEDULE = MANUAL`.
299 for cluster_id in self
300 .cluster_scheduling_decisions
301 .keys()
302 .cloned()
303 .collect_vec()
304 {
305 match self.get_managed_cluster_config(cluster_id) {
306 None => {
307 // Cluster have been dropped or switched to unmanaged.
308 debug!(
309 "handle_scheduling_decisions: \
310 Removing cluster {} from cluster_scheduling_decisions, \
311 because get_managed_cluster_config returned None",
312 cluster_id
313 );
314 self.cluster_scheduling_decisions.remove(&cluster_id);
315 }
316 Some(managed_config) => {
317 if matches!(managed_config.schedule, ClusterSchedule::Manual) {
318 debug!(
319 "handle_scheduling_decisions: \
320 Removing cluster {} from cluster_scheduling_decisions, \
321 because schedule is Manual",
322 cluster_id
323 );
324 self.cluster_scheduling_decisions.remove(&cluster_id);
325 }
326 }
327 }
328 }
329
330 // 3. Act on `scheduling_decisions` where needed.
331 let mut altered_a_cluster = false;
332 for (cluster_id, decisions) in self.cluster_scheduling_decisions.clone() {
333 // We touch a cluster only when all policies have made a decision about it. This is
334 // to ensure that after an envd restart all policies have a chance to run at least once
335 // before we turn off a cluster, to avoid spuriously turning off a cluster and possibly
336 // losing a hydrated state.
337 if POLICIES.iter().all(|policy| decisions.contains_key(policy)) {
338 // Check whether the cluster's state matches the needed state.
339 // If any policy says On, then we need a replica.
340 let needs_replica = decisions
341 .values()
342 .map(|decision| decision.cluster_on())
343 .contains(&true);
344 let cluster_config = self.catalog().get_cluster(cluster_id).config.clone();
345 let mut new_config = cluster_config.clone();
346 let ClusterVariant::Managed(managed_config) = &mut new_config.variant else {
347 panic!("cleaned up unmanaged clusters above");
348 };
349 let has_replica = managed_config.replication_factor > 0; // Is it On?
350 if needs_replica != has_replica {
351 // Turn the cluster On or Off.
352 altered_a_cluster = true;
353 managed_config.replication_factor = if needs_replica { 1 } else { 0 };
354 if let Err(e) = self
355 .sequence_alter_cluster_managed_to_managed(
356 None,
357 cluster_id,
358 new_config.clone(),
359 crate::catalog::ReplicaCreateDropReason::ClusterScheduling(
360 decisions.values().cloned().collect(),
361 ),
362 AlterClusterPlanStrategy::None,
363 )
364 .await
365 {
366 soft_panic_or_log!(
367 "handle_scheduling_decisions couldn't alter cluster {}. \
368 Old config: {:?}, \
369 New config: {:?}, \
370 Error: {}",
371 cluster_id,
372 cluster_config,
373 new_config,
374 e
375 );
376 }
377 }
378 } else {
379 debug!(
380 "handle_scheduling_decisions: \
381 Not all policies have made a decision about cluster {}. decisions: {:?}",
382 cluster_id, decisions,
383 );
384 }
385 }
386
387 self.metrics
388 .handle_scheduling_decisions_seconds
389 .with_label_values(&[altered_a_cluster.to_string().as_str()])
390 .observe((Instant::now() - start_time).as_secs_f64());
391 }
392
393 /// Returns the managed config for a cluster. Returns None if the cluster doesn't exist or if
394 /// it's an unmanaged cluster.
395 fn get_managed_cluster_config(&self, cluster_id: ClusterId) -> Option<ClusterVariantManaged> {
396 let cluster = self.catalog().try_get_cluster(cluster_id)?;
397 if let ClusterVariant::Managed(managed_config) = cluster.config.variant.clone() {
398 Some(managed_config)
399 } else {
400 None
401 }
402 }
403}