1use std::collections::{BTreeMap, BTreeSet};
11use std::ops::DerefMut;
12use std::sync::Arc;
13
14use mz_ore::now::SYSTEM_TIME;
15use mz_repr::RelationDesc;
16use mz_sql_parser::ast::{ExternalReferences, Ident, IdentError, UnresolvedItemName};
17use mz_sql_server_util::SqlServerError;
18use mz_sql_server_util::desc::SqlServerTableRaw;
19use mz_storage_types::sources::load_generator::{
20 LOAD_GENERATOR_DATABASE_NAME, LoadGenerator, LoadGeneratorOutput,
21};
22use mz_storage_types::sources::{ExternalReferenceResolutionError, SourceReferenceResolver};
23
24use crate::names::{FullItemName, RawDatabaseSpecifier};
25use crate::plan::{PlanError, SourceReference, SourceReferences};
26
27use super::{RequestedSourceExport, error::PgSourcePurificationError};
28
29pub(super) enum SourceReferenceClient<'a> {
32 Postgres {
33 client: &'a mz_postgres_util::Client,
34 publication: &'a str,
35 database: &'a str,
36 },
37 MySql {
38 conn: &'a mut mz_mysql_util::MySqlConn,
39 include_system_schemas: bool,
42 },
43 SqlServer {
44 client: &'a mut mz_sql_server_util::Client,
45 database: Arc<str>,
46 },
47 Kafka {
48 topic: &'a str,
49 },
50 LoadGenerator {
51 generator: &'a LoadGenerator,
52 },
53}
54
55#[derive(Clone, Debug)]
57pub(super) enum ReferenceMetadata {
58 Postgres {
59 table: mz_postgres_util::desc::PostgresTableDesc,
60 database: String,
61 },
62 MySql(mz_mysql_util::MySqlTableSchema),
63 SqlServer {
64 table: mz_sql_server_util::desc::SqlServerTableDesc,
65 database: Arc<str>,
66 capture_instance: Arc<str>,
67 },
68 Kafka(String),
69 LoadGenerator {
70 name: String,
71 desc: Option<RelationDesc>,
72 namespace: String,
73 output: LoadGeneratorOutput,
74 },
75}
76
77impl ReferenceMetadata {
78 fn namespace(&self) -> Option<&str> {
79 match self {
80 ReferenceMetadata::Postgres { table, .. } => Some(&table.namespace),
81 ReferenceMetadata::MySql(table) => Some(&table.schema_name),
82 ReferenceMetadata::SqlServer { table, .. } => Some(table.schema_name.as_ref()),
83 ReferenceMetadata::Kafka(_) => None,
84 ReferenceMetadata::LoadGenerator { namespace, .. } => Some(namespace),
85 }
86 }
87
88 fn name(&self) -> &str {
89 match self {
90 ReferenceMetadata::Postgres { table, .. } => &table.name,
91 ReferenceMetadata::MySql(table) => &table.name,
92 ReferenceMetadata::SqlServer { table, .. } => table.name.as_ref(),
93 ReferenceMetadata::Kafka(topic) => topic,
94 ReferenceMetadata::LoadGenerator { name, .. } => name,
95 }
96 }
97
98 pub(super) fn postgres_desc(&self) -> Option<&mz_postgres_util::desc::PostgresTableDesc> {
99 match self {
100 ReferenceMetadata::Postgres { table, .. } => Some(table),
101 _ => None,
102 }
103 }
104
105 pub(super) fn mysql_table(&self) -> Option<&mz_mysql_util::MySqlTableSchema> {
106 match self {
107 ReferenceMetadata::MySql(table) => Some(table),
108 _ => None,
109 }
110 }
111
112 pub(super) fn sql_server_table(&self) -> Option<&mz_sql_server_util::desc::SqlServerTableDesc> {
113 match self {
114 ReferenceMetadata::SqlServer { table, .. } => Some(table),
115 _ => None,
116 }
117 }
118
119 pub(super) fn sql_server_capture_instance(&self) -> Option<&Arc<str>> {
120 match self {
121 ReferenceMetadata::SqlServer {
122 capture_instance, ..
123 } => Some(capture_instance),
124 _ => None,
125 }
126 }
127
128 pub(super) fn load_generator_desc(&self) -> Option<&Option<RelationDesc>> {
129 match self {
130 ReferenceMetadata::LoadGenerator { desc, .. } => Some(desc),
131 _ => None,
132 }
133 }
134
135 pub(super) fn load_generator_output(&self) -> Option<&LoadGeneratorOutput> {
136 match self {
137 ReferenceMetadata::LoadGenerator { output, .. } => Some(output),
138 _ => None,
139 }
140 }
141
142 pub(super) fn external_reference(&self) -> Result<UnresolvedItemName, IdentError> {
146 match self {
147 ReferenceMetadata::Postgres { table, database } => {
148 Ok(UnresolvedItemName::qualified(&[
149 Ident::new(database)?,
150 Ident::new(&table.namespace)?,
151 Ident::new(&table.name)?,
152 ]))
153 }
154 ReferenceMetadata::MySql(table) => Ok(UnresolvedItemName::qualified(&[
155 Ident::new(&table.schema_name)?,
156 Ident::new(&table.name)?,
157 ])),
158 ReferenceMetadata::SqlServer {
159 table,
160 database,
161 capture_instance: _,
162 } => Ok(UnresolvedItemName::qualified(&[
163 Ident::new(database.as_ref())?,
164 Ident::new(table.schema_name.as_ref())?,
165 Ident::new(table.name.as_ref())?,
166 ])),
167 ReferenceMetadata::Kafka(topic) => {
168 Ok(UnresolvedItemName::qualified(&[Ident::new(topic)?]))
169 }
170 ReferenceMetadata::LoadGenerator {
171 name, namespace, ..
172 } => {
173 let name = FullItemName {
174 database: RawDatabaseSpecifier::Name(LOAD_GENERATOR_DATABASE_NAME.to_owned()),
175 schema: namespace.to_string(),
176 item: name.to_string(),
177 };
178 Ok(UnresolvedItemName::from(name))
179 }
180 }
181 }
182}
183
184#[derive(Clone, Debug)]
186pub(super) struct RetrievedSourceReferences {
187 updated_at: u64,
188 references: Vec<ReferenceMetadata>,
189 resolver: SourceReferenceResolver,
190}
191
192pub(crate) static DATABASE_FAKE_NAME: &str = "database";
205
206impl<'a> SourceReferenceClient<'a> {
207 pub(super) async fn get_source_references(
212 mut self,
213 ) -> Result<RetrievedSourceReferences, PlanError> {
214 let references = match self {
215 SourceReferenceClient::Postgres {
216 client,
217 publication,
218 database,
219 } => {
220 let tables = mz_postgres_util::publication_info(client, publication, None).await?;
221
222 if tables.is_empty() {
223 Err(PgSourcePurificationError::EmptyPublication(
224 publication.to_string(),
225 ))?;
226 }
227
228 tables
229 .into_iter()
230 .map(|(_oid, desc)| ReferenceMetadata::Postgres {
231 table: desc,
232 database: database.to_string(),
233 })
234 .collect()
235 }
236 SourceReferenceClient::MySql {
237 ref mut conn,
238 include_system_schemas,
239 } => {
240 let request = if include_system_schemas {
241 mz_mysql_util::SchemaRequest::AllWithSystemSchemas
242 } else {
243 mz_mysql_util::SchemaRequest::All
244 };
245 let tables = mz_mysql_util::schema_info((*conn).deref_mut(), &request).await?;
248
249 tables.into_iter().map(ReferenceMetadata::MySql).collect()
250 }
251 SourceReferenceClient::SqlServer {
252 ref mut client,
253 ref database,
254 } => {
255 let tables = mz_sql_server_util::inspect::get_tables(client).await?;
256
257 let mut unique_tables: BTreeMap<(Arc<str>, Arc<str>), SqlServerTableRaw> =
258 BTreeMap::default();
259 for table in tables {
260 let key = (Arc::clone(&table.schema_name), Arc::clone(&table.name));
261
262 unique_tables
263 .entry(key)
264 .and_modify(|chosen_table: &mut SqlServerTableRaw| {
265 if chosen_table.capture_instance.create_date
270 < table.capture_instance.create_date
271 || (chosen_table.capture_instance.create_date
272 == table.capture_instance.create_date
273 && chosen_table.capture_instance.name
274 < table.capture_instance.name)
275 {
276 *chosen_table = table.clone();
277 }
278 })
279 .or_insert(table);
280 }
281
282 let mut constraints_by_table =
283 mz_sql_server_util::inspect::get_constraints_for_tables(
284 client,
285 unique_tables.keys(),
286 )
287 .await?;
288 unique_tables
289 .into_iter()
290 .map(|(qualified_table_name, raw_table)| {
291 let constraints = constraints_by_table
292 .remove(&qualified_table_name)
293 .unwrap_or_default();
294 let capture_instance = Arc::clone(&raw_table.capture_instance.name);
295 let database = Arc::clone(database);
296 let table = mz_sql_server_util::desc::SqlServerTableDesc::new(
297 raw_table,
298 constraints,
299 )?;
300 Ok(ReferenceMetadata::SqlServer {
301 table,
302 database,
303 capture_instance,
304 })
305 })
306 .collect::<Result<_, SqlServerError>>()?
307 }
308 SourceReferenceClient::Kafka { topic } => {
309 vec![ReferenceMetadata::Kafka(topic.to_string())]
310 }
311 SourceReferenceClient::LoadGenerator { generator } => {
312 let mut references = generator
313 .views()
314 .into_iter()
315 .map(
316 |(view, relation, output)| ReferenceMetadata::LoadGenerator {
317 name: view.to_string(),
318 desc: Some(relation),
319 namespace: generator.schema_name().to_string(),
320 output,
321 },
322 )
323 .collect::<Vec<_>>();
324
325 if references.is_empty() {
326 references.push(ReferenceMetadata::LoadGenerator {
329 name: generator.schema_name().to_string(),
330 desc: None,
331 namespace: generator.schema_name().to_string(),
332 output: LoadGeneratorOutput::Default,
333 });
334 }
335 references
336 }
337 };
338
339 let reference_names: Vec<(&str, &str)> = references
340 .iter()
341 .map(|reference| {
342 (
343 reference.namespace().unwrap_or_else(|| reference.name()),
344 reference.name(),
345 )
346 })
347 .collect();
348 let resolver = match self {
357 SourceReferenceClient::Postgres { database, .. } => {
358 SourceReferenceResolver::new(database, &reference_names)
359 }
360 SourceReferenceClient::SqlServer { database, .. } => {
361 SourceReferenceResolver::new(&database, &reference_names)
362 }
363 SourceReferenceClient::LoadGenerator { .. } => {
364 SourceReferenceResolver::new(LOAD_GENERATOR_DATABASE_NAME, &reference_names)
365 }
366 SourceReferenceClient::MySql { .. } | SourceReferenceClient::Kafka { .. } => {
367 SourceReferenceResolver::new(DATABASE_FAKE_NAME, &reference_names)
368 }
369 }?;
370
371 Ok(RetrievedSourceReferences {
372 updated_at: SYSTEM_TIME(),
373 references,
374 resolver,
375 })
376 }
377}
378
379impl RetrievedSourceReferences {
380 pub(super) fn available_source_references(self) -> SourceReferences {
383 SourceReferences {
384 updated_at: self.updated_at,
385 references: self
386 .references
387 .into_iter()
388 .map(|reference| match reference {
389 ReferenceMetadata::Postgres { table, .. } => SourceReference {
390 name: table.name,
391 namespace: Some(table.namespace),
392 columns: table.columns.into_iter().map(|c| c.name).collect(),
393 },
394 ReferenceMetadata::MySql(table) => SourceReference {
395 name: table.name,
396 namespace: Some(table.schema_name),
397 columns: table
398 .columns
399 .into_iter()
400 .map(|column| column.name())
401 .collect(),
402 },
403 ReferenceMetadata::SqlServer { table, .. } => SourceReference {
404 name: table.name.to_string(),
405 namespace: Some(table.schema_name.to_string()),
406 columns: table
407 .columns
408 .into_iter()
409 .map(|c| c.name.to_string())
410 .collect(),
411 },
412 ReferenceMetadata::Kafka(topic) => SourceReference {
413 name: topic,
414 namespace: None,
415 columns: vec![],
416 },
417 ReferenceMetadata::LoadGenerator {
418 name,
419 desc,
420 namespace,
421 ..
422 } => SourceReference {
423 name,
424 namespace: Some(namespace),
425 columns: desc
426 .map(|desc| desc.iter_names().map(|n| n.to_string()).collect())
427 .unwrap_or_default(),
428 },
429 })
430 .collect(),
431 }
432 }
433
434 pub(super) fn requested_source_exports<'a>(
436 &'a self,
437 requested: Option<&ExternalReferences>,
438 source_name: &UnresolvedItemName,
439 ) -> Result<Vec<RequestedSourceExport<&'a ReferenceMetadata>>, PlanError> {
440 let filtered: Vec<(&ReferenceMetadata, Option<&UnresolvedItemName>)> = match requested {
444 Some(ExternalReferences::All) => self.references.iter().map(|r| (r, None)).collect(),
445 Some(ExternalReferences::SubsetSchemas(schemas)) => {
446 let available_schemas: BTreeSet<&str> = self
447 .references
448 .iter()
449 .filter_map(|r| r.namespace())
450 .collect();
451 let requested_schemas: BTreeSet<&str> =
452 schemas.iter().map(|s| s.as_str()).collect();
453
454 let missing_schemas: Vec<_> = requested_schemas
455 .difference(&available_schemas)
456 .map(|s| s.to_string())
457 .collect();
458
459 if !missing_schemas.is_empty() {
460 Err(PlanError::NoTablesFoundForSchemas(missing_schemas))?
461 }
462
463 self.references
464 .iter()
465 .filter_map(|reference| {
466 reference
467 .namespace()
468 .map(|namespace| {
469 requested_schemas
470 .contains(namespace)
471 .then_some((reference, None))
472 })
473 .flatten()
474 })
475 .collect()
476 }
477 Some(ExternalReferences::SubsetTables(requested_tables)) => {
478 requested_tables
481 .into_iter()
482 .map(|requested_table| {
483 let idx = self.resolver.resolve_idx(&requested_table.reference.0)?;
484 Ok((&self.references[idx], requested_table.alias.as_ref()))
485 })
486 .collect::<Result<Vec<_>, PlanError>>()?
487 }
488 None => {
489 if self.references.len() != 1 {
492 Err(ExternalReferenceResolutionError::Ambiguous {
493 name: "".to_string(),
494 })?
495 }
496 vec![(&self.references[0], None)]
497 }
498 };
499
500 filtered
502 .into_iter()
503 .map(|(reference, alias)| {
504 let name = match alias {
505 Some(alias_name) => {
506 let partial = crate::normalize::unresolved_item_name(alias_name.clone())?;
507 match partial.schema {
508 Some(_) => alias_name.clone(),
509 None => super::source_export_name_gen(source_name, &partial.item)?,
512 }
513 }
514 None => {
515 super::source_export_name_gen(source_name, reference.name())?
519 }
520 };
521
522 Ok(RequestedSourceExport {
523 external_reference: reference.external_reference()?,
524 name,
525 meta: reference,
526 })
527 })
528 .collect::<Result<Vec<_>, PlanError>>()
529 }
530
531 pub(super) fn resolve_name(&self, name: &[Ident]) -> Result<&ReferenceMetadata, PlanError> {
532 let idx = self.resolver.resolve_idx(name)?;
533 Ok(&self.references[idx])
534 }
535
536 pub(super) fn all_references(&self) -> &Vec<ReferenceMetadata> {
537 &self.references
538 }
539}