1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
// Copyright Materialize, Inc. and contributors. All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use std::collections::BTreeSet;

use mz_sql::session::vars::{SystemVars, Value, Var, VarInput, ENABLE_LAUNCHDARKLY};

/// A struct that defines the system parameters that should be synchronized
pub struct SynchronizedParameters {
    /// The backing `SystemVars` instance. Synchronized parameters are exactly
    /// those that are returned by [SystemVars::iter_synced].
    system_vars: SystemVars,
    /// A set of names identifying the synchronized variables from the above
    /// `system_vars`.
    ///
    /// Derived from the above at construction time with the assumption that this
    /// set cannot change during the lifecycle of the [SystemVars] instance.
    synchronized: BTreeSet<&'static str>,
    /// A set of names that identifies the synchronized parameters that have been
    /// modified by the frontend and need to be pushed to backend.
    modified: BTreeSet<&'static str>,
}

impl Default for SynchronizedParameters {
    fn default() -> Self {
        Self::new(SystemVars::default())
    }
}

impl SynchronizedParameters {
    pub fn new(system_vars: SystemVars) -> Self {
        let synchronized = system_vars
            .iter_synced()
            .map(|v| v.name())
            .collect::<BTreeSet<_>>();
        Self {
            system_vars,
            synchronized,
            modified: BTreeSet::new(),
        }
    }

    pub fn is_synchronized(&self, name: &str) -> bool {
        self.synchronized.contains(name)
    }

    /// Return a clone of the set of names of synchronized values.
    ///
    /// Mostly useful when we need to iterate over each value, while still
    /// maintaining a mutable reference of the surrounding
    /// [SynchronizedParameters] instance.
    pub fn synchronized(&self) -> BTreeSet<&'static str> {
        self.synchronized.clone()
    }

    /// Return a vector of [ModifiedParameter] instances that need to be pushed
    /// to the backend and reset this set to the empty set for future calls.
    ///
    /// The set will start growing again as soon as we modify a parameter from
    /// the `synchronized` set with a [SynchronizedParameters::modify] call.
    pub fn modified(&mut self) -> Vec<ModifiedParameter> {
        let mut modified = BTreeSet::new();
        std::mem::swap(&mut self.modified, &mut modified);
        self.system_vars
            .iter_synced()
            .filter(move |var| modified.contains(var.name()))
            .map(|var| {
                let name = var.name().to_string();
                let value = var.value();
                let is_default = self.system_vars.is_default(&name, VarInput::Flat(&value)).expect("This will never panic because both the name and the value come from a `Var` instance");
                ModifiedParameter {
                    name,
                    value,
                    is_default,
                }
            })
            .collect()
    }

    /// Get the current in-memory value for the parameter identified by the
    /// given `name`.
    ///
    /// # Panics
    ///
    /// The method will panic if the name does not refer to a valid parameter.
    pub fn get(&self, name: &str) -> String {
        self.system_vars
            .get(name)
            .expect("valid system parameter name")
            .value()
    }

    /// Try to modify the in-memory entry for `name` in the SystemVars backing
    /// this [SynchronizedParameters] instace.
    ///
    /// This will call `SystemVars::reset` iff `value` is the default for this
    /// `name` and `SystemVars::set` otherwise.
    ///
    /// As a side effect, the modified set will be changed to contain `name` iff
    /// the in-memory entry for `name` was modified **and** `name` is in the
    /// `synchronized` set.
    ///
    /// Return `true` iff the backing in-memory value for this `name` has
    /// changed.
    pub fn modify(&mut self, name: &str, value: &str) -> bool {
        // It's OK to call `unwrap_or(false)` here because for fixed `name`
        // and `value` an error in `self.is_default(name, value)` implies
        // the same error in `self.system_vars.set(name, value)`.
        let value = VarInput::Flat(value);
        let modified = if self.system_vars.is_default(name, value).unwrap_or(false) {
            self.system_vars.reset(name)
        } else {
            self.system_vars.set(name, value)
        };

        match modified {
            Ok(true) => {
                // Track modified parameters from the "synchronized" set.
                if let Some(name) = self.synchronized.get(name) {
                    self.modified.insert(name);
                }
                true
            }
            Ok(false) => {
                // The value was the same as the current one.
                false
            }
            Err(e) => {
                tracing::error!("cannot modify system parameter {}: {}", name, e);
                false
            }
        }
    }

    pub fn enable_launchdarkly(&self) -> bool {
        let var_name = self.get(ENABLE_LAUNCHDARKLY.name());
        let var_input = VarInput::Flat(&var_name);
        bool::parse(var_input).expect("This is known to be a bool")
    }
}

pub struct ModifiedParameter {
    pub name: String,
    pub value: String,
    pub is_default: bool,
}

#[cfg(test)]
mod tests {
    use mz_sql::session::vars::SystemVars;

    use super::SynchronizedParameters;

    #[mz_ore::test]
    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `decNumberFromInt32` on OS `linux`
    fn test_github_18189() {
        let vars = SystemVars::default();
        let mut sync = SynchronizedParameters::new(vars);
        assert!(sync.modify("allowed_cluster_replica_sizes", "1,2"));
        assert_eq!(sync.get("allowed_cluster_replica_sizes"), r#""1", "2""#);
        assert!(sync.modify("allowed_cluster_replica_sizes", ""));
        assert_eq!(sync.get("allowed_cluster_replica_sizes"), "");
    }

    #[mz_ore::test]
    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `decNumberFromInt32` on OS `linux`
    fn test_vars_are_synced() {
        let vars = SystemVars::default();
        let sync = SynchronizedParameters::new(vars);

        assert!(!sync.synchronized().is_empty());
    }
}