mz_dyncfg_launchdarkly/
lib.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! A dyncfg::ConfigSet backed by LaunchDarkly.
11
12use std::time::Duration;
13
14use hyper_tls::HttpsConnector;
15use launchdarkly_server_sdk as ld;
16use mz_build_info::BuildInfo;
17use mz_dyncfg::{ConfigSet, ConfigUpdates, ConfigVal};
18use mz_ore::cast::CastLossy;
19use mz_ore::task;
20use tokio::time;
21
22/// Start a background task that syncs to a ConfigSet from LaunchDarkly. `ctx_builder` can be used
23/// to add additional LD contexts. A `build` context is added automatically. Returns `Ok` after the
24/// LD client has been initialized and an initial sync completed. If the initialization takes longer
25/// than `config_sync_timeout`, an error is returned.
26///
27/// A successful initialization can take up to `config_sync_timeout`, preventing the calling service
28/// from starting for possibly up to that duration. Its value should be chosen based on the needs of
29/// the service in the case that LaunchDarkly is down.
30///
31/// If the caller chooses to continue if this function returns an error, the ConfigSet will retain
32/// its default values. Those should be chosen with this risk in mind.
33pub async fn sync_launchdarkly_to_configset<F>(
34    set: ConfigSet,
35    build_info: &'static BuildInfo,
36    ctx_builder: F,
37    // Use an option so that local dev where this is disabled still uses the same validation logic
38    // for the ConfigSet.
39    launchdarkly_sdk_key: Option<&str>,
40    config_sync_timeout: Duration,
41    config_sync_loop_interval: Option<Duration>,
42    on_update: impl Fn(&ConfigUpdates, &ConfigSet) + Send + 'static,
43) -> Result<(), anyhow::Error>
44where
45    F: FnOnce(&mut ld::MultiContextBuilder) -> Result<(), anyhow::Error>,
46{
47    // Ensure that all the ConfigVals in the set support FlagValue conversion, even if LD is
48    // disabled (preventing error skew in local vs prod settings).
49    for entry in set.entries() {
50        let _ = dyn_into_flag(entry.val())?;
51    }
52    let ld_client = if let Some(key) = launchdarkly_sdk_key {
53        let config = ld::ConfigBuilder::new(key)
54            .event_processor(
55                ld::EventProcessorBuilder::new().https_connector(HttpsConnector::new()),
56            )
57            .data_source(
58                ld::StreamingDataSourceBuilder::new().https_connector(HttpsConnector::new()),
59            )
60            .build()
61            .expect("valid config");
62        let client = ld::Client::build(config)?;
63        client.start_with_default_executor();
64        let init = async {
65            let max_backoff = Duration::from_secs(60);
66            let mut backoff = Duration::from_secs(5);
67
68            // TODO(materialize#32030): fix retry logic
69            loop {
70                match client.wait_for_initialization(config_sync_timeout).await {
71                    Some(true) => break,
72                    Some(false) => tracing::warn!("SyncedConfigSet failed to initialize"),
73                    None => {}
74                }
75
76                tokio::time::sleep(backoff).await;
77                backoff = (backoff * 2).min(max_backoff);
78            }
79        };
80        if tokio::time::timeout(config_sync_timeout, init)
81            .await
82            .is_err()
83        {
84            tracing::info!("SyncedConfigSet initialize on boot: initialize has timed out");
85        }
86        Some(client)
87    } else {
88        None
89    };
90
91    let synced = SyncedConfigSet {
92        set,
93        ld_client,
94        ld_ctx: ld_ctx(build_info, ctx_builder)?,
95        on_update,
96    };
97    synced.sync()?;
98    task::spawn(
99        || "SyncedConfigSet sync_loop",
100        synced.sync_loop(config_sync_loop_interval),
101    );
102    Ok(())
103}
104
105fn ld_ctx<F>(build_info: &'static BuildInfo, ctx_builder: F) -> Result<ld::Context, anyhow::Error>
106where
107    F: FnOnce(&mut ld::MultiContextBuilder) -> Result<(), anyhow::Error>,
108{
109    // Register multiple contexts for this client.
110    //
111    // Unfortunately, it seems that the order in which conflicting targeting
112    // rules are applied depends on the definition order of feature flag
113    // variations rather than on the order in which context are registered with
114    // the multi-context builder.
115    let mut builder = ld::MultiContextBuilder::new();
116
117    builder.add_context(
118        ld::ContextBuilder::new(build_info.sha)
119            .kind("build")
120            .set_string("semver_version", build_info.semver_version().to_string())
121            .build()
122            .map_err(|e| anyhow::anyhow!(e))?,
123    );
124
125    ctx_builder(&mut builder)?;
126
127    builder.build().map_err(|e| anyhow::anyhow!(e))
128}
129
130struct SyncedConfigSet<F>
131where
132    F: Fn(&ConfigUpdates, &ConfigSet) + Send,
133{
134    set: ConfigSet,
135    ld_client: Option<ld::Client>,
136    ld_ctx: ld::Context,
137    on_update: F,
138}
139
140impl<F: Fn(&ConfigUpdates, &ConfigSet) + Send> SyncedConfigSet<F> {
141    /// Returns a future that periodically polls LaunchDarkly and updates the ConfigSet.
142    async fn sync_loop(self, tick_interval: Option<Duration>) {
143        let Some(tick_interval) = tick_interval else {
144            tracing::info!("skipping SyncedConfigSet sync as tick_interval = None");
145            return;
146        };
147
148        let mut interval = time::interval(tick_interval);
149        interval.set_missed_tick_behavior(time::MissedTickBehavior::Skip);
150
151        tracing::info!(
152            "synchronizing SyncedConfigSet values every {} seconds",
153            tick_interval.as_secs()
154        );
155
156        loop {
157            interval.tick().await;
158
159            if let Err(err) = self.sync() {
160                tracing::info!("SyncedConfigSet: {err}");
161            }
162        }
163    }
164
165    /// Reads current values from LaunchDarkly and updates the ConfigSet.
166    fn sync(&self) -> Result<(), anyhow::Error> {
167        let mut updates = ConfigUpdates::default();
168        let Some(ld_client) = &self.ld_client else {
169            (self.on_update)(&updates, &self.set);
170            return Ok(());
171        };
172        for entry in self.set.entries() {
173            let val = dyn_into_flag(entry.val()).expect("new() verifies all configs can convert");
174            let flag_var = ld_client.variation(&self.ld_ctx, entry.name(), val);
175            let update = match (entry.val(), flag_var) {
176                (ConfigVal::Bool(_), ld::FlagValue::Bool(flag)) => ConfigVal::Bool(flag),
177                (ConfigVal::U32(_), ld::FlagValue::Number(flag)) => {
178                    ConfigVal::U32(u32::cast_lossy(flag))
179                }
180                (ConfigVal::Usize(_), ld::FlagValue::Number(flag)) => {
181                    ConfigVal::Usize(usize::cast_lossy(flag))
182                }
183                (ConfigVal::F64(_), ld::FlagValue::Number(flag)) => ConfigVal::F64(flag),
184                (ConfigVal::String(_), ld::FlagValue::Str(flag)) => ConfigVal::String(flag),
185                (ConfigVal::Duration(_), ld::FlagValue::Str(flag)) => {
186                    ConfigVal::Duration(humantime::parse_duration(&flag)?)
187                }
188                (ConfigVal::Json(_), ld::FlagValue::Json(flag)) => ConfigVal::Json(flag),
189
190                // Hardcode all others so that if ConfigVal gets new types this match block will
191                // compile error.
192                (ConfigVal::Bool(_), _)
193                | (ConfigVal::U32(_), _)
194                | (ConfigVal::Usize(_), _)
195                | (ConfigVal::F64(_), _)
196                | (ConfigVal::Duration(_), _)
197                | (ConfigVal::Json(_), _)
198                | (ConfigVal::OptUsize(_), _)
199                | (ConfigVal::String(_), _) => anyhow::bail!(
200                    "LD flag cannot be cast to the ConfigVal for {}",
201                    entry.name()
202                ),
203            };
204            updates.add_dynamic(entry.name(), update);
205        }
206        updates.apply(&self.set);
207        (self.on_update)(&updates, &self.set);
208        Ok(())
209    }
210}
211
212/// Converts a dyncfg ConfigVal into a LaunchDarkly FlagValue. Returns an error if the ConfigVal
213/// type isn't supported by the FlagValue format.
214fn dyn_into_flag(val: ConfigVal) -> Result<ld::FlagValue, anyhow::Error> {
215    // Note that errors must only (and always) occur when the ConfigVal type isn't fully supported.
216    // That is, don't error only if the current value isn't supported (like None in an Opt type):
217    // error always for an Opt value because it might be None.
218    Ok(match val {
219        ConfigVal::Bool(v) => ld::FlagValue::Bool(v),
220        ConfigVal::U32(v) => ld::FlagValue::Number(v.into()),
221        ConfigVal::Usize(v) => ld::FlagValue::Number(f64::cast_lossy(v)),
222        ConfigVal::OptUsize(_) => anyhow::bail!("OptUsize None cannot be converted to a FlagValue"),
223        ConfigVal::F64(v) => ld::FlagValue::Number(v),
224        ConfigVal::String(v) => ld::FlagValue::Str(v),
225        ConfigVal::Duration(v) => ld::FlagValue::Str(humantime::format_duration(v).to_string()),
226        ConfigVal::Json(v) => ld::FlagValue::Json(v),
227    })
228}