1use std::collections::BTreeMap;
13use std::fs;
14use std::path::{Path, PathBuf};
15use std::time::Duration;
16
17use anyhow::Context;
18use mz_dyncfg::{ConfigSet, ConfigUpdates, ConfigVal};
19use mz_ore::task;
20use serde_json::Value as JsonValue;
21use tokio::time;
22
23pub async fn sync_file_to_configset(
29 set: ConfigSet,
30 config_file: impl AsRef<Path>,
31 config_sync_timeout: Duration,
32 config_sync_loop_interval: Option<Duration>,
33 on_update: impl Fn(&ConfigUpdates, &ConfigSet) + Send + 'static,
34) -> Result<(), anyhow::Error> {
35 let config_file = config_file.as_ref().to_owned();
36
37 if !config_file.exists() {
39 tracing::warn!("sync config file {:?} does not exist", config_file);
40 return Ok(());
41 }
42
43 let synced = SyncedConfigSet {
44 set,
45 config_file,
46 on_update,
47 };
48
49 if let Err(err) = tokio::time::timeout(config_sync_timeout, async {
51 synced.sync()?;
52 Ok::<_, anyhow::Error>(())
53 })
54 .await
55 {
56 tracing::warn!("error while initializing file-backed config set: {}", err);
57 return Err(err.into());
58 }
59
60 task::spawn(
62 || "SyncedConfigSet sync_loop",
63 synced.sync_loop(config_sync_loop_interval),
64 );
65
66 Ok(())
67}
68
69struct SyncedConfigSet<F>
70where
71 F: Fn(&ConfigUpdates, &ConfigSet) + Send,
72{
73 set: ConfigSet,
74 config_file: PathBuf,
75 on_update: F,
76}
77
78impl<F: Fn(&ConfigUpdates, &ConfigSet) + Send> SyncedConfigSet<F> {
79 async fn sync_loop(self, tick_interval: Option<Duration>) {
81 let Some(tick_interval) = tick_interval else {
82 tracing::info!("skipping SyncedConfigSet sync as tick_interval = None");
83 return;
84 };
85
86 let mut interval = time::interval(tick_interval);
87 interval.set_missed_tick_behavior(time::MissedTickBehavior::Skip);
88
89 tracing::info!(
90 "synchronizing SyncedConfigSet values every {} seconds",
91 tick_interval.as_secs()
92 );
93
94 loop {
95 interval.tick().await;
96
97 if let Err(err) = self.sync() {
98 tracing::warn!("SyncedConfigSet sync error: {}", err);
99 }
100 }
101 }
102
103 pub fn sync(&self) -> Result<(), anyhow::Error> {
105 let file_contents = fs::read_to_string(&self.config_file)
106 .with_context(|| format!("failed to read config file: {:?}", self.config_file))?;
107
108 let values: BTreeMap<String, JsonValue> = serde_json::from_str(&file_contents)
109 .with_context(|| format!("failed to parse config file: {:?}", self.config_file))?;
110
111 let mut updates = ConfigUpdates::default();
112 for entry in self.set.entries() {
113 if let Some(val) = values.get(entry.name()) {
114 match json_to_config_val(val, &entry.val()) {
115 Ok(new_val) => {
116 if new_val != entry.val() {
118 tracing::debug!(
119 "updating config value {} from {:?} to {:?}",
120 &entry.name(),
121 &entry.val(),
122 new_val
123 );
124 updates.add_dynamic(entry.name(), new_val);
125 }
126 }
127 Err(err) => {
128 tracing::warn!(
129 "failed to convert JSON value for {}: {}",
130 entry.name(),
131 err
132 );
133 }
134 }
135 }
136 }
137 updates.apply(&self.set);
138 (self.on_update)(&updates, &self.set);
139 Ok(())
140 }
141}
142
143fn json_to_config_val(json: &JsonValue, template: &ConfigVal) -> Result<ConfigVal, anyhow::Error> {
145 match (template, json) {
146 (ConfigVal::Bool(_), JsonValue::Bool(v)) => Ok(ConfigVal::Bool(*v)),
147 (ConfigVal::U32(_), JsonValue::Number(v)) => Ok(ConfigVal::U32(
148 v.as_u64()
149 .and_then(|v| v.try_into().ok())
150 .ok_or_else(|| anyhow::anyhow!("not a u32"))?,
151 )),
152 (ConfigVal::Usize(_), JsonValue::Number(v)) => Ok(ConfigVal::Usize(
153 v.as_u64()
154 .and_then(|v| v.try_into().ok())
155 .ok_or_else(|| anyhow::anyhow!("not a usize"))?,
156 )),
157 (ConfigVal::OptUsize(_), JsonValue::Null) => Ok(ConfigVal::OptUsize(None)),
158 (ConfigVal::OptUsize(_), JsonValue::Number(v)) => Ok(ConfigVal::OptUsize(Some(
159 v.as_u64()
160 .and_then(|v| v.try_into().ok())
161 .ok_or_else(|| anyhow::anyhow!("not a usize"))?,
162 ))),
163 (ConfigVal::F64(_), JsonValue::Number(v)) => Ok(ConfigVal::F64(
164 v.as_f64().ok_or_else(|| anyhow::anyhow!("not an f64"))?,
165 )),
166 (ConfigVal::String(_), JsonValue::String(v)) => Ok(ConfigVal::String(v.clone())),
167 (ConfigVal::Duration(_), JsonValue::String(v)) => {
168 Ok(ConfigVal::Duration(humantime::parse_duration(v)?))
169 }
170 (ConfigVal::Json(_), v) => Ok(ConfigVal::Json(v.clone())),
171 _ => Err(anyhow::anyhow!("type mismatch")),
172 }
173}
174
175#[cfg(test)]
176mod tests {
177 use super::*;
178 use mz_dyncfg::Config;
179 use std::io::Write;
180 use std::sync::Arc;
181 use std::sync::atomic::AtomicBool;
182
183 #[mz_ore::test(tokio::test)]
184 async fn test_file_sync() {
185 let mut config_file = tempfile::NamedTempFile::new().unwrap();
186 const BOOL_CONFIG: Config<bool> = Config::new("test_bool", true, "A test boolean config");
187 const STRING_CONFIG: Config<&str> =
188 Config::new("test_string", "default", "A test string config");
189 let set = ConfigSet::default().add(&BOOL_CONFIG).add(&STRING_CONFIG);
190
191 sync_file_to_configset(
193 set.clone(),
194 &config_file.path(),
195 Duration::from_secs(1),
196 None,
197 |_, _| {},
198 )
199 .await
200 .unwrap();
201 assert_eq!(BOOL_CONFIG.get(&set), true);
202 assert_eq!(STRING_CONFIG.get(&set), "default");
203
204 config_file
206 .write_all(
207 String::from("{\"test_bool\": false, \"test_string\": \"modified\"}").as_bytes(),
208 )
209 .unwrap();
210
211 let updates_received = Arc::new(AtomicBool::new(false));
213 let updates_received_clone = Arc::clone(&updates_received);
214 sync_file_to_configset(
215 set.clone(),
216 &config_file,
217 Duration::from_secs(1),
218 None,
219 move |updates, _| {
220 assert_eq!(updates.updates.len(), 2);
221 updates_received_clone.store(true, std::sync::atomic::Ordering::SeqCst);
222 },
223 )
224 .await
225 .unwrap();
226
227 assert!(updates_received.load(std::sync::atomic::Ordering::SeqCst));
228 assert_eq!(BOOL_CONFIG.get(&set), false);
229 assert_eq!(STRING_CONFIG.get(&set), "modified");
230 }
231}