mz_adapter/config/
backend.rs1use std::collections::BTreeMap;
11
12use mz_sql::session::user::SYSTEM_USER;
13use tracing::{error, info};
14use uuid::Uuid;
15
16use crate::config::SynchronizedParameters;
17use crate::session::SessionConfig;
18use crate::{AdapterError, Client, SessionClient};
19
20pub struct SystemParameterBackend {
25 session_client: SessionClient,
26}
27
28impl SystemParameterBackend {
29 pub async fn new(client: Client) -> Result<Self, AdapterError> {
30 let conn_id = client.new_conn_id()?;
31 let session = client.new_session(SessionConfig {
32 conn_id,
33 uuid: Uuid::new_v4(),
34 user: SYSTEM_USER.name.clone(),
35 client_ip: None,
36 external_metadata_rx: None,
37 internal_user_metadata: None,
38 helm_chart_version: None,
39 });
40 let session_client = client.startup(session).await?;
41 Ok(Self { session_client })
42 }
43
44 pub async fn push(&mut self, params: &mut SynchronizedParameters) {
48 for param in params.modified() {
49 let mut vars = BTreeMap::new();
50 info!(name = param.name, value = param.value, "updating parameter");
51 vars.insert(param.name.clone(), param.value.clone());
52 match self.session_client.set_system_vars(vars).await {
53 Ok(()) => {
54 info!(name = param.name, value = param.value, "update success");
55 }
56 Err(error) => match error {
57 AdapterError::ReadOnly => {
58 info!(
59 name = param.name,
60 value = param.value,
61 "cannot update system variable in read-only mode",
62 );
63 }
64 error => {
65 error!(
66 name = param.name,
67 value = param.value,
68 "cannot update system variable: {}",
69 error
70 );
71 }
72 },
73 }
74 }
75 }
76
77 pub async fn pull(&self, params: &mut SynchronizedParameters) {
80 let vars = self.session_client.get_system_vars().await;
81 for var in vars.iter() {
82 params.modify(var.name(), &var.value());
83 }
84 }
85}