Skip to main content

mz_adapter/config/
params.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::BTreeSet;
11
12use mz_adapter_types::dyncfgs::ENABLE_SCOPED_SYSTEM_PARAMETERS;
13use mz_sql::session::vars::{ENABLE_LAUNCHDARKLY, SystemVars, Value, Var, VarInput};
14
15/// A struct that defines the system parameters that should be synchronized
16pub struct SynchronizedParameters {
17    /// The backing `SystemVars` instance. Synchronized parameters are exactly
18    /// those that are returned by [SystemVars::iter_synced].
19    system_vars: SystemVars,
20    /// A set of names identifying the synchronized variables from the above
21    /// `system_vars`.
22    ///
23    /// Derived from the above at construction time with the assumption that this
24    /// set cannot change during the lifecycle of the [SystemVars] instance.
25    synchronized: BTreeSet<&'static str>,
26    /// A set of names that identifies the synchronized parameters that have been
27    /// modified by the frontend and need to be pushed to backend.
28    modified: BTreeSet<&'static str>,
29}
30
31impl Default for SynchronizedParameters {
32    fn default() -> Self {
33        Self::new(SystemVars::default())
34    }
35}
36
37impl SynchronizedParameters {
38    pub fn new(system_vars: SystemVars) -> Self {
39        let synchronized = system_vars
40            .iter_synced()
41            .map(|v| v.name())
42            .collect::<BTreeSet<_>>();
43        Self {
44            system_vars,
45            synchronized,
46            modified: BTreeSet::new(),
47        }
48    }
49
50    pub fn is_synchronized(&self, name: &str) -> bool {
51        self.synchronized.contains(name)
52    }
53
54    /// Return a clone of the set of names of synchronized values.
55    ///
56    /// Mostly useful when we need to iterate over each value, while still
57    /// maintaining a mutable reference of the surrounding
58    /// [SynchronizedParameters] instance.
59    pub fn synchronized(&self) -> BTreeSet<&'static str> {
60        self.synchronized.clone()
61    }
62
63    /// Return a vector of [ModifiedParameter] instances that need to be pushed
64    /// to the backend and reset this set to the empty set for future calls.
65    ///
66    /// The set will start growing again as soon as we modify a parameter from
67    /// the `synchronized` set with a [SynchronizedParameters::modify] call.
68    pub fn modified(&mut self) -> Vec<ModifiedParameter> {
69        let mut modified = BTreeSet::new();
70        std::mem::swap(&mut self.modified, &mut modified);
71        self.system_vars
72            .iter_synced()
73            .filter(move |var| modified.contains(var.name()))
74            .map(|var| {
75                let name = var.name().to_string();
76                let value = var.value();
77                let is_default = self.system_vars.is_default(&name, VarInput::Flat(&value)).expect("This will never panic because both the name and the value come from a `Var` instance");
78                ModifiedParameter {
79                    name,
80                    value,
81                    is_default,
82                }
83            })
84            .collect()
85    }
86
87    /// Get the current in-memory value for the parameter identified by the
88    /// given `name`.
89    ///
90    /// # Panics
91    ///
92    /// The method will panic if the name does not refer to a valid parameter.
93    pub fn get(&self, name: &str) -> String {
94        self.system_vars
95            .get(name)
96            .expect("valid system parameter name")
97            .value()
98    }
99
100    /// Canonicalize a raw `value` for the parameter `name` to the same formatted
101    /// form [`SynchronizedParameters::get`] returns, by parsing it through the
102    /// system var and re-formatting.
103    ///
104    /// This lets values that are equal but differently encoded compare equal.
105    /// For example LaunchDarkly serves a boolean as `"true"`, while the canonical
106    /// formatting of a `bool` system var is `"on"`. Returns `None` if `name` is
107    /// not a valid parameter or `value` does not parse for it.
108    pub fn canonicalize(&self, name: &str, value: &str) -> Option<String> {
109        self.system_vars
110            .parse(name, VarInput::Flat(value))
111            .ok()
112            .map(|value| value.format())
113    }
114
115    /// Try to modify the in-memory entry for `name` in the SystemVars backing
116    /// this [SynchronizedParameters] instance.
117    ///
118    /// This will call `SystemVars::reset` iff `value` is the default for this
119    /// `name` and `SystemVars::set` otherwise.
120    ///
121    /// As a side effect, the modified set will be changed to contain `name` iff
122    /// the in-memory entry for `name` was modified **and** `name` is in the
123    /// `synchronized` set.
124    ///
125    /// Return `true` iff the backing in-memory value for this `name` has
126    /// changed.
127    pub fn modify(&mut self, name: &str, value: &str) -> bool {
128        // It's OK to call `unwrap_or(false)` here because for fixed `name`
129        // and `value` an error in `self.is_default(name, value)` implies
130        // the same error in `self.system_vars.set(name, value)`.
131        let value = VarInput::Flat(value);
132        let modified = if self.system_vars.is_default(name, value).unwrap_or(false) {
133            self.system_vars.reset(name)
134        } else {
135            self.system_vars.set(name, value)
136        };
137
138        match modified {
139            Ok(true) => {
140                // Track modified parameters from the "synchronized" set.
141                if let Some(name) = self.synchronized.get(name) {
142                    self.modified.insert(name);
143                }
144                true
145            }
146            Ok(false) => {
147                // The value was the same as the current one.
148                false
149            }
150            Err(e) => {
151                tracing::error!("cannot modify system parameter {}: {}", name, e);
152                false
153            }
154        }
155    }
156
157    pub fn enable_launchdarkly(&self) -> bool {
158        let var_name = self.get(ENABLE_LAUNCHDARKLY.name());
159        let var_input = VarInput::Flat(&var_name);
160        bool::parse(var_input).expect("This is known to be a bool")
161    }
162
163    /// Whether scoped (per-cluster and per-replica) system parameters are
164    /// evaluated. Read from this working copy so the sync loop can gate the
165    /// scoped reconcile without taking a catalog snapshot.
166    pub fn enable_scoped_system_parameters(&self) -> bool {
167        let var_name = self.get(ENABLE_SCOPED_SYSTEM_PARAMETERS.name());
168        let var_input = VarInput::Flat(&var_name);
169        bool::parse(var_input).expect("This is known to be a bool")
170    }
171}
172
173pub struct ModifiedParameter {
174    pub name: String,
175    pub value: String,
176    pub is_default: bool,
177}
178
179#[cfg(test)]
180mod tests {
181    use mz_sql::session::vars::SystemVars;
182
183    use super::SynchronizedParameters;
184
185    #[mz_ore::test]
186    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `decNumberFromInt32` on OS `linux`
187    fn test_github_18189() {
188        let vars = SystemVars::default();
189        let mut sync = SynchronizedParameters::new(vars);
190        assert!(sync.modify("allowed_cluster_replica_sizes", "1,2"));
191        assert_eq!(sync.get("allowed_cluster_replica_sizes"), r#""1", "2""#);
192        assert!(sync.modify("allowed_cluster_replica_sizes", ""));
193        assert_eq!(sync.get("allowed_cluster_replica_sizes"), "");
194    }
195
196    #[mz_ore::test]
197    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `decNumberFromInt32` on OS `linux`
198    fn test_canonicalize_bridges_bool_encodings() {
199        let vars = SystemVars::default();
200        let sync = SynchronizedParameters::new(vars);
201
202        // A `bool` system var formats canonically as "on"/"off", while
203        // LaunchDarkly serves booleans as "true"/"false". The scoped
204        // differs-from-env test compares a raw LD value against `get()`, so
205        // canonicalization must bridge the two spellings, otherwise every
206        // boolean flag would register as differing, even on a FALLTHROUGH that
207        // serves the env-wide value. (`enable_eager_delta_joins` is a scoped
208        // `bool` feature flag, default off.)
209        let name = "enable_eager_delta_joins";
210        let off = sync.get(name);
211        assert_eq!(off, "off");
212
213        // The LD spellings canonicalize to the same form as the var's own.
214        assert_eq!(sync.canonicalize(name, "false").as_deref(), Some("off"));
215        assert_eq!(sync.canonicalize(name, "true").as_deref(), Some("on"));
216        assert_eq!(
217            sync.canonicalize(name, "off"),
218            sync.canonicalize(name, "false")
219        );
220        assert_eq!(
221            sync.canonicalize(name, "on"),
222            sync.canonicalize(name, "true")
223        );
224
225        // The crux: a scoped "false" must match the env-wide "off" baseline, so
226        // it is dropped rather than recorded as a spurious override.
227        assert_eq!(
228            sync.canonicalize(name, "false").as_deref(),
229            Some(off.as_str())
230        );
231
232        // An unparseable value yields `None`. The scoped recording path relies
233        // on this to *skip* such values rather than store them: a stored
234        // unparseable bool would later panic the optimizer's decode on every
235        // plan. See `SystemParameterFrontend::evaluate_scoped_overrides`.
236        assert_eq!(sync.canonicalize(name, "not-a-bool"), None);
237    }
238
239    #[mz_ore::test]
240    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `decNumberFromInt32` on OS `linux`
241    fn test_vars_are_synced() {
242        let vars = SystemVars::default();
243        let sync = SynchronizedParameters::new(vars);
244
245        assert!(!sync.synchronized().is_empty());
246    }
247}