Skip to main content

mz_adapter/
config.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::BTreeMap;
11use std::path::PathBuf;
12
13use mz_build_info::BuildInfo;
14use mz_cluster_client::ReplicaId;
15use mz_controller_types::ClusterId;
16use mz_ore::metric;
17use mz_ore::metrics::{MetricsRegistry, UIntGauge};
18use mz_ore::now::NowFn;
19use mz_sql::catalog::EnvironmentId;
20use prometheus::IntCounter;
21
22mod backend;
23mod frontend;
24mod params;
25mod sync;
26
27pub use backend::SystemParameterBackend;
28pub use frontend::SystemParameterFrontend;
29pub use params::{ModifiedParameter, SynchronizedParameters};
30pub use sync::system_parameter_sync;
31
32/// Scoped (per-cluster and per-replica) system-parameter overrides, keyed by
33/// object id. Each value is the raw (unparsed) string for a parameter whose
34/// scoped value differs from the environment-wide value; an absent entry means
35/// no override. Empty maps mean no scoped overrides at all.
36///
37/// This is the in-memory mirror of the durable `cluster_system_configurations`
38/// and `replica_system_configurations` catalog collections.
39#[derive(Clone, Debug, Default, PartialEq, Eq)]
40pub struct ScopedParameters {
41    /// Cluster-coherent overrides, keyed by cluster id.
42    pub cluster: BTreeMap<ClusterId, BTreeMap<String, String>>,
43    /// Replica-local overrides, keyed by replica id.
44    pub replica: BTreeMap<ReplicaId, BTreeMap<String, String>>,
45}
46
47impl ScopedParameters {
48    /// Returns `true` if there are no cluster or replica overrides.
49    pub fn is_empty(&self) -> bool {
50        self.cluster.is_empty() && self.replica.is_empty()
51    }
52
53    /// Returns a copy of `self` with `other`'s entries merged in, replacing any
54    /// existing entry for the same object. Expresses no removals.
55    pub fn merge(&self, other: &ScopedParameters) -> ScopedParameters {
56        let mut merged = self.clone();
57        merged
58            .cluster
59            .extend(other.cluster.iter().map(|(id, v)| (*id, v.clone())));
60        merged
61            .replica
62            .extend(other.replica.iter().map(|(id, v)| (*id, v.clone())));
63        merged
64    }
65}
66
67/// A factory for [SystemParameterFrontend] instances.
68#[derive(Clone, Debug)]
69pub struct SystemParameterSyncConfig {
70    /// The environment ID that should identify connected clients.
71    env_id: EnvironmentId,
72    /// Build info for the environment running this.
73    build_info: &'static BuildInfo,
74    /// Parameter sync metrics.
75    metrics: Metrics,
76    ///  /// A map from parameter names to LaunchDarkly feature keys
77    /// to use when populating the [SynchronizedParameters]
78    /// instance in [SystemParameterFrontend::pull].
79    key_map: BTreeMap<String, String>,
80    /// Configuration for the parameter backend that we're syncing with.
81    backend_config: SystemParameterSyncClientConfig,
82}
83
84#[derive(Clone, Debug)]
85pub enum SystemParameterSyncClientConfig {
86    File {
87        // Path to a JSON config file that contains system parameters.
88        path: PathBuf,
89    },
90    LaunchDarkly {
91        /// The LaunchDarkly SDK key
92        sdk_key: String,
93        /// Function to return the current time.
94        now_fn: NowFn,
95    },
96}
97
98impl SystemParameterSyncClientConfig {
99    fn is_launch_darkly(&self) -> bool {
100        match &self {
101            Self::LaunchDarkly { .. } => true,
102            Self::File { .. } => false,
103        }
104    }
105}
106
107impl SystemParameterSyncConfig {
108    /// Construct a new [SystemParameterFrontend] instance.
109    pub fn new(
110        env_id: EnvironmentId,
111        build_info: &'static BuildInfo,
112        registry: &MetricsRegistry,
113        key_map: BTreeMap<String, String>,
114        backend_config: SystemParameterSyncClientConfig,
115    ) -> Self {
116        Self {
117            env_id,
118            build_info,
119            metrics: Metrics::register_into(registry),
120            key_map,
121            backend_config,
122        }
123    }
124}
125
126#[derive(Debug, Clone)]
127pub(super) struct Metrics {
128    pub last_cse_time_seconds: UIntGauge,
129    pub last_sse_time_seconds: UIntGauge,
130    pub params_changed: IntCounter,
131}
132
133impl Metrics {
134    pub(super) fn register_into(registry: &MetricsRegistry) -> Self {
135        Self {
136            last_cse_time_seconds: registry.register(metric!(
137                name: "mz_parameter_frontend_last_cse_time_seconds",
138                help: "The last known time when the LaunchDarkly client sent an event to the LaunchDarkly server (as unix timestamp).",
139            )),
140            last_sse_time_seconds: registry.register(metric!(
141                name: "mz_parameter_frontend_last_sse_time_seconds",
142                help: "The last known time when the LaunchDarkly client received an event from the LaunchDarkly server (as unix timestamp).",
143            )),
144            params_changed: registry.register(metric!(
145                name: "mz_parameter_frontend_params_changed",
146                help: "The number of parameter changes pulled from the LaunchDarkly frontend.",
147            )),
148        }
149    }
150}
151
152#[cfg(test)]
153mod tests {
154    use std::collections::BTreeMap;
155
156    use mz_cluster_client::ReplicaId;
157    use mz_controller_types::ClusterId;
158
159    use super::ScopedParameters;
160
161    fn cfg(name: &str, value: &str) -> BTreeMap<String, String> {
162        BTreeMap::from([(name.to_string(), value.to_string())])
163    }
164
165    #[mz_ore::test]
166    fn test_scoped_parameters_is_empty() {
167        assert!(ScopedParameters::default().is_empty());
168
169        let mut params = ScopedParameters::default();
170        params.cluster.insert(ClusterId::User(1), cfg("f", "true"));
171        assert!(!params.is_empty());
172
173        let mut params = ScopedParameters::default();
174        params.replica.insert(ReplicaId::User(1), cfg("f", "true"));
175        assert!(!params.is_empty());
176    }
177
178    #[mz_ore::test]
179    fn test_scoped_parameters_merge() {
180        let mut base = ScopedParameters::default();
181        base.cluster.insert(ClusterId::User(1), cfg("f", "old"));
182        base.cluster.insert(ClusterId::User(2), cfg("f", "keep"));
183        base.replica.insert(ReplicaId::User(1), cfg("g", "old"));
184
185        let mut incoming = ScopedParameters::default();
186        // Overrides the existing entry for the same object...
187        incoming.cluster.insert(ClusterId::User(1), cfg("f", "new"));
188        // ...and adds a new object, leaving others untouched.
189        incoming.replica.insert(ReplicaId::User(2), cfg("g", "new"));
190
191        let merged = base.merge(&incoming);
192
193        // Replaced.
194        assert_eq!(merged.cluster[&ClusterId::User(1)], cfg("f", "new"));
195        // Untouched object retained (merge does not express removals).
196        assert_eq!(merged.cluster[&ClusterId::User(2)], cfg("f", "keep"));
197        // Pre-existing replica retained, new replica added.
198        assert_eq!(merged.replica[&ReplicaId::User(1)], cfg("g", "old"));
199        assert_eq!(merged.replica[&ReplicaId::User(2)], cfg("g", "new"));
200
201        // The original is unchanged (merge returns a copy).
202        assert_eq!(base.cluster[&ClusterId::User(1)], cfg("f", "old"));
203    }
204}