1use std::collections::BTreeMap;
13use std::fs;
14use std::path::{Path, PathBuf};
15use std::time::Duration;
16
17use anyhow::Context;
18use mz_dyncfg::{ConfigSet, ConfigUpdates, ConfigVal};
19use mz_ore::task;
20use serde_json::Value as JsonValue;
21use tokio::time;
22
23pub async fn sync_file_to_configset(
29 set: ConfigSet,
30 config_file: impl AsRef<Path>,
31 config_sync_timeout: Duration,
32 config_sync_loop_interval: Option<Duration>,
33 on_update: impl Fn(&ConfigUpdates, &ConfigSet) + Send + 'static,
34) -> Result<(), anyhow::Error> {
35 let config_file = config_file.as_ref().to_owned();
36
37 if !config_file.exists() {
39 tracing::warn!("sync config file {:?} does not exist", config_file);
40 return Ok(());
41 }
42
43 let synced = SyncedConfigSet {
44 set,
45 config_file,
46 on_update,
47 };
48
49 match tokio::time::timeout(config_sync_timeout, async {
51 synced.sync()?;
52 Ok::<_, anyhow::Error>(())
53 })
54 .await
55 {
56 Ok(Ok(())) => {}
57 Ok(Err(err)) => {
58 tracing::warn!("error while initializing file-backed config set: {}", err);
59 return Err(err);
60 }
61 Err(err) => {
62 tracing::warn!("timeout while initializing file-backed config set: {}", err);
63 return Err(err.into());
64 }
65 }
66
67 task::spawn(
69 || "SyncedConfigSet sync_loop",
70 synced.sync_loop(config_sync_loop_interval),
71 );
72
73 Ok(())
74}
75
76struct SyncedConfigSet<F>
77where
78 F: Fn(&ConfigUpdates, &ConfigSet) + Send,
79{
80 set: ConfigSet,
81 config_file: PathBuf,
82 on_update: F,
83}
84
85impl<F: Fn(&ConfigUpdates, &ConfigSet) + Send> SyncedConfigSet<F> {
86 async fn sync_loop(self, tick_interval: Option<Duration>) {
88 let Some(tick_interval) = tick_interval else {
89 tracing::info!("skipping SyncedConfigSet sync as tick_interval = None");
90 return;
91 };
92
93 let mut interval = time::interval(tick_interval);
94 interval.set_missed_tick_behavior(time::MissedTickBehavior::Skip);
95
96 tracing::info!(
97 "synchronizing SyncedConfigSet values every {} seconds",
98 tick_interval.as_secs()
99 );
100
101 loop {
102 interval.tick().await;
103
104 if let Err(err) = self.sync() {
105 tracing::warn!("SyncedConfigSet sync error: {}", err);
106 }
107 }
108 }
109
110 pub fn sync(&self) -> Result<(), anyhow::Error> {
112 let file_contents = fs::read_to_string(&self.config_file)
113 .with_context(|| format!("failed to read config file: {:?}", self.config_file))?;
114
115 if file_contents.trim().is_empty() {
117 return Ok(());
118 }
119
120 let values: BTreeMap<String, JsonValue> = serde_json::from_str(&file_contents)
121 .with_context(|| format!("failed to parse config file: {:?}", self.config_file))?;
122
123 let mut updates = ConfigUpdates::default();
124 for entry in self.set.entries() {
125 if let Some(val) = values.get(entry.name()) {
126 match json_to_config_val(val, &entry.val()) {
127 Ok(new_val) => {
128 if new_val != entry.val() {
130 tracing::debug!(
131 "updating config value {} from {:?} to {:?}",
132 &entry.name(),
133 &entry.val(),
134 new_val
135 );
136 updates.add_dynamic(entry.name(), new_val);
137 }
138 }
139 Err(err) => {
140 tracing::warn!(
141 "failed to convert JSON value for {}: {}",
142 entry.name(),
143 err
144 );
145 }
146 }
147 }
148 }
149 updates.apply(&self.set);
150 (self.on_update)(&updates, &self.set);
151 Ok(())
152 }
153}
154
155fn json_to_config_val(json: &JsonValue, template: &ConfigVal) -> Result<ConfigVal, anyhow::Error> {
157 match (template, json) {
158 (ConfigVal::Bool(_), JsonValue::Bool(v)) => Ok(ConfigVal::Bool(*v)),
159 (ConfigVal::U32(_), JsonValue::Number(v)) => Ok(ConfigVal::U32(
160 v.as_u64()
161 .and_then(|v| v.try_into().ok())
162 .ok_or_else(|| anyhow::anyhow!("not a u32"))?,
163 )),
164 (ConfigVal::Usize(_), JsonValue::Number(v)) => Ok(ConfigVal::Usize(
165 v.as_u64()
166 .and_then(|v| v.try_into().ok())
167 .ok_or_else(|| anyhow::anyhow!("not a usize"))?,
168 )),
169 (ConfigVal::OptUsize(_), JsonValue::Null) => Ok(ConfigVal::OptUsize(None)),
170 (ConfigVal::OptUsize(_), JsonValue::Number(v)) => Ok(ConfigVal::OptUsize(Some(
171 v.as_u64()
172 .and_then(|v| v.try_into().ok())
173 .ok_or_else(|| anyhow::anyhow!("not a usize"))?,
174 ))),
175 (ConfigVal::F64(_), JsonValue::Number(v)) => Ok(ConfigVal::F64(
176 v.as_f64().ok_or_else(|| anyhow::anyhow!("not an f64"))?,
177 )),
178 (ConfigVal::String(_), JsonValue::String(v)) => Ok(ConfigVal::String(v.clone())),
179 (ConfigVal::Duration(_), JsonValue::String(v)) => {
180 Ok(ConfigVal::Duration(humantime::parse_duration(v)?))
181 }
182 (ConfigVal::Json(_), v) => Ok(ConfigVal::Json(v.clone())),
183 _ => Err(anyhow::anyhow!("type mismatch")),
184 }
185}
186
187#[cfg(test)]
188mod tests {
189 use super::*;
190 use mz_dyncfg::Config;
191 use std::io::Write;
192 use std::sync::Arc;
193 use std::sync::atomic::AtomicBool;
194
195 #[mz_ore::test(tokio::test)]
196 async fn test_file_sync() {
197 let mut config_file = tempfile::NamedTempFile::new().unwrap();
198 const BOOL_CONFIG: Config<bool> = Config::new("test_bool", true, "A test boolean config");
199 const STRING_CONFIG: Config<&str> =
200 Config::new("test_string", "default", "A test string config");
201 let set = ConfigSet::default().add(&BOOL_CONFIG).add(&STRING_CONFIG);
202
203 sync_file_to_configset(
205 set.clone(),
206 &config_file.path(),
207 Duration::from_secs(1),
208 None,
209 |_, _| {},
210 )
211 .await
212 .unwrap();
213 assert_eq!(BOOL_CONFIG.get(&set), true);
214 assert_eq!(STRING_CONFIG.get(&set), "default");
215
216 config_file
218 .write_all(
219 String::from("{\"test_bool\": false, \"test_string\": \"modified\"}").as_bytes(),
220 )
221 .unwrap();
222
223 let updates_received = Arc::new(AtomicBool::new(false));
225 let updates_received_clone = Arc::clone(&updates_received);
226 sync_file_to_configset(
227 set.clone(),
228 &config_file,
229 Duration::from_secs(1),
230 None,
231 move |updates, _| {
232 assert_eq!(updates.updates.len(), 2);
233 updates_received_clone.store(true, std::sync::atomic::Ordering::SeqCst);
234 },
235 )
236 .await
237 .unwrap();
238
239 assert!(updates_received.load(std::sync::atomic::Ordering::SeqCst));
240 assert_eq!(BOOL_CONFIG.get(&set), false);
241 assert_eq!(STRING_CONFIG.get(&set), "modified");
242 }
243}