mz_adapter/config/
frontend.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::BTreeMap;
11use std::sync::Arc;
12use std::time::Duration;
13
14use derivative::Derivative;
15use hyper_tls::HttpsConnector;
16use launchdarkly_server_sdk as ld;
17use mz_build_info::BuildInfo;
18use mz_cloud_provider::CloudProvider;
19use mz_ore::now::NowFn;
20use mz_sql::catalog::EnvironmentId;
21use tokio::time;
22
23use crate::config::{Metrics, SynchronizedParameters, SystemParameterSyncConfig};
24
25/// A frontend client for pulling [SynchronizedParameters] from LaunchDarkly.
26#[derive(Derivative)]
27#[derivative(Debug)]
28pub struct SystemParameterFrontend {
29    /// An SDK client to mediate interactions with the LaunchDarkly client.
30    #[derivative(Debug = "ignore")]
31    ld_client: ld::Client,
32    /// The context to use when quering LaunchDarkly using the SDK.
33    /// This scopes down queries to a specific key.
34    ld_ctx: ld::Context,
35    /// A map from parameter names to LaunchDarkly feature keys
36    /// to use when populating the [SynchronizedParameters]
37    /// instance in [SystemParameterFrontend::pull].
38    ld_key_map: BTreeMap<String, String>,
39    /// Frontend metrics.
40    ld_metrics: Metrics,
41    /// Function to return the current time.
42    now_fn: NowFn,
43}
44
45impl SystemParameterFrontend {
46    /// Create a new [SystemParameterFrontend] initialize.
47    ///
48    /// This will create and initialize an [ld::Client] instance. The
49    /// [ld::Client::initialized_async] call will be attempted in a loop with an
50    /// exponential backoff with power `2s` and max duration `60s`.
51    pub async fn from(sync_config: &SystemParameterSyncConfig) -> Result<Self, anyhow::Error> {
52        Ok(Self {
53            ld_client: ld_client(sync_config).await?,
54            ld_ctx: ld_ctx(&sync_config.env_id, sync_config.build_info)?,
55            ld_key_map: sync_config.ld_key_map.clone(),
56            ld_metrics: sync_config.metrics.clone(),
57            now_fn: sync_config.now_fn.clone(),
58        })
59    }
60
61    /// Pull the current values for all [SynchronizedParameters] from the
62    /// [SystemParameterFrontend] and return `true` iff at least one parameter
63    /// value was modified.
64    pub fn pull(&self, params: &mut SynchronizedParameters) -> bool {
65        let mut changed = false;
66
67        for param_name in params.synchronized().into_iter() {
68            let flag_name = self
69                .ld_key_map
70                .get(param_name)
71                .map(|flag_name| flag_name.as_str())
72                .unwrap_or(param_name);
73
74            let flag_var =
75                self.ld_client
76                    .variation(&self.ld_ctx, flag_name, params.get(param_name));
77
78            let flag_str = match flag_var {
79                ld::FlagValue::Bool(v) => v.to_string(),
80                ld::FlagValue::Str(v) => v,
81                ld::FlagValue::Number(v) => v.to_string(),
82                ld::FlagValue::Json(v) => v.to_string(),
83            };
84
85            let change = params.modify(param_name, flag_str.as_str());
86            self.ld_metrics.params_changed.inc_by(u64::from(change));
87            changed |= change;
88        }
89
90        changed
91    }
92}
93
94fn ld_config(sync_config: &SystemParameterSyncConfig) -> ld::Config {
95    ld::ConfigBuilder::new(&sync_config.ld_sdk_key)
96        .event_processor(
97            ld::EventProcessorBuilder::new()
98                .https_connector(HttpsConnector::new())
99                .on_success({
100                    let last_cse_time_seconds = sync_config.metrics.last_cse_time_seconds.clone();
101                    Arc::new(move |result| {
102                        if let Ok(ts) = u64::try_from(result.time_from_server / 1000) {
103                            last_cse_time_seconds.set(ts);
104                        } else {
105                            tracing::warn!(
106                                "Cannot convert time_from_server / 1000 from u128 to u64"
107                            );
108                        }
109                    })
110                }),
111        )
112        .data_source(ld::StreamingDataSourceBuilder::new().https_connector(HttpsConnector::new()))
113        .build()
114        .expect("valid config")
115}
116
117async fn ld_client(sync_config: &SystemParameterSyncConfig) -> Result<ld::Client, anyhow::Error> {
118    let ld_client = ld::Client::build(ld_config(sync_config))?;
119
120    tracing::info!("waiting for SystemParameterFrontend to initialize");
121
122    // Start and initialize LD client for the frontend. The callback passed
123    // will export the last time when an SSE event from the LD server was
124    // received in a Prometheus metric.
125    ld_client.start_with_default_executor_and_callback({
126        let last_sse_time_seconds = sync_config.metrics.last_sse_time_seconds.clone();
127        let now_fn = sync_config.now_fn.clone();
128        Arc::new(move |_ev| {
129            let ts = now_fn() / 1000;
130            last_sse_time_seconds.set(ts);
131        })
132    });
133
134    let max_backoff = Duration::from_secs(60);
135    let mut backoff = Duration::from_secs(5);
136    let timeout = Duration::from_secs(10);
137
138    // TODO(materialize#32030): fix retry logic
139    loop {
140        match ld_client.wait_for_initialization(timeout).await {
141            Some(true) => break,
142            Some(false) => tracing::warn!("SystemParameterFrontend failed to initialize"),
143            None => tracing::warn!("SystemParameterFrontend initialization timed out"),
144        }
145
146        time::sleep(backoff).await;
147        backoff = (backoff * 2).min(max_backoff);
148    }
149
150    tracing::info!("successfully initialized SystemParameterFrontend");
151
152    Ok(ld_client)
153}
154
155fn ld_ctx(
156    env_id: &EnvironmentId,
157    build_info: &'static BuildInfo,
158) -> Result<ld::Context, anyhow::Error> {
159    // Register multiple contexts for this client.
160    //
161    // Unfortunately, it seems that the order in which conflicting targeting
162    // rules are applied depends on the definition order of feature flag
163    // variations rather than on the order in which context are registered with
164    // the multi-context builder.
165    let mut ctx_builder = ld::MultiContextBuilder::new();
166
167    if env_id.cloud_provider() != &CloudProvider::Local {
168        ctx_builder.add_context(
169            ld::ContextBuilder::new(env_id.to_string())
170                .kind("environment")
171                .set_string("cloud_provider", env_id.cloud_provider().to_string())
172                .set_string("cloud_provider_region", env_id.cloud_provider_region())
173                .set_string("organization_id", env_id.organization_id().to_string())
174                .set_string("ordinal", env_id.ordinal().to_string())
175                .build()
176                .map_err(|e| anyhow::anyhow!(e))?,
177        );
178        ctx_builder.add_context(
179            ld::ContextBuilder::new(env_id.organization_id().to_string())
180                .kind("organization")
181                .build()
182                .map_err(|e| anyhow::anyhow!(e))?,
183        );
184    } else {
185        // If cloud_provider is 'local', use anonymous `environment` and
186        // `organization` contexts with fixed keys, as otherwise we will create
187        // a lot of additional contexts (which are the billable entity for
188        // LaunchDarkly).
189        ctx_builder.add_context(
190            ld::ContextBuilder::new("anonymous-dev@materialize.com")
191                .anonymous(true) // exclude this user from the dashboard
192                .kind("environment")
193                .set_string("cloud_provider", env_id.cloud_provider().to_string())
194                .set_string("cloud_provider_region", env_id.cloud_provider_region())
195                .set_string("organization_id", uuid::Uuid::nil().to_string())
196                .set_string("ordinal", env_id.ordinal().to_string())
197                .build()
198                .map_err(|e| anyhow::anyhow!(e))?,
199        );
200        ctx_builder.add_context(
201            ld::ContextBuilder::new(uuid::Uuid::nil().to_string())
202                .anonymous(true) // exclude this user from the dashboard
203                .kind("organization")
204                .build()
205                .map_err(|e| anyhow::anyhow!(e))?,
206        );
207    };
208
209    ctx_builder.add_context(
210        ld::ContextBuilder::new(build_info.sha)
211            .kind("build")
212            .set_string("semver_version", build_info.semver_version().to_string())
213            .build()
214            .map_err(|e| anyhow::anyhow!(e))?,
215    );
216
217    ctx_builder.build().map_err(|e| anyhow::anyhow!(e))
218}