Skip to main content

mz_adapter/config/
frontend.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::BTreeMap;
11use std::fs;
12use std::path::PathBuf;
13use std::time::Duration;
14
15use bytes::Bytes;
16use derivative::Derivative;
17use futures::TryStreamExt;
18use launchdarkly_sdk_transport::{ByteStream, HttpTransport, ResponseFuture};
19use launchdarkly_server_sdk as ld;
20use mz_build_info::BuildInfo;
21use mz_cloud_provider::CloudProvider;
22use mz_ore::metrics::UIntGauge;
23use mz_ore::now::NowFn;
24use mz_sql::catalog::EnvironmentId;
25use serde_json::Value as JsonValue;
26use tokio::time;
27use tracing::warn;
28
29use crate::config::{
30    Metrics, SynchronizedParameters, SystemParameterSyncClientConfig, SystemParameterSyncConfig,
31};
32
33/// A frontend client for pulling [SynchronizedParameters] from LaunchDarkly.
34#[derive(Derivative)]
35#[derivative(Debug)]
36pub struct SystemParameterFrontend {
37    /// An SDK client to mediate interactions with the LaunchDarkly and json config file clients.
38    client: SystemParameterFrontendClient,
39    /// A map from parameter names to LaunchDarkly feature keys
40    /// to use when populating the [SynchronizedParameters]
41    /// instance in [SystemParameterFrontend::pull].
42    key_map: BTreeMap<String, String>,
43    /// Frontend metrics.
44    metrics: Metrics,
45}
46
47#[derive(Derivative)]
48#[derivative(Debug)]
49pub enum SystemParameterFrontendClient {
50    File {
51        path: PathBuf,
52    },
53    LaunchDarkly {
54        /// An SDK client to mediate interactions with the LaunchDarkly client.
55        #[derivative(Debug = "ignore")]
56        client: ld::Client,
57        /// The context to use when querying LaunchDarkly using the SDK.
58        /// This scopes down queries to a specific key.
59        ctx: ld::Context,
60    },
61}
62
63impl SystemParameterFrontendClient {}
64
65impl SystemParameterFrontend {
66    /// Create a new [SystemParameterFrontend] initialize.
67    ///
68    /// This will create and initialize an [ld::Client] instance. The
69    /// [ld::Client::wait_for_initialization] call will be attempted in a loop with an
70    /// exponential backoff with power `2s` and max duration `60s`.
71    pub async fn from(sync_config: &SystemParameterSyncConfig) -> Result<Self, anyhow::Error> {
72        match &sync_config.backend_config {
73            super::SystemParameterSyncClientConfig::File { path } => Ok(Self {
74                client: SystemParameterFrontendClient::File { path: path.clone() },
75                key_map: sync_config.key_map.clone(),
76                metrics: sync_config.metrics.clone(),
77            }),
78            SystemParameterSyncClientConfig::LaunchDarkly { sdk_key, now_fn } => Ok(Self {
79                client: SystemParameterFrontendClient::LaunchDarkly {
80                    client: ld_client(sdk_key, &sync_config.metrics, now_fn).await?,
81                    ctx: ld_ctx(&sync_config.env_id, sync_config.build_info)?,
82                },
83                metrics: sync_config.metrics.clone(),
84                key_map: sync_config.key_map.clone(),
85            }),
86        }
87    }
88
89    /// Pull the current values for all [SynchronizedParameters] from the
90    /// [SystemParameterFrontend] and return `true` iff at least one parameter
91    /// value was modified.
92    pub fn pull(&self, params: &mut SynchronizedParameters) -> bool {
93        let mut changed = false;
94        for param_name in params.synchronized().into_iter() {
95            let flag_name = self
96                .key_map
97                .get(param_name)
98                .map(|flag_name| flag_name.as_str())
99                .unwrap_or(param_name);
100
101            let flag_str = match self.client {
102                SystemParameterFrontendClient::LaunchDarkly {
103                    ref client,
104                    ref ctx,
105                } => {
106                    let flag_var = client.variation(ctx, flag_name, params.get(param_name));
107                    match flag_var {
108                        ld::FlagValue::Bool(v) => v.to_string(),
109                        ld::FlagValue::Str(v) => v,
110                        ld::FlagValue::Number(v) => v.to_string(),
111                        ld::FlagValue::Json(v) => v.to_string(),
112                    }
113                }
114                SystemParameterFrontendClient::File { ref path } => {
115                    let file_contents = fs::read_to_string(path)
116                        .inspect_err(|e| warn!("Could not open system paraemter sync file {}", e))
117                        .unwrap_or_default();
118                    let values: BTreeMap<String, JsonValue> = serde_json::from_str(&file_contents)
119                        .inspect_err(|e| warn!("Could not open system paraemter sync file {:?}", e))
120                        .unwrap_or_default();
121                    values
122                        .get(flag_name)
123                        .and_then(|o| match o {
124                            serde_json::Value::String(v) => Some(v.to_string()),
125                            serde_json::Value::Number(v) => Some(v.to_string()),
126                            serde_json::Value::Bool(v) => Some(v.to_string()),
127                            serde_json::Value::Object(_) => Some(o.to_string()),
128                            serde_json::Value::Array(_) => Some(o.to_string()),
129                            serde_json::Value::Null => None,
130                        })
131                        .unwrap_or_else(|| params.get(param_name))
132                }
133            };
134
135            let old = params.get(param_name);
136            let change = params.modify(param_name, flag_str.as_str());
137            if change {
138                tracing::debug!(
139                    %param_name, %old, new = %flag_str,
140                    "updating system param",
141                );
142            }
143            self.metrics.params_changed.inc_by(u64::from(change));
144            changed |= change;
145        }
146
147        changed
148    }
149}
150
151/// An [`HttpTransport`] wrapper that records timestamps on successful HTTP
152/// responses. Used to populate Prometheus metrics that track LaunchDarkly
153/// connectivity health.
154///
155/// Two instances are created — one for the event processor (CSE metric, tracks
156/// outbound event sends) and one for the streaming data source (SSE metric,
157/// tracks inbound SSE events).
158#[derive(Clone)]
159struct MetricsTransport<T> {
160    inner: T,
161    last_success_gauge: UIntGauge,
162    now_fn: NowFn,
163}
164
165impl<T: HttpTransport> HttpTransport for MetricsTransport<T> {
166    fn request(&self, request: http::Request<Option<Bytes>>) -> ResponseFuture {
167        let inner_fut = self.inner.request(request);
168        let gauge = self.last_success_gauge.clone();
169        let now_fn = self.now_fn.clone();
170        Box::pin(async move {
171            let resp = inner_fut.await?;
172            if resp.status().is_success() {
173                gauge.set(now_fn() / 1000);
174                let (parts, body) = resp.into_parts();
175                let wrapped: ByteStream = Box::pin(body.inspect_ok(move |_| {
176                    gauge.set(now_fn() / 1000);
177                }));
178                Ok(http::Response::from_parts(parts, wrapped))
179            } else {
180                Ok(resp)
181            }
182        })
183    }
184}
185
186fn ld_config(api_key: &str, metrics: &Metrics, now_fn: &NowFn) -> ld::Config {
187    let transport = launchdarkly_sdk_transport::HyperTransport::builder()
188        .connect_timeout(Duration::from_secs(10))
189        .read_timeout(Duration::from_secs(300))
190        .build_https()
191        .expect("failed to create HTTPS transport");
192
193    let cse_transport = MetricsTransport {
194        inner: transport.clone(),
195        last_success_gauge: metrics.last_cse_time_seconds.clone(),
196        now_fn: now_fn.clone(),
197    };
198    let data_source_transport = MetricsTransport {
199        inner: transport,
200        last_success_gauge: metrics.last_sse_time_seconds.clone(),
201        now_fn: now_fn.clone(),
202    };
203
204    let mut event_processor = ld::EventProcessorBuilder::new();
205    event_processor.transport(cse_transport);
206
207    let mut data_source = ld::StreamingDataSourceBuilder::new();
208    data_source.transport(data_source_transport);
209
210    ld::ConfigBuilder::new(api_key)
211        .event_processor(&event_processor)
212        .data_source(&data_source)
213        .build()
214        .expect("valid config")
215}
216
217async fn ld_client(
218    api_key: &str,
219    metrics: &Metrics,
220    now_fn: &NowFn,
221) -> Result<ld::Client, anyhow::Error> {
222    let ld_client = ld::Client::build(ld_config(api_key, metrics, now_fn))?;
223    tracing::info!("waiting for SystemParameterFrontend to initialize");
224    ld_client.start_with_default_executor();
225
226    let max_backoff = Duration::from_secs(60);
227    let mut backoff = Duration::from_secs(5);
228    let timeout = Duration::from_secs(10);
229
230    // TODO(materialize#32030): fix retry logic
231    loop {
232        match ld_client.wait_for_initialization(timeout).await {
233            Some(true) => break,
234            Some(false) => tracing::warn!("SystemParameterFrontend failed to initialize"),
235            None => tracing::warn!("SystemParameterFrontend initialization timed out"),
236        }
237
238        time::sleep(backoff).await;
239        backoff = (backoff * 2).min(max_backoff);
240    }
241
242    tracing::info!("successfully initialized SystemParameterFrontend");
243
244    Ok(ld_client)
245}
246
247fn ld_ctx(
248    env_id: &EnvironmentId,
249    build_info: &'static BuildInfo,
250) -> Result<ld::Context, anyhow::Error> {
251    // Register multiple contexts for this client.
252    //
253    // Unfortunately, it seems that the order in which conflicting targeting
254    // rules are applied depends on the definition order of feature flag
255    // variations rather than on the order in which context are registered with
256    // the multi-context builder.
257    let mut ctx_builder = ld::MultiContextBuilder::new();
258
259    if env_id.cloud_provider() != &CloudProvider::Local {
260        ctx_builder.add_context(
261            ld::ContextBuilder::new(env_id.to_string())
262                .kind("environment")
263                .set_string("cloud_provider", env_id.cloud_provider().to_string())
264                .set_string("cloud_provider_region", env_id.cloud_provider_region())
265                .set_string("organization_id", env_id.organization_id().to_string())
266                .set_string("ordinal", env_id.ordinal().to_string())
267                .build()
268                .map_err(|e| anyhow::anyhow!(e))?,
269        );
270        ctx_builder.add_context(
271            ld::ContextBuilder::new(env_id.organization_id().to_string())
272                .kind("organization")
273                .build()
274                .map_err(|e| anyhow::anyhow!(e))?,
275        );
276    } else {
277        // If cloud_provider is 'local', use anonymous `environment` and
278        // `organization` contexts with fixed keys, as otherwise we will create
279        // a lot of additional contexts (which are the billable entity for
280        // LaunchDarkly).
281        ctx_builder.add_context(
282            ld::ContextBuilder::new("anonymous-dev@materialize.com")
283                .anonymous(true) // exclude this user from the dashboard
284                .kind("environment")
285                .set_string("cloud_provider", env_id.cloud_provider().to_string())
286                .set_string("cloud_provider_region", env_id.cloud_provider_region())
287                .set_string("organization_id", uuid::Uuid::nil().to_string())
288                .set_string("ordinal", env_id.ordinal().to_string())
289                .build()
290                .map_err(|e| anyhow::anyhow!(e))?,
291        );
292        ctx_builder.add_context(
293            ld::ContextBuilder::new(uuid::Uuid::nil().to_string())
294                .anonymous(true) // exclude this user from the dashboard
295                .kind("organization")
296                .build()
297                .map_err(|e| anyhow::anyhow!(e))?,
298        );
299    };
300
301    ctx_builder.add_context(
302        ld::ContextBuilder::new(build_info.sha)
303            .kind("build")
304            .set_string("semver_version", build_info.semver_version().to_string())
305            .build()
306            .map_err(|e| anyhow::anyhow!(e))?,
307    );
308
309    ctx_builder.build().map_err(|e| anyhow::anyhow!(e))
310}
311
312#[cfg(test)]
313mod tests {
314    use super::*;
315    use futures::StreamExt;
316    use launchdarkly_sdk_transport::{ByteStream, TransportError};
317    use mz_ore::metrics::MetricsRegistry;
318    use std::sync::Arc;
319    use std::sync::atomic::{AtomicU64, Ordering};
320
321    /// A fake transport that simulates a long-lived SSE streaming connection:
322    /// returns 200 OK immediately, then delivers multiple SSE events as body
323    /// chunks (exactly how LaunchDarkly's streaming data source works).
324    #[derive(Clone)]
325    struct FakeSseTransport;
326
327    impl HttpTransport for FakeSseTransport {
328        fn request(&self, _request: http::Request<Option<Bytes>>) -> ResponseFuture {
329            let body: ByteStream = Box::pin(futures::stream::iter(vec![
330                Ok(Bytes::from("event: put\ndata: {\"flags\":{}}\n\n")),
331                Ok(Bytes::from("event: patch\ndata: {\"key\":\"flag1\"}\n\n")),
332                Ok(Bytes::from("event: patch\ndata: {\"key\":\"flag2\"}\n\n")),
333            ]));
334            Box::pin(async move {
335                http::Response::builder()
336                    .status(200)
337                    .body(body)
338                    .map_err(|e| TransportError::new(std::io::Error::other(e)))
339            })
340        }
341    }
342
343    /// A fake transport that returns an error, simulating a failed connection.
344    #[derive(Clone)]
345    struct FailingTransport;
346
347    impl HttpTransport for FailingTransport {
348        fn request(&self, _request: http::Request<Option<Bytes>>) -> ResponseFuture {
349            Box::pin(async move {
350                Err(TransportError::new(std::io::Error::new(
351                    std::io::ErrorKind::ConnectionRefused,
352                    "connection refused",
353                )))
354            })
355        }
356    }
357
358    fn test_gauge(registry: &MetricsRegistry, name: &str) -> UIntGauge {
359        registry.register(mz_ore::metric!(
360            name: name,
361            help: "test gauge",
362        ))
363    }
364
365    /// Verifies that MetricsTransport updates the gauge on each body chunk,
366    /// not just on the initial HTTP 200 response head. This matters for
367    /// long-lived streaming connections where SSE events arrive as body chunks.
368    #[mz_ore::test(tokio::test)]
369    async fn test_metric_updated_on_body_chunks() -> Result<(), anyhow::Error> {
370        let time = Arc::new(AtomicU64::new(1_000_000));
371        let time_clone = Arc::clone(&time);
372        let now_fn = NowFn::from(move || time_clone.load(Ordering::SeqCst));
373
374        let registry = MetricsRegistry::new();
375        let gauge = test_gauge(&registry, "test_sse_gauge");
376
377        let transport = MetricsTransport {
378            inner: FakeSseTransport,
379            last_success_gauge: gauge.clone(),
380            now_fn,
381        };
382
383        assert_eq!(gauge.get(), 0);
384
385        let request = http::Request::builder()
386            .uri("https://stream.launchdarkly.com/all")
387            .body(None)?;
388        let response = transport.request(request).await?;
389
390        assert_eq!(gauge.get(), 1000);
391
392        time.store(2_800_000, Ordering::SeqCst);
393
394        let mut body = response.into_body();
395        let mut event_count = 0;
396        while let Some(Ok(_chunk)) = body.next().await {
397            event_count += 1;
398        }
399        assert_eq!(event_count, 3);
400
401        assert_eq!(gauge.get(), 2800);
402        Ok(())
403    }
404
405    #[mz_ore::test(tokio::test)]
406    async fn test_cse_metric_updates_correctly_per_request() -> Result<(), anyhow::Error> {
407        let time = Arc::new(AtomicU64::new(1_000_000));
408        let time_clone = Arc::clone(&time);
409        let now_fn = NowFn::from(move || time_clone.load(Ordering::SeqCst));
410
411        let registry = MetricsRegistry::new();
412        let gauge = test_gauge(&registry, "test_cse_gauge");
413
414        let transport = MetricsTransport {
415            inner: FakeSseTransport,
416            last_success_gauge: gauge.clone(),
417            now_fn,
418        };
419
420        let req = || -> Result<http::Request<Option<Bytes>>, http::Error> {
421            http::Request::builder()
422                .uri("https://events.launchdarkly.com/bulk")
423                .body(None)
424        };
425
426        let _ = transport.request(req()?).await?;
427        assert_eq!(gauge.get(), 1000);
428
429        time.store(2_000_000, Ordering::SeqCst);
430        let _ = transport.request(req()?).await?;
431        assert_eq!(gauge.get(), 2000);
432
433        time.store(3_000_000, Ordering::SeqCst);
434        let _ = transport.request(req()?).await?;
435        assert_eq!(gauge.get(), 3000);
436        Ok(())
437    }
438
439    #[mz_ore::test(tokio::test)]
440    async fn test_metric_not_updated_on_failed_request() -> Result<(), anyhow::Error> {
441        let now_fn = NowFn::from(|| 5_000_000u64);
442
443        let registry = MetricsRegistry::new();
444        let gauge = test_gauge(&registry, "test_fail_gauge");
445
446        let transport = MetricsTransport {
447            inner: FailingTransport,
448            last_success_gauge: gauge.clone(),
449            now_fn,
450        };
451
452        let request = http::Request::builder()
453            .uri("https://stream.launchdarkly.com/all")
454            .body(None)?;
455        let result = transport.request(request).await;
456        assert!(result.is_err());
457        assert_eq!(gauge.get(), 0, "gauge must not update on transport error");
458        Ok(())
459    }
460}