mz_adapter/config/
frontend.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::BTreeMap;
11use std::fs;
12use std::path::PathBuf;
13use std::sync::Arc;
14use std::time::Duration;
15
16use derivative::Derivative;
17use hyper_tls::HttpsConnector;
18use launchdarkly_server_sdk as ld;
19use mz_build_info::BuildInfo;
20use mz_cloud_provider::CloudProvider;
21use mz_ore::now::NowFn;
22use mz_sql::catalog::EnvironmentId;
23use serde_json::Value as JsonValue;
24use tokio::time;
25use tracing::warn;
26
27use crate::config::{
28    Metrics, SynchronizedParameters, SystemParameterSyncClientConfig, SystemParameterSyncConfig,
29};
30
31/// A frontend client for pulling [SynchronizedParameters] from LaunchDarkly.
32#[derive(Derivative)]
33#[derivative(Debug)]
34pub struct SystemParameterFrontend {
35    /// An SDK client to mediate interactions with the LaunchDarkly and json config file clients.
36    client: SystemParameterFrontendClient,
37    /// A map from parameter names to LaunchDarkly feature keys
38    /// to use when populating the [SynchronizedParameters]
39    /// instance in [SystemParameterFrontend::pull].
40    key_map: BTreeMap<String, String>,
41    /// Frontend metrics.
42    metrics: Metrics,
43}
44
45#[derive(Derivative)]
46#[derivative(Debug)]
47pub enum SystemParameterFrontendClient {
48    File {
49        path: PathBuf,
50    },
51    LaunchDarkly {
52        /// An SDK client to mediate interactions with the LaunchDarkly client.
53        #[derivative(Debug = "ignore")]
54        client: ld::Client,
55        /// The context to use when querying LaunchDarkly using the SDK.
56        /// This scopes down queries to a specific key.
57        ctx: ld::Context,
58    },
59}
60
61impl SystemParameterFrontendClient {}
62
63impl SystemParameterFrontend {
64    /// Create a new [SystemParameterFrontend] initialize.
65    ///
66    /// This will create and initialize an [ld::Client] instance. The
67    /// [ld::Client::initialized_async] call will be attempted in a loop with an
68    /// exponential backoff with power `2s` and max duration `60s`.
69    pub async fn from(sync_config: &SystemParameterSyncConfig) -> Result<Self, anyhow::Error> {
70        match &sync_config.backend_config {
71            super::SystemParameterSyncClientConfig::File { path } => Ok(Self {
72                client: SystemParameterFrontendClient::File { path: path.clone() },
73                key_map: sync_config.key_map.clone(),
74                metrics: sync_config.metrics.clone(),
75            }),
76            SystemParameterSyncClientConfig::LaunchDarkly { sdk_key, now_fn } => Ok(Self {
77                client: SystemParameterFrontendClient::LaunchDarkly {
78                    client: ld_client(sdk_key, &sync_config.metrics, now_fn).await?,
79                    ctx: ld_ctx(&sync_config.env_id, sync_config.build_info)?,
80                },
81                metrics: sync_config.metrics.clone(),
82                key_map: sync_config.key_map.clone(),
83            }),
84        }
85    }
86
87    /// Pull the current values for all [SynchronizedParameters] from the
88    /// [SystemParameterFrontend] and return `true` iff at least one parameter
89    /// value was modified.
90    pub fn pull(&self, params: &mut SynchronizedParameters) -> bool {
91        let mut changed = false;
92        for param_name in params.synchronized().into_iter() {
93            let flag_name = self
94                .key_map
95                .get(param_name)
96                .map(|flag_name| flag_name.as_str())
97                .unwrap_or(param_name);
98
99            let flag_str = match self.client {
100                SystemParameterFrontendClient::LaunchDarkly {
101                    ref client,
102                    ref ctx,
103                } => {
104                    let flag_var = client.variation(ctx, flag_name, params.get(param_name));
105                    match flag_var {
106                        ld::FlagValue::Bool(v) => v.to_string(),
107                        ld::FlagValue::Str(v) => v,
108                        ld::FlagValue::Number(v) => v.to_string(),
109                        ld::FlagValue::Json(v) => v.to_string(),
110                    }
111                }
112                SystemParameterFrontendClient::File { ref path } => {
113                    let file_contents = fs::read_to_string(path)
114                        .inspect_err(|e| warn!("Could not open system paraemter sync file {}", e))
115                        .unwrap_or_default();
116                    let values: BTreeMap<String, JsonValue> = serde_json::from_str(&file_contents)
117                        .inspect_err(|e| warn!("Could not open system paraemter sync file {:?}", e))
118                        .unwrap_or_default();
119                    values
120                        .get(flag_name)
121                        .and_then(|o| match o {
122                            serde_json::Value::String(v) => Some(v.to_string()),
123                            serde_json::Value::Number(v) => Some(v.to_string()),
124                            serde_json::Value::Bool(v) => Some(v.to_string()),
125                            serde_json::Value::Object(_) => Some(o.to_string()),
126                            serde_json::Value::Array(_) => Some(o.to_string()),
127                            serde_json::Value::Null => None,
128                        })
129                        .unwrap_or_else(|| params.get(param_name))
130                }
131            };
132
133            let old = params.get(param_name);
134            let change = params.modify(param_name, flag_str.as_str());
135            if change {
136                tracing::debug!(
137                    %param_name, %old, new = %flag_str,
138                    "updating system param",
139                );
140            }
141            self.metrics.params_changed.inc_by(u64::from(change));
142            changed |= change;
143        }
144
145        changed
146    }
147}
148
149fn ld_config(api_key: &str, metrics: &Metrics) -> ld::Config {
150    ld::ConfigBuilder::new(api_key)
151        .event_processor(
152            ld::EventProcessorBuilder::new()
153                .https_connector(HttpsConnector::new())
154                .on_success({
155                    let last_cse_time_seconds = metrics.last_cse_time_seconds.clone();
156                    Arc::new(move |result| {
157                        if let Ok(ts) = u64::try_from(result.time_from_server / 1000) {
158                            last_cse_time_seconds.set(ts);
159                        } else {
160                            tracing::warn!(
161                                "Cannot convert time_from_server / 1000 from u128 to u64"
162                            );
163                        }
164                    })
165                }),
166        )
167        .data_source(ld::StreamingDataSourceBuilder::new().https_connector(HttpsConnector::new()))
168        .build()
169        .expect("valid config")
170}
171
172async fn ld_client(
173    api_key: &str,
174    metrics: &Metrics,
175    now_fn: &NowFn,
176) -> Result<ld::Client, anyhow::Error> {
177    let ld_client = ld::Client::build(ld_config(api_key, metrics))?;
178    tracing::info!("waiting for SystemParameterFrontend to initialize");
179    // Start and initialize LD client for the frontend. The callback passed
180    // will export the last time when an SSE event from the LD server was
181    // received in a Prometheus metric.
182    ld_client.start_with_default_executor_and_callback({
183        let last_sse_time_seconds = metrics.last_sse_time_seconds.clone();
184        let now_fn = now_fn.clone();
185        Arc::new(move |_ev| {
186            let ts = now_fn() / 1000;
187            last_sse_time_seconds.set(ts);
188        })
189    });
190
191    let max_backoff = Duration::from_secs(60);
192    let mut backoff = Duration::from_secs(5);
193    let timeout = Duration::from_secs(10);
194
195    // TODO(materialize#32030): fix retry logic
196    loop {
197        match ld_client.wait_for_initialization(timeout).await {
198            Some(true) => break,
199            Some(false) => tracing::warn!("SystemParameterFrontend failed to initialize"),
200            None => tracing::warn!("SystemParameterFrontend initialization timed out"),
201        }
202
203        time::sleep(backoff).await;
204        backoff = (backoff * 2).min(max_backoff);
205    }
206
207    tracing::info!("successfully initialized SystemParameterFrontend");
208
209    Ok(ld_client)
210}
211
212fn ld_ctx(
213    env_id: &EnvironmentId,
214    build_info: &'static BuildInfo,
215) -> Result<ld::Context, anyhow::Error> {
216    // Register multiple contexts for this client.
217    //
218    // Unfortunately, it seems that the order in which conflicting targeting
219    // rules are applied depends on the definition order of feature flag
220    // variations rather than on the order in which context are registered with
221    // the multi-context builder.
222    let mut ctx_builder = ld::MultiContextBuilder::new();
223
224    if env_id.cloud_provider() != &CloudProvider::Local {
225        ctx_builder.add_context(
226            ld::ContextBuilder::new(env_id.to_string())
227                .kind("environment")
228                .set_string("cloud_provider", env_id.cloud_provider().to_string())
229                .set_string("cloud_provider_region", env_id.cloud_provider_region())
230                .set_string("organization_id", env_id.organization_id().to_string())
231                .set_string("ordinal", env_id.ordinal().to_string())
232                .build()
233                .map_err(|e| anyhow::anyhow!(e))?,
234        );
235        ctx_builder.add_context(
236            ld::ContextBuilder::new(env_id.organization_id().to_string())
237                .kind("organization")
238                .build()
239                .map_err(|e| anyhow::anyhow!(e))?,
240        );
241    } else {
242        // If cloud_provider is 'local', use anonymous `environment` and
243        // `organization` contexts with fixed keys, as otherwise we will create
244        // a lot of additional contexts (which are the billable entity for
245        // LaunchDarkly).
246        ctx_builder.add_context(
247            ld::ContextBuilder::new("anonymous-dev@materialize.com")
248                .anonymous(true) // exclude this user from the dashboard
249                .kind("environment")
250                .set_string("cloud_provider", env_id.cloud_provider().to_string())
251                .set_string("cloud_provider_region", env_id.cloud_provider_region())
252                .set_string("organization_id", uuid::Uuid::nil().to_string())
253                .set_string("ordinal", env_id.ordinal().to_string())
254                .build()
255                .map_err(|e| anyhow::anyhow!(e))?,
256        );
257        ctx_builder.add_context(
258            ld::ContextBuilder::new(uuid::Uuid::nil().to_string())
259                .anonymous(true) // exclude this user from the dashboard
260                .kind("organization")
261                .build()
262                .map_err(|e| anyhow::anyhow!(e))?,
263        );
264    };
265
266    ctx_builder.add_context(
267        ld::ContextBuilder::new(build_info.sha)
268            .kind("build")
269            .set_string("semver_version", build_info.semver_version().to_string())
270            .build()
271            .map_err(|e| anyhow::anyhow!(e))?,
272    );
273
274    ctx_builder.build().map_err(|e| anyhow::anyhow!(e))
275}