1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90
// Copyright Materialize, Inc. and contributors. All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.
use std::collections::BTreeMap;
use mz_sql::session::user::SYSTEM_USER;
use tracing::{error, info};
use uuid::Uuid;
use crate::config::SynchronizedParameters;
use crate::session::SessionConfig;
use crate::{AdapterError, Client, SessionClient};
/// A backend client for pushing and pulling [SynchronizedParameters].
///
/// Pulling is required in order to catch concurrent changes before pushing
/// modified values in the [crate::config::system_parameter_sync].
pub struct SystemParameterBackend {
session_client: SessionClient,
}
impl SystemParameterBackend {
pub async fn new(client: Client) -> Result<Self, AdapterError> {
let conn_id = client.new_conn_id()?;
let session = client.new_session(SessionConfig {
conn_id,
uuid: Uuid::new_v4(),
user: SYSTEM_USER.name.clone(),
client_ip: None,
external_metadata_rx: None,
helm_chart_version: None,
});
let session_client = client.startup(session).await?;
Ok(Self { session_client })
}
/// Push all current values from the given [SynchronizedParameters] that are
/// marked as modified to the [SystemParameterBackend] and reset their
/// modified status.
pub async fn push(&mut self, params: &mut SynchronizedParameters) {
for param in params.modified() {
let mut vars = BTreeMap::new();
info!(name = param.name, value = param.value, "updating parameter");
vars.insert(param.name.clone(), param.value.clone());
match self.session_client.set_system_vars(vars).await {
Ok(()) => {
info!(name = param.name, value = param.value, "update success");
}
Err(error) => match error {
AdapterError::ReadOnly => {
info!(
name = param.name,
value = param.value,
"cannot update system variable in read-only mode",
);
}
error => {
error!(
name = param.name,
value = param.value,
"cannot update system variable: {}",
error
);
}
},
}
}
}
/// Pull the current values for all [SynchronizedParameters] from the
/// [SystemParameterBackend].
pub async fn pull(&mut self, params: &mut SynchronizedParameters) {
match self.session_client.get_system_vars().await {
Ok(vars) => {
for (name, value) in vars {
params.modify(&name, &value);
}
}
Err(error) => {
error!("cannot execute `SHOW ALL` query: {}", error)
}
}
}
}