mz_adapter/config/params.rs
1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::BTreeSet;
11
12use mz_sql::session::vars::{ENABLE_LAUNCHDARKLY, SystemVars, Value, Var, VarInput};
13
14/// A struct that defines the system parameters that should be synchronized
15pub struct SynchronizedParameters {
16 /// The backing `SystemVars` instance. Synchronized parameters are exactly
17 /// those that are returned by [SystemVars::iter_synced].
18 system_vars: SystemVars,
19 /// A set of names identifying the synchronized variables from the above
20 /// `system_vars`.
21 ///
22 /// Derived from the above at construction time with the assumption that this
23 /// set cannot change during the lifecycle of the [SystemVars] instance.
24 synchronized: BTreeSet<&'static str>,
25 /// A set of names that identifies the synchronized parameters that have been
26 /// modified by the frontend and need to be pushed to backend.
27 modified: BTreeSet<&'static str>,
28}
29
30impl Default for SynchronizedParameters {
31 fn default() -> Self {
32 Self::new(SystemVars::default())
33 }
34}
35
36impl SynchronizedParameters {
37 pub fn new(system_vars: SystemVars) -> Self {
38 let synchronized = system_vars
39 .iter_synced()
40 .map(|v| v.name())
41 .collect::<BTreeSet<_>>();
42 Self {
43 system_vars,
44 synchronized,
45 modified: BTreeSet::new(),
46 }
47 }
48
49 pub fn is_synchronized(&self, name: &str) -> bool {
50 self.synchronized.contains(name)
51 }
52
53 /// Return a clone of the set of names of synchronized values.
54 ///
55 /// Mostly useful when we need to iterate over each value, while still
56 /// maintaining a mutable reference of the surrounding
57 /// [SynchronizedParameters] instance.
58 pub fn synchronized(&self) -> BTreeSet<&'static str> {
59 self.synchronized.clone()
60 }
61
62 /// Return a vector of [ModifiedParameter] instances that need to be pushed
63 /// to the backend and reset this set to the empty set for future calls.
64 ///
65 /// The set will start growing again as soon as we modify a parameter from
66 /// the `synchronized` set with a [SynchronizedParameters::modify] call.
67 pub fn modified(&mut self) -> Vec<ModifiedParameter> {
68 let mut modified = BTreeSet::new();
69 std::mem::swap(&mut self.modified, &mut modified);
70 self.system_vars
71 .iter_synced()
72 .filter(move |var| modified.contains(var.name()))
73 .map(|var| {
74 let name = var.name().to_string();
75 let value = var.value();
76 let is_default = self.system_vars.is_default(&name, VarInput::Flat(&value)).expect("This will never panic because both the name and the value come from a `Var` instance");
77 ModifiedParameter {
78 name,
79 value,
80 is_default,
81 }
82 })
83 .collect()
84 }
85
86 /// Get the current in-memory value for the parameter identified by the
87 /// given `name`.
88 ///
89 /// # Panics
90 ///
91 /// The method will panic if the name does not refer to a valid parameter.
92 pub fn get(&self, name: &str) -> String {
93 self.system_vars
94 .get(name)
95 .expect("valid system parameter name")
96 .value()
97 }
98
99 /// Try to modify the in-memory entry for `name` in the SystemVars backing
100 /// this [SynchronizedParameters] instace.
101 ///
102 /// This will call `SystemVars::reset` iff `value` is the default for this
103 /// `name` and `SystemVars::set` otherwise.
104 ///
105 /// As a side effect, the modified set will be changed to contain `name` iff
106 /// the in-memory entry for `name` was modified **and** `name` is in the
107 /// `synchronized` set.
108 ///
109 /// Return `true` iff the backing in-memory value for this `name` has
110 /// changed.
111 pub fn modify(&mut self, name: &str, value: &str) -> bool {
112 // It's OK to call `unwrap_or(false)` here because for fixed `name`
113 // and `value` an error in `self.is_default(name, value)` implies
114 // the same error in `self.system_vars.set(name, value)`.
115 let value = VarInput::Flat(value);
116 let modified = if self.system_vars.is_default(name, value).unwrap_or(false) {
117 self.system_vars.reset(name)
118 } else {
119 self.system_vars.set(name, value)
120 };
121
122 match modified {
123 Ok(true) => {
124 // Track modified parameters from the "synchronized" set.
125 if let Some(name) = self.synchronized.get(name) {
126 self.modified.insert(name);
127 }
128 true
129 }
130 Ok(false) => {
131 // The value was the same as the current one.
132 false
133 }
134 Err(e) => {
135 tracing::error!("cannot modify system parameter {}: {}", name, e);
136 false
137 }
138 }
139 }
140
141 pub fn enable_launchdarkly(&self) -> bool {
142 let var_name = self.get(ENABLE_LAUNCHDARKLY.name());
143 let var_input = VarInput::Flat(&var_name);
144 bool::parse(var_input).expect("This is known to be a bool")
145 }
146}
147
148pub struct ModifiedParameter {
149 pub name: String,
150 pub value: String,
151 pub is_default: bool,
152}
153
154#[cfg(test)]
155mod tests {
156 use mz_sql::session::vars::SystemVars;
157
158 use super::SynchronizedParameters;
159
160 #[mz_ore::test]
161 #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `decNumberFromInt32` on OS `linux`
162 fn test_github_18189() {
163 let vars = SystemVars::default();
164 let mut sync = SynchronizedParameters::new(vars);
165 assert!(sync.modify("allowed_cluster_replica_sizes", "1,2"));
166 assert_eq!(sync.get("allowed_cluster_replica_sizes"), r#""1", "2""#);
167 assert!(sync.modify("allowed_cluster_replica_sizes", ""));
168 assert_eq!(sync.get("allowed_cluster_replica_sizes"), "");
169 }
170
171 #[mz_ore::test]
172 #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `decNumberFromInt32` on OS `linux`
173 fn test_vars_are_synced() {
174 let vars = SystemVars::default();
175 let sync = SynchronizedParameters::new(vars);
176
177 assert!(!sync.synchronized().is_empty());
178 }
179}