mz_adapter/config/
frontend.rs1use std::collections::BTreeMap;
11use std::fs;
12use std::path::PathBuf;
13use std::sync::Arc;
14use std::time::Duration;
15
16use derivative::Derivative;
17use hyper_tls::HttpsConnector;
18use launchdarkly_server_sdk as ld;
19use mz_build_info::BuildInfo;
20use mz_cloud_provider::CloudProvider;
21use mz_ore::now::NowFn;
22use mz_sql::catalog::EnvironmentId;
23use serde_json::Value as JsonValue;
24use tokio::time;
25use tracing::warn;
26
27use crate::config::{
28 Metrics, SynchronizedParameters, SystemParameterSyncClientConfig, SystemParameterSyncConfig,
29};
30
31#[derive(Derivative)]
33#[derivative(Debug)]
34pub struct SystemParameterFrontend {
35 client: SystemParameterFrontendClient,
37 key_map: BTreeMap<String, String>,
41 metrics: Metrics,
43}
44
45#[derive(Derivative)]
46#[derivative(Debug)]
47pub enum SystemParameterFrontendClient {
48 File {
49 path: PathBuf,
50 },
51 LaunchDarkly {
52 #[derivative(Debug = "ignore")]
54 client: ld::Client,
55 ctx: ld::Context,
58 },
59}
60
61impl SystemParameterFrontendClient {}
62
63impl SystemParameterFrontend {
64 pub async fn from(sync_config: &SystemParameterSyncConfig) -> Result<Self, anyhow::Error> {
70 match &sync_config.backend_config {
71 super::SystemParameterSyncClientConfig::File { path } => Ok(Self {
72 client: SystemParameterFrontendClient::File { path: path.clone() },
73 key_map: sync_config.key_map.clone(),
74 metrics: sync_config.metrics.clone(),
75 }),
76 SystemParameterSyncClientConfig::LaunchDarkly { sdk_key, now_fn } => Ok(Self {
77 client: SystemParameterFrontendClient::LaunchDarkly {
78 client: ld_client(sdk_key, &sync_config.metrics, now_fn).await?,
79 ctx: ld_ctx(&sync_config.env_id, sync_config.build_info)?,
80 },
81 metrics: sync_config.metrics.clone(),
82 key_map: sync_config.key_map.clone(),
83 }),
84 }
85 }
86
87 pub fn pull(&self, params: &mut SynchronizedParameters) -> bool {
91 let mut changed = false;
92 for param_name in params.synchronized().into_iter() {
93 let flag_name = self
94 .key_map
95 .get(param_name)
96 .map(|flag_name| flag_name.as_str())
97 .unwrap_or(param_name);
98
99 let flag_str = match self.client {
100 SystemParameterFrontendClient::LaunchDarkly {
101 ref client,
102 ref ctx,
103 } => {
104 let flag_var = client.variation(ctx, flag_name, params.get(param_name));
105 match flag_var {
106 ld::FlagValue::Bool(v) => v.to_string(),
107 ld::FlagValue::Str(v) => v,
108 ld::FlagValue::Number(v) => v.to_string(),
109 ld::FlagValue::Json(v) => v.to_string(),
110 }
111 }
112 SystemParameterFrontendClient::File { ref path } => {
113 let file_contents = fs::read_to_string(path)
114 .inspect_err(|e| warn!("Could not open system paraemter sync file {}", e))
115 .unwrap_or_default();
116 let values: BTreeMap<String, JsonValue> = serde_json::from_str(&file_contents)
117 .inspect_err(|e| warn!("Could not open system paraemter sync file {:?}", e))
118 .unwrap_or_default();
119 values
120 .get(flag_name)
121 .and_then(|o| match o {
122 serde_json::Value::String(v) => Some(v.to_string()),
123 serde_json::Value::Number(v) => Some(v.to_string()),
124 serde_json::Value::Bool(v) => Some(v.to_string()),
125 serde_json::Value::Object(_) => Some(o.to_string()),
126 serde_json::Value::Array(_) => Some(o.to_string()),
127 serde_json::Value::Null => None,
128 })
129 .unwrap_or_else(|| params.get(param_name))
130 }
131 };
132
133 let old = params.get(param_name);
134 let change = params.modify(param_name, flag_str.as_str());
135 if change {
136 tracing::debug!(
137 %param_name, %old, new = %flag_str,
138 "updating system param",
139 );
140 }
141 self.metrics.params_changed.inc_by(u64::from(change));
142 changed |= change;
143 }
144
145 changed
146 }
147}
148
149fn ld_config(api_key: &str, metrics: &Metrics) -> ld::Config {
150 ld::ConfigBuilder::new(api_key)
151 .event_processor(
152 ld::EventProcessorBuilder::new()
153 .https_connector(HttpsConnector::new())
154 .on_success({
155 let last_cse_time_seconds = metrics.last_cse_time_seconds.clone();
156 Arc::new(move |result| {
157 if let Ok(ts) = u64::try_from(result.time_from_server / 1000) {
158 last_cse_time_seconds.set(ts);
159 } else {
160 tracing::warn!(
161 "Cannot convert time_from_server / 1000 from u128 to u64"
162 );
163 }
164 })
165 }),
166 )
167 .data_source(ld::StreamingDataSourceBuilder::new().https_connector(HttpsConnector::new()))
168 .build()
169 .expect("valid config")
170}
171
172async fn ld_client(
173 api_key: &str,
174 metrics: &Metrics,
175 now_fn: &NowFn,
176) -> Result<ld::Client, anyhow::Error> {
177 let ld_client = ld::Client::build(ld_config(api_key, metrics))?;
178 tracing::info!("waiting for SystemParameterFrontend to initialize");
179 ld_client.start_with_default_executor_and_callback({
183 let last_sse_time_seconds = metrics.last_sse_time_seconds.clone();
184 let now_fn = now_fn.clone();
185 Arc::new(move |_ev| {
186 let ts = now_fn() / 1000;
187 last_sse_time_seconds.set(ts);
188 })
189 });
190
191 let max_backoff = Duration::from_secs(60);
192 let mut backoff = Duration::from_secs(5);
193 let timeout = Duration::from_secs(10);
194
195 loop {
197 match ld_client.wait_for_initialization(timeout).await {
198 Some(true) => break,
199 Some(false) => tracing::warn!("SystemParameterFrontend failed to initialize"),
200 None => tracing::warn!("SystemParameterFrontend initialization timed out"),
201 }
202
203 time::sleep(backoff).await;
204 backoff = (backoff * 2).min(max_backoff);
205 }
206
207 tracing::info!("successfully initialized SystemParameterFrontend");
208
209 Ok(ld_client)
210}
211
212fn ld_ctx(
213 env_id: &EnvironmentId,
214 build_info: &'static BuildInfo,
215) -> Result<ld::Context, anyhow::Error> {
216 let mut ctx_builder = ld::MultiContextBuilder::new();
223
224 if env_id.cloud_provider() != &CloudProvider::Local {
225 ctx_builder.add_context(
226 ld::ContextBuilder::new(env_id.to_string())
227 .kind("environment")
228 .set_string("cloud_provider", env_id.cloud_provider().to_string())
229 .set_string("cloud_provider_region", env_id.cloud_provider_region())
230 .set_string("organization_id", env_id.organization_id().to_string())
231 .set_string("ordinal", env_id.ordinal().to_string())
232 .build()
233 .map_err(|e| anyhow::anyhow!(e))?,
234 );
235 ctx_builder.add_context(
236 ld::ContextBuilder::new(env_id.organization_id().to_string())
237 .kind("organization")
238 .build()
239 .map_err(|e| anyhow::anyhow!(e))?,
240 );
241 } else {
242 ctx_builder.add_context(
247 ld::ContextBuilder::new("anonymous-dev@materialize.com")
248 .anonymous(true) .kind("environment")
250 .set_string("cloud_provider", env_id.cloud_provider().to_string())
251 .set_string("cloud_provider_region", env_id.cloud_provider_region())
252 .set_string("organization_id", uuid::Uuid::nil().to_string())
253 .set_string("ordinal", env_id.ordinal().to_string())
254 .build()
255 .map_err(|e| anyhow::anyhow!(e))?,
256 );
257 ctx_builder.add_context(
258 ld::ContextBuilder::new(uuid::Uuid::nil().to_string())
259 .anonymous(true) .kind("organization")
261 .build()
262 .map_err(|e| anyhow::anyhow!(e))?,
263 );
264 };
265
266 ctx_builder.add_context(
267 ld::ContextBuilder::new(build_info.sha)
268 .kind("build")
269 .set_string("semver_version", build_info.semver_version().to_string())
270 .build()
271 .map_err(|e| anyhow::anyhow!(e))?,
272 );
273
274 ctx_builder.build().map_err(|e| anyhow::anyhow!(e))
275}