mz_adapter/config/
params.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::BTreeSet;
11
12use mz_sql::session::vars::{ENABLE_LAUNCHDARKLY, SystemVars, Value, Var, VarInput};
13
14/// A struct that defines the system parameters that should be synchronized
15pub struct SynchronizedParameters {
16    /// The backing `SystemVars` instance. Synchronized parameters are exactly
17    /// those that are returned by [SystemVars::iter_synced].
18    system_vars: SystemVars,
19    /// A set of names identifying the synchronized variables from the above
20    /// `system_vars`.
21    ///
22    /// Derived from the above at construction time with the assumption that this
23    /// set cannot change during the lifecycle of the [SystemVars] instance.
24    synchronized: BTreeSet<&'static str>,
25    /// A set of names that identifies the synchronized parameters that have been
26    /// modified by the frontend and need to be pushed to backend.
27    modified: BTreeSet<&'static str>,
28}
29
30impl Default for SynchronizedParameters {
31    fn default() -> Self {
32        Self::new(SystemVars::default())
33    }
34}
35
36impl SynchronizedParameters {
37    pub fn new(system_vars: SystemVars) -> Self {
38        let synchronized = system_vars
39            .iter_synced()
40            .map(|v| v.name())
41            .collect::<BTreeSet<_>>();
42        Self {
43            system_vars,
44            synchronized,
45            modified: BTreeSet::new(),
46        }
47    }
48
49    pub fn is_synchronized(&self, name: &str) -> bool {
50        self.synchronized.contains(name)
51    }
52
53    /// Return a clone of the set of names of synchronized values.
54    ///
55    /// Mostly useful when we need to iterate over each value, while still
56    /// maintaining a mutable reference of the surrounding
57    /// [SynchronizedParameters] instance.
58    pub fn synchronized(&self) -> BTreeSet<&'static str> {
59        self.synchronized.clone()
60    }
61
62    /// Return a vector of [ModifiedParameter] instances that need to be pushed
63    /// to the backend and reset this set to the empty set for future calls.
64    ///
65    /// The set will start growing again as soon as we modify a parameter from
66    /// the `synchronized` set with a [SynchronizedParameters::modify] call.
67    pub fn modified(&mut self) -> Vec<ModifiedParameter> {
68        let mut modified = BTreeSet::new();
69        std::mem::swap(&mut self.modified, &mut modified);
70        self.system_vars
71            .iter_synced()
72            .filter(move |var| modified.contains(var.name()))
73            .map(|var| {
74                let name = var.name().to_string();
75                let value = var.value();
76                let is_default = self.system_vars.is_default(&name, VarInput::Flat(&value)).expect("This will never panic because both the name and the value come from a `Var` instance");
77                ModifiedParameter {
78                    name,
79                    value,
80                    is_default,
81                }
82            })
83            .collect()
84    }
85
86    /// Get the current in-memory value for the parameter identified by the
87    /// given `name`.
88    ///
89    /// # Panics
90    ///
91    /// The method will panic if the name does not refer to a valid parameter.
92    pub fn get(&self, name: &str) -> String {
93        self.system_vars
94            .get(name)
95            .expect("valid system parameter name")
96            .value()
97    }
98
99    /// Try to modify the in-memory entry for `name` in the SystemVars backing
100    /// this [SynchronizedParameters] instace.
101    ///
102    /// This will call `SystemVars::reset` iff `value` is the default for this
103    /// `name` and `SystemVars::set` otherwise.
104    ///
105    /// As a side effect, the modified set will be changed to contain `name` iff
106    /// the in-memory entry for `name` was modified **and** `name` is in the
107    /// `synchronized` set.
108    ///
109    /// Return `true` iff the backing in-memory value for this `name` has
110    /// changed.
111    pub fn modify(&mut self, name: &str, value: &str) -> bool {
112        // It's OK to call `unwrap_or(false)` here because for fixed `name`
113        // and `value` an error in `self.is_default(name, value)` implies
114        // the same error in `self.system_vars.set(name, value)`.
115        let value = VarInput::Flat(value);
116        let modified = if self.system_vars.is_default(name, value).unwrap_or(false) {
117            self.system_vars.reset(name)
118        } else {
119            self.system_vars.set(name, value)
120        };
121
122        match modified {
123            Ok(true) => {
124                // Track modified parameters from the "synchronized" set.
125                if let Some(name) = self.synchronized.get(name) {
126                    self.modified.insert(name);
127                }
128                true
129            }
130            Ok(false) => {
131                // The value was the same as the current one.
132                false
133            }
134            Err(e) => {
135                tracing::error!("cannot modify system parameter {}: {}", name, e);
136                false
137            }
138        }
139    }
140
141    pub fn enable_launchdarkly(&self) -> bool {
142        let var_name = self.get(ENABLE_LAUNCHDARKLY.name());
143        let var_input = VarInput::Flat(&var_name);
144        bool::parse(var_input).expect("This is known to be a bool")
145    }
146}
147
148pub struct ModifiedParameter {
149    pub name: String,
150    pub value: String,
151    pub is_default: bool,
152}
153
154#[cfg(test)]
155mod tests {
156    use mz_sql::session::vars::SystemVars;
157
158    use super::SynchronizedParameters;
159
160    #[mz_ore::test]
161    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `decNumberFromInt32` on OS `linux`
162    fn test_github_18189() {
163        let vars = SystemVars::default();
164        let mut sync = SynchronizedParameters::new(vars);
165        assert!(sync.modify("allowed_cluster_replica_sizes", "1,2"));
166        assert_eq!(sync.get("allowed_cluster_replica_sizes"), r#""1", "2""#);
167        assert!(sync.modify("allowed_cluster_replica_sizes", ""));
168        assert_eq!(sync.get("allowed_cluster_replica_sizes"), "");
169    }
170
171    #[mz_ore::test]
172    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `decNumberFromInt32` on OS `linux`
173    fn test_vars_are_synced() {
174        let vars = SystemVars::default();
175        let sync = SynchronizedParameters::new(vars);
176
177        assert!(!sync.synchronized().is_empty());
178    }
179}