1use std::collections::BTreeMap;
11use std::path::PathBuf;
12
13use mz_build_info::BuildInfo;
14use mz_cluster_client::ReplicaId;
15use mz_controller_types::ClusterId;
16use mz_ore::metric;
17use mz_ore::metrics::{MetricsRegistry, UIntGauge};
18use mz_ore::now::NowFn;
19use mz_sql::catalog::EnvironmentId;
20use prometheus::IntCounter;
21
22mod backend;
23mod frontend;
24mod params;
25mod sync;
26
27pub use backend::SystemParameterBackend;
28pub use frontend::SystemParameterFrontend;
29pub use params::{ModifiedParameter, SynchronizedParameters};
30pub use sync::system_parameter_sync;
31
32#[derive(Clone, Debug, Default, PartialEq, Eq)]
40pub struct ScopedParameters {
41 pub cluster: BTreeMap<ClusterId, BTreeMap<String, String>>,
43 pub replica: BTreeMap<ReplicaId, BTreeMap<String, String>>,
45}
46
47impl ScopedParameters {
48 pub fn is_empty(&self) -> bool {
50 self.cluster.is_empty() && self.replica.is_empty()
51 }
52
53 pub fn merge(&self, other: &ScopedParameters) -> ScopedParameters {
56 let mut merged = self.clone();
57 merged
58 .cluster
59 .extend(other.cluster.iter().map(|(id, v)| (*id, v.clone())));
60 merged
61 .replica
62 .extend(other.replica.iter().map(|(id, v)| (*id, v.clone())));
63 merged
64 }
65}
66
67#[derive(Clone, Debug)]
69pub struct SystemParameterSyncConfig {
70 env_id: EnvironmentId,
72 build_info: &'static BuildInfo,
74 metrics: Metrics,
76 key_map: BTreeMap<String, String>,
80 backend_config: SystemParameterSyncClientConfig,
82}
83
84#[derive(Clone, Debug)]
85pub enum SystemParameterSyncClientConfig {
86 File {
87 path: PathBuf,
89 },
90 LaunchDarkly {
91 sdk_key: String,
93 now_fn: NowFn,
95 },
96}
97
98impl SystemParameterSyncClientConfig {
99 fn is_launch_darkly(&self) -> bool {
100 match &self {
101 Self::LaunchDarkly { .. } => true,
102 Self::File { .. } => false,
103 }
104 }
105}
106
107impl SystemParameterSyncConfig {
108 pub fn new(
110 env_id: EnvironmentId,
111 build_info: &'static BuildInfo,
112 registry: &MetricsRegistry,
113 key_map: BTreeMap<String, String>,
114 backend_config: SystemParameterSyncClientConfig,
115 ) -> Self {
116 Self {
117 env_id,
118 build_info,
119 metrics: Metrics::register_into(registry),
120 key_map,
121 backend_config,
122 }
123 }
124}
125
126#[derive(Debug, Clone)]
127pub(super) struct Metrics {
128 pub last_cse_time_seconds: UIntGauge,
129 pub last_sse_time_seconds: UIntGauge,
130 pub params_changed: IntCounter,
131}
132
133impl Metrics {
134 pub(super) fn register_into(registry: &MetricsRegistry) -> Self {
135 Self {
136 last_cse_time_seconds: registry.register(metric!(
137 name: "mz_parameter_frontend_last_cse_time_seconds",
138 help: "The last known time when the LaunchDarkly client sent an event to the LaunchDarkly server (as unix timestamp).",
139 )),
140 last_sse_time_seconds: registry.register(metric!(
141 name: "mz_parameter_frontend_last_sse_time_seconds",
142 help: "The last known time when the LaunchDarkly client received an event from the LaunchDarkly server (as unix timestamp).",
143 )),
144 params_changed: registry.register(metric!(
145 name: "mz_parameter_frontend_params_changed",
146 help: "The number of parameter changes pulled from the LaunchDarkly frontend.",
147 )),
148 }
149 }
150}
151
152#[cfg(test)]
153mod tests {
154 use std::collections::BTreeMap;
155
156 use mz_cluster_client::ReplicaId;
157 use mz_controller_types::ClusterId;
158
159 use super::ScopedParameters;
160
161 fn cfg(name: &str, value: &str) -> BTreeMap<String, String> {
162 BTreeMap::from([(name.to_string(), value.to_string())])
163 }
164
165 #[mz_ore::test]
166 fn test_scoped_parameters_is_empty() {
167 assert!(ScopedParameters::default().is_empty());
168
169 let mut params = ScopedParameters::default();
170 params.cluster.insert(ClusterId::User(1), cfg("f", "true"));
171 assert!(!params.is_empty());
172
173 let mut params = ScopedParameters::default();
174 params.replica.insert(ReplicaId::User(1), cfg("f", "true"));
175 assert!(!params.is_empty());
176 }
177
178 #[mz_ore::test]
179 fn test_scoped_parameters_merge() {
180 let mut base = ScopedParameters::default();
181 base.cluster.insert(ClusterId::User(1), cfg("f", "old"));
182 base.cluster.insert(ClusterId::User(2), cfg("f", "keep"));
183 base.replica.insert(ReplicaId::User(1), cfg("g", "old"));
184
185 let mut incoming = ScopedParameters::default();
186 incoming.cluster.insert(ClusterId::User(1), cfg("f", "new"));
188 incoming.replica.insert(ReplicaId::User(2), cfg("g", "new"));
190
191 let merged = base.merge(&incoming);
192
193 assert_eq!(merged.cluster[&ClusterId::User(1)], cfg("f", "new"));
195 assert_eq!(merged.cluster[&ClusterId::User(2)], cfg("f", "keep"));
197 assert_eq!(merged.replica[&ReplicaId::User(1)], cfg("g", "old"));
199 assert_eq!(merged.replica[&ReplicaId::User(2)], cfg("g", "new"));
200
201 assert_eq!(base.cluster[&ClusterId::User(1)], cfg("f", "old"));
203 }
204}