Skip to main content

mz_adapter/config/
backend.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::BTreeMap;
11
12use mz_auth::{Authenticated, AuthenticatorKind};
13use mz_sql::session::user::SYSTEM_USER;
14use tracing::{error, info};
15use uuid::Uuid;
16
17use crate::config::SynchronizedParameters;
18use crate::session::SessionConfig;
19use crate::{AdapterError, Client, SessionClient};
20
21/// A backend client for pushing and pulling [SynchronizedParameters].
22///
23/// Pulling is required in order to catch concurrent changes before pushing
24/// modified values in the [crate::config::system_parameter_sync].
25pub struct SystemParameterBackend {
26    session_client: SessionClient,
27}
28
29impl SystemParameterBackend {
30    pub async fn new(client: Client) -> Result<Self, AdapterError> {
31        let conn_id = client.new_conn_id()?;
32        let session = client.new_session(
33            SessionConfig {
34                conn_id,
35                uuid: Uuid::new_v4(),
36                user: SYSTEM_USER.name.clone(),
37                client_ip: None,
38                external_metadata_rx: None,
39                helm_chart_version: None,
40                authenticator_kind: AuthenticatorKind::None,
41                groups: None,
42            },
43            Authenticated,
44        );
45        let session_client = client.startup(session).await?;
46        Ok(Self { session_client })
47    }
48
49    /// Push all current values from the given [SynchronizedParameters] that are
50    /// marked as modified to the [SystemParameterBackend] and reset their
51    /// modified status.
52    pub async fn push(&mut self, params: &mut SynchronizedParameters) {
53        for param in params.modified() {
54            let mut vars = BTreeMap::new();
55            info!(name = param.name, value = param.value, "updating parameter");
56            vars.insert(param.name.clone(), param.value.clone());
57            match self.session_client.set_system_vars(vars).await {
58                Ok(()) => {
59                    info!(name = param.name, value = param.value, "update success");
60                }
61                Err(error) => match error {
62                    AdapterError::ReadOnly => {
63                        info!(
64                            name = param.name,
65                            value = param.value,
66                            "cannot update system variable in read-only mode",
67                        );
68                    }
69                    error => {
70                        error!(
71                            name = param.name,
72                            value = param.value,
73                            "cannot update system variable: {}",
74                            error
75                        );
76                    }
77                },
78            }
79        }
80    }
81
82    /// Pull the current values for all [SynchronizedParameters] from the
83    /// [SystemParameterBackend].
84    pub async fn pull(&self, params: &mut SynchronizedParameters) {
85        let vars = self.session_client.get_system_vars().await;
86        for var in vars.iter() {
87            params.modify(var.name(), &var.value());
88        }
89    }
90}