Skip to main content

mz_dyncfg_file/
lib.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! A dyncfg::ConfigSet backed by a local JSON file.
11
12use std::collections::BTreeMap;
13use std::fs;
14use std::path::{Path, PathBuf};
15use std::time::Duration;
16
17use anyhow::Context;
18use mz_dyncfg::{ConfigSet, ConfigUpdates, ConfigVal};
19use mz_ore::task;
20use serde_json::Value as JsonValue;
21use tokio::time;
22
23/// Start a background task that syncs a ConfigSet with a local JSON file.
24/// Returns `Ok` after the initial sync is completed. If the initialization takes longer
25/// than `config_sync_timeout`, an error is returned.
26///
27/// The file format is a simple JSON object mapping config names to their values.
28pub async fn sync_file_to_configset(
29    set: ConfigSet,
30    config_file: impl AsRef<Path>,
31    config_sync_timeout: Duration,
32    config_sync_loop_interval: Option<Duration>,
33    on_update: impl Fn(&ConfigUpdates, &ConfigSet) + Send + 'static,
34) -> Result<(), anyhow::Error> {
35    let config_file = config_file.as_ref().to_owned();
36
37    // Create initial file if it doesn't exist
38    if !config_file.exists() {
39        tracing::warn!("sync config file {:?} does not exist", config_file);
40        return Ok(());
41    }
42
43    let synced = SyncedConfigSet {
44        set,
45        config_file,
46        on_update,
47    };
48
49    // Do initial sync
50    match tokio::time::timeout(config_sync_timeout, async {
51        synced.sync()?;
52        Ok::<_, anyhow::Error>(())
53    })
54    .await
55    {
56        Ok(Ok(())) => {}
57        Ok(Err(err)) => {
58            tracing::warn!("error while initializing file-backed config set: {}", err);
59            return Err(err);
60        }
61        Err(err) => {
62            tracing::warn!("timeout while initializing file-backed config set: {}", err);
63            return Err(err.into());
64        }
65    }
66
67    // Start background sync task if interval is specified
68    task::spawn(
69        || "SyncedConfigSet sync_loop",
70        synced.sync_loop(config_sync_loop_interval),
71    );
72
73    Ok(())
74}
75
76struct SyncedConfigSet<F>
77where
78    F: Fn(&ConfigUpdates, &ConfigSet) + Send,
79{
80    set: ConfigSet,
81    config_file: PathBuf,
82    on_update: F,
83}
84
85impl<F: Fn(&ConfigUpdates, &ConfigSet) + Send> SyncedConfigSet<F> {
86    /// Returns a future that periodically reads the config file and updates the ConfigSet.
87    async fn sync_loop(self, tick_interval: Option<Duration>) {
88        let Some(tick_interval) = tick_interval else {
89            tracing::info!("skipping SyncedConfigSet sync as tick_interval = None");
90            return;
91        };
92
93        let mut interval = time::interval(tick_interval);
94        interval.set_missed_tick_behavior(time::MissedTickBehavior::Skip);
95
96        tracing::info!(
97            "synchronizing SyncedConfigSet values every {} seconds",
98            tick_interval.as_secs()
99        );
100
101        loop {
102            interval.tick().await;
103
104            if let Err(err) = self.sync() {
105                tracing::warn!("SyncedConfigSet sync error: {}", err);
106            }
107        }
108    }
109
110    /// Reads current values from the config file and updates the ConfigSet.
111    pub fn sync(&self) -> Result<(), anyhow::Error> {
112        let file_contents = fs::read_to_string(&self.config_file)
113            .with_context(|| format!("failed to read config file: {:?}", self.config_file))?;
114
115        // Treat an empty file as an empty config (no overrides).
116        if file_contents.trim().is_empty() {
117            return Ok(());
118        }
119
120        let values: BTreeMap<String, JsonValue> = serde_json::from_str(&file_contents)
121            .with_context(|| format!("failed to parse config file: {:?}", self.config_file))?;
122
123        let mut updates = ConfigUpdates::default();
124        for entry in self.set.entries() {
125            if let Some(val) = values.get(entry.name()) {
126                match json_to_config_val(val, &entry.val()) {
127                    Ok(new_val) => {
128                        // Only update if the value is different
129                        if new_val != entry.val() {
130                            tracing::debug!(
131                                "updating config value {} from {:?} to {:?}",
132                                &entry.name(),
133                                &entry.val(),
134                                new_val
135                            );
136                            updates.add_dynamic(entry.name(), new_val);
137                        }
138                    }
139                    Err(err) => {
140                        tracing::warn!(
141                            "failed to convert JSON value for {}: {}",
142                            entry.name(),
143                            err
144                        );
145                    }
146                }
147            }
148        }
149        updates.apply(&self.set);
150        (self.on_update)(&updates, &self.set);
151        Ok(())
152    }
153}
154
155/// Convert a JSON value to a ConfigVal, using the existing value as a template for the type
156fn json_to_config_val(json: &JsonValue, template: &ConfigVal) -> Result<ConfigVal, anyhow::Error> {
157    match (template, json) {
158        (ConfigVal::Bool(_), JsonValue::Bool(v)) => Ok(ConfigVal::Bool(*v)),
159        (ConfigVal::U32(_), JsonValue::Number(v)) => Ok(ConfigVal::U32(
160            v.as_u64()
161                .and_then(|v| v.try_into().ok())
162                .ok_or_else(|| anyhow::anyhow!("not a u32"))?,
163        )),
164        (ConfigVal::Usize(_), JsonValue::Number(v)) => Ok(ConfigVal::Usize(
165            v.as_u64()
166                .and_then(|v| v.try_into().ok())
167                .ok_or_else(|| anyhow::anyhow!("not a usize"))?,
168        )),
169        (ConfigVal::OptUsize(_), JsonValue::Null) => Ok(ConfigVal::OptUsize(None)),
170        (ConfigVal::OptUsize(_), JsonValue::Number(v)) => Ok(ConfigVal::OptUsize(Some(
171            v.as_u64()
172                .and_then(|v| v.try_into().ok())
173                .ok_or_else(|| anyhow::anyhow!("not a usize"))?,
174        ))),
175        (ConfigVal::F64(_), JsonValue::Number(v)) => Ok(ConfigVal::F64(
176            v.as_f64().ok_or_else(|| anyhow::anyhow!("not an f64"))?,
177        )),
178        (ConfigVal::String(_), JsonValue::String(v)) => Ok(ConfigVal::String(v.clone())),
179        (ConfigVal::Duration(_), JsonValue::String(v)) => {
180            Ok(ConfigVal::Duration(humantime::parse_duration(v)?))
181        }
182        (ConfigVal::Json(_), v) => Ok(ConfigVal::Json(v.clone())),
183        _ => Err(anyhow::anyhow!("type mismatch")),
184    }
185}
186
187#[cfg(test)]
188mod tests {
189    use super::*;
190    use mz_dyncfg::Config;
191    use std::io::Write;
192    use std::sync::Arc;
193    use std::sync::atomic::AtomicBool;
194
195    #[mz_ore::test(tokio::test)]
196    async fn test_file_sync() {
197        let mut config_file = tempfile::NamedTempFile::new().unwrap();
198        const BOOL_CONFIG: Config<bool> = Config::new("test_bool", true, "A test boolean config");
199        const STRING_CONFIG: Config<&str> =
200            Config::new("test_string", "default", "A test string config");
201        let set = ConfigSet::default().add(&BOOL_CONFIG).add(&STRING_CONFIG);
202
203        // Start sync with empty file (should create it)
204        sync_file_to_configset(
205            set.clone(),
206            &config_file.path(),
207            Duration::from_secs(1),
208            None,
209            |_, _| {},
210        )
211        .await
212        .unwrap();
213        assert_eq!(BOOL_CONFIG.get(&set), true);
214        assert_eq!(STRING_CONFIG.get(&set), "default");
215
216        // update the data
217        config_file
218            .write_all(
219                String::from("{\"test_bool\": false, \"test_string\": \"modified\"}").as_bytes(),
220            )
221            .unwrap();
222
223        // Start new sync to read modified values
224        let updates_received = Arc::new(AtomicBool::new(false));
225        let updates_received_clone = Arc::clone(&updates_received);
226        sync_file_to_configset(
227            set.clone(),
228            &config_file,
229            Duration::from_secs(1),
230            None,
231            move |updates, _| {
232                assert_eq!(updates.updates.len(), 2);
233                updates_received_clone.store(true, std::sync::atomic::Ordering::SeqCst);
234            },
235        )
236        .await
237        .unwrap();
238
239        assert!(updates_received.load(std::sync::atomic::Ordering::SeqCst));
240        assert_eq!(BOOL_CONFIG.get(&set), false);
241        assert_eq!(STRING_CONFIG.get(&set), "modified");
242    }
243}