mz_adapter/config/
sync.rs1use std::collections::BTreeSet;
11use std::sync::Arc;
12use std::time::Duration;
13
14use mz_controller::clusters::ReplicaLocation;
15use mz_controller_types::{ClusterId, ReplicaId};
16use mz_dyncfg::ParameterScope;
17use tokio::time;
18
19use crate::Client;
20use crate::catalog::Catalog;
21use crate::config::{
22 ClusterEvalContext, ClusterScopeContext, ReplicaEvalContext, ReplicaScopeContext,
23 ScopedParameters, ScopedParametersScope, SynchronizedParameters, SystemParameterBackend,
24 SystemParameterFrontend, SystemParameterSyncConfig,
25};
26
27pub async fn system_parameter_sync(
31 sync_config: SystemParameterSyncConfig,
32 adapter_client: Client,
33 tick_interval: Option<Duration>,
34) -> Result<(), anyhow::Error> {
35 let Some(tick_interval) = tick_interval else {
36 tracing::info!("skipping system parameter sync as tick_interval = None");
37 return Ok(());
38 };
39
40 let scoped_client = adapter_client.clone();
43
44 let mut frontend = Option::<Arc<SystemParameterFrontend>>::None; let mut backend = SystemParameterBackend::new(adapter_client).await?;
49
50 let mut interval = time::interval(tick_interval);
52 interval.set_missed_tick_behavior(time::MissedTickBehavior::Skip);
53
54 tracing::info!(
56 "synchronizing system parameter values every {} seconds",
57 tick_interval.as_secs()
58 );
59
60 let mut scoped_overrides_maybe_dirty = true;
66
67 let mut params = SynchronizedParameters::default();
68 loop {
69 interval.tick().await;
71
72 backend.pull(&mut params).await;
74
75 if !params.enable_launchdarkly() && sync_config.backend_config.is_launch_darkly() {
76 if frontend.is_some() {
77 tracing::info!("stopping system parameter frontend");
78 frontend = None;
79 } else {
80 tracing::info!("system parameter sync is disabled; not syncing")
81 }
82
83 continue;
85 }
86
87 if frontend.is_none() {
88 tracing::info!("initializing system parameter frontend");
89 let new_frontend = Arc::new(SystemParameterFrontend::from(&sync_config).await?);
90 scoped_client.install_scoped_system_parameter_frontend(Arc::clone(&new_frontend));
94 frontend = Some(new_frontend);
95 }
96
97 let frontend = frontend.as_ref().expect("frontend exists");
99 if frontend.pull(&mut params) {
100 backend.push(&mut params).await;
101 }
102
103 sync_scoped_params(
107 &scoped_client,
108 frontend,
109 ¶ms,
110 &mut scoped_overrides_maybe_dirty,
111 )
112 .await;
113 }
114}
115
116async fn sync_scoped_params(
120 client: &Client,
121 frontend: &SystemParameterFrontend,
122 params: &SynchronizedParameters,
123 maybe_dirty: &mut bool,
124) {
125 if !params.enable_scoped_system_parameters() {
136 if *maybe_dirty {
137 client
140 .update_scoped_system_parameters(ScopedParameters::default(), None)
141 .await;
142 *maybe_dirty = false;
143 }
144 return;
145 }
146 *maybe_dirty = true;
147
148 let catalog = client.catalog_snapshot().await;
149
150 let prune_scope = ScopedParametersScope {
157 clusters: catalog.clusters().map(|cluster| cluster.id).collect(),
158 replicas: catalog
159 .clusters()
160 .flat_map(|cluster| cluster.replicas().map(|replica| replica.replica_id))
161 .collect(),
162 };
163 let scoped = evaluate_scoped_parameters(frontend, params, &catalog, None, None);
164 client
165 .update_scoped_system_parameters(scoped, Some(prune_scope))
166 .await;
167}
168
169pub(crate) fn evaluate_scoped_parameters(
178 frontend: &SystemParameterFrontend,
179 params: &SynchronizedParameters,
180 catalog: &Catalog,
181 cluster_filter: Option<&BTreeSet<ClusterId>>,
182 replica_filter: Option<&BTreeSet<ReplicaId>>,
183) -> ScopedParameters {
184 let system_config = catalog.system_config();
185
186 let replica_param_names: Vec<&'static str> = system_config
190 .iter_synced()
191 .filter(|var| var.scope() == ParameterScope::Replica)
192 .map(|var| var.name())
193 .collect();
194 let cluster_param_names: Vec<&'static str> = system_config
195 .iter_synced()
196 .filter(|var| var.scope() == ParameterScope::Cluster)
197 .map(|var| var.name())
198 .collect();
199
200 let replica = if replica_param_names.is_empty() {
201 Default::default()
202 } else {
203 let replicas = build_replica_eval_contexts(catalog, replica_filter);
204 frontend.pull_replica_overrides(params, &replica_param_names, &replicas)
205 };
206 let cluster = if cluster_param_names.is_empty() {
207 Default::default()
208 } else {
209 let clusters = build_cluster_eval_contexts(catalog, cluster_filter);
210 frontend.pull_cluster_overrides(params, &cluster_param_names, &clusters)
211 };
212
213 ScopedParameters { cluster, replica }
214}
215
216fn build_cluster_eval_contexts(
219 catalog: &Catalog,
220 filter: Option<&BTreeSet<ClusterId>>,
221) -> Vec<ClusterEvalContext> {
222 catalog
223 .clusters()
224 .filter(|cluster| filter.is_none_or(|f| f.contains(&cluster.id)))
225 .map(|cluster| ClusterEvalContext {
226 cluster_id: cluster.id,
227 cluster: ClusterScopeContext {
228 id: cluster.id.to_string(),
229 name: cluster.name.clone(),
230 is_builtin: cluster.id.is_system(),
231 },
232 })
233 .collect()
234}
235
236fn build_replica_eval_contexts(
239 catalog: &Catalog,
240 filter: Option<&BTreeSet<ReplicaId>>,
241) -> Vec<ReplicaEvalContext> {
242 if filter.is_some_and(|f| f.is_empty()) {
246 return Vec::new();
247 }
248
249 let mut contexts = Vec::new();
250 for cluster in catalog.clusters() {
251 let is_builtin = cluster.id.is_system();
252 let cluster_ctx = ClusterScopeContext {
253 id: cluster.id.to_string(),
254 name: cluster.name.clone(),
255 is_builtin,
256 };
257 for replica in cluster.replicas() {
258 if filter.is_some_and(|f| !f.contains(&replica.replica_id)) {
259 continue;
260 }
261 let ReplicaLocation::Managed(location) = &replica.config.location else {
263 continue;
264 };
265 let replica_ctx = ReplicaScopeContext {
266 id: replica.replica_id.to_string(),
267 name: replica.name.clone(),
268 is_builtin,
269 size: location.size.clone(),
270 size_family: location.allocation.family().to_string(),
271 cluster_id: cluster.id.to_string(),
272 cluster_name: cluster.name.clone(),
273 };
274 contexts.push(ReplicaEvalContext {
275 cluster_id: cluster.id,
276 replica_id: replica.replica_id,
277 cluster: cluster_ctx.clone(),
278 replica: replica_ctx,
279 });
280 }
281 }
282 contexts
283}