1use std::collections::{BTreeMap, BTreeSet};
11use std::ops::DerefMut;
12use std::sync::Arc;
13
14use mz_ore::now::SYSTEM_TIME;
15use mz_repr::RelationDesc;
16use mz_sql_parser::ast::{ExternalReferences, Ident, IdentError, UnresolvedItemName};
17use mz_storage_types::sources::load_generator::{LoadGenerator, LoadGeneratorOutput};
18use mz_storage_types::sources::{ExternalReferenceResolutionError, SourceReferenceResolver};
19
20use crate::names::{FullItemName, RawDatabaseSpecifier};
21use crate::plan::{PlanError, SourceReference, SourceReferences};
22
23use super::{RequestedSourceExport, error::PgSourcePurificationError};
24
25pub(super) enum SourceReferenceClient<'a> {
28 Postgres {
29 client: &'a mz_postgres_util::Client,
30 publication: &'a str,
31 database: &'a str,
32 },
33 MySql {
34 conn: &'a mut mz_mysql_util::MySqlConn,
35 include_system_schemas: bool,
38 },
39 SqlServer {
40 client: &'a mut mz_sql_server_util::Client,
41 database: Arc<str>,
42 },
43 Kafka {
44 topic: &'a str,
45 },
46 LoadGenerator {
47 generator: &'a LoadGenerator,
48 },
49}
50
51#[derive(Clone, Debug)]
53pub(super) enum ReferenceMetadata {
54 Postgres {
55 table: mz_postgres_util::desc::PostgresTableDesc,
56 database: String,
57 },
58 MySql(mz_mysql_util::MySqlTableSchema),
59 SqlServer {
60 table: mz_sql_server_util::desc::SqlServerTableDesc,
61 database: Arc<str>,
62 capture_instance: Arc<str>,
63 },
64 Kafka(String),
65 LoadGenerator {
66 name: String,
67 desc: Option<RelationDesc>,
68 namespace: String,
69 output: LoadGeneratorOutput,
70 },
71}
72
73impl ReferenceMetadata {
74 fn namespace(&self) -> Option<&str> {
75 match self {
76 ReferenceMetadata::Postgres { table, .. } => Some(&table.namespace),
77 ReferenceMetadata::MySql(table) => Some(&table.schema_name),
78 ReferenceMetadata::SqlServer { table, .. } => Some(table.schema_name.as_ref()),
79 ReferenceMetadata::Kafka(_) => None,
80 ReferenceMetadata::LoadGenerator { namespace, .. } => Some(namespace),
81 }
82 }
83
84 fn name(&self) -> &str {
85 match self {
86 ReferenceMetadata::Postgres { table, .. } => &table.name,
87 ReferenceMetadata::MySql(table) => &table.name,
88 ReferenceMetadata::SqlServer { table, .. } => table.name.as_ref(),
89 ReferenceMetadata::Kafka(topic) => topic,
90 ReferenceMetadata::LoadGenerator { name, .. } => name,
91 }
92 }
93
94 pub(super) fn postgres_desc(&self) -> Option<&mz_postgres_util::desc::PostgresTableDesc> {
95 match self {
96 ReferenceMetadata::Postgres { table, .. } => Some(table),
97 _ => None,
98 }
99 }
100
101 pub(super) fn mysql_table(&self) -> Option<&mz_mysql_util::MySqlTableSchema> {
102 match self {
103 ReferenceMetadata::MySql(table) => Some(table),
104 _ => None,
105 }
106 }
107
108 pub(super) fn sql_server_table(&self) -> Option<&mz_sql_server_util::desc::SqlServerTableDesc> {
109 match self {
110 ReferenceMetadata::SqlServer { table, .. } => Some(table),
111 _ => None,
112 }
113 }
114
115 pub(super) fn sql_server_capture_instance(&self) -> Option<&Arc<str>> {
116 match self {
117 ReferenceMetadata::SqlServer {
118 capture_instance, ..
119 } => Some(capture_instance),
120 _ => None,
121 }
122 }
123
124 pub(super) fn load_generator_desc(&self) -> Option<&Option<RelationDesc>> {
125 match self {
126 ReferenceMetadata::LoadGenerator { desc, .. } => Some(desc),
127 _ => None,
128 }
129 }
130
131 pub(super) fn load_generator_output(&self) -> Option<&LoadGeneratorOutput> {
132 match self {
133 ReferenceMetadata::LoadGenerator { output, .. } => Some(output),
134 _ => None,
135 }
136 }
137
138 pub(super) fn external_reference(&self) -> Result<UnresolvedItemName, IdentError> {
142 match self {
143 ReferenceMetadata::Postgres { table, database } => {
144 Ok(UnresolvedItemName::qualified(&[
145 Ident::new(database)?,
146 Ident::new(&table.namespace)?,
147 Ident::new(&table.name)?,
148 ]))
149 }
150 ReferenceMetadata::MySql(table) => Ok(UnresolvedItemName::qualified(&[
151 Ident::new(&table.schema_name)?,
152 Ident::new(&table.name)?,
153 ])),
154 ReferenceMetadata::SqlServer {
155 table,
156 database,
157 capture_instance: _,
158 } => Ok(UnresolvedItemName::qualified(&[
159 Ident::new(database.as_ref())?,
160 Ident::new(table.schema_name.as_ref())?,
161 Ident::new(table.name.as_ref())?,
162 ])),
163 ReferenceMetadata::Kafka(topic) => {
164 Ok(UnresolvedItemName::qualified(&[Ident::new(topic)?]))
165 }
166 ReferenceMetadata::LoadGenerator {
167 name, namespace, ..
168 } => {
169 let name = FullItemName {
170 database: RawDatabaseSpecifier::Name(
171 mz_storage_types::sources::load_generator::LOAD_GENERATOR_DATABASE_NAME
172 .to_owned(),
173 ),
174 schema: namespace.to_string(),
175 item: name.to_string(),
176 };
177 Ok(UnresolvedItemName::from(name))
178 }
179 }
180 }
181}
182
183#[derive(Clone, Debug)]
185pub(super) struct RetrievedSourceReferences {
186 updated_at: u64,
187 references: Vec<ReferenceMetadata>,
188 resolver: SourceReferenceResolver,
189}
190
191pub(crate) static DATABASE_FAKE_NAME: &str = "database";
197
198impl<'a> SourceReferenceClient<'a> {
199 pub(super) async fn get_source_references(
204 mut self,
205 ) -> Result<RetrievedSourceReferences, PlanError> {
206 let references = match self {
207 SourceReferenceClient::Postgres {
208 client,
209 publication,
210 database,
211 } => {
212 let tables = mz_postgres_util::publication_info(client, publication, None).await?;
213
214 if tables.is_empty() {
215 Err(PgSourcePurificationError::EmptyPublication(
216 publication.to_string(),
217 ))?;
218 }
219
220 tables
221 .into_iter()
222 .map(|(_oid, desc)| ReferenceMetadata::Postgres {
223 table: desc,
224 database: database.to_string(),
225 })
226 .collect()
227 }
228 SourceReferenceClient::MySql {
229 ref mut conn,
230 include_system_schemas,
231 } => {
232 let request = if include_system_schemas {
233 mz_mysql_util::SchemaRequest::AllWithSystemSchemas
234 } else {
235 mz_mysql_util::SchemaRequest::All
236 };
237 let tables = mz_mysql_util::schema_info((*conn).deref_mut(), &request).await?;
240
241 tables.into_iter().map(ReferenceMetadata::MySql).collect()
242 }
243 SourceReferenceClient::SqlServer {
244 ref mut client,
245 ref database,
246 } => {
247 let tables = mz_sql_server_util::inspect::get_tables(client).await?;
248
249 let mut unique_tables = BTreeMap::default();
252 for table in tables {
253 let key = (Arc::clone(&table.schema_name), Arc::clone(&table.name));
254 if unique_tables.contains_key(&key) {
255 tracing::warn!(?table, "filtering out other instance of table");
256 } else {
257 unique_tables.insert(key, table);
258 }
259 }
260
261 unique_tables
262 .into_values()
263 .map(|raw| {
264 let capture_instance = Arc::clone(&raw.capture_instance);
265 let database = Arc::clone(database);
266 let table = mz_sql_server_util::desc::SqlServerTableDesc::new(raw);
267 ReferenceMetadata::SqlServer {
268 table,
269 database,
270 capture_instance,
271 }
272 })
273 .collect()
274 }
275 SourceReferenceClient::Kafka { topic } => {
276 vec![ReferenceMetadata::Kafka(topic.to_string())]
277 }
278 SourceReferenceClient::LoadGenerator { generator } => {
279 let mut references = generator
280 .views()
281 .into_iter()
282 .map(
283 |(view, relation, output)| ReferenceMetadata::LoadGenerator {
284 name: view.to_string(),
285 desc: Some(relation),
286 namespace: generator.schema_name().to_string(),
287 output,
288 },
289 )
290 .collect::<Vec<_>>();
291
292 if references.is_empty() {
293 references.push(ReferenceMetadata::LoadGenerator {
296 name: generator.schema_name().to_string(),
297 desc: None,
298 namespace: generator.schema_name().to_string(),
299 output: LoadGeneratorOutput::Default,
300 });
301 }
302 references
303 }
304 };
305
306 let reference_names: Vec<(&str, &str)> = references
307 .iter()
308 .map(|reference| {
309 (
310 reference.namespace().unwrap_or_else(|| reference.name()),
311 reference.name(),
312 )
313 })
314 .collect();
315 let resolver = match self {
316 SourceReferenceClient::Postgres { database, .. } => {
317 SourceReferenceResolver::new(database, &reference_names)
318 }
319 _ => SourceReferenceResolver::new(DATABASE_FAKE_NAME, &reference_names),
320 }?;
321
322 Ok(RetrievedSourceReferences {
323 updated_at: SYSTEM_TIME(),
324 references,
325 resolver,
326 })
327 }
328}
329
330impl RetrievedSourceReferences {
331 pub(super) fn available_source_references(self) -> SourceReferences {
334 SourceReferences {
335 updated_at: self.updated_at,
336 references: self
337 .references
338 .into_iter()
339 .map(|reference| match reference {
340 ReferenceMetadata::Postgres { table, .. } => SourceReference {
341 name: table.name,
342 namespace: Some(table.namespace),
343 columns: table.columns.into_iter().map(|c| c.name).collect(),
344 },
345 ReferenceMetadata::MySql(table) => SourceReference {
346 name: table.name,
347 namespace: Some(table.schema_name),
348 columns: table
349 .columns
350 .into_iter()
351 .map(|column| column.name())
352 .collect(),
353 },
354 ReferenceMetadata::SqlServer { table, .. } => SourceReference {
355 name: table.name.to_string(),
356 namespace: Some(table.schema_name.to_string()),
357 columns: table
358 .columns
359 .into_iter()
360 .map(|c| c.name.to_string())
361 .collect(),
362 },
363 ReferenceMetadata::Kafka(topic) => SourceReference {
364 name: topic,
365 namespace: None,
366 columns: vec![],
367 },
368 ReferenceMetadata::LoadGenerator {
369 name,
370 desc,
371 namespace,
372 ..
373 } => SourceReference {
374 name,
375 namespace: Some(namespace),
376 columns: desc
377 .map(|desc| desc.iter_names().map(|n| n.to_string()).collect())
378 .unwrap_or_default(),
379 },
380 })
381 .collect(),
382 }
383 }
384
385 pub(super) fn requested_source_exports<'a>(
387 &'a self,
388 requested: Option<&ExternalReferences>,
389 source_name: &UnresolvedItemName,
390 ) -> Result<Vec<RequestedSourceExport<&'a ReferenceMetadata>>, PlanError> {
391 let filtered: Vec<(&ReferenceMetadata, Option<&UnresolvedItemName>)> = match requested {
395 Some(ExternalReferences::All) => self.references.iter().map(|r| (r, None)).collect(),
396 Some(ExternalReferences::SubsetSchemas(schemas)) => {
397 let available_schemas: BTreeSet<&str> = self
398 .references
399 .iter()
400 .filter_map(|r| r.namespace())
401 .collect();
402 let requested_schemas: BTreeSet<&str> =
403 schemas.iter().map(|s| s.as_str()).collect();
404
405 let missing_schemas: Vec<_> = requested_schemas
406 .difference(&available_schemas)
407 .map(|s| s.to_string())
408 .collect();
409
410 if !missing_schemas.is_empty() {
411 Err(PlanError::NoTablesFoundForSchemas(missing_schemas))?
412 }
413
414 self.references
415 .iter()
416 .filter_map(|reference| {
417 reference
418 .namespace()
419 .map(|namespace| {
420 requested_schemas
421 .contains(namespace)
422 .then_some((reference, None))
423 })
424 .flatten()
425 })
426 .collect()
427 }
428 Some(ExternalReferences::SubsetTables(requested_tables)) => {
429 requested_tables
432 .into_iter()
433 .map(|requested_table| {
434 let idx = self.resolver.resolve_idx(&requested_table.reference.0)?;
435 Ok((&self.references[idx], requested_table.alias.as_ref()))
436 })
437 .collect::<Result<Vec<_>, PlanError>>()?
438 }
439 None => {
440 if self.references.len() != 1 {
443 Err(ExternalReferenceResolutionError::Ambiguous {
444 name: "".to_string(),
445 })?
446 }
447 vec![(&self.references[0], None)]
448 }
449 };
450
451 filtered
453 .into_iter()
454 .map(|(reference, alias)| {
455 let name = match alias {
456 Some(alias_name) => {
457 let partial = crate::normalize::unresolved_item_name(alias_name.clone())?;
458 match partial.schema {
459 Some(_) => alias_name.clone(),
460 None => super::source_export_name_gen(source_name, &partial.item)?,
463 }
464 }
465 None => {
466 super::source_export_name_gen(source_name, reference.name())?
470 }
471 };
472
473 Ok(RequestedSourceExport {
474 external_reference: reference.external_reference()?,
475 name,
476 meta: reference,
477 })
478 })
479 .collect::<Result<Vec<_>, PlanError>>()
480 }
481
482 pub(super) fn resolve_name(&self, name: &[Ident]) -> Result<&ReferenceMetadata, PlanError> {
483 let idx = self.resolver.resolve_idx(name)?;
484 Ok(&self.references[idx])
485 }
486
487 pub(super) fn all_references(&self) -> &Vec<ReferenceMetadata> {
488 &self.references
489 }
490}