1use std::collections::{BTreeMap, BTreeSet};
11use std::path::PathBuf;
12
13use mz_build_info::BuildInfo;
14use mz_cluster_client::ReplicaId;
15use mz_controller_types::ClusterId;
16use mz_ore::metric;
17use mz_ore::metrics::{MetricsRegistry, UIntGauge};
18use mz_ore::now::NowFn;
19use mz_sql::catalog::EnvironmentId;
20use prometheus::IntCounter;
21
22mod backend;
23mod frontend;
24mod params;
25mod sync;
26
27pub use backend::SystemParameterBackend;
28pub use frontend::{
29 ClusterEvalContext, ClusterScopeContext, ReplicaEvalContext, ReplicaScopeContext,
30 SystemParameterFrontend,
31};
32pub use params::{ModifiedParameter, SynchronizedParameters};
33pub(crate) use sync::evaluate_scoped_parameters;
34pub use sync::system_parameter_sync;
35
36#[derive(Clone, Debug, Default, PartialEq, Eq)]
44pub struct ScopedParameters {
45 pub cluster: BTreeMap<ClusterId, BTreeMap<String, String>>,
47 pub replica: BTreeMap<ReplicaId, BTreeMap<String, String>>,
49}
50
51#[derive(Clone, Debug, Default, PartialEq, Eq)]
60pub struct ScopedParametersScope {
61 pub clusters: BTreeSet<ClusterId>,
63 pub replicas: BTreeSet<ReplicaId>,
65}
66
67impl ScopedParameters {
68 pub fn is_empty(&self) -> bool {
70 self.cluster.is_empty() && self.replica.is_empty()
71 }
72
73 pub fn merge(&self, other: &ScopedParameters) -> ScopedParameters {
76 let mut merged = self.clone();
77 merged
78 .cluster
79 .extend(other.cluster.iter().map(|(id, v)| (*id, v.clone())));
80 merged
81 .replica
82 .extend(other.replica.iter().map(|(id, v)| (*id, v.clone())));
83 merged
84 }
85}
86
87#[derive(Clone, Debug)]
89pub struct SystemParameterSyncConfig {
90 env_id: EnvironmentId,
92 build_info: &'static BuildInfo,
94 metrics: Metrics,
96 key_map: BTreeMap<String, String>,
100 backend_config: SystemParameterSyncClientConfig,
102}
103
104#[derive(Clone, Debug)]
105pub enum SystemParameterSyncClientConfig {
106 File {
107 path: PathBuf,
109 },
110 LaunchDarkly {
111 sdk_key: String,
113 now_fn: NowFn,
115 },
116}
117
118impl SystemParameterSyncClientConfig {
119 fn is_launch_darkly(&self) -> bool {
120 match &self {
121 Self::LaunchDarkly { .. } => true,
122 Self::File { .. } => false,
123 }
124 }
125}
126
127impl SystemParameterSyncConfig {
128 pub fn new(
130 env_id: EnvironmentId,
131 build_info: &'static BuildInfo,
132 registry: &MetricsRegistry,
133 key_map: BTreeMap<String, String>,
134 backend_config: SystemParameterSyncClientConfig,
135 ) -> Self {
136 Self {
137 env_id,
138 build_info,
139 metrics: Metrics::register_into(registry),
140 key_map,
141 backend_config,
142 }
143 }
144}
145
146#[derive(Debug, Clone)]
147pub(super) struct Metrics {
148 pub last_cse_time_seconds: UIntGauge,
149 pub last_sse_time_seconds: UIntGauge,
150 pub params_changed: IntCounter,
151}
152
153impl Metrics {
154 pub(super) fn register_into(registry: &MetricsRegistry) -> Self {
155 Self {
156 last_cse_time_seconds: registry.register(metric!(
157 name: "mz_parameter_frontend_last_cse_time_seconds",
158 help: "The last known time when the LaunchDarkly client sent an event to the LaunchDarkly server (as unix timestamp).",
159 )),
160 last_sse_time_seconds: registry.register(metric!(
161 name: "mz_parameter_frontend_last_sse_time_seconds",
162 help: "The last known time when the LaunchDarkly client received an event from the LaunchDarkly server (as unix timestamp).",
163 )),
164 params_changed: registry.register(metric!(
165 name: "mz_parameter_frontend_params_changed",
166 help: "The number of parameter changes pulled from the LaunchDarkly frontend.",
167 )),
168 }
169 }
170}
171
172#[cfg(test)]
173mod tests {
174 use std::collections::BTreeMap;
175
176 use mz_cluster_client::ReplicaId;
177 use mz_controller_types::ClusterId;
178
179 use super::ScopedParameters;
180
181 fn cfg(name: &str, value: &str) -> BTreeMap<String, String> {
182 BTreeMap::from([(name.to_string(), value.to_string())])
183 }
184
185 #[mz_ore::test]
186 fn test_scoped_parameters_is_empty() {
187 assert!(ScopedParameters::default().is_empty());
188
189 let mut params = ScopedParameters::default();
190 params.cluster.insert(ClusterId::User(1), cfg("f", "true"));
191 assert!(!params.is_empty());
192
193 let mut params = ScopedParameters::default();
194 params.replica.insert(ReplicaId::User(1), cfg("f", "true"));
195 assert!(!params.is_empty());
196 }
197
198 #[mz_ore::test]
199 fn test_scoped_parameters_merge() {
200 let mut base = ScopedParameters::default();
201 base.cluster.insert(ClusterId::User(1), cfg("f", "old"));
202 base.cluster.insert(ClusterId::User(2), cfg("f", "keep"));
203 base.replica.insert(ReplicaId::User(1), cfg("g", "old"));
204
205 let mut incoming = ScopedParameters::default();
206 incoming.cluster.insert(ClusterId::User(1), cfg("f", "new"));
208 incoming.replica.insert(ReplicaId::User(2), cfg("g", "new"));
210
211 let merged = base.merge(&incoming);
212
213 assert_eq!(merged.cluster[&ClusterId::User(1)], cfg("f", "new"));
215 assert_eq!(merged.cluster[&ClusterId::User(2)], cfg("f", "keep"));
217 assert_eq!(merged.replica[&ReplicaId::User(1)], cfg("g", "old"));
219 assert_eq!(merged.replica[&ReplicaId::User(2)], cfg("g", "new"));
220
221 assert_eq!(base.cluster[&ClusterId::User(1)], cfg("f", "old"));
223 }
224}