1use mz_sql_parser::ast::{
32 ConnectionOptionName, CreateConnectionType, CreateSourceConnection, Raw, RawItemName,
33 WithOptionValue,
34};
35
36#[derive(Debug, Clone)]
38pub(crate) enum Infrastructure {
39 Connection {
41 connector_type: String,
43 properties: Vec<Property>,
45 },
46 Source {
48 connector_type: String,
50 connection_ref: Option<String>,
52 properties: Vec<Property>,
54 },
55 TableFromSource {
57 source_ref: String,
59 external_reference: Option<String>,
61 },
62}
63
64#[derive(Debug, Clone)]
66pub(crate) struct Property {
67 pub key: String,
69 pub value: String,
71 pub secret_ref: Option<String>,
73 pub object_ref: Option<String>,
75}
76
77pub(crate) fn extract(stmt: &crate::project::ast::Statement) -> Option<Infrastructure> {
82 match stmt {
83 crate::project::ast::Statement::CreateConnection(s) => Some(extract_connection(s)),
84 crate::project::ast::Statement::CreateSource(s) => Some(extract_source(s)),
85 crate::project::ast::Statement::CreateTableFromSource(s) => {
86 Some(extract_table_from_source(s))
87 }
88 _ => None,
89 }
90}
91
92fn connection_type_name(ct: &CreateConnectionType) -> &'static str {
94 match ct {
95 CreateConnectionType::Postgres => "Postgres",
96 CreateConnectionType::Kafka => "Kafka",
97 CreateConnectionType::MySql => "MySQL",
98 CreateConnectionType::Ssh => "SSH Tunnel",
99 CreateConnectionType::Aws => "AWS",
100 CreateConnectionType::AwsPrivatelink => "AWS PrivateLink",
101 CreateConnectionType::Gcp => "GCP",
102 CreateConnectionType::Csr => "Confluent Schema Registry",
103 CreateConnectionType::GlueSchemaRegistry => "Glue Schema Registry",
104 CreateConnectionType::SqlServer => "SQL Server",
105 CreateConnectionType::IcebergCatalog => "Iceberg Catalog",
106 }
107}
108
109fn format_option_value(value: &WithOptionValue<Raw>) -> (String, Option<String>, Option<String>) {
111 match value {
112 WithOptionValue::Secret(name) => {
113 let name_str = raw_item_name_to_string(name);
114 (name_str.clone(), Some(name_str), None)
115 }
116 WithOptionValue::Item(name) => {
117 let name_str = raw_item_name_to_string(name);
118 (name_str.clone(), None, Some(name_str))
119 }
120 other => (format!("{}", other), None, None),
121 }
122}
123
124fn raw_item_name_to_string(name: &RawItemName) -> String {
126 match name {
127 RawItemName::Name(n) => n.to_string(),
128 RawItemName::Id(_, n, _) => n.to_string(),
129 }
130}
131
132macro_rules! options_to_properties {
137 ($options:expr) => {
138 $options
139 .iter()
140 .filter_map(|opt| {
141 let value = opt.value.as_ref()?;
142 let (display, secret_ref, object_ref) = format_option_value(value);
143 Some(Property {
144 key: format!("{}", opt.name),
145 value: display,
146 secret_ref,
147 object_ref,
148 })
149 })
150 .collect()
151 };
152}
153
154fn extract_connection(stmt: &mz_sql_parser::ast::CreateConnectionStatement<Raw>) -> Infrastructure {
156 let connector_type = connection_type_name(&stmt.connection_type).to_string();
157 let properties = stmt
158 .values
159 .iter()
160 .filter_map(|opt| {
161 if matches!(
162 opt.name,
163 ConnectionOptionName::PublicKey1 | ConnectionOptionName::PublicKey2
164 ) {
165 return None;
166 }
167 let value = opt.value.as_ref()?;
168 let (display, secret_ref, object_ref) = format_option_value(value);
169 Some(Property {
170 key: format!("{}", opt.name),
171 value: display,
172 secret_ref,
173 object_ref,
174 })
175 })
176 .collect();
177 Infrastructure::Connection {
178 connector_type,
179 properties,
180 }
181}
182
183fn extract_source(stmt: &mz_sql_parser::ast::CreateSourceStatement<Raw>) -> Infrastructure {
185 let (connector_type, connection_ref, properties) = match &stmt.connection {
186 CreateSourceConnection::Postgres {
187 connection,
188 options,
189 } => (
190 "Postgres".to_string(),
191 Some(raw_item_name_to_string(connection)),
192 options_to_properties!(options),
193 ),
194 CreateSourceConnection::Kafka {
195 connection,
196 options,
197 } => (
198 "Kafka".to_string(),
199 Some(raw_item_name_to_string(connection)),
200 options_to_properties!(options),
201 ),
202 CreateSourceConnection::MySql {
203 connection,
204 options,
205 } => (
206 "MySQL".to_string(),
207 Some(raw_item_name_to_string(connection)),
208 options_to_properties!(options),
209 ),
210 CreateSourceConnection::SqlServer {
211 connection,
212 options,
213 } => (
214 "SQL Server".to_string(),
215 Some(raw_item_name_to_string(connection)),
216 options_to_properties!(options),
217 ),
218 CreateSourceConnection::LoadGenerator { generator, options } => (
219 format!("Load Generator ({})", generator),
220 None,
221 options_to_properties!(options),
222 ),
223 };
224 Infrastructure::Source {
225 connector_type,
226 connection_ref,
227 properties,
228 }
229}
230
231fn extract_table_from_source(
233 stmt: &mz_sql_parser::ast::CreateTableFromSourceStatement<Raw>,
234) -> Infrastructure {
235 let source_ref = raw_item_name_to_string(&stmt.source);
236 let external_reference = stmt.external_reference.as_ref().map(|n| n.to_string());
237 Infrastructure::TableFromSource {
238 source_ref,
239 external_reference,
240 }
241}