mz_adapter/config/
backend.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::BTreeMap;
11
12use mz_sql::session::user::SYSTEM_USER;
13use tracing::{error, info};
14use uuid::Uuid;
15
16use crate::config::SynchronizedParameters;
17use crate::session::SessionConfig;
18use crate::{AdapterError, Client, SessionClient};
19
20/// A backend client for pushing and pulling [SynchronizedParameters].
21///
22/// Pulling is required in order to catch concurrent changes before pushing
23/// modified values in the [crate::config::system_parameter_sync].
24pub struct SystemParameterBackend {
25    session_client: SessionClient,
26}
27
28impl SystemParameterBackend {
29    pub async fn new(client: Client) -> Result<Self, AdapterError> {
30        let conn_id = client.new_conn_id()?;
31        let session = client.new_session(SessionConfig {
32            conn_id,
33            uuid: Uuid::new_v4(),
34            user: SYSTEM_USER.name.clone(),
35            client_ip: None,
36            external_metadata_rx: None,
37            internal_user_metadata: None,
38            helm_chart_version: None,
39        });
40        let session_client = client.startup(session).await?;
41        Ok(Self { session_client })
42    }
43
44    /// Push all current values from the given [SynchronizedParameters] that are
45    /// marked as modified to the [SystemParameterBackend] and reset their
46    /// modified status.
47    pub async fn push(&mut self, params: &mut SynchronizedParameters) {
48        for param in params.modified() {
49            let mut vars = BTreeMap::new();
50            info!(name = param.name, value = param.value, "updating parameter");
51            vars.insert(param.name.clone(), param.value.clone());
52            match self.session_client.set_system_vars(vars).await {
53                Ok(()) => {
54                    info!(name = param.name, value = param.value, "update success");
55                }
56                Err(error) => match error {
57                    AdapterError::ReadOnly => {
58                        info!(
59                            name = param.name,
60                            value = param.value,
61                            "cannot update system variable in read-only mode",
62                        );
63                    }
64                    error => {
65                        error!(
66                            name = param.name,
67                            value = param.value,
68                            "cannot update system variable: {}",
69                            error
70                        );
71                    }
72                },
73            }
74        }
75    }
76
77    /// Pull the current values for all [SynchronizedParameters] from the
78    /// [SystemParameterBackend].
79    pub async fn pull(&self, params: &mut SynchronizedParameters) {
80        let vars = self.session_client.get_system_vars().await;
81        for var in vars.iter() {
82            params.modify(var.name(), &var.value());
83        }
84    }
85}