mz_adapter/coord/
cluster_scheduling.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use crate::coord::{Coordinator, Message};
11use itertools::Itertools;
12use mz_audit_log::SchedulingDecisionsWithReasonsV2;
13use mz_catalog::memory::objects::{CatalogItem, ClusterVariant, ClusterVariantManaged};
14use mz_controller_types::ClusterId;
15use mz_ore::collections::CollectionExt;
16use mz_ore::{soft_assert_or_log, soft_panic_or_log};
17use mz_repr::adt::interval::Interval;
18use mz_repr::{GlobalId, TimestampManipulation};
19use mz_sql::catalog::CatalogCluster;
20use mz_sql::plan::{AlterClusterPlanStrategy, ClusterSchedule};
21use std::time::{Duration, Instant};
22use tracing::{debug, warn};
23
24const POLICIES: &[&str] = &[REFRESH_POLICY_NAME];
25
26const REFRESH_POLICY_NAME: &str = "refresh";
27
28/// A policy's decision for whether it wants a certain cluster to be On, along with its reason.
29/// (Among the reasons there can be settings of the policy as well as other information about the
30/// state of the system.)
31#[derive(Clone, Debug)]
32pub enum SchedulingDecision {
33    /// The reason for the refresh policy for wanting to turn a cluster On or Off.
34    Refresh(RefreshDecision),
35}
36
37impl SchedulingDecision {
38    /// Extract the On/Off decision from the policy-specific structs.
39    pub fn cluster_on(&self) -> bool {
40        match &self {
41            SchedulingDecision::Refresh(RefreshDecision { cluster_on, .. }) => cluster_on.clone(),
42        }
43    }
44}
45
46#[derive(Clone, Debug)]
47pub struct RefreshDecision {
48    /// Whether the ON REFRESH policy wants a certain cluster to be On.
49    cluster_on: bool,
50    /// Objects that currently need a refresh on the cluster (taking into account the rehydration
51    /// time estimate), and therefore should keep the cluster On.
52    objects_needing_refresh: Vec<GlobalId>,
53    /// Objects for which we estimate that they currently need Persist compaction, and therefore
54    /// should keep the cluster On.
55    objects_needing_compaction: Vec<GlobalId>,
56    /// The HYDRATION TIME ESTIMATE setting of the cluster.
57    hydration_time_estimate: Duration,
58}
59
60impl SchedulingDecision {
61    pub fn reasons_to_audit_log_reasons<'a, I>(reasons: I) -> SchedulingDecisionsWithReasonsV2
62    where
63        I: IntoIterator<Item = &'a SchedulingDecision>,
64    {
65        SchedulingDecisionsWithReasonsV2 {
66            on_refresh: reasons
67                .into_iter()
68                .filter_map(|r| match r {
69                    SchedulingDecision::Refresh(RefreshDecision {
70                        cluster_on,
71                        objects_needing_refresh,
72                        objects_needing_compaction,
73                        hydration_time_estimate,
74                    }) => {
75                        soft_assert_or_log!(
76                            !cluster_on
77                                || !objects_needing_refresh.is_empty()
78                                || !objects_needing_compaction.is_empty(),
79                            "`cluster_on = true` should have an explanation"
80                        );
81                        let mut hydration_time_estimate_str = String::new();
82                        mz_repr::strconv::format_interval(
83                            &mut hydration_time_estimate_str,
84                            Interval::from_duration(hydration_time_estimate).expect(
85                                "planning ensured that this is convertible back to Interval",
86                            ),
87                        );
88                        Some(mz_audit_log::RefreshDecisionWithReasonV2 {
89                            decision: (*cluster_on).into(),
90                            objects_needing_refresh: objects_needing_refresh
91                                .iter()
92                                .map(|id| id.to_string())
93                                .collect(),
94                            objects_needing_compaction: objects_needing_compaction
95                                .iter()
96                                .map(|id| id.to_string())
97                                .collect(),
98                            hydration_time_estimate: hydration_time_estimate_str,
99                        })
100                    }
101                })
102                .into_element(), // Each policy should have exactly one opinion on each cluster.
103        }
104    }
105}
106
107impl Coordinator {
108    #[mz_ore::instrument(level = "debug")]
109    /// Call each scheduling policy.
110    pub(crate) async fn check_scheduling_policies(&mut self) {
111        // (So far, we have only this one policy.)
112        self.check_refresh_policy();
113    }
114
115    /// Runs the `SCHEDULE = ON REFRESH` cluster scheduling policy, which makes cluster On/Off
116    /// decisions based on REFRESH materialized view write frontiers and the current time (the local
117    /// oracle read ts), and sends `Message::SchedulingDecisions` with these decisions.
118    /// (Queries the timestamp oracle on a background task.)
119    fn check_refresh_policy(&self) {
120        let start_time = Instant::now();
121
122        // Collect information about REFRESH MVs:
123        // - cluster
124        // - hydration_time_estimate of the cluster
125        // - MV's id
126        // - MV's write frontier
127        // - MV's refresh schedule
128        let mut refresh_mv_infos = Vec::new();
129        for cluster in self.catalog().clusters() {
130            if let ClusterVariant::Managed(ref config) = cluster.config.variant {
131                match config.schedule {
132                    ClusterSchedule::Manual => {
133                        // Nothing to do, user manages this cluster manually.
134                    }
135                    ClusterSchedule::Refresh {
136                        hydration_time_estimate,
137                    } => {
138                        let mvs = cluster
139                            .bound_objects()
140                            .iter()
141                            .filter_map(|id| {
142                                if let CatalogItem::MaterializedView(mv) =
143                                    self.catalog().get_entry(id).item()
144                                {
145                                    mv.refresh_schedule.clone().map(|refresh_schedule| {
146                                        let (_since, write_frontier) = self
147                                            .controller
148                                            .storage
149                                            .collection_frontiers(mv.global_id())
150                                            .expect("the storage controller should know about MVs that exist in the catalog");
151                                        (mv.global_id(), write_frontier, refresh_schedule)
152                                    })
153                                } else {
154                                    None
155                                }
156                            })
157                            .collect_vec();
158                        debug!(%cluster.id, ?refresh_mv_infos, "check_refresh_policy");
159                        refresh_mv_infos.push((cluster.id, hydration_time_estimate, mvs));
160                    }
161                }
162            }
163        }
164
165        // Spawn a background task that queries the timestamp oracle for the current read timestamp,
166        // compares this ts with the REFRESH MV write frontiers, thus making On/Off decisions per
167        // cluster, and sends a `Message::SchedulingDecisions` with these decisions.
168        let ts_oracle = self.get_local_timestamp_oracle();
169        let internal_cmd_tx = self.internal_cmd_tx.clone();
170        let check_scheduling_policies_seconds_cloned =
171            self.metrics.check_scheduling_policies_seconds.clone();
172        let compaction_estimate = self
173            .catalog()
174            .system_config()
175            .cluster_refresh_mv_compaction_estimate()
176            .try_into()
177            .expect("should be configured to a reasonable value");
178        mz_ore::task::spawn(|| "refresh policy get ts and make decisions", async move {
179            let task_start_time = Instant::now();
180            let local_read_ts = ts_oracle.read_ts().await;
181            debug!(%local_read_ts, ?refresh_mv_infos, "check_refresh_policy background task");
182            let decisions = refresh_mv_infos
183                .into_iter()
184                .map(|(cluster_id, hydration_time_estimate, refresh_mv_info)| {
185                    // 1. check that
186                    // write_frontier < local_read_ts + hydration_time_estimate
187                    let hydration_estimate = &hydration_time_estimate
188                        .try_into()
189                        .expect("checked during planning");
190                    let local_read_ts_adjusted = local_read_ts.step_forward_by(hydration_estimate);
191                    let mvs_needing_refresh = refresh_mv_info
192                        .iter()
193                        .cloned()
194                        .filter_map(|(id, frontier, _refresh_schedule)| {
195                            if frontier.less_than(&local_read_ts_adjusted) {
196                                Some(id)
197                            } else {
198                                None
199                            }
200                        })
201                        .collect_vec();
202
203                    // 2. check that
204                    // prev_refresh + compaction_estimate > local_read_ts
205                    let mvs_needing_compaction = refresh_mv_info
206                        .into_iter()
207                        .filter_map(|(id, frontier, refresh_schedule)| {
208                            let frontier = frontier.as_option();
209                            // `prev_refresh` will be None in two cases:
210                            // 1. When there is no previous refresh, because we haven't yet had
211                            // the first refresh. In this case, there is no need to schedule
212                            // time now for compaction.
213                            // 2. In the niche case where a `REFRESH EVERY` MV's write frontier
214                            // is empty. In this case, it's not impossible that there would be a
215                            // need for compaction. But I can't see any easy way to correctly
216                            // handle this case, because we don't have any info handy about when
217                            // the last refresh happened in wall clock time, because the
218                            // frontiers have no relation to wall clock time. So, we'll not
219                            // schedule any compaction time.
220                            // (Note that `REFRESH AT` MVs with empty frontiers, which is a more
221                            // common case, are fine, because `last_refresh` will return
222                            // Some(...) for them.)
223                            let prev_refresh = match frontier {
224                                Some(frontier) => frontier.round_down_minus_1(&refresh_schedule),
225                                None => refresh_schedule.last_refresh(),
226                            };
227                            prev_refresh
228                                .map(|prev_refresh| {
229                                    if prev_refresh.step_forward_by(&compaction_estimate)
230                                        > local_read_ts
231                                    {
232                                        Some(id)
233                                    } else {
234                                        None
235                                    }
236                                })
237                                .flatten()
238                        })
239                        .collect_vec();
240
241                    let cluster_on =
242                        !mvs_needing_refresh.is_empty() || !mvs_needing_compaction.is_empty();
243                    (
244                        cluster_id,
245                        SchedulingDecision::Refresh(RefreshDecision {
246                            cluster_on,
247                            objects_needing_refresh: mvs_needing_refresh,
248                            objects_needing_compaction: mvs_needing_compaction,
249                            hydration_time_estimate,
250                        }),
251                    )
252                })
253                .collect();
254            if let Err(e) = internal_cmd_tx.send(Message::SchedulingDecisions(vec![(
255                REFRESH_POLICY_NAME,
256                decisions,
257            )])) {
258                // It is not an error for this task to be running after `internal_cmd_rx` is dropped.
259                warn!("internal_cmd_rx dropped before we could send: {:?}", e);
260            }
261            check_scheduling_policies_seconds_cloned
262                .with_label_values(&[REFRESH_POLICY_NAME, "background"])
263                .observe((Instant::now() - task_start_time).as_secs_f64());
264        });
265
266        self.metrics
267            .check_scheduling_policies_seconds
268            .with_label_values(&[REFRESH_POLICY_NAME, "main"])
269            .observe((Instant::now() - start_time).as_secs_f64());
270    }
271
272    /// Handles `SchedulingDecisions`:
273    /// 1. Adds the newly made decisions to `cluster_scheduling_decisions`.
274    /// 2. Cleans up old decisions that are for clusters no longer in scope of automated scheduling
275    ///   decisions.
276    /// 3. For each cluster, it sums up `cluster_scheduling_decisions`, checks the summed up decision
277    ///   against the cluster state, and turns cluster On/Off if needed.
278    #[mz_ore::instrument(level = "debug")]
279    pub(crate) async fn handle_scheduling_decisions(
280        &mut self,
281        decisions: Vec<(&'static str, Vec<(ClusterId, SchedulingDecision)>)>,
282    ) {
283        let start_time = Instant::now();
284
285        // 1. Add the received decisions to `cluster_scheduling_decisions`.
286        for (policy_name, decisions) in decisions.iter() {
287            for (cluster_id, decision) in decisions {
288                self.cluster_scheduling_decisions
289                    .entry(*cluster_id)
290                    .or_insert_with(Default::default)
291                    .insert(policy_name, decision.clone());
292            }
293        }
294
295        // 2. Clean up those clusters from `scheduling_decisions` that
296        // - have been dropped, or
297        // - were switched to unmanaged, or
298        // - were switched to `SCHEDULE = MANUAL`.
299        for cluster_id in self
300            .cluster_scheduling_decisions
301            .keys()
302            .cloned()
303            .collect_vec()
304        {
305            match self.get_managed_cluster_config(cluster_id) {
306                None => {
307                    // Cluster have been dropped or switched to unmanaged.
308                    debug!(
309                        "handle_scheduling_decisions: \
310                        Removing cluster {} from cluster_scheduling_decisions, \
311                        because get_managed_cluster_config returned None",
312                        cluster_id
313                    );
314                    self.cluster_scheduling_decisions.remove(&cluster_id);
315                }
316                Some(managed_config) => {
317                    if matches!(managed_config.schedule, ClusterSchedule::Manual) {
318                        debug!(
319                            "handle_scheduling_decisions: \
320                            Removing cluster {} from cluster_scheduling_decisions, \
321                            because schedule is Manual",
322                            cluster_id
323                        );
324                        self.cluster_scheduling_decisions.remove(&cluster_id);
325                    }
326                }
327            }
328        }
329
330        // 3. Act on `scheduling_decisions` where needed.
331        let mut altered_a_cluster = false;
332        for (cluster_id, decisions) in self.cluster_scheduling_decisions.clone() {
333            // We touch a cluster only when all policies have made a decision about it. This is
334            // to ensure that after an envd restart all policies have a chance to run at least once
335            // before we turn off a cluster, to avoid spuriously turning off a cluster and possibly
336            // losing a hydrated state.
337            if POLICIES.iter().all(|policy| decisions.contains_key(policy)) {
338                // Check whether the cluster's state matches the needed state.
339                // If any policy says On, then we need a replica.
340                let needs_replica = decisions
341                    .values()
342                    .map(|decision| decision.cluster_on())
343                    .contains(&true);
344                let cluster_config = self.catalog().get_cluster(cluster_id).config.clone();
345                let mut new_config = cluster_config.clone();
346                let ClusterVariant::Managed(managed_config) = &mut new_config.variant else {
347                    panic!("cleaned up unmanaged clusters above");
348                };
349                let has_replica = managed_config.replication_factor > 0; // Is it On?
350                if needs_replica != has_replica {
351                    // Turn the cluster On or Off.
352                    altered_a_cluster = true;
353                    managed_config.replication_factor = if needs_replica { 1 } else { 0 };
354                    if let Err(e) = self
355                        .sequence_alter_cluster_managed_to_managed(
356                            None,
357                            cluster_id,
358                            new_config.clone(),
359                            crate::catalog::ReplicaCreateDropReason::ClusterScheduling(
360                                decisions.values().cloned().collect(),
361                            ),
362                            AlterClusterPlanStrategy::None,
363                        )
364                        .await
365                    {
366                        soft_panic_or_log!(
367                            "handle_scheduling_decisions couldn't alter cluster {}. \
368                             Old config: {:?}, \
369                             New config: {:?}, \
370                             Error: {}",
371                            cluster_id,
372                            cluster_config,
373                            new_config,
374                            e
375                        );
376                    }
377                }
378            } else {
379                debug!(
380                    "handle_scheduling_decisions: \
381                    Not all policies have made a decision about cluster {}. decisions: {:?}",
382                    cluster_id, decisions,
383                );
384            }
385        }
386
387        self.metrics
388            .handle_scheduling_decisions_seconds
389            .with_label_values(&[altered_a_cluster.to_string().as_str()])
390            .observe((Instant::now() - start_time).as_secs_f64());
391    }
392
393    /// Returns the managed config for a cluster. Returns None if the cluster doesn't exist or if
394    /// it's an unmanaged cluster.
395    fn get_managed_cluster_config(&self, cluster_id: ClusterId) -> Option<ClusterVariantManaged> {
396        let cluster = self.catalog().try_get_cluster(cluster_id)?;
397        if let ClusterVariant::Managed(managed_config) = cluster.config.variant.clone() {
398            Some(managed_config)
399        } else {
400            None
401        }
402    }
403}