mz_adapter/coord/cluster_scheduling.rs
1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use itertools::Itertools;
11use mz_audit_log::SchedulingDecisionsWithReasonsV2;
12use mz_catalog::memory::objects::{CatalogItem, ClusterVariant, ClusterVariantManaged};
13use mz_controller_types::ClusterId;
14use mz_ore::collections::CollectionExt;
15use mz_ore::{soft_assert_or_log, soft_panic_or_log};
16use mz_repr::adt::interval::Interval;
17use mz_repr::{GlobalId, TimestampManipulation};
18use mz_sql::catalog::CatalogCluster;
19use mz_sql::plan::{AlterClusterPlanStrategy, ClusterSchedule};
20use std::time::{Duration, Instant};
21use tracing::{debug, warn};
22
23use crate::AdapterError;
24use crate::coord::{Coordinator, Message};
25
26const POLICIES: &[&str] = &[REFRESH_POLICY_NAME];
27
28const REFRESH_POLICY_NAME: &str = "refresh";
29
30/// A policy's decision for whether it wants a certain cluster to be On, along with its reason.
31/// (Among the reasons there can be settings of the policy as well as other information about the
32/// state of the system.)
33#[derive(Clone, Debug)]
34pub enum SchedulingDecision {
35    /// The reason for the refresh policy for wanting to turn a cluster On or Off.
36    Refresh(RefreshDecision),
37}
38
39impl SchedulingDecision {
40    /// Extract the On/Off decision from the policy-specific structs.
41    pub fn cluster_on(&self) -> bool {
42        match &self {
43            SchedulingDecision::Refresh(RefreshDecision { cluster_on, .. }) => cluster_on.clone(),
44        }
45    }
46}
47
48#[derive(Clone, Debug)]
49pub struct RefreshDecision {
50    /// Whether the ON REFRESH policy wants a certain cluster to be On.
51    cluster_on: bool,
52    /// Objects that currently need a refresh on the cluster (taking into account the rehydration
53    /// time estimate), and therefore should keep the cluster On.
54    objects_needing_refresh: Vec<GlobalId>,
55    /// Objects for which we estimate that they currently need Persist compaction, and therefore
56    /// should keep the cluster On.
57    objects_needing_compaction: Vec<GlobalId>,
58    /// The HYDRATION TIME ESTIMATE setting of the cluster.
59    hydration_time_estimate: Duration,
60}
61
62impl SchedulingDecision {
63    pub fn reasons_to_audit_log_reasons<'a, I>(reasons: I) -> SchedulingDecisionsWithReasonsV2
64    where
65        I: IntoIterator<Item = &'a SchedulingDecision>,
66    {
67        SchedulingDecisionsWithReasonsV2 {
68            on_refresh: reasons
69                .into_iter()
70                .filter_map(|r| match r {
71                    SchedulingDecision::Refresh(RefreshDecision {
72                        cluster_on,
73                        objects_needing_refresh,
74                        objects_needing_compaction,
75                        hydration_time_estimate,
76                    }) => {
77                        soft_assert_or_log!(
78                            !cluster_on
79                                || !objects_needing_refresh.is_empty()
80                                || !objects_needing_compaction.is_empty(),
81                            "`cluster_on = true` should have an explanation"
82                        );
83                        let mut hydration_time_estimate_str = String::new();
84                        mz_repr::strconv::format_interval(
85                            &mut hydration_time_estimate_str,
86                            Interval::from_duration(hydration_time_estimate).expect(
87                                "planning ensured that this is convertible back to Interval",
88                            ),
89                        );
90                        Some(mz_audit_log::RefreshDecisionWithReasonV2 {
91                            decision: (*cluster_on).into(),
92                            objects_needing_refresh: objects_needing_refresh
93                                .iter()
94                                .map(|id| id.to_string())
95                                .collect(),
96                            objects_needing_compaction: objects_needing_compaction
97                                .iter()
98                                .map(|id| id.to_string())
99                                .collect(),
100                            hydration_time_estimate: hydration_time_estimate_str,
101                        })
102                    }
103                })
104                .into_element(), // Each policy should have exactly one opinion on each cluster.
105        }
106    }
107}
108
109impl Coordinator {
110    #[mz_ore::instrument(level = "debug")]
111    /// Call each scheduling policy.
112    pub(crate) async fn check_scheduling_policies(&mut self) {
113        // (So far, we have only this one policy.)
114        self.check_refresh_policy();
115    }
116
117    /// Runs the `SCHEDULE = ON REFRESH` cluster scheduling policy, which makes cluster On/Off
118    /// decisions based on REFRESH materialized view write frontiers and the current time (the local
119    /// oracle read ts), and sends `Message::SchedulingDecisions` with these decisions.
120    /// (Queries the timestamp oracle on a background task.)
121    fn check_refresh_policy(&self) {
122        let start_time = Instant::now();
123
124        // Collect information about REFRESH MVs:
125        // - cluster
126        // - hydration_time_estimate of the cluster
127        // - MV's id
128        // - MV's write frontier
129        // - MV's refresh schedule
130        let mut refresh_mv_infos = Vec::new();
131        for cluster in self.catalog().clusters() {
132            if let ClusterVariant::Managed(ref config) = cluster.config.variant {
133                match config.schedule {
134                    ClusterSchedule::Manual => {
135                        // Nothing to do, user manages this cluster manually.
136                    }
137                    ClusterSchedule::Refresh {
138                        hydration_time_estimate,
139                    } => {
140                        let mvs = cluster
141                            .bound_objects()
142                            .iter()
143                            .filter_map(|id| {
144                                if let CatalogItem::MaterializedView(mv) =
145                                    self.catalog().get_entry(id).item()
146                                {
147                                    mv.refresh_schedule.clone().map(|refresh_schedule| {
148                                        let (_since, write_frontier) = self
149                                            .controller
150                                            .storage
151                                            .collection_frontiers(mv.global_id())
152                                            .expect("the storage controller should know about MVs that exist in the catalog");
153                                        (mv.global_id(), write_frontier, refresh_schedule)
154                                    })
155                                } else {
156                                    None
157                                }
158                            })
159                            .collect_vec();
160                        debug!(%cluster.id, ?refresh_mv_infos, "check_refresh_policy");
161                        refresh_mv_infos.push((cluster.id, hydration_time_estimate, mvs));
162                    }
163                }
164            }
165        }
166
167        // Spawn a background task that queries the timestamp oracle for the current read timestamp,
168        // compares this ts with the REFRESH MV write frontiers, thus making On/Off decisions per
169        // cluster, and sends a `Message::SchedulingDecisions` with these decisions.
170        let ts_oracle = self.get_local_timestamp_oracle();
171        let internal_cmd_tx = self.internal_cmd_tx.clone();
172        let check_scheduling_policies_seconds_cloned =
173            self.metrics.check_scheduling_policies_seconds.clone();
174        let compaction_estimate = self
175            .catalog()
176            .system_config()
177            .cluster_refresh_mv_compaction_estimate()
178            .try_into()
179            .expect("should be configured to a reasonable value");
180        mz_ore::task::spawn(|| "refresh policy get ts and make decisions", async move {
181            let task_start_time = Instant::now();
182            let local_read_ts = ts_oracle.read_ts().await;
183            debug!(%local_read_ts, ?refresh_mv_infos, "check_refresh_policy background task");
184            let decisions = refresh_mv_infos
185                .into_iter()
186                .map(|(cluster_id, hydration_time_estimate, refresh_mv_info)| {
187                    // 1. check that
188                    // write_frontier < local_read_ts + hydration_time_estimate
189                    let hydration_estimate = &hydration_time_estimate
190                        .try_into()
191                        .expect("checked during planning");
192                    let local_read_ts_adjusted = local_read_ts.step_forward_by(hydration_estimate);
193                    let mvs_needing_refresh = refresh_mv_info
194                        .iter()
195                        .cloned()
196                        .filter_map(|(id, frontier, _refresh_schedule)| {
197                            if frontier.less_than(&local_read_ts_adjusted) {
198                                Some(id)
199                            } else {
200                                None
201                            }
202                        })
203                        .collect_vec();
204
205                    // 2. check that
206                    // prev_refresh + compaction_estimate > local_read_ts
207                    let mvs_needing_compaction = refresh_mv_info
208                        .into_iter()
209                        .filter_map(|(id, frontier, refresh_schedule)| {
210                            let frontier = frontier.as_option();
211                            // `prev_refresh` will be None in two cases:
212                            // 1. When there is no previous refresh, because we haven't yet had
213                            // the first refresh. In this case, there is no need to schedule
214                            // time now for compaction.
215                            // 2. In the niche case where a `REFRESH EVERY` MV's write frontier
216                            // is empty. In this case, it's not impossible that there would be a
217                            // need for compaction. But I can't see any easy way to correctly
218                            // handle this case, because we don't have any info handy about when
219                            // the last refresh happened in wall clock time, because the
220                            // frontiers have no relation to wall clock time. So, we'll not
221                            // schedule any compaction time.
222                            // (Note that `REFRESH AT` MVs with empty frontiers, which is a more
223                            // common case, are fine, because `last_refresh` will return
224                            // Some(...) for them.)
225                            let prev_refresh = match frontier {
226                                Some(frontier) => frontier.round_down_minus_1(&refresh_schedule),
227                                None => refresh_schedule.last_refresh(),
228                            };
229                            prev_refresh
230                                .map(|prev_refresh| {
231                                    if prev_refresh.step_forward_by(&compaction_estimate)
232                                        > local_read_ts
233                                    {
234                                        Some(id)
235                                    } else {
236                                        None
237                                    }
238                                })
239                                .flatten()
240                        })
241                        .collect_vec();
242
243                    let cluster_on =
244                        !mvs_needing_refresh.is_empty() || !mvs_needing_compaction.is_empty();
245                    (
246                        cluster_id,
247                        SchedulingDecision::Refresh(RefreshDecision {
248                            cluster_on,
249                            objects_needing_refresh: mvs_needing_refresh,
250                            objects_needing_compaction: mvs_needing_compaction,
251                            hydration_time_estimate,
252                        }),
253                    )
254                })
255                .collect();
256            if let Err(e) = internal_cmd_tx.send(Message::SchedulingDecisions(vec![(
257                REFRESH_POLICY_NAME,
258                decisions,
259            )])) {
260                // It is not an error for this task to be running after `internal_cmd_rx` is dropped.
261                warn!("internal_cmd_rx dropped before we could send: {:?}", e);
262            }
263            check_scheduling_policies_seconds_cloned
264                .with_label_values(&[REFRESH_POLICY_NAME, "background"])
265                .observe((Instant::now() - task_start_time).as_secs_f64());
266        });
267
268        self.metrics
269            .check_scheduling_policies_seconds
270            .with_label_values(&[REFRESH_POLICY_NAME, "main"])
271            .observe((Instant::now() - start_time).as_secs_f64());
272    }
273
274    /// Handles `SchedulingDecisions`:
275    /// 1. Adds the newly made decisions to `cluster_scheduling_decisions`.
276    /// 2. Cleans up old decisions that are for clusters no longer in scope of automated scheduling
277    ///   decisions.
278    /// 3. For each cluster, it sums up `cluster_scheduling_decisions`, checks the summed up decision
279    ///   against the cluster state, and turns cluster On/Off if needed.
280    #[mz_ore::instrument(level = "debug")]
281    pub(crate) async fn handle_scheduling_decisions(
282        &mut self,
283        decisions: Vec<(&'static str, Vec<(ClusterId, SchedulingDecision)>)>,
284    ) {
285        let start_time = Instant::now();
286
287        // 1. Add the received decisions to `cluster_scheduling_decisions`.
288        for (policy_name, decisions) in decisions.iter() {
289            for (cluster_id, decision) in decisions {
290                self.cluster_scheduling_decisions
291                    .entry(*cluster_id)
292                    .or_insert_with(Default::default)
293                    .insert(policy_name, decision.clone());
294            }
295        }
296
297        // 2. Clean up those clusters from `scheduling_decisions` that
298        // - have been dropped, or
299        // - were switched to unmanaged, or
300        // - were switched to `SCHEDULE = MANUAL`.
301        for cluster_id in self
302            .cluster_scheduling_decisions
303            .keys()
304            .cloned()
305            .collect_vec()
306        {
307            match self.get_managed_cluster_config(cluster_id) {
308                None => {
309                    // Cluster have been dropped or switched to unmanaged.
310                    debug!(
311                        "handle_scheduling_decisions: \
312                        Removing cluster {} from cluster_scheduling_decisions, \
313                        because get_managed_cluster_config returned None",
314                        cluster_id
315                    );
316                    self.cluster_scheduling_decisions.remove(&cluster_id);
317                }
318                Some(managed_config) => {
319                    if matches!(managed_config.schedule, ClusterSchedule::Manual) {
320                        debug!(
321                            "handle_scheduling_decisions: \
322                            Removing cluster {} from cluster_scheduling_decisions, \
323                            because schedule is Manual",
324                            cluster_id
325                        );
326                        self.cluster_scheduling_decisions.remove(&cluster_id);
327                    }
328                }
329            }
330        }
331
332        // 3. Act on `scheduling_decisions` where needed.
333        let mut altered_a_cluster = false;
334        for (cluster_id, decisions) in self.cluster_scheduling_decisions.clone() {
335            // We touch a cluster only when all policies have made a decision about it. This is
336            // to ensure that after an envd restart all policies have a chance to run at least once
337            // before we turn off a cluster, to avoid spuriously turning off a cluster and possibly
338            // losing a hydrated state.
339            if POLICIES.iter().all(|policy| decisions.contains_key(policy)) {
340                // Check whether the cluster's state matches the needed state.
341                // If any policy says On, then we need a replica.
342                let needs_replica = decisions
343                    .values()
344                    .map(|decision| decision.cluster_on())
345                    .contains(&true);
346                let cluster_config = self.catalog().get_cluster(cluster_id).config.clone();
347                let mut new_config = cluster_config.clone();
348                let ClusterVariant::Managed(managed_config) = &mut new_config.variant else {
349                    panic!("cleaned up unmanaged clusters above");
350                };
351                let has_replica = managed_config.replication_factor > 0; // Is it On?
352                if needs_replica != has_replica {
353                    // Turn the cluster On or Off.
354                    altered_a_cluster = true;
355                    managed_config.replication_factor = if needs_replica { 1 } else { 0 };
356                    if let Err(e) = self
357                        .sequence_alter_cluster_managed_to_managed(
358                            None,
359                            cluster_id,
360                            new_config.clone(),
361                            crate::catalog::ReplicaCreateDropReason::ClusterScheduling(
362                                decisions.values().cloned().collect(),
363                            ),
364                            AlterClusterPlanStrategy::None,
365                        )
366                        .await
367                    {
368                        if let AdapterError::AlterClusterWhilePendingReplicas = e {
369                            debug!(
370                                "handle_scheduling_decisions tried to alter a cluster that is undergoing a graceful reconfiguration"
371                            );
372                        } else {
373                            soft_panic_or_log!(
374                                "handle_scheduling_decisions couldn't alter cluster {}. \
375                                 Old config: {:?}, \
376                                 New config: {:?}, \
377                                 Error: {}",
378                                cluster_id,
379                                cluster_config,
380                                new_config,
381                                e
382                            );
383                        }
384                    }
385                }
386            } else {
387                debug!(
388                    "handle_scheduling_decisions: \
389                    Not all policies have made a decision about cluster {}. decisions: {:?}",
390                    cluster_id, decisions,
391                );
392            }
393        }
394
395        self.metrics
396            .handle_scheduling_decisions_seconds
397            .with_label_values(&[altered_a_cluster.to_string().as_str()])
398            .observe((Instant::now() - start_time).as_secs_f64());
399    }
400
401    /// Returns the managed config for a cluster. Returns None if the cluster doesn't exist or if
402    /// it's an unmanaged cluster.
403    fn get_managed_cluster_config(&self, cluster_id: ClusterId) -> Option<ClusterVariantManaged> {
404        let cluster = self.catalog().try_get_cluster(cluster_id)?;
405        if let ClusterVariant::Managed(managed_config) = cluster.config.variant.clone() {
406            Some(managed_config)
407        } else {
408            None
409        }
410    }
411}