mz_dyncfg_file/
lib.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! A dyncfg::ConfigSet backed by a local JSON file.
11
12use std::collections::BTreeMap;
13use std::fs;
14use std::path::{Path, PathBuf};
15use std::time::Duration;
16
17use anyhow::Context;
18use mz_dyncfg::{ConfigSet, ConfigUpdates, ConfigVal};
19use mz_ore::task;
20use serde_json::Value as JsonValue;
21use tokio::time;
22
23/// Start a background task that syncs a ConfigSet with a local JSON file.
24/// Returns `Ok` after the initial sync is completed. If the initialization takes longer
25/// than `config_sync_timeout`, an error is returned.
26///
27/// The file format is a simple JSON object mapping config names to their values.
28pub async fn sync_file_to_configset(
29    set: ConfigSet,
30    config_file: impl AsRef<Path>,
31    config_sync_timeout: Duration,
32    config_sync_loop_interval: Option<Duration>,
33    on_update: impl Fn(&ConfigUpdates, &ConfigSet) + Send + 'static,
34) -> Result<(), anyhow::Error> {
35    let config_file = config_file.as_ref().to_owned();
36
37    // Create initial file if it doesn't exist
38    if !config_file.exists() {
39        tracing::warn!("sync config file {:?} does not exist", config_file);
40        return Ok(());
41    }
42
43    let synced = SyncedConfigSet {
44        set,
45        config_file,
46        on_update,
47    };
48
49    // Do initial sync
50    if let Err(err) = tokio::time::timeout(config_sync_timeout, async {
51        synced.sync()?;
52        Ok::<_, anyhow::Error>(())
53    })
54    .await
55    {
56        tracing::warn!("error while initializing file-backed config set: {}", err);
57        return Err(err.into());
58    }
59
60    // Start background sync task if interval is specified
61    task::spawn(
62        || "SyncedConfigSet sync_loop",
63        synced.sync_loop(config_sync_loop_interval),
64    );
65
66    Ok(())
67}
68
69struct SyncedConfigSet<F>
70where
71    F: Fn(&ConfigUpdates, &ConfigSet) + Send,
72{
73    set: ConfigSet,
74    config_file: PathBuf,
75    on_update: F,
76}
77
78impl<F: Fn(&ConfigUpdates, &ConfigSet) + Send> SyncedConfigSet<F> {
79    /// Returns a future that periodically reads the config file and updates the ConfigSet.
80    async fn sync_loop(self, tick_interval: Option<Duration>) {
81        let Some(tick_interval) = tick_interval else {
82            tracing::info!("skipping SyncedConfigSet sync as tick_interval = None");
83            return;
84        };
85
86        let mut interval = time::interval(tick_interval);
87        interval.set_missed_tick_behavior(time::MissedTickBehavior::Skip);
88
89        tracing::info!(
90            "synchronizing SyncedConfigSet values every {} seconds",
91            tick_interval.as_secs()
92        );
93
94        loop {
95            interval.tick().await;
96
97            if let Err(err) = self.sync() {
98                tracing::warn!("SyncedConfigSet sync error: {}", err);
99            }
100        }
101    }
102
103    /// Reads current values from the config file and updates the ConfigSet.
104    pub fn sync(&self) -> Result<(), anyhow::Error> {
105        let file_contents = fs::read_to_string(&self.config_file)
106            .with_context(|| format!("failed to read config file: {:?}", self.config_file))?;
107
108        let values: BTreeMap<String, JsonValue> = serde_json::from_str(&file_contents)
109            .with_context(|| format!("failed to parse config file: {:?}", self.config_file))?;
110
111        let mut updates = ConfigUpdates::default();
112        for entry in self.set.entries() {
113            if let Some(val) = values.get(entry.name()) {
114                match json_to_config_val(val, &entry.val()) {
115                    Ok(new_val) => {
116                        // Only update if the value is different
117                        if new_val != entry.val() {
118                            tracing::debug!(
119                                "updating config value {} from {:?} to {:?}",
120                                &entry.name(),
121                                &entry.val(),
122                                new_val
123                            );
124                            updates.add_dynamic(entry.name(), new_val);
125                        }
126                    }
127                    Err(err) => {
128                        tracing::warn!(
129                            "failed to convert JSON value for {}: {}",
130                            entry.name(),
131                            err
132                        );
133                    }
134                }
135            }
136        }
137        updates.apply(&self.set);
138        (self.on_update)(&updates, &self.set);
139        Ok(())
140    }
141}
142
143/// Convert a JSON value to a ConfigVal, using the existing value as a template for the type
144fn json_to_config_val(json: &JsonValue, template: &ConfigVal) -> Result<ConfigVal, anyhow::Error> {
145    match (template, json) {
146        (ConfigVal::Bool(_), JsonValue::Bool(v)) => Ok(ConfigVal::Bool(*v)),
147        (ConfigVal::U32(_), JsonValue::Number(v)) => Ok(ConfigVal::U32(
148            v.as_u64()
149                .and_then(|v| v.try_into().ok())
150                .ok_or_else(|| anyhow::anyhow!("not a u32"))?,
151        )),
152        (ConfigVal::Usize(_), JsonValue::Number(v)) => Ok(ConfigVal::Usize(
153            v.as_u64()
154                .and_then(|v| v.try_into().ok())
155                .ok_or_else(|| anyhow::anyhow!("not a usize"))?,
156        )),
157        (ConfigVal::OptUsize(_), JsonValue::Null) => Ok(ConfigVal::OptUsize(None)),
158        (ConfigVal::OptUsize(_), JsonValue::Number(v)) => Ok(ConfigVal::OptUsize(Some(
159            v.as_u64()
160                .and_then(|v| v.try_into().ok())
161                .ok_or_else(|| anyhow::anyhow!("not a usize"))?,
162        ))),
163        (ConfigVal::F64(_), JsonValue::Number(v)) => Ok(ConfigVal::F64(
164            v.as_f64().ok_or_else(|| anyhow::anyhow!("not an f64"))?,
165        )),
166        (ConfigVal::String(_), JsonValue::String(v)) => Ok(ConfigVal::String(v.clone())),
167        (ConfigVal::Duration(_), JsonValue::String(v)) => {
168            Ok(ConfigVal::Duration(humantime::parse_duration(v)?))
169        }
170        (ConfigVal::Json(_), v) => Ok(ConfigVal::Json(v.clone())),
171        _ => Err(anyhow::anyhow!("type mismatch")),
172    }
173}
174
175#[cfg(test)]
176mod tests {
177    use super::*;
178    use mz_dyncfg::Config;
179    use std::io::Write;
180    use std::sync::Arc;
181    use std::sync::atomic::AtomicBool;
182
183    #[mz_ore::test(tokio::test)]
184    async fn test_file_sync() {
185        let mut config_file = tempfile::NamedTempFile::new().unwrap();
186        const BOOL_CONFIG: Config<bool> = Config::new("test_bool", true, "A test boolean config");
187        const STRING_CONFIG: Config<&str> =
188            Config::new("test_string", "default", "A test string config");
189        let set = ConfigSet::default().add(&BOOL_CONFIG).add(&STRING_CONFIG);
190
191        // Start sync with empty file (should create it)
192        sync_file_to_configset(
193            set.clone(),
194            &config_file.path(),
195            Duration::from_secs(1),
196            None,
197            |_, _| {},
198        )
199        .await
200        .unwrap();
201        assert_eq!(BOOL_CONFIG.get(&set), true);
202        assert_eq!(STRING_CONFIG.get(&set), "default");
203
204        // update the data
205        config_file
206            .write_all(
207                String::from("{\"test_bool\": false, \"test_string\": \"modified\"}").as_bytes(),
208            )
209            .unwrap();
210
211        // Start new sync to read modified values
212        let updates_received = Arc::new(AtomicBool::new(false));
213        let updates_received_clone = Arc::clone(&updates_received);
214        sync_file_to_configset(
215            set.clone(),
216            &config_file,
217            Duration::from_secs(1),
218            None,
219            move |updates, _| {
220                assert_eq!(updates.updates.len(), 2);
221                updates_received_clone.store(true, std::sync::atomic::Ordering::SeqCst);
222            },
223        )
224        .await
225        .unwrap();
226
227        assert!(updates_received.load(std::sync::atomic::Ordering::SeqCst));
228        assert_eq!(BOOL_CONFIG.get(&set), false);
229        assert_eq!(STRING_CONFIG.get(&set), "modified");
230    }
231}