mz_dyncfg_launchdarkly/
lib.rs1use std::time::Duration;
13
14use hyper_tls::HttpsConnector;
15use launchdarkly_server_sdk as ld;
16use mz_build_info::BuildInfo;
17use mz_dyncfg::{ConfigSet, ConfigUpdates, ConfigVal};
18use mz_ore::cast::CastLossy;
19use mz_ore::task;
20use tokio::time;
21
22pub async fn sync_launchdarkly_to_configset<F>(
34 set: ConfigSet,
35 build_info: &'static BuildInfo,
36 ctx_builder: F,
37 launchdarkly_sdk_key: Option<&str>,
40 config_sync_timeout: Duration,
41 config_sync_loop_interval: Option<Duration>,
42 on_update: impl Fn(&ConfigUpdates, &ConfigSet) + Send + 'static,
43) -> Result<(), anyhow::Error>
44where
45 F: FnOnce(&mut ld::MultiContextBuilder) -> Result<(), anyhow::Error>,
46{
47 for entry in set.entries() {
50 let _ = dyn_into_flag(entry.val())?;
51 }
52 let ld_client = if let Some(key) = launchdarkly_sdk_key {
53 let config = ld::ConfigBuilder::new(key)
54 .event_processor(
55 ld::EventProcessorBuilder::new().https_connector(HttpsConnector::new()),
56 )
57 .data_source(
58 ld::StreamingDataSourceBuilder::new().https_connector(HttpsConnector::new()),
59 )
60 .build()
61 .expect("valid config");
62 let client = ld::Client::build(config)?;
63 client.start_with_default_executor();
64 let init = async {
65 let max_backoff = Duration::from_secs(60);
66 let mut backoff = Duration::from_secs(5);
67
68 loop {
70 match client.wait_for_initialization(config_sync_timeout).await {
71 Some(true) => break,
72 Some(false) => tracing::warn!("SyncedConfigSet failed to initialize"),
73 None => {}
74 }
75
76 tokio::time::sleep(backoff).await;
77 backoff = (backoff * 2).min(max_backoff);
78 }
79 };
80 if tokio::time::timeout(config_sync_timeout, init)
81 .await
82 .is_err()
83 {
84 tracing::info!("SyncedConfigSet initialize on boot: initialize has timed out");
85 }
86 Some(client)
87 } else {
88 None
89 };
90
91 let synced = SyncedConfigSet {
92 set,
93 ld_client,
94 ld_ctx: ld_ctx(build_info, ctx_builder)?,
95 on_update,
96 };
97 synced.sync()?;
98 task::spawn(
99 || "SyncedConfigSet sync_loop",
100 synced.sync_loop(config_sync_loop_interval),
101 );
102 Ok(())
103}
104
105fn ld_ctx<F>(build_info: &'static BuildInfo, ctx_builder: F) -> Result<ld::Context, anyhow::Error>
106where
107 F: FnOnce(&mut ld::MultiContextBuilder) -> Result<(), anyhow::Error>,
108{
109 let mut builder = ld::MultiContextBuilder::new();
116
117 builder.add_context(
118 ld::ContextBuilder::new(build_info.sha)
119 .kind("build")
120 .set_string("semver_version", build_info.semver_version().to_string())
121 .build()
122 .map_err(|e| anyhow::anyhow!(e))?,
123 );
124
125 ctx_builder(&mut builder)?;
126
127 builder.build().map_err(|e| anyhow::anyhow!(e))
128}
129
130struct SyncedConfigSet<F>
131where
132 F: Fn(&ConfigUpdates, &ConfigSet) + Send,
133{
134 set: ConfigSet,
135 ld_client: Option<ld::Client>,
136 ld_ctx: ld::Context,
137 on_update: F,
138}
139
140impl<F: Fn(&ConfigUpdates, &ConfigSet) + Send> SyncedConfigSet<F> {
141 async fn sync_loop(self, tick_interval: Option<Duration>) {
143 let Some(tick_interval) = tick_interval else {
144 tracing::info!("skipping SyncedConfigSet sync as tick_interval = None");
145 return;
146 };
147
148 let mut interval = time::interval(tick_interval);
149 interval.set_missed_tick_behavior(time::MissedTickBehavior::Skip);
150
151 tracing::info!(
152 "synchronizing SyncedConfigSet values every {} seconds",
153 tick_interval.as_secs()
154 );
155
156 loop {
157 interval.tick().await;
158
159 if let Err(err) = self.sync() {
160 tracing::info!("SyncedConfigSet: {err}");
161 }
162 }
163 }
164
165 fn sync(&self) -> Result<(), anyhow::Error> {
167 let mut updates = ConfigUpdates::default();
168 let Some(ld_client) = &self.ld_client else {
169 (self.on_update)(&updates, &self.set);
170 return Ok(());
171 };
172 for entry in self.set.entries() {
173 let val = dyn_into_flag(entry.val()).expect("new() verifies all configs can convert");
174 let flag_var = ld_client.variation(&self.ld_ctx, entry.name(), val);
175 let update = match (entry.val(), flag_var) {
176 (ConfigVal::Bool(_), ld::FlagValue::Bool(flag)) => ConfigVal::Bool(flag),
177 (ConfigVal::U32(_), ld::FlagValue::Number(flag)) => {
178 ConfigVal::U32(u32::cast_lossy(flag))
179 }
180 (ConfigVal::Usize(_), ld::FlagValue::Number(flag)) => {
181 ConfigVal::Usize(usize::cast_lossy(flag))
182 }
183 (ConfigVal::F64(_), ld::FlagValue::Number(flag)) => ConfigVal::F64(flag),
184 (ConfigVal::String(_), ld::FlagValue::Str(flag)) => ConfigVal::String(flag),
185 (ConfigVal::Duration(_), ld::FlagValue::Str(flag)) => {
186 ConfigVal::Duration(humantime::parse_duration(&flag)?)
187 }
188 (ConfigVal::Json(_), ld::FlagValue::Json(flag)) => ConfigVal::Json(flag),
189
190 (ConfigVal::Bool(_), _)
193 | (ConfigVal::U32(_), _)
194 | (ConfigVal::Usize(_), _)
195 | (ConfigVal::F64(_), _)
196 | (ConfigVal::Duration(_), _)
197 | (ConfigVal::Json(_), _)
198 | (ConfigVal::OptUsize(_), _)
199 | (ConfigVal::String(_), _) => anyhow::bail!(
200 "LD flag cannot be cast to the ConfigVal for {}",
201 entry.name()
202 ),
203 };
204 tracing::debug!(
205 "updating config value {} from {:?} to {:?}",
206 &entry.name(),
207 &entry.val(),
208 update
209 );
210 updates.add_dynamic(entry.name(), update);
211 }
212 updates.apply(&self.set);
213 (self.on_update)(&updates, &self.set);
214 Ok(())
215 }
216}
217
218fn dyn_into_flag(val: ConfigVal) -> Result<ld::FlagValue, anyhow::Error> {
221 Ok(match val {
225 ConfigVal::Bool(v) => ld::FlagValue::Bool(v),
226 ConfigVal::U32(v) => ld::FlagValue::Number(v.into()),
227 ConfigVal::Usize(v) => ld::FlagValue::Number(f64::cast_lossy(v)),
228 ConfigVal::OptUsize(_) => anyhow::bail!("OptUsize None cannot be converted to a FlagValue"),
229 ConfigVal::F64(v) => ld::FlagValue::Number(v),
230 ConfigVal::String(v) => ld::FlagValue::Str(v),
231 ConfigVal::Duration(v) => ld::FlagValue::Str(humantime::format_duration(v).to_string()),
232 ConfigVal::Json(v) => ld::FlagValue::Json(v),
233 })
234}