Skip to main content

mz_adapter/config/
sync.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::BTreeSet;
11use std::sync::Arc;
12use std::time::Duration;
13
14use mz_controller::clusters::ReplicaLocation;
15use mz_controller_types::{ClusterId, ReplicaId};
16use mz_dyncfg::ParameterScope;
17use tokio::time;
18
19use crate::Client;
20use crate::catalog::Catalog;
21use crate::config::{
22    ClusterEvalContext, ClusterScopeContext, ReplicaEvalContext, ReplicaScopeContext,
23    ScopedParameters, ScopedParametersScope, SynchronizedParameters, SystemParameterBackend,
24    SystemParameterFrontend, SystemParameterSyncConfig,
25};
26
27/// Run a loop that periodically pulls system parameters defined in the
28/// LaunchDarkly-backed [SystemParameterFrontend] and pushes modified values to the
29/// `ALTER SYSTEM`-backed [SystemParameterBackend].
30pub async fn system_parameter_sync(
31    sync_config: SystemParameterSyncConfig,
32    adapter_client: Client,
33    tick_interval: Option<Duration>,
34) -> Result<(), anyhow::Error> {
35    let Some(tick_interval) = tick_interval else {
36        tracing::info!("skipping system parameter sync as tick_interval = None");
37        return Ok(());
38    };
39
40    // Keep a client handle for catalog snapshots and the per-replica scoped
41    // config push, since the backend consumes its own clone.
42    let scoped_client = adapter_client.clone();
43
44    // Ensure the frontend client is initialized. Wrapped in `Arc` so a clone can
45    // be shared with the coordinator for synchronous create-time scoped
46    // resolution.
47    let mut frontend = Option::<Arc<SystemParameterFrontend>>::None; // lazy initialize the frontend below
48    let mut backend = SystemParameterBackend::new(adapter_client).await?;
49
50    // Tick every `tick_duration` ms, skipping missed ticks.
51    let mut interval = time::interval(tick_interval);
52    interval.set_missed_tick_behavior(time::MissedTickBehavior::Skip);
53
54    // Run the synchronization loop.
55    tracing::info!(
56        "synchronizing system parameter values every {} seconds",
57        tick_interval.as_secs()
58    );
59
60    // Whether the scoped overrides may carry state that needs clearing once the
61    // feature is disabled. True at startup, since a prior run (this process or
62    // an earlier one) may have left overrides behind. While the feature stays
63    // disabled this lets the scoped reconcile do no per-tick work after the
64    // single clearing push.
65    let mut scoped_overrides_maybe_dirty = true;
66
67    let mut params = SynchronizedParameters::default();
68    loop {
69        // Wait for the next sync period
70        interval.tick().await;
71
72        // Fetch current parameter values from the backend
73        backend.pull(&mut params).await;
74
75        if !params.enable_launchdarkly() && sync_config.backend_config.is_launch_darkly() {
76            if frontend.is_some() {
77                tracing::info!("stopping system parameter frontend");
78                frontend = None;
79            } else {
80                tracing::info!("system parameter sync is disabled; not syncing")
81            }
82
83            // Don't do anything until the next loop.
84            continue;
85        }
86
87        if frontend.is_none() {
88            tracing::info!("initializing system parameter frontend");
89            let new_frontend = Arc::new(SystemParameterFrontend::from(&sync_config).await?);
90            // Share the frontend with the coordinator so the create-cluster /
91            // create-replica paths can resolve a new object's scoped overrides
92            // synchronously, instead of waiting for the next tick.
93            scoped_client.install_scoped_system_parameter_frontend(Arc::clone(&new_frontend));
94            frontend = Some(new_frontend);
95        }
96
97        // Pull latest state from frontend and push changes to backend.
98        let frontend = frontend.as_ref().expect("frontend exists");
99        if frontend.pull(&mut params) {
100            backend.push(&mut params).await;
101        }
102
103        // Reconcile the scoped (per-cluster and per-replica) parameters. We do
104        // this every tick (independent of whether the environment-wide values
105        // changed) so the overrides track the current set of live objects.
106        sync_scoped_params(
107            &scoped_client,
108            frontend,
109            &params,
110            &mut scoped_overrides_maybe_dirty,
111        )
112        .await;
113    }
114}
115
116/// Evaluate the scoped parameters (cluster-coherent and replica-local) for the
117/// currently live clusters and replicas and push the resulting overrides to the
118/// coordinator's working copy.
119async fn sync_scoped_params(
120    client: &Client,
121    frontend: &SystemParameterFrontend,
122    params: &SynchronizedParameters,
123    maybe_dirty: &mut bool,
124) {
125    // Read the feature gate from the working copy rather than a catalog
126    // snapshot. The gate is an environment-wide synced parameter, so `params`
127    // already carries its current value. This keeps a disabled environment, the
128    // default, from paying a coordinator round-trip every tick.
129    //
130    // Scoped (per-cluster and per-replica) overrides are off by default,
131    // leaving the environment-wide behavior unchanged. While disabled we
132    // evaluate no scoped contexts. If a prior run left overrides behind we clear
133    // them once so resolution falls back to the environment-wide value
134    // everywhere, then do no per-tick work until the feature is enabled again.
135    if !params.enable_scoped_system_parameters() {
136        if *maybe_dirty {
137            // Full replace with an empty desired state clears every override.
138            // No create-time write races here: it is gated off too.
139            client
140                .update_scoped_system_parameters(ScopedParameters::default(), None)
141                .await;
142            *maybe_dirty = false;
143        }
144        return;
145    }
146    *maybe_dirty = true;
147
148    let catalog = client.catalog_snapshot().await;
149
150    // Push the desired state to the coordinator, which holds the working copy
151    // and resolves each layer at its boundary: the controller's per-replica
152    // dyncfg push for `replica`, plan-time `OptimizerFeatureOverrides` for
153    // `cluster`. Scope removals to the live objects in this snapshot, so an
154    // object created between this snapshot and the apply (folding its override
155    // into its own create transaction) is not wiped by this reconcile.
156    let prune_scope = ScopedParametersScope {
157        clusters: catalog.clusters().map(|cluster| cluster.id).collect(),
158        replicas: catalog
159            .clusters()
160            .flat_map(|cluster| cluster.replicas().map(|replica| replica.replica_id))
161            .collect(),
162    };
163    let scoped = evaluate_scoped_parameters(frontend, params, &catalog, None, None);
164    client
165        .update_scoped_system_parameters(scoped, Some(prune_scope))
166        .await;
167}
168
169/// Evaluate the scoped parameters (cluster-coherent and replica-local) for the
170/// live objects, optionally restricted to a subset of cluster or replica ids.
171///
172/// The full pass (both filters `None`) is the sync loop's per-tick reconcile.
173/// The create path passes `Some(..)` to resolve just the newly-created objects
174/// synchronously, so they observe their overrides in their first controller
175/// configuration or first plan rather than after the next tick. Returns the
176/// sparse desired overrides for the evaluated objects.
177pub(crate) fn evaluate_scoped_parameters(
178    frontend: &SystemParameterFrontend,
179    params: &SynchronizedParameters,
180    catalog: &Catalog,
181    cluster_filter: Option<&BTreeSet<ClusterId>>,
182    replica_filter: Option<&BTreeSet<ReplicaId>>,
183) -> ScopedParameters {
184    let system_config = catalog.system_config();
185
186    // The synced parameters, partitioned by scope class. The scope declaration
187    // bounds evaluation to exactly the flags in use: an environment with no
188    // scoped flags evaluates neither pass.
189    let replica_param_names: Vec<&'static str> = system_config
190        .iter_synced()
191        .filter(|var| var.scope() == ParameterScope::Replica)
192        .map(|var| var.name())
193        .collect();
194    let cluster_param_names: Vec<&'static str> = system_config
195        .iter_synced()
196        .filter(|var| var.scope() == ParameterScope::Cluster)
197        .map(|var| var.name())
198        .collect();
199
200    let replica = if replica_param_names.is_empty() {
201        Default::default()
202    } else {
203        let replicas = build_replica_eval_contexts(catalog, replica_filter);
204        frontend.pull_replica_overrides(params, &replica_param_names, &replicas)
205    };
206    let cluster = if cluster_param_names.is_empty() {
207        Default::default()
208    } else {
209        let clusters = build_cluster_eval_contexts(catalog, cluster_filter);
210        frontend.pull_cluster_overrides(params, &cluster_param_names, &clusters)
211    };
212
213    ScopedParameters { cluster, replica }
214}
215
216/// Build a [`ClusterEvalContext`] for each live cluster in the catalog, skipping
217/// clusters absent from `filter` when one is given.
218fn build_cluster_eval_contexts(
219    catalog: &Catalog,
220    filter: Option<&BTreeSet<ClusterId>>,
221) -> Vec<ClusterEvalContext> {
222    catalog
223        .clusters()
224        .filter(|cluster| filter.is_none_or(|f| f.contains(&cluster.id)))
225        .map(|cluster| ClusterEvalContext {
226            cluster_id: cluster.id,
227            cluster: ClusterScopeContext {
228                id: cluster.id.to_string(),
229                name: cluster.name.clone(),
230                is_builtin: cluster.id.is_system(),
231            },
232        })
233        .collect()
234}
235
236/// Build a [`ReplicaEvalContext`] for each live managed replica in the catalog,
237/// skipping replicas absent from `filter` when one is given.
238fn build_replica_eval_contexts(
239    catalog: &Catalog,
240    filter: Option<&BTreeSet<ReplicaId>>,
241) -> Vec<ReplicaEvalContext> {
242    // An empty filter cannot match any replica, so skip the per-cluster scan.
243    // The create-cluster path passes an empty set to resolve only the cluster
244    // scope.
245    if filter.is_some_and(|f| f.is_empty()) {
246        return Vec::new();
247    }
248
249    let mut contexts = Vec::new();
250    for cluster in catalog.clusters() {
251        let is_builtin = cluster.id.is_system();
252        let cluster_ctx = ClusterScopeContext {
253            id: cluster.id.to_string(),
254            name: cluster.name.clone(),
255            is_builtin,
256        };
257        for replica in cluster.replicas() {
258            if filter.is_some_and(|f| !f.contains(&replica.replica_id)) {
259                continue;
260            }
261            // Only managed replicas have a size (and therefore a size family).
262            let ReplicaLocation::Managed(location) = &replica.config.location else {
263                continue;
264            };
265            let replica_ctx = ReplicaScopeContext {
266                id: replica.replica_id.to_string(),
267                name: replica.name.clone(),
268                is_builtin,
269                size: location.size.clone(),
270                size_family: location.allocation.family().to_string(),
271                cluster_id: cluster.id.to_string(),
272                cluster_name: cluster.name.clone(),
273            };
274            contexts.push(ReplicaEvalContext {
275                cluster_id: cluster.id,
276                replica_id: replica.replica_id,
277                cluster: cluster_ctx.clone(),
278                replica: replica_ctx,
279            });
280        }
281    }
282    contexts
283}