mz_adapter/config/
sync.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::time::Duration;
11
12use tokio::time;
13
14use crate::Client;
15use crate::config::{
16    SynchronizedParameters, SystemParameterBackend, SystemParameterFrontend,
17    SystemParameterSyncConfig,
18};
19
20/// Run a loop that periodically pulls system parameters defined in the
21/// LaunchDarkly-backed [SystemParameterFrontend] and pushes modified values to the
22/// `ALTER SYSTEM`-backed [SystemParameterBackend].
23pub async fn system_parameter_sync(
24    sync_config: SystemParameterSyncConfig,
25    adapter_client: Client,
26    tick_interval: Option<Duration>,
27) -> Result<(), anyhow::Error> {
28    let Some(tick_interval) = tick_interval else {
29        tracing::info!("skipping system parameter sync as tick_interval = None");
30        return Ok(());
31    };
32
33    // Ensure the frontend client is initialized.
34    let mut frontend = Option::<SystemParameterFrontend>::None; // lazy initialize the frontend below
35    let mut backend = SystemParameterBackend::new(adapter_client).await?;
36
37    // Tick every `tick_duration` ms, skipping missed ticks.
38    let mut interval = time::interval(tick_interval);
39    interval.set_missed_tick_behavior(time::MissedTickBehavior::Skip);
40
41    // Run the synchronization loop.
42    tracing::info!(
43        "synchronizing system parameter values every {} seconds",
44        tick_interval.as_secs()
45    );
46
47    let mut params = SynchronizedParameters::default();
48    loop {
49        // Wait for the next sync period
50        interval.tick().await;
51
52        // Fetch current parameter values from the backend
53        backend.pull(&mut params).await;
54
55        if !params.enable_launchdarkly() {
56            if frontend.is_some() {
57                tracing::info!("stopping system parameter frontend");
58                frontend = None;
59            } else {
60                tracing::info!("system parameter sync is disabled; not syncing")
61            }
62
63            // Don't do anything until the next loop.
64            continue;
65        } else {
66            if frontend.is_none() {
67                tracing::info!("initializing system parameter frontend");
68                frontend = Some(SystemParameterFrontend::from(&sync_config).await?);
69            }
70
71            // Pull latest state from frontend and push changes to backend.
72            let frontend = frontend.as_ref().expect("frontend exists");
73            if frontend.pull(&mut params) {
74                backend.push(&mut params).await;
75            }
76        }
77    }
78}