1use std::collections::BTreeMap;
13use std::fs;
14use std::path::{Path, PathBuf};
15use std::time::Duration;
16
17use anyhow::Context;
18use mz_dyncfg::{ConfigSet, ConfigUpdates, ConfigVal};
19use mz_ore::task;
20use serde_json::Value as JsonValue;
21use tokio::time;
22
23pub async fn sync_file_to_configset(
29 set: ConfigSet,
30 config_file: impl AsRef<Path>,
31 config_sync_timeout: Duration,
32 config_sync_loop_interval: Option<Duration>,
33 on_update: impl Fn(&ConfigUpdates, &ConfigSet) + Send + 'static,
34) -> Result<(), anyhow::Error> {
35 let config_file = config_file.as_ref().to_owned();
36
37 if !config_file.exists() {
39 tracing::warn!("sync config file {:?} does not exist", config_file);
40 return Ok(());
41 }
42
43 let synced = SyncedConfigSet {
44 set,
45 config_file,
46 on_update,
47 };
48
49 match tokio::time::timeout(config_sync_timeout, async {
51 synced.sync()?;
52 Ok::<_, anyhow::Error>(())
53 })
54 .await
55 {
56 Ok(Ok(())) => {}
57 Ok(Err(err)) => {
58 tracing::warn!("error while initializing file-backed config set: {}", err);
59 return Err(err);
60 }
61 Err(err) => {
62 tracing::warn!("timeout while initializing file-backed config set: {}", err);
63 return Err(err.into());
64 }
65 }
66
67 task::spawn(
69 || "SyncedConfigSet sync_loop",
70 synced.sync_loop(config_sync_loop_interval),
71 );
72
73 Ok(())
74}
75
76struct SyncedConfigSet<F>
77where
78 F: Fn(&ConfigUpdates, &ConfigSet) + Send,
79{
80 set: ConfigSet,
81 config_file: PathBuf,
82 on_update: F,
83}
84
85impl<F: Fn(&ConfigUpdates, &ConfigSet) + Send> SyncedConfigSet<F> {
86 async fn sync_loop(self, tick_interval: Option<Duration>) {
88 let Some(tick_interval) = tick_interval else {
89 tracing::info!("skipping SyncedConfigSet sync as tick_interval = None");
90 return;
91 };
92
93 let mut interval = time::interval(tick_interval);
94 interval.set_missed_tick_behavior(time::MissedTickBehavior::Skip);
95
96 tracing::info!(
97 "synchronizing SyncedConfigSet values every {} seconds",
98 tick_interval.as_secs()
99 );
100
101 loop {
102 interval.tick().await;
103
104 if let Err(err) = self.sync() {
105 tracing::warn!("SyncedConfigSet sync error: {}", err);
106 }
107 }
108 }
109
110 pub fn sync(&self) -> Result<(), anyhow::Error> {
112 let file_contents = fs::read_to_string(&self.config_file)
113 .with_context(|| format!("failed to read config file: {:?}", self.config_file))?;
114
115 if file_contents.trim().is_empty() {
117 return Ok(());
118 }
119
120 let values: BTreeMap<String, JsonValue> = serde_json::from_str(&file_contents)
121 .with_context(|| format!("failed to parse config file: {:?}", self.config_file))?;
122
123 let mut updates = ConfigUpdates::default();
124 for entry in self.set.entries() {
125 if let Some(val) = values.get(entry.name()) {
126 match json_to_config_val(val, &entry.val()) {
127 Ok(new_val) => {
128 if new_val != entry.val() {
130 tracing::debug!(
131 "updating config value {} from {:?} to {:?}",
132 &entry.name(),
133 &entry.val(),
134 new_val
135 );
136 updates.add_dynamic(entry.name(), new_val);
137 }
138 }
139 Err(err) => {
140 tracing::warn!(
141 "failed to convert JSON value for {}: {}",
142 entry.name(),
143 err
144 );
145 }
146 }
147 }
148 }
149 updates.apply(&self.set);
150 (self.on_update)(&updates, &self.set);
151 Ok(())
152 }
153}
154
155fn json_to_config_val(json: &JsonValue, template: &ConfigVal) -> Result<ConfigVal, anyhow::Error> {
157 match (template, json) {
158 (ConfigVal::Bool(_), JsonValue::Bool(v)) => Ok(ConfigVal::Bool(*v)),
159 (ConfigVal::U32(_), JsonValue::Number(v)) => Ok(ConfigVal::U32(
160 v.as_u64()
161 .and_then(|v| v.try_into().ok())
162 .ok_or_else(|| anyhow::anyhow!("not a u32"))?,
163 )),
164 (ConfigVal::Usize(_), JsonValue::Number(v)) => Ok(ConfigVal::Usize(
165 v.as_u64()
166 .and_then(|v| v.try_into().ok())
167 .ok_or_else(|| anyhow::anyhow!("not a usize"))?,
168 )),
169 (ConfigVal::OptUsize(_), JsonValue::Null) => Ok(ConfigVal::OptUsize(None)),
170 (ConfigVal::OptUsize(_), JsonValue::Number(v)) => Ok(ConfigVal::OptUsize(Some(
171 v.as_u64()
172 .and_then(|v| v.try_into().ok())
173 .ok_or_else(|| anyhow::anyhow!("not a usize"))?,
174 ))),
175 (ConfigVal::F64(_), JsonValue::Number(v)) => Ok(ConfigVal::F64(
176 v.as_f64().ok_or_else(|| anyhow::anyhow!("not an f64"))?,
177 )),
178 (ConfigVal::String(_), JsonValue::String(v)) => Ok(ConfigVal::String(v.clone())),
179 (ConfigVal::OptString(_), JsonValue::Null) => Ok(ConfigVal::OptString(None)),
180 (ConfigVal::OptString(_), JsonValue::String(v)) => {
181 Ok(ConfigVal::OptString(Some(v.clone())))
182 }
183 (ConfigVal::Duration(_), JsonValue::String(v)) => {
184 Ok(ConfigVal::Duration(humantime::parse_duration(v)?))
185 }
186 (ConfigVal::Json(_), v) => Ok(ConfigVal::Json(v.clone())),
187 _ => Err(anyhow::anyhow!("type mismatch")),
188 }
189}
190
191#[cfg(test)]
192mod tests {
193 use super::*;
194 use mz_dyncfg::Config;
195 use std::io::Write;
196 use std::sync::Arc;
197 use std::sync::atomic::AtomicBool;
198
199 #[mz_ore::test(tokio::test)]
200 async fn test_file_sync() {
201 let mut config_file = tempfile::NamedTempFile::new().unwrap();
202 const BOOL_CONFIG: Config<bool> = Config::new("test_bool", true, "A test boolean config");
203 const STRING_CONFIG: Config<&str> =
204 Config::new("test_string", "default", "A test string config");
205 let set = ConfigSet::default().add(&BOOL_CONFIG).add(&STRING_CONFIG);
206
207 sync_file_to_configset(
209 set.clone(),
210 &config_file.path(),
211 Duration::from_secs(1),
212 None,
213 |_, _| {},
214 )
215 .await
216 .unwrap();
217 assert_eq!(BOOL_CONFIG.get(&set), true);
218 assert_eq!(STRING_CONFIG.get(&set), "default");
219
220 config_file
222 .write_all(
223 String::from("{\"test_bool\": false, \"test_string\": \"modified\"}").as_bytes(),
224 )
225 .unwrap();
226
227 let updates_received = Arc::new(AtomicBool::new(false));
229 let updates_received_clone = Arc::clone(&updates_received);
230 sync_file_to_configset(
231 set.clone(),
232 &config_file,
233 Duration::from_secs(1),
234 None,
235 move |updates, _| {
236 assert_eq!(updates.updates.len(), 2);
237 updates_received_clone.store(true, std::sync::atomic::Ordering::SeqCst);
238 },
239 )
240 .await
241 .unwrap();
242
243 assert!(updates_received.load(std::sync::atomic::Ordering::SeqCst));
244 assert_eq!(BOOL_CONFIG.get(&set), false);
245 assert_eq!(STRING_CONFIG.get(&set), "modified");
246 }
247
248 #[mz_ore::test(tokio::test)]
249 async fn test_file_sync_opt_string() {
250 const OPT_STRING_CONFIG: Config<Option<&str>> =
251 Config::new("test_opt_string", None, "A test optional string config");
252 let set = ConfigSet::default().add(&OPT_STRING_CONFIG);
253
254 let mut config_file = tempfile::NamedTempFile::new().unwrap();
255 config_file
256 .write_all(b"{\"test_opt_string\": \"hello\"}")
257 .unwrap();
258 sync_file_to_configset(
259 set.clone(),
260 &config_file,
261 Duration::from_secs(1),
262 None,
263 |_, _| {},
264 )
265 .await
266 .unwrap();
267 assert_eq!(OPT_STRING_CONFIG.get(&set), Some("hello".to_string()));
268
269 let mut config_file = tempfile::NamedTempFile::new().unwrap();
271 config_file
272 .write_all(b"{\"test_opt_string\": null}")
273 .unwrap();
274 sync_file_to_configset(
275 set.clone(),
276 &config_file,
277 Duration::from_secs(1),
278 None,
279 |_, _| {},
280 )
281 .await
282 .unwrap();
283 assert_eq!(OPT_STRING_CONFIG.get(&set), None);
284 }
285}