Skip to main content

mz_dyncfg_launchdarkly/
lib.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! A dyncfg::ConfigSet backed by LaunchDarkly.
11
12use std::time::Duration;
13
14use launchdarkly_server_sdk as ld;
15use mz_build_info::BuildInfo;
16use mz_dyncfg::{ConfigSet, ConfigUpdates, ConfigVal};
17use mz_ore::cast::CastLossy;
18use mz_ore::task;
19use tokio::time;
20
21/// Start a background task that syncs to a ConfigSet from LaunchDarkly. `ctx_builder` can be used
22/// to add additional LD contexts. A `build` context is added automatically. Returns `Ok` after the
23/// LD client has been initialized and an initial sync completed. If the initialization takes longer
24/// than `config_sync_timeout`, an error is returned.
25///
26/// A successful initialization can take up to `config_sync_timeout`, preventing the calling service
27/// from starting for possibly up to that duration. Its value should be chosen based on the needs of
28/// the service in the case that LaunchDarkly is down.
29///
30/// If the caller chooses to continue if this function returns an error, the ConfigSet will retain
31/// its default values. Those should be chosen with this risk in mind.
32pub async fn sync_launchdarkly_to_configset<F>(
33    set: ConfigSet,
34    build_info: &'static BuildInfo,
35    ctx_builder: F,
36    // Use an option so that local dev where this is disabled still uses the same validation logic
37    // for the ConfigSet.
38    launchdarkly_sdk_key: Option<&str>,
39    config_sync_timeout: Duration,
40    config_sync_loop_interval: Option<Duration>,
41    on_update: impl Fn(&ConfigUpdates, &ConfigSet) + Send + 'static,
42) -> Result<(), anyhow::Error>
43where
44    F: FnOnce(&mut ld::MultiContextBuilder) -> Result<(), anyhow::Error>,
45{
46    // Ensure that all the ConfigVals in the set support FlagValue conversion, even if LD is
47    // disabled (preventing error skew in local vs prod settings).
48    for entry in set.entries() {
49        let _ = dyn_into_flag(entry.val())?;
50    }
51    let ld_client = if let Some(key) = launchdarkly_sdk_key {
52        let transport = launchdarkly_sdk_transport::HyperTransport::builder()
53            .connect_timeout(Duration::from_secs(10))
54            .read_timeout(Duration::from_secs(300))
55            .build_https()
56            .expect("failed to create HTTPS transport");
57
58        let mut data_source = ld::StreamingDataSourceBuilder::new();
59        data_source.transport(transport.clone());
60
61        let mut event_processor = ld::EventProcessorBuilder::new();
62        event_processor.transport(transport);
63
64        let config = ld::ConfigBuilder::new(key)
65            .data_source(&data_source)
66            .event_processor(&event_processor)
67            .build()
68            .expect("valid config");
69        let client = ld::Client::build(config)?;
70        client.start_with_default_executor();
71        let init = async {
72            let max_backoff = Duration::from_secs(60);
73            let mut backoff = Duration::from_secs(5);
74
75            // TODO(materialize#32030): fix retry logic
76            loop {
77                match client.wait_for_initialization(config_sync_timeout).await {
78                    Some(true) => break,
79                    Some(false) => tracing::warn!("SyncedConfigSet failed to initialize"),
80                    None => {}
81                }
82
83                tokio::time::sleep(backoff).await;
84                backoff = (backoff * 2).min(max_backoff);
85            }
86        };
87        if tokio::time::timeout(config_sync_timeout, init)
88            .await
89            .is_err()
90        {
91            tracing::info!("SyncedConfigSet initialize on boot: initialize has timed out");
92        }
93        Some(client)
94    } else {
95        None
96    };
97
98    let synced = SyncedConfigSet {
99        set,
100        ld_client,
101        ld_ctx: ld_ctx(build_info, ctx_builder)?,
102        on_update,
103    };
104    synced.sync()?;
105    task::spawn(
106        || "SyncedConfigSet sync_loop",
107        synced.sync_loop(config_sync_loop_interval),
108    );
109    Ok(())
110}
111
112fn ld_ctx<F>(build_info: &'static BuildInfo, ctx_builder: F) -> Result<ld::Context, anyhow::Error>
113where
114    F: FnOnce(&mut ld::MultiContextBuilder) -> Result<(), anyhow::Error>,
115{
116    // Register multiple contexts for this client.
117    //
118    // Unfortunately, it seems that the order in which conflicting targeting
119    // rules are applied depends on the definition order of feature flag
120    // variations rather than on the order in which context are registered with
121    // the multi-context builder.
122    let mut builder = ld::MultiContextBuilder::new();
123
124    builder.add_context(
125        ld::ContextBuilder::new(build_info.sha)
126            .kind("build")
127            .set_string("semver_version", build_info.semver_version().to_string())
128            .build()
129            .map_err(|e| anyhow::anyhow!(e))?,
130    );
131
132    ctx_builder(&mut builder)?;
133
134    builder.build().map_err(|e| anyhow::anyhow!(e))
135}
136
137struct SyncedConfigSet<F>
138where
139    F: Fn(&ConfigUpdates, &ConfigSet) + Send,
140{
141    set: ConfigSet,
142    ld_client: Option<ld::Client>,
143    ld_ctx: ld::Context,
144    on_update: F,
145}
146
147impl<F: Fn(&ConfigUpdates, &ConfigSet) + Send> SyncedConfigSet<F> {
148    /// Returns a future that periodically polls LaunchDarkly and updates the ConfigSet.
149    async fn sync_loop(self, tick_interval: Option<Duration>) {
150        let Some(tick_interval) = tick_interval else {
151            tracing::info!("skipping SyncedConfigSet sync as tick_interval = None");
152            return;
153        };
154
155        let mut interval = time::interval(tick_interval);
156        interval.set_missed_tick_behavior(time::MissedTickBehavior::Skip);
157
158        tracing::info!(
159            "synchronizing SyncedConfigSet values every {} seconds",
160            tick_interval.as_secs()
161        );
162
163        loop {
164            interval.tick().await;
165
166            if let Err(err) = self.sync() {
167                tracing::info!("SyncedConfigSet: {err}");
168            }
169        }
170    }
171
172    /// Reads current values from LaunchDarkly and updates the ConfigSet.
173    fn sync(&self) -> Result<(), anyhow::Error> {
174        let mut updates = ConfigUpdates::default();
175        let Some(ld_client) = &self.ld_client else {
176            (self.on_update)(&updates, &self.set);
177            return Ok(());
178        };
179        for entry in self.set.entries() {
180            let val = dyn_into_flag(entry.val()).expect("new() verifies all configs can convert");
181            let flag_var = ld_client.variation(&self.ld_ctx, entry.name(), val);
182            let update = match (entry.val(), flag_var) {
183                (ConfigVal::Bool(_), ld::FlagValue::Bool(flag)) => ConfigVal::Bool(flag),
184                (ConfigVal::U32(_), ld::FlagValue::Number(flag)) => {
185                    ConfigVal::U32(u32::cast_lossy(flag))
186                }
187                (ConfigVal::Usize(_), ld::FlagValue::Number(flag)) => {
188                    ConfigVal::Usize(usize::cast_lossy(flag))
189                }
190                (ConfigVal::F64(_), ld::FlagValue::Number(flag)) => ConfigVal::F64(flag),
191                (ConfigVal::String(_), ld::FlagValue::Str(flag)) => ConfigVal::String(flag),
192                (ConfigVal::Duration(_), ld::FlagValue::Str(flag)) => {
193                    ConfigVal::Duration(humantime::parse_duration(&flag)?)
194                }
195                (ConfigVal::Json(_), ld::FlagValue::Json(flag)) => ConfigVal::Json(flag),
196
197                // Hardcode all others so that if ConfigVal gets new types this match block will
198                // compile error.
199                (ConfigVal::Bool(_), _)
200                | (ConfigVal::U32(_), _)
201                | (ConfigVal::Usize(_), _)
202                | (ConfigVal::F64(_), _)
203                | (ConfigVal::Duration(_), _)
204                | (ConfigVal::Json(_), _)
205                | (ConfigVal::OptUsize(_), _)
206                | (ConfigVal::String(_), _)
207                | (ConfigVal::OptString(_), _) => anyhow::bail!(
208                    "LD flag cannot be cast to the ConfigVal for {}",
209                    entry.name()
210                ),
211            };
212            tracing::debug!(
213                "updating config value {} from {:?} to {:?}",
214                &entry.name(),
215                &entry.val(),
216                update
217            );
218            updates.add_dynamic(entry.name(), update);
219        }
220        updates.apply(&self.set);
221        (self.on_update)(&updates, &self.set);
222        Ok(())
223    }
224}
225
226/// Converts a dyncfg ConfigVal into a LaunchDarkly FlagValue. Returns an error if the ConfigVal
227/// type isn't supported by the FlagValue format.
228fn dyn_into_flag(val: ConfigVal) -> Result<ld::FlagValue, anyhow::Error> {
229    // Note that errors must only (and always) occur when the ConfigVal type isn't fully supported.
230    // That is, don't error only if the current value isn't supported (like None in an Opt type):
231    // error always for an Opt value because it might be None.
232    Ok(match val {
233        ConfigVal::Bool(v) => ld::FlagValue::Bool(v),
234        ConfigVal::U32(v) => ld::FlagValue::Number(v.into()),
235        ConfigVal::Usize(v) => ld::FlagValue::Number(f64::cast_lossy(v)),
236        ConfigVal::OptUsize(_) => anyhow::bail!("OptUsize None cannot be converted to a FlagValue"),
237        ConfigVal::F64(v) => ld::FlagValue::Number(v),
238        ConfigVal::String(v) => ld::FlagValue::Str(v),
239        ConfigVal::OptString(_) => {
240            anyhow::bail!("OptString None cannot be converted to a FlagValue")
241        }
242        ConfigVal::Duration(v) => ld::FlagValue::Str(humantime::format_duration(v).to_string()),
243        ConfigVal::Json(v) => ld::FlagValue::Json(v),
244    })
245}