1use std::collections::BTreeMap;
11use std::fs;
12use std::path::PathBuf;
13use std::time::Duration;
14
15use bytes::Bytes;
16use derivative::Derivative;
17use futures::TryStreamExt;
18use launchdarkly_sdk_transport::{ByteStream, HttpTransport, ResponseFuture};
19use launchdarkly_server_sdk as ld;
20use mz_build_info::BuildInfo;
21use mz_cloud_provider::CloudProvider;
22use mz_ore::metrics::UIntGauge;
23use mz_ore::now::NowFn;
24use mz_sql::catalog::EnvironmentId;
25use serde_json::Value as JsonValue;
26use tokio::time;
27use tracing::warn;
28
29use crate::config::{
30 Metrics, SynchronizedParameters, SystemParameterSyncClientConfig, SystemParameterSyncConfig,
31};
32
33#[derive(Derivative)]
35#[derivative(Debug)]
36pub struct SystemParameterFrontend {
37 client: SystemParameterFrontendClient,
39 key_map: BTreeMap<String, String>,
43 metrics: Metrics,
45}
46
47#[derive(Derivative)]
48#[derivative(Debug)]
49pub enum SystemParameterFrontendClient {
50 File {
51 path: PathBuf,
52 },
53 LaunchDarkly {
54 #[derivative(Debug = "ignore")]
56 client: ld::Client,
57 ctx: ld::Context,
60 },
61}
62
63impl SystemParameterFrontendClient {}
64
65impl SystemParameterFrontend {
66 pub async fn from(sync_config: &SystemParameterSyncConfig) -> Result<Self, anyhow::Error> {
72 match &sync_config.backend_config {
73 super::SystemParameterSyncClientConfig::File { path } => Ok(Self {
74 client: SystemParameterFrontendClient::File { path: path.clone() },
75 key_map: sync_config.key_map.clone(),
76 metrics: sync_config.metrics.clone(),
77 }),
78 SystemParameterSyncClientConfig::LaunchDarkly { sdk_key, now_fn } => Ok(Self {
79 client: SystemParameterFrontendClient::LaunchDarkly {
80 client: ld_client(sdk_key, &sync_config.metrics, now_fn).await?,
81 ctx: ld_ctx(&sync_config.env_id, sync_config.build_info)?,
82 },
83 metrics: sync_config.metrics.clone(),
84 key_map: sync_config.key_map.clone(),
85 }),
86 }
87 }
88
89 pub fn pull(&self, params: &mut SynchronizedParameters) -> bool {
93 let mut changed = false;
94 for param_name in params.synchronized().into_iter() {
95 let flag_name = self
96 .key_map
97 .get(param_name)
98 .map(|flag_name| flag_name.as_str())
99 .unwrap_or(param_name);
100
101 let flag_str = match self.client {
102 SystemParameterFrontendClient::LaunchDarkly {
103 ref client,
104 ref ctx,
105 } => {
106 let flag_var = client.variation(ctx, flag_name, params.get(param_name));
107 match flag_var {
108 ld::FlagValue::Bool(v) => v.to_string(),
109 ld::FlagValue::Str(v) => v,
110 ld::FlagValue::Number(v) => v.to_string(),
111 ld::FlagValue::Json(v) => v.to_string(),
112 }
113 }
114 SystemParameterFrontendClient::File { ref path } => {
115 let file_contents = fs::read_to_string(path)
116 .inspect_err(|e| warn!("Could not open system paraemter sync file {}", e))
117 .unwrap_or_default();
118 let values: BTreeMap<String, JsonValue> = serde_json::from_str(&file_contents)
119 .inspect_err(|e| warn!("Could not open system paraemter sync file {:?}", e))
120 .unwrap_or_default();
121 values
122 .get(flag_name)
123 .and_then(|o| match o {
124 serde_json::Value::String(v) => Some(v.to_string()),
125 serde_json::Value::Number(v) => Some(v.to_string()),
126 serde_json::Value::Bool(v) => Some(v.to_string()),
127 serde_json::Value::Object(_) => Some(o.to_string()),
128 serde_json::Value::Array(_) => Some(o.to_string()),
129 serde_json::Value::Null => None,
130 })
131 .unwrap_or_else(|| params.get(param_name))
132 }
133 };
134
135 let old = params.get(param_name);
136 let change = params.modify(param_name, flag_str.as_str());
137 if change {
138 tracing::debug!(
139 %param_name, %old, new = %flag_str,
140 "updating system param",
141 );
142 }
143 self.metrics.params_changed.inc_by(u64::from(change));
144 changed |= change;
145 }
146
147 changed
148 }
149}
150
151#[derive(Clone)]
159struct MetricsTransport<T> {
160 inner: T,
161 last_success_gauge: UIntGauge,
162 now_fn: NowFn,
163}
164
165impl<T: HttpTransport> HttpTransport for MetricsTransport<T> {
166 fn request(&self, request: http::Request<Option<Bytes>>) -> ResponseFuture {
167 let inner_fut = self.inner.request(request);
168 let gauge = self.last_success_gauge.clone();
169 let now_fn = self.now_fn.clone();
170 Box::pin(async move {
171 let resp = inner_fut.await?;
172 if resp.status().is_success() {
173 gauge.set(now_fn() / 1000);
174 let (parts, body) = resp.into_parts();
175 let wrapped: ByteStream = Box::pin(body.inspect_ok(move |_| {
176 gauge.set(now_fn() / 1000);
177 }));
178 Ok(http::Response::from_parts(parts, wrapped))
179 } else {
180 Ok(resp)
181 }
182 })
183 }
184}
185
186fn ld_config(api_key: &str, metrics: &Metrics, now_fn: &NowFn) -> ld::Config {
187 let transport = launchdarkly_sdk_transport::HyperTransport::builder()
188 .connect_timeout(Duration::from_secs(10))
189 .read_timeout(Duration::from_secs(300))
190 .build_https()
191 .expect("failed to create HTTPS transport");
192
193 let cse_transport = MetricsTransport {
194 inner: transport.clone(),
195 last_success_gauge: metrics.last_cse_time_seconds.clone(),
196 now_fn: now_fn.clone(),
197 };
198 let data_source_transport = MetricsTransport {
199 inner: transport,
200 last_success_gauge: metrics.last_sse_time_seconds.clone(),
201 now_fn: now_fn.clone(),
202 };
203
204 let mut event_processor = ld::EventProcessorBuilder::new();
205 event_processor.transport(cse_transport);
206
207 let mut data_source = ld::StreamingDataSourceBuilder::new();
208 data_source.transport(data_source_transport);
209
210 ld::ConfigBuilder::new(api_key)
211 .event_processor(&event_processor)
212 .data_source(&data_source)
213 .build()
214 .expect("valid config")
215}
216
217async fn ld_client(
218 api_key: &str,
219 metrics: &Metrics,
220 now_fn: &NowFn,
221) -> Result<ld::Client, anyhow::Error> {
222 let ld_client = ld::Client::build(ld_config(api_key, metrics, now_fn))?;
223 tracing::info!("waiting for SystemParameterFrontend to initialize");
224 ld_client.start_with_default_executor();
225
226 let max_backoff = Duration::from_secs(60);
227 let mut backoff = Duration::from_secs(5);
228 let timeout = Duration::from_secs(10);
229
230 loop {
232 match ld_client.wait_for_initialization(timeout).await {
233 Some(true) => break,
234 Some(false) => tracing::warn!("SystemParameterFrontend failed to initialize"),
235 None => tracing::warn!("SystemParameterFrontend initialization timed out"),
236 }
237
238 time::sleep(backoff).await;
239 backoff = (backoff * 2).min(max_backoff);
240 }
241
242 tracing::info!("successfully initialized SystemParameterFrontend");
243
244 Ok(ld_client)
245}
246
247fn ld_ctx(
248 env_id: &EnvironmentId,
249 build_info: &'static BuildInfo,
250) -> Result<ld::Context, anyhow::Error> {
251 let mut ctx_builder = ld::MultiContextBuilder::new();
258
259 if env_id.cloud_provider() != &CloudProvider::Local {
260 ctx_builder.add_context(
261 ld::ContextBuilder::new(env_id.to_string())
262 .kind("environment")
263 .set_string("cloud_provider", env_id.cloud_provider().to_string())
264 .set_string("cloud_provider_region", env_id.cloud_provider_region())
265 .set_string("organization_id", env_id.organization_id().to_string())
266 .set_string("ordinal", env_id.ordinal().to_string())
267 .build()
268 .map_err(|e| anyhow::anyhow!(e))?,
269 );
270 ctx_builder.add_context(
271 ld::ContextBuilder::new(env_id.organization_id().to_string())
272 .kind("organization")
273 .build()
274 .map_err(|e| anyhow::anyhow!(e))?,
275 );
276 } else {
277 ctx_builder.add_context(
282 ld::ContextBuilder::new("anonymous-dev@materialize.com")
283 .anonymous(true) .kind("environment")
285 .set_string("cloud_provider", env_id.cloud_provider().to_string())
286 .set_string("cloud_provider_region", env_id.cloud_provider_region())
287 .set_string("organization_id", uuid::Uuid::nil().to_string())
288 .set_string("ordinal", env_id.ordinal().to_string())
289 .build()
290 .map_err(|e| anyhow::anyhow!(e))?,
291 );
292 ctx_builder.add_context(
293 ld::ContextBuilder::new(uuid::Uuid::nil().to_string())
294 .anonymous(true) .kind("organization")
296 .build()
297 .map_err(|e| anyhow::anyhow!(e))?,
298 );
299 };
300
301 ctx_builder.add_context(
302 ld::ContextBuilder::new(build_info.sha)
303 .kind("build")
304 .set_string("semver_version", build_info.semver_version().to_string())
305 .build()
306 .map_err(|e| anyhow::anyhow!(e))?,
307 );
308
309 ctx_builder.build().map_err(|e| anyhow::anyhow!(e))
310}
311
312#[cfg(test)]
313mod tests {
314 use super::*;
315 use futures::StreamExt;
316 use launchdarkly_sdk_transport::{ByteStream, TransportError};
317 use mz_ore::metrics::MetricsRegistry;
318 use std::sync::Arc;
319 use std::sync::atomic::{AtomicU64, Ordering};
320
321 #[derive(Clone)]
325 struct FakeSseTransport;
326
327 impl HttpTransport for FakeSseTransport {
328 fn request(&self, _request: http::Request<Option<Bytes>>) -> ResponseFuture {
329 let body: ByteStream = Box::pin(futures::stream::iter(vec![
330 Ok(Bytes::from("event: put\ndata: {\"flags\":{}}\n\n")),
331 Ok(Bytes::from("event: patch\ndata: {\"key\":\"flag1\"}\n\n")),
332 Ok(Bytes::from("event: patch\ndata: {\"key\":\"flag2\"}\n\n")),
333 ]));
334 Box::pin(async move {
335 http::Response::builder()
336 .status(200)
337 .body(body)
338 .map_err(|e| TransportError::new(std::io::Error::other(e)))
339 })
340 }
341 }
342
343 #[derive(Clone)]
345 struct FailingTransport;
346
347 impl HttpTransport for FailingTransport {
348 fn request(&self, _request: http::Request<Option<Bytes>>) -> ResponseFuture {
349 Box::pin(async move {
350 Err(TransportError::new(std::io::Error::new(
351 std::io::ErrorKind::ConnectionRefused,
352 "connection refused",
353 )))
354 })
355 }
356 }
357
358 fn test_gauge(registry: &MetricsRegistry, name: &str) -> UIntGauge {
359 registry.register(mz_ore::metric!(
360 name: name,
361 help: "test gauge",
362 ))
363 }
364
365 #[mz_ore::test(tokio::test)]
369 async fn test_metric_updated_on_body_chunks() -> Result<(), anyhow::Error> {
370 let time = Arc::new(AtomicU64::new(1_000_000));
371 let time_clone = Arc::clone(&time);
372 let now_fn = NowFn::from(move || time_clone.load(Ordering::SeqCst));
373
374 let registry = MetricsRegistry::new();
375 let gauge = test_gauge(®istry, "test_sse_gauge");
376
377 let transport = MetricsTransport {
378 inner: FakeSseTransport,
379 last_success_gauge: gauge.clone(),
380 now_fn,
381 };
382
383 assert_eq!(gauge.get(), 0);
384
385 let request = http::Request::builder()
386 .uri("https://stream.launchdarkly.com/all")
387 .body(None)?;
388 let response = transport.request(request).await?;
389
390 assert_eq!(gauge.get(), 1000);
391
392 time.store(2_800_000, Ordering::SeqCst);
393
394 let mut body = response.into_body();
395 let mut event_count = 0;
396 while let Some(Ok(_chunk)) = body.next().await {
397 event_count += 1;
398 }
399 assert_eq!(event_count, 3);
400
401 assert_eq!(gauge.get(), 2800);
402 Ok(())
403 }
404
405 #[mz_ore::test(tokio::test)]
406 async fn test_cse_metric_updates_correctly_per_request() -> Result<(), anyhow::Error> {
407 let time = Arc::new(AtomicU64::new(1_000_000));
408 let time_clone = Arc::clone(&time);
409 let now_fn = NowFn::from(move || time_clone.load(Ordering::SeqCst));
410
411 let registry = MetricsRegistry::new();
412 let gauge = test_gauge(®istry, "test_cse_gauge");
413
414 let transport = MetricsTransport {
415 inner: FakeSseTransport,
416 last_success_gauge: gauge.clone(),
417 now_fn,
418 };
419
420 let req = || -> Result<http::Request<Option<Bytes>>, http::Error> {
421 http::Request::builder()
422 .uri("https://events.launchdarkly.com/bulk")
423 .body(None)
424 };
425
426 let _ = transport.request(req()?).await?;
427 assert_eq!(gauge.get(), 1000);
428
429 time.store(2_000_000, Ordering::SeqCst);
430 let _ = transport.request(req()?).await?;
431 assert_eq!(gauge.get(), 2000);
432
433 time.store(3_000_000, Ordering::SeqCst);
434 let _ = transport.request(req()?).await?;
435 assert_eq!(gauge.get(), 3000);
436 Ok(())
437 }
438
439 #[mz_ore::test(tokio::test)]
440 async fn test_metric_not_updated_on_failed_request() -> Result<(), anyhow::Error> {
441 let now_fn = NowFn::from(|| 5_000_000u64);
442
443 let registry = MetricsRegistry::new();
444 let gauge = test_gauge(®istry, "test_fail_gauge");
445
446 let transport = MetricsTransport {
447 inner: FailingTransport,
448 last_success_gauge: gauge.clone(),
449 now_fn,
450 };
451
452 let request = http::Request::builder()
453 .uri("https://stream.launchdarkly.com/all")
454 .body(None)?;
455 let result = transport.request(request).await;
456 assert!(result.is_err());
457 assert_eq!(gauge.get(), 0, "gauge must not update on transport error");
458 Ok(())
459 }
460}