1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
910//! A dyncfg::ConfigSet backed by LaunchDarkly.
1112use std::time::Duration;
1314use hyper_tls::HttpsConnector;
15use launchdarkly_server_sdk as ld;
16use mz_build_info::BuildInfo;
17use mz_dyncfg::{ConfigSet, ConfigUpdates, ConfigVal};
18use mz_ore::cast::CastLossy;
19use mz_ore::task;
20use tokio::time;
2122/// Start a background task that syncs to a ConfigSet from LaunchDarkly. `ctx_builder` can be used
23/// to add additional LD contexts. A `build` context is added automatically. Returns `Ok` after the
24/// LD client has been initialized and an initial sync completed. If the initialization takes longer
25/// than `config_sync_timeout`, an error is returned.
26///
27/// A successful initialization can take up to `config_sync_timeout`, preventing the calling service
28/// from starting for possibly up to that duration. Its value should be chosen based on the needs of
29/// the service in the case that LaunchDarkly is down.
30///
31/// If the caller chooses to continue if this function returns an error, the ConfigSet will retain
32/// its default values. Those should be chosen with this risk in mind.
33pub async fn sync_launchdarkly_to_configset<F>(
34 set: ConfigSet,
35 build_info: &'static BuildInfo,
36 ctx_builder: F,
37// Use an option so that local dev where this is disabled still uses the same validation logic
38 // for the ConfigSet.
39launchdarkly_sdk_key: Option<&str>,
40 config_sync_timeout: Duration,
41 config_sync_loop_interval: Option<Duration>,
42 on_update: impl Fn(&ConfigUpdates, &ConfigSet) + Send + 'static,
43) -> Result<(), anyhow::Error>
44where
45F: FnOnce(&mut ld::MultiContextBuilder) -> Result<(), anyhow::Error>,
46{
47// Ensure that all the ConfigVals in the set support FlagValue conversion, even if LD is
48 // disabled (preventing error skew in local vs prod settings).
49for entry in set.entries() {
50let _ = dyn_into_flag(entry.val())?;
51 }
52let ld_client = if let Some(key) = launchdarkly_sdk_key {
53let config = ld::ConfigBuilder::new(key)
54 .event_processor(
55 ld::EventProcessorBuilder::new().https_connector(HttpsConnector::new()),
56 )
57 .data_source(
58 ld::StreamingDataSourceBuilder::new().https_connector(HttpsConnector::new()),
59 )
60 .build()
61 .expect("valid config");
62let client = ld::Client::build(config)?;
63 client.start_with_default_executor();
64let init = async {
65let max_backoff = Duration::from_secs(60);
66let mut backoff = Duration::from_secs(5);
6768// TODO(materialize#32030): fix retry logic
69loop {
70match client.wait_for_initialization(config_sync_timeout).await {
71Some(true) => break,
72Some(false) => tracing::warn!("SyncedConfigSet failed to initialize"),
73None => {}
74 }
7576 tokio::time::sleep(backoff).await;
77 backoff = (backoff * 2).min(max_backoff);
78 }
79 };
80if tokio::time::timeout(config_sync_timeout, init)
81 .await
82.is_err()
83 {
84tracing::info!("SyncedConfigSet initialize on boot: initialize has timed out");
85 }
86Some(client)
87 } else {
88None
89};
9091let synced = SyncedConfigSet {
92 set,
93 ld_client,
94 ld_ctx: ld_ctx(build_info, ctx_builder)?,
95 on_update,
96 };
97 synced.sync()?;
98 task::spawn(
99 || "SyncedConfigSet sync_loop",
100 synced.sync_loop(config_sync_loop_interval),
101 );
102Ok(())
103}
104105fn ld_ctx<F>(build_info: &'static BuildInfo, ctx_builder: F) -> Result<ld::Context, anyhow::Error>
106where
107F: FnOnce(&mut ld::MultiContextBuilder) -> Result<(), anyhow::Error>,
108{
109// Register multiple contexts for this client.
110 //
111 // Unfortunately, it seems that the order in which conflicting targeting
112 // rules are applied depends on the definition order of feature flag
113 // variations rather than on the order in which context are registered with
114 // the multi-context builder.
115let mut builder = ld::MultiContextBuilder::new();
116117 builder.add_context(
118 ld::ContextBuilder::new(build_info.sha)
119 .kind("build")
120 .set_string("semver_version", build_info.semver_version().to_string())
121 .build()
122 .map_err(|e| anyhow::anyhow!(e))?,
123 );
124125 ctx_builder(&mut builder)?;
126127 builder.build().map_err(|e| anyhow::anyhow!(e))
128}
129130struct SyncedConfigSet<F>
131where
132F: Fn(&ConfigUpdates, &ConfigSet) + Send,
133{
134 set: ConfigSet,
135 ld_client: Option<ld::Client>,
136 ld_ctx: ld::Context,
137 on_update: F,
138}
139140impl<F: Fn(&ConfigUpdates, &ConfigSet) + Send> SyncedConfigSet<F> {
141/// Returns a future that periodically polls LaunchDarkly and updates the ConfigSet.
142async fn sync_loop(self, tick_interval: Option<Duration>) {
143let Some(tick_interval) = tick_interval else {
144tracing::info!("skipping SyncedConfigSet sync as tick_interval = None");
145return;
146 };
147148let mut interval = time::interval(tick_interval);
149 interval.set_missed_tick_behavior(time::MissedTickBehavior::Skip);
150151tracing::info!(
152"synchronizing SyncedConfigSet values every {} seconds",
153 tick_interval.as_secs()
154 );
155156loop {
157 interval.tick().await;
158159if let Err(err) = self.sync() {
160tracing::info!("SyncedConfigSet: {err}");
161 }
162 }
163 }
164165/// Reads current values from LaunchDarkly and updates the ConfigSet.
166fn sync(&self) -> Result<(), anyhow::Error> {
167let mut updates = ConfigUpdates::default();
168let Some(ld_client) = &self.ld_client else {
169 (self.on_update)(&updates, &self.set);
170return Ok(());
171 };
172for entry in self.set.entries() {
173let val = dyn_into_flag(entry.val()).expect("new() verifies all configs can convert");
174let flag_var = ld_client.variation(&self.ld_ctx, entry.name(), val);
175let update = match (entry.val(), flag_var) {
176 (ConfigVal::Bool(_), ld::FlagValue::Bool(flag)) => ConfigVal::Bool(flag),
177 (ConfigVal::U32(_), ld::FlagValue::Number(flag)) => {
178 ConfigVal::U32(u32::cast_lossy(flag))
179 }
180 (ConfigVal::Usize(_), ld::FlagValue::Number(flag)) => {
181 ConfigVal::Usize(usize::cast_lossy(flag))
182 }
183 (ConfigVal::F64(_), ld::FlagValue::Number(flag)) => ConfigVal::F64(flag),
184 (ConfigVal::String(_), ld::FlagValue::Str(flag)) => ConfigVal::String(flag),
185 (ConfigVal::Duration(_), ld::FlagValue::Str(flag)) => {
186 ConfigVal::Duration(humantime::parse_duration(&flag)?)
187 }
188 (ConfigVal::Json(_), ld::FlagValue::Json(flag)) => ConfigVal::Json(flag),
189190// Hardcode all others so that if ConfigVal gets new types this match block will
191 // compile error.
192(ConfigVal::Bool(_), _)
193 | (ConfigVal::U32(_), _)
194 | (ConfigVal::Usize(_), _)
195 | (ConfigVal::F64(_), _)
196 | (ConfigVal::Duration(_), _)
197 | (ConfigVal::Json(_), _)
198 | (ConfigVal::OptUsize(_), _)
199 | (ConfigVal::String(_), _) => anyhow::bail!(
200"LD flag cannot be cast to the ConfigVal for {}",
201 entry.name()
202 ),
203 };
204 updates.add_dynamic(entry.name(), update);
205 }
206 updates.apply(&self.set);
207 (self.on_update)(&updates, &self.set);
208Ok(())
209 }
210}
211212/// Converts a dyncfg ConfigVal into a LaunchDarkly FlagValue. Returns an error if the ConfigVal
213/// type isn't supported by the FlagValue format.
214fn dyn_into_flag(val: ConfigVal) -> Result<ld::FlagValue, anyhow::Error> {
215// Note that errors must only (and always) occur when the ConfigVal type isn't fully supported.
216 // That is, don't error only if the current value isn't supported (like None in an Opt type):
217 // error always for an Opt value because it might be None.
218Ok(match val {
219 ConfigVal::Bool(v) => ld::FlagValue::Bool(v),
220 ConfigVal::U32(v) => ld::FlagValue::Number(v.into()),
221 ConfigVal::Usize(v) => ld::FlagValue::Number(f64::cast_lossy(v)),
222 ConfigVal::OptUsize(_) => anyhow::bail!("OptUsize None cannot be converted to a FlagValue"),
223 ConfigVal::F64(v) => ld::FlagValue::Number(v),
224 ConfigVal::String(v) => ld::FlagValue::Str(v),
225 ConfigVal::Duration(v) => ld::FlagValue::Str(humantime::format_duration(v).to_string()),
226 ConfigVal::Json(v) => ld::FlagValue::Json(v),
227 })
228}