1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
910use std::time::Duration;
1112use tokio::time;
1314use crate::Client;
15use crate::config::{
16 SynchronizedParameters, SystemParameterBackend, SystemParameterFrontend,
17 SystemParameterSyncConfig,
18};
1920/// Run a loop that periodically pulls system parameters defined in the
21/// LaunchDarkly-backed [SystemParameterFrontend] and pushes modified values to the
22/// `ALTER SYSTEM`-backed [SystemParameterBackend].
23pub async fn system_parameter_sync(
24 sync_config: SystemParameterSyncConfig,
25 adapter_client: Client,
26 tick_interval: Option<Duration>,
27) -> Result<(), anyhow::Error> {
28let Some(tick_interval) = tick_interval else {
29tracing::info!("skipping system parameter sync as tick_interval = None");
30return Ok(());
31 };
3233// Ensure the frontend client is initialized.
34let mut frontend = Option::<SystemParameterFrontend>::None; // lazy initialize the frontend below
35let mut backend = SystemParameterBackend::new(adapter_client).await?;
3637// Tick every `tick_duration` ms, skipping missed ticks.
38let mut interval = time::interval(tick_interval);
39 interval.set_missed_tick_behavior(time::MissedTickBehavior::Skip);
4041// Run the synchronization loop.
42tracing::info!(
43"synchronizing system parameter values every {} seconds",
44 tick_interval.as_secs()
45 );
4647let mut params = SynchronizedParameters::default();
48loop {
49// Wait for the next sync period
50interval.tick().await;
5152// Fetch current parameter values from the backend
53backend.pull(&mut params).await;
5455if !params.enable_launchdarkly() {
56if frontend.is_some() {
57tracing::info!("stopping system parameter frontend");
58 frontend = None;
59 } else {
60tracing::info!("system parameter sync is disabled; not syncing")
61 }
6263// Don't do anything until the next loop.
64continue;
65 } else {
66if frontend.is_none() {
67tracing::info!("initializing system parameter frontend");
68 frontend = Some(SystemParameterFrontend::from(&sync_config).await?);
69 }
7071// Pull latest state from frontend and push changes to backend.
72let frontend = frontend.as_ref().expect("frontend exists");
73if frontend.pull(&mut params) {
74 backend.push(&mut params).await;
75 }
76 }
77 }
78}