mz_adapter/config/
sync.rs1use std::time::Duration;
11
12use tokio::time;
13
14use crate::Client;
15use crate::config::{
16 SynchronizedParameters, SystemParameterBackend, SystemParameterFrontend,
17 SystemParameterSyncConfig,
18};
19
20pub async fn system_parameter_sync(
24 sync_config: SystemParameterSyncConfig,
25 adapter_client: Client,
26 tick_interval: Option<Duration>,
27) -> Result<(), anyhow::Error> {
28 let Some(tick_interval) = tick_interval else {
29 tracing::info!("skipping system parameter sync as tick_interval = None");
30 return Ok(());
31 };
32
33 let mut frontend = Option::<SystemParameterFrontend>::None; let mut backend = SystemParameterBackend::new(adapter_client).await?;
36
37 let mut interval = time::interval(tick_interval);
39 interval.set_missed_tick_behavior(time::MissedTickBehavior::Skip);
40
41 tracing::info!(
43 "synchronizing system parameter values every {} seconds",
44 tick_interval.as_secs()
45 );
46
47 let mut params = SynchronizedParameters::default();
48 loop {
49 interval.tick().await;
51
52 backend.pull(&mut params).await;
54
55 if !params.enable_launchdarkly() && sync_config.backend_config.is_launch_darkly() {
56 if frontend.is_some() {
57 tracing::info!("stopping system parameter frontend");
58 frontend = None;
59 } else {
60 tracing::info!("system parameter sync is disabled; not syncing")
61 }
62
63 continue;
65 }
66
67 if frontend.is_none() {
68 tracing::info!("initializing system parameter frontend");
69 frontend = Some(SystemParameterFrontend::from(&sync_config).await?);
70 }
71
72 let frontend = frontend.as_ref().expect("frontend exists");
74 if frontend.pull(&mut params) {
75 backend.push(&mut params).await;
76 }
77 }
78}