mz_adapter/config/
backend.rs1use std::collections::BTreeMap;
11
12use mz_auth::{Authenticated, AuthenticatorKind};
13use mz_sql::session::user::SYSTEM_USER;
14use tracing::{error, info};
15use uuid::Uuid;
16
17use crate::config::SynchronizedParameters;
18use crate::session::SessionConfig;
19use crate::{AdapterError, Client, SessionClient};
20
21pub struct SystemParameterBackend {
26 session_client: SessionClient,
27}
28
29impl SystemParameterBackend {
30 pub async fn new(client: Client) -> Result<Self, AdapterError> {
31 let conn_id = client.new_conn_id()?;
32 let session = client.new_session(
33 SessionConfig {
34 conn_id,
35 uuid: Uuid::new_v4(),
36 user: SYSTEM_USER.name.clone(),
37 client_ip: None,
38 external_metadata_rx: None,
39 helm_chart_version: None,
40 authenticator_kind: AuthenticatorKind::None,
41 groups: None,
42 },
43 Authenticated,
44 );
45 let session_client = client.startup(session).await?;
46 Ok(Self { session_client })
47 }
48
49 pub async fn push(&mut self, params: &mut SynchronizedParameters) {
53 for param in params.modified() {
54 let mut vars = BTreeMap::new();
55 info!(name = param.name, value = param.value, "updating parameter");
56 vars.insert(param.name.clone(), param.value.clone());
57 match self.session_client.set_system_vars(vars).await {
58 Ok(()) => {
59 info!(name = param.name, value = param.value, "update success");
60 }
61 Err(error) => match error {
62 AdapterError::ReadOnly => {
63 info!(
64 name = param.name,
65 value = param.value,
66 "cannot update system variable in read-only mode",
67 );
68 }
69 error => {
70 error!(
71 name = param.name,
72 value = param.value,
73 "cannot update system variable: {}",
74 error
75 );
76 }
77 },
78 }
79 }
80 }
81
82 pub async fn pull(&self, params: &mut SynchronizedParameters) {
85 let vars = self.session_client.get_system_vars().await;
86 for var in vars.iter() {
87 params.modify(var.name(), &var.value());
88 }
89 }
90}