Skip to main content

mz_adapter/
config.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::{BTreeMap, BTreeSet};
11use std::path::PathBuf;
12
13use mz_build_info::BuildInfo;
14use mz_cluster_client::ReplicaId;
15use mz_controller_types::ClusterId;
16use mz_ore::metric;
17use mz_ore::metrics::{MetricsRegistry, UIntGauge};
18use mz_ore::now::NowFn;
19use mz_sql::catalog::EnvironmentId;
20use prometheus::IntCounter;
21
22mod backend;
23mod frontend;
24mod params;
25mod sync;
26
27pub use backend::SystemParameterBackend;
28pub use frontend::{
29    ClusterEvalContext, ClusterScopeContext, ReplicaEvalContext, ReplicaScopeContext,
30    SystemParameterFrontend,
31};
32pub use params::{ModifiedParameter, SynchronizedParameters};
33pub(crate) use sync::evaluate_scoped_parameters;
34pub use sync::system_parameter_sync;
35
36/// Scoped (per-cluster and per-replica) system-parameter overrides, keyed by
37/// object id. Each value is the raw (unparsed) string for a parameter whose
38/// scoped value differs from the environment-wide value; an absent entry means
39/// no override. Empty maps mean no scoped overrides at all.
40///
41/// This is the in-memory mirror of the durable `cluster_system_configurations`
42/// and `replica_system_configurations` catalog collections.
43#[derive(Clone, Debug, Default, PartialEq, Eq)]
44pub struct ScopedParameters {
45    /// Cluster-coherent overrides, keyed by cluster id.
46    pub cluster: BTreeMap<ClusterId, BTreeMap<String, String>>,
47    /// Replica-local overrides, keyed by replica id.
48    pub replica: BTreeMap<ReplicaId, BTreeMap<String, String>>,
49}
50
51/// The set of objects a [`ScopedParameters`] update was evaluated for, used to
52/// bound which durable override rows the update may prune.
53///
54/// The update is authoritative only for objects in this set. The durable apply
55/// removes a row only when its owning object is in scope and the update no
56/// longer carries that override, so an object created after the update's
57/// evaluation snapshot, and the override it folded into its own create
58/// transaction, is not wiped by a concurrent full-state reconcile.
59#[derive(Clone, Debug, Default, PartialEq, Eq)]
60pub struct ScopedParametersScope {
61    /// Cluster ids whose rows the update may prune.
62    pub clusters: BTreeSet<ClusterId>,
63    /// Replica ids whose rows the update may prune.
64    pub replicas: BTreeSet<ReplicaId>,
65}
66
67impl ScopedParameters {
68    /// Returns `true` if there are no cluster or replica overrides.
69    pub fn is_empty(&self) -> bool {
70        self.cluster.is_empty() && self.replica.is_empty()
71    }
72
73    /// Returns a copy of `self` with `other`'s entries merged in, replacing any
74    /// existing entry for the same object. Expresses no removals.
75    pub fn merge(&self, other: &ScopedParameters) -> ScopedParameters {
76        let mut merged = self.clone();
77        merged
78            .cluster
79            .extend(other.cluster.iter().map(|(id, v)| (*id, v.clone())));
80        merged
81            .replica
82            .extend(other.replica.iter().map(|(id, v)| (*id, v.clone())));
83        merged
84    }
85}
86
87/// A factory for [SystemParameterFrontend] instances.
88#[derive(Clone, Debug)]
89pub struct SystemParameterSyncConfig {
90    /// The environment ID that should identify connected clients.
91    env_id: EnvironmentId,
92    /// Build info for the environment running this.
93    build_info: &'static BuildInfo,
94    /// Parameter sync metrics.
95    metrics: Metrics,
96    ///  /// A map from parameter names to LaunchDarkly feature keys
97    /// to use when populating the [SynchronizedParameters]
98    /// instance in [SystemParameterFrontend::pull].
99    key_map: BTreeMap<String, String>,
100    /// Configuration for the parameter backend that we're syncing with.
101    backend_config: SystemParameterSyncClientConfig,
102}
103
104#[derive(Clone, Debug)]
105pub enum SystemParameterSyncClientConfig {
106    File {
107        // Path to a JSON config file that contains system parameters.
108        path: PathBuf,
109    },
110    LaunchDarkly {
111        /// The LaunchDarkly SDK key
112        sdk_key: String,
113        /// Function to return the current time.
114        now_fn: NowFn,
115    },
116}
117
118impl SystemParameterSyncClientConfig {
119    fn is_launch_darkly(&self) -> bool {
120        match &self {
121            Self::LaunchDarkly { .. } => true,
122            Self::File { .. } => false,
123        }
124    }
125}
126
127impl SystemParameterSyncConfig {
128    /// Construct a new [SystemParameterFrontend] instance.
129    pub fn new(
130        env_id: EnvironmentId,
131        build_info: &'static BuildInfo,
132        registry: &MetricsRegistry,
133        key_map: BTreeMap<String, String>,
134        backend_config: SystemParameterSyncClientConfig,
135    ) -> Self {
136        Self {
137            env_id,
138            build_info,
139            metrics: Metrics::register_into(registry),
140            key_map,
141            backend_config,
142        }
143    }
144}
145
146#[derive(Debug, Clone)]
147pub(super) struct Metrics {
148    pub last_cse_time_seconds: UIntGauge,
149    pub last_sse_time_seconds: UIntGauge,
150    pub params_changed: IntCounter,
151}
152
153impl Metrics {
154    pub(super) fn register_into(registry: &MetricsRegistry) -> Self {
155        Self {
156            last_cse_time_seconds: registry.register(metric!(
157                name: "mz_parameter_frontend_last_cse_time_seconds",
158                help: "The last known time when the LaunchDarkly client sent an event to the LaunchDarkly server (as unix timestamp).",
159            )),
160            last_sse_time_seconds: registry.register(metric!(
161                name: "mz_parameter_frontend_last_sse_time_seconds",
162                help: "The last known time when the LaunchDarkly client received an event from the LaunchDarkly server (as unix timestamp).",
163            )),
164            params_changed: registry.register(metric!(
165                name: "mz_parameter_frontend_params_changed",
166                help: "The number of parameter changes pulled from the LaunchDarkly frontend.",
167            )),
168        }
169    }
170}
171
172#[cfg(test)]
173mod tests {
174    use std::collections::BTreeMap;
175
176    use mz_cluster_client::ReplicaId;
177    use mz_controller_types::ClusterId;
178
179    use super::ScopedParameters;
180
181    fn cfg(name: &str, value: &str) -> BTreeMap<String, String> {
182        BTreeMap::from([(name.to_string(), value.to_string())])
183    }
184
185    #[mz_ore::test]
186    fn test_scoped_parameters_is_empty() {
187        assert!(ScopedParameters::default().is_empty());
188
189        let mut params = ScopedParameters::default();
190        params.cluster.insert(ClusterId::User(1), cfg("f", "true"));
191        assert!(!params.is_empty());
192
193        let mut params = ScopedParameters::default();
194        params.replica.insert(ReplicaId::User(1), cfg("f", "true"));
195        assert!(!params.is_empty());
196    }
197
198    #[mz_ore::test]
199    fn test_scoped_parameters_merge() {
200        let mut base = ScopedParameters::default();
201        base.cluster.insert(ClusterId::User(1), cfg("f", "old"));
202        base.cluster.insert(ClusterId::User(2), cfg("f", "keep"));
203        base.replica.insert(ReplicaId::User(1), cfg("g", "old"));
204
205        let mut incoming = ScopedParameters::default();
206        // Overrides the existing entry for the same object...
207        incoming.cluster.insert(ClusterId::User(1), cfg("f", "new"));
208        // ...and adds a new object, leaving others untouched.
209        incoming.replica.insert(ReplicaId::User(2), cfg("g", "new"));
210
211        let merged = base.merge(&incoming);
212
213        // Replaced.
214        assert_eq!(merged.cluster[&ClusterId::User(1)], cfg("f", "new"));
215        // Untouched object retained (merge does not express removals).
216        assert_eq!(merged.cluster[&ClusterId::User(2)], cfg("f", "keep"));
217        // Pre-existing replica retained, new replica added.
218        assert_eq!(merged.replica[&ReplicaId::User(1)], cfg("g", "old"));
219        assert_eq!(merged.replica[&ReplicaId::User(2)], cfg("g", "new"));
220
221        // The original is unchanged (merge returns a copy).
222        assert_eq!(base.cluster[&ClusterId::User(1)], cfg("f", "old"));
223    }
224}