mz_dyncfg_launchdarkly/
lib.rs1use std::time::Duration;
13
14use launchdarkly_server_sdk as ld;
15use mz_build_info::BuildInfo;
16use mz_dyncfg::{ConfigSet, ConfigUpdates, ConfigVal};
17use mz_ore::cast::CastLossy;
18use mz_ore::task;
19use tokio::time;
20
21pub async fn sync_launchdarkly_to_configset<F>(
33 set: ConfigSet,
34 build_info: &'static BuildInfo,
35 ctx_builder: F,
36 launchdarkly_sdk_key: Option<&str>,
39 config_sync_timeout: Duration,
40 config_sync_loop_interval: Option<Duration>,
41 on_update: impl Fn(&ConfigUpdates, &ConfigSet) + Send + 'static,
42) -> Result<(), anyhow::Error>
43where
44 F: FnOnce(&mut ld::MultiContextBuilder) -> Result<(), anyhow::Error>,
45{
46 for entry in set.entries() {
49 let _ = dyn_into_flag(entry.val())?;
50 }
51 let ld_client = if let Some(key) = launchdarkly_sdk_key {
52 let transport = launchdarkly_sdk_transport::HyperTransport::builder()
53 .connect_timeout(Duration::from_secs(10))
54 .read_timeout(Duration::from_secs(300))
55 .build_https()
56 .expect("failed to create HTTPS transport");
57
58 let mut data_source = ld::StreamingDataSourceBuilder::new();
59 data_source.transport(transport.clone());
60
61 let mut event_processor = ld::EventProcessorBuilder::new();
62 event_processor.transport(transport);
63
64 let config = ld::ConfigBuilder::new(key)
65 .data_source(&data_source)
66 .event_processor(&event_processor)
67 .build()
68 .expect("valid config");
69 let client = ld::Client::build(config)?;
70 client.start_with_default_executor();
71 let init = async {
72 let max_backoff = Duration::from_secs(60);
73 let mut backoff = Duration::from_secs(5);
74
75 loop {
77 match client.wait_for_initialization(config_sync_timeout).await {
78 Some(true) => break,
79 Some(false) => tracing::warn!("SyncedConfigSet failed to initialize"),
80 None => {}
81 }
82
83 tokio::time::sleep(backoff).await;
84 backoff = (backoff * 2).min(max_backoff);
85 }
86 };
87 if tokio::time::timeout(config_sync_timeout, init)
88 .await
89 .is_err()
90 {
91 tracing::info!("SyncedConfigSet initialize on boot: initialize has timed out");
92 }
93 Some(client)
94 } else {
95 None
96 };
97
98 let synced = SyncedConfigSet {
99 set,
100 ld_client,
101 ld_ctx: ld_ctx(build_info, ctx_builder)?,
102 on_update,
103 };
104 synced.sync()?;
105 task::spawn(
106 || "SyncedConfigSet sync_loop",
107 synced.sync_loop(config_sync_loop_interval),
108 );
109 Ok(())
110}
111
112fn ld_ctx<F>(build_info: &'static BuildInfo, ctx_builder: F) -> Result<ld::Context, anyhow::Error>
113where
114 F: FnOnce(&mut ld::MultiContextBuilder) -> Result<(), anyhow::Error>,
115{
116 let mut builder = ld::MultiContextBuilder::new();
123
124 builder.add_context(
125 ld::ContextBuilder::new(build_info.sha)
126 .kind("build")
127 .set_string("semver_version", build_info.semver_version().to_string())
128 .build()
129 .map_err(|e| anyhow::anyhow!(e))?,
130 );
131
132 ctx_builder(&mut builder)?;
133
134 builder.build().map_err(|e| anyhow::anyhow!(e))
135}
136
137struct SyncedConfigSet<F>
138where
139 F: Fn(&ConfigUpdates, &ConfigSet) + Send,
140{
141 set: ConfigSet,
142 ld_client: Option<ld::Client>,
143 ld_ctx: ld::Context,
144 on_update: F,
145}
146
147impl<F: Fn(&ConfigUpdates, &ConfigSet) + Send> SyncedConfigSet<F> {
148 async fn sync_loop(self, tick_interval: Option<Duration>) {
150 let Some(tick_interval) = tick_interval else {
151 tracing::info!("skipping SyncedConfigSet sync as tick_interval = None");
152 return;
153 };
154
155 let mut interval = time::interval(tick_interval);
156 interval.set_missed_tick_behavior(time::MissedTickBehavior::Skip);
157
158 tracing::info!(
159 "synchronizing SyncedConfigSet values every {} seconds",
160 tick_interval.as_secs()
161 );
162
163 loop {
164 interval.tick().await;
165
166 if let Err(err) = self.sync() {
167 tracing::info!("SyncedConfigSet: {err}");
168 }
169 }
170 }
171
172 fn sync(&self) -> Result<(), anyhow::Error> {
174 let mut updates = ConfigUpdates::default();
175 let Some(ld_client) = &self.ld_client else {
176 (self.on_update)(&updates, &self.set);
177 return Ok(());
178 };
179 for entry in self.set.entries() {
180 let val = dyn_into_flag(entry.val()).expect("new() verifies all configs can convert");
181 let flag_var = ld_client.variation(&self.ld_ctx, entry.name(), val);
182 let update = match (entry.val(), flag_var) {
183 (ConfigVal::Bool(_), ld::FlagValue::Bool(flag)) => ConfigVal::Bool(flag),
184 (ConfigVal::U32(_), ld::FlagValue::Number(flag)) => {
185 ConfigVal::U32(u32::cast_lossy(flag))
186 }
187 (ConfigVal::Usize(_), ld::FlagValue::Number(flag)) => {
188 ConfigVal::Usize(usize::cast_lossy(flag))
189 }
190 (ConfigVal::F64(_), ld::FlagValue::Number(flag)) => ConfigVal::F64(flag),
191 (ConfigVal::String(_), ld::FlagValue::Str(flag)) => ConfigVal::String(flag),
192 (ConfigVal::Duration(_), ld::FlagValue::Str(flag)) => {
193 ConfigVal::Duration(humantime::parse_duration(&flag)?)
194 }
195 (ConfigVal::Json(_), ld::FlagValue::Json(flag)) => ConfigVal::Json(flag),
196
197 (ConfigVal::Bool(_), _)
200 | (ConfigVal::U32(_), _)
201 | (ConfigVal::Usize(_), _)
202 | (ConfigVal::F64(_), _)
203 | (ConfigVal::Duration(_), _)
204 | (ConfigVal::Json(_), _)
205 | (ConfigVal::OptUsize(_), _)
206 | (ConfigVal::String(_), _)
207 | (ConfigVal::OptString(_), _) => anyhow::bail!(
208 "LD flag cannot be cast to the ConfigVal for {}",
209 entry.name()
210 ),
211 };
212 tracing::debug!(
213 "updating config value {} from {:?} to {:?}",
214 &entry.name(),
215 &entry.val(),
216 update
217 );
218 updates.add_dynamic(entry.name(), update);
219 }
220 updates.apply(&self.set);
221 (self.on_update)(&updates, &self.set);
222 Ok(())
223 }
224}
225
226fn dyn_into_flag(val: ConfigVal) -> Result<ld::FlagValue, anyhow::Error> {
229 Ok(match val {
233 ConfigVal::Bool(v) => ld::FlagValue::Bool(v),
234 ConfigVal::U32(v) => ld::FlagValue::Number(v.into()),
235 ConfigVal::Usize(v) => ld::FlagValue::Number(f64::cast_lossy(v)),
236 ConfigVal::OptUsize(_) => anyhow::bail!("OptUsize None cannot be converted to a FlagValue"),
237 ConfigVal::F64(v) => ld::FlagValue::Number(v),
238 ConfigVal::String(v) => ld::FlagValue::Str(v),
239 ConfigVal::OptString(_) => {
240 anyhow::bail!("OptString None cannot be converted to a FlagValue")
241 }
242 ConfigVal::Duration(v) => ld::FlagValue::Str(humantime::format_duration(v).to_string()),
243 ConfigVal::Json(v) => ld::FlagValue::Json(v),
244 })
245}