1use std::collections::{BTreeMap, BTreeSet};
11use std::ops::DerefMut;
12use std::sync::Arc;
13
14use mz_ore::now::SYSTEM_TIME;
15use mz_repr::RelationDesc;
16use mz_sql_parser::ast::{ExternalReferences, Ident, IdentError, UnresolvedItemName};
17use mz_sql_server_util::SqlServerError;
18use mz_sql_server_util::desc::SqlServerTableRaw;
19use mz_storage_types::sources::load_generator::{LoadGenerator, LoadGeneratorOutput};
20use mz_storage_types::sources::{ExternalReferenceResolutionError, SourceReferenceResolver};
21
22use crate::names::{FullItemName, RawDatabaseSpecifier};
23use crate::plan::{PlanError, SourceReference, SourceReferences};
24
25use super::{RequestedSourceExport, error::PgSourcePurificationError};
26
27pub(super) enum SourceReferenceClient<'a> {
30 Postgres {
31 client: &'a mz_postgres_util::Client,
32 publication: &'a str,
33 database: &'a str,
34 },
35 MySql {
36 conn: &'a mut mz_mysql_util::MySqlConn,
37 include_system_schemas: bool,
40 },
41 SqlServer {
42 client: &'a mut mz_sql_server_util::Client,
43 database: Arc<str>,
44 },
45 Kafka {
46 topic: &'a str,
47 },
48 LoadGenerator {
49 generator: &'a LoadGenerator,
50 },
51}
52
53#[derive(Clone, Debug)]
55pub(super) enum ReferenceMetadata {
56 Postgres {
57 table: mz_postgres_util::desc::PostgresTableDesc,
58 database: String,
59 },
60 MySql(mz_mysql_util::MySqlTableSchema),
61 SqlServer {
62 table: mz_sql_server_util::desc::SqlServerTableDesc,
63 database: Arc<str>,
64 capture_instance: Arc<str>,
65 },
66 Kafka(String),
67 LoadGenerator {
68 name: String,
69 desc: Option<RelationDesc>,
70 namespace: String,
71 output: LoadGeneratorOutput,
72 },
73}
74
75impl ReferenceMetadata {
76 fn namespace(&self) -> Option<&str> {
77 match self {
78 ReferenceMetadata::Postgres { table, .. } => Some(&table.namespace),
79 ReferenceMetadata::MySql(table) => Some(&table.schema_name),
80 ReferenceMetadata::SqlServer { table, .. } => Some(table.schema_name.as_ref()),
81 ReferenceMetadata::Kafka(_) => None,
82 ReferenceMetadata::LoadGenerator { namespace, .. } => Some(namespace),
83 }
84 }
85
86 fn name(&self) -> &str {
87 match self {
88 ReferenceMetadata::Postgres { table, .. } => &table.name,
89 ReferenceMetadata::MySql(table) => &table.name,
90 ReferenceMetadata::SqlServer { table, .. } => table.name.as_ref(),
91 ReferenceMetadata::Kafka(topic) => topic,
92 ReferenceMetadata::LoadGenerator { name, .. } => name,
93 }
94 }
95
96 pub(super) fn postgres_desc(&self) -> Option<&mz_postgres_util::desc::PostgresTableDesc> {
97 match self {
98 ReferenceMetadata::Postgres { table, .. } => Some(table),
99 _ => None,
100 }
101 }
102
103 pub(super) fn mysql_table(&self) -> Option<&mz_mysql_util::MySqlTableSchema> {
104 match self {
105 ReferenceMetadata::MySql(table) => Some(table),
106 _ => None,
107 }
108 }
109
110 pub(super) fn sql_server_table(&self) -> Option<&mz_sql_server_util::desc::SqlServerTableDesc> {
111 match self {
112 ReferenceMetadata::SqlServer { table, .. } => Some(table),
113 _ => None,
114 }
115 }
116
117 pub(super) fn sql_server_capture_instance(&self) -> Option<&Arc<str>> {
118 match self {
119 ReferenceMetadata::SqlServer {
120 capture_instance, ..
121 } => Some(capture_instance),
122 _ => None,
123 }
124 }
125
126 pub(super) fn load_generator_desc(&self) -> Option<&Option<RelationDesc>> {
127 match self {
128 ReferenceMetadata::LoadGenerator { desc, .. } => Some(desc),
129 _ => None,
130 }
131 }
132
133 pub(super) fn load_generator_output(&self) -> Option<&LoadGeneratorOutput> {
134 match self {
135 ReferenceMetadata::LoadGenerator { output, .. } => Some(output),
136 _ => None,
137 }
138 }
139
140 pub(super) fn external_reference(&self) -> Result<UnresolvedItemName, IdentError> {
144 match self {
145 ReferenceMetadata::Postgres { table, database } => {
146 Ok(UnresolvedItemName::qualified(&[
147 Ident::new(database)?,
148 Ident::new(&table.namespace)?,
149 Ident::new(&table.name)?,
150 ]))
151 }
152 ReferenceMetadata::MySql(table) => Ok(UnresolvedItemName::qualified(&[
153 Ident::new(&table.schema_name)?,
154 Ident::new(&table.name)?,
155 ])),
156 ReferenceMetadata::SqlServer {
157 table,
158 database,
159 capture_instance: _,
160 } => Ok(UnresolvedItemName::qualified(&[
161 Ident::new(database.as_ref())?,
162 Ident::new(table.schema_name.as_ref())?,
163 Ident::new(table.name.as_ref())?,
164 ])),
165 ReferenceMetadata::Kafka(topic) => {
166 Ok(UnresolvedItemName::qualified(&[Ident::new(topic)?]))
167 }
168 ReferenceMetadata::LoadGenerator {
169 name, namespace, ..
170 } => {
171 let name = FullItemName {
172 database: RawDatabaseSpecifier::Name(
173 mz_storage_types::sources::load_generator::LOAD_GENERATOR_DATABASE_NAME
174 .to_owned(),
175 ),
176 schema: namespace.to_string(),
177 item: name.to_string(),
178 };
179 Ok(UnresolvedItemName::from(name))
180 }
181 }
182 }
183}
184
185#[derive(Clone, Debug)]
187pub(super) struct RetrievedSourceReferences {
188 updated_at: u64,
189 references: Vec<ReferenceMetadata>,
190 resolver: SourceReferenceResolver,
191}
192
193pub(crate) static DATABASE_FAKE_NAME: &str = "database";
199
200impl<'a> SourceReferenceClient<'a> {
201 pub(super) async fn get_source_references(
206 mut self,
207 ) -> Result<RetrievedSourceReferences, PlanError> {
208 let references = match self {
209 SourceReferenceClient::Postgres {
210 client,
211 publication,
212 database,
213 } => {
214 let tables = mz_postgres_util::publication_info(client, publication, None).await?;
215
216 if tables.is_empty() {
217 Err(PgSourcePurificationError::EmptyPublication(
218 publication.to_string(),
219 ))?;
220 }
221
222 tables
223 .into_iter()
224 .map(|(_oid, desc)| ReferenceMetadata::Postgres {
225 table: desc,
226 database: database.to_string(),
227 })
228 .collect()
229 }
230 SourceReferenceClient::MySql {
231 ref mut conn,
232 include_system_schemas,
233 } => {
234 let request = if include_system_schemas {
235 mz_mysql_util::SchemaRequest::AllWithSystemSchemas
236 } else {
237 mz_mysql_util::SchemaRequest::All
238 };
239 let tables = mz_mysql_util::schema_info((*conn).deref_mut(), &request).await?;
242
243 tables.into_iter().map(ReferenceMetadata::MySql).collect()
244 }
245 SourceReferenceClient::SqlServer {
246 ref mut client,
247 ref database,
248 } => {
249 let tables = mz_sql_server_util::inspect::get_tables(client).await?;
250
251 let mut unique_tables: BTreeMap<(Arc<str>, Arc<str>), SqlServerTableRaw> =
252 BTreeMap::default();
253 for table in tables {
254 let key = (Arc::clone(&table.schema_name), Arc::clone(&table.name));
255
256 unique_tables
257 .entry(key)
258 .and_modify(|chosen_table: &mut SqlServerTableRaw| {
259 if chosen_table.capture_instance.create_date
264 < table.capture_instance.create_date
265 || (chosen_table.capture_instance.create_date
266 == table.capture_instance.create_date
267 && chosen_table.capture_instance.name
268 < table.capture_instance.name)
269 {
270 *chosen_table = table.clone();
271 }
272 })
273 .or_insert(table);
274 }
275
276 let mut constraints_by_table =
277 mz_sql_server_util::inspect::get_constraints_for_tables(
278 client,
279 unique_tables.keys(),
280 )
281 .await?;
282 unique_tables
283 .into_iter()
284 .map(|(qualified_table_name, raw_table)| {
285 let constraints = constraints_by_table
286 .remove(&qualified_table_name)
287 .unwrap_or_default();
288 let capture_instance = Arc::clone(&raw_table.capture_instance.name);
289 let database = Arc::clone(database);
290 let table = mz_sql_server_util::desc::SqlServerTableDesc::new(
291 raw_table,
292 constraints,
293 )?;
294 Ok(ReferenceMetadata::SqlServer {
295 table,
296 database,
297 capture_instance,
298 })
299 })
300 .collect::<Result<_, SqlServerError>>()?
301 }
302 SourceReferenceClient::Kafka { topic } => {
303 vec![ReferenceMetadata::Kafka(topic.to_string())]
304 }
305 SourceReferenceClient::LoadGenerator { generator } => {
306 let mut references = generator
307 .views()
308 .into_iter()
309 .map(
310 |(view, relation, output)| ReferenceMetadata::LoadGenerator {
311 name: view.to_string(),
312 desc: Some(relation),
313 namespace: generator.schema_name().to_string(),
314 output,
315 },
316 )
317 .collect::<Vec<_>>();
318
319 if references.is_empty() {
320 references.push(ReferenceMetadata::LoadGenerator {
323 name: generator.schema_name().to_string(),
324 desc: None,
325 namespace: generator.schema_name().to_string(),
326 output: LoadGeneratorOutput::Default,
327 });
328 }
329 references
330 }
331 };
332
333 let reference_names: Vec<(&str, &str)> = references
334 .iter()
335 .map(|reference| {
336 (
337 reference.namespace().unwrap_or_else(|| reference.name()),
338 reference.name(),
339 )
340 })
341 .collect();
342 let resolver = match self {
343 SourceReferenceClient::Postgres { database, .. } => {
344 SourceReferenceResolver::new(database, &reference_names)
345 }
346 _ => SourceReferenceResolver::new(DATABASE_FAKE_NAME, &reference_names),
347 }?;
348
349 Ok(RetrievedSourceReferences {
350 updated_at: SYSTEM_TIME(),
351 references,
352 resolver,
353 })
354 }
355}
356
357impl RetrievedSourceReferences {
358 pub(super) fn available_source_references(self) -> SourceReferences {
361 SourceReferences {
362 updated_at: self.updated_at,
363 references: self
364 .references
365 .into_iter()
366 .map(|reference| match reference {
367 ReferenceMetadata::Postgres { table, .. } => SourceReference {
368 name: table.name,
369 namespace: Some(table.namespace),
370 columns: table.columns.into_iter().map(|c| c.name).collect(),
371 },
372 ReferenceMetadata::MySql(table) => SourceReference {
373 name: table.name,
374 namespace: Some(table.schema_name),
375 columns: table
376 .columns
377 .into_iter()
378 .map(|column| column.name())
379 .collect(),
380 },
381 ReferenceMetadata::SqlServer { table, .. } => SourceReference {
382 name: table.name.to_string(),
383 namespace: Some(table.schema_name.to_string()),
384 columns: table
385 .columns
386 .into_iter()
387 .map(|c| c.name.to_string())
388 .collect(),
389 },
390 ReferenceMetadata::Kafka(topic) => SourceReference {
391 name: topic,
392 namespace: None,
393 columns: vec![],
394 },
395 ReferenceMetadata::LoadGenerator {
396 name,
397 desc,
398 namespace,
399 ..
400 } => SourceReference {
401 name,
402 namespace: Some(namespace),
403 columns: desc
404 .map(|desc| desc.iter_names().map(|n| n.to_string()).collect())
405 .unwrap_or_default(),
406 },
407 })
408 .collect(),
409 }
410 }
411
412 pub(super) fn requested_source_exports<'a>(
414 &'a self,
415 requested: Option<&ExternalReferences>,
416 source_name: &UnresolvedItemName,
417 ) -> Result<Vec<RequestedSourceExport<&'a ReferenceMetadata>>, PlanError> {
418 let filtered: Vec<(&ReferenceMetadata, Option<&UnresolvedItemName>)> = match requested {
422 Some(ExternalReferences::All) => self.references.iter().map(|r| (r, None)).collect(),
423 Some(ExternalReferences::SubsetSchemas(schemas)) => {
424 let available_schemas: BTreeSet<&str> = self
425 .references
426 .iter()
427 .filter_map(|r| r.namespace())
428 .collect();
429 let requested_schemas: BTreeSet<&str> =
430 schemas.iter().map(|s| s.as_str()).collect();
431
432 let missing_schemas: Vec<_> = requested_schemas
433 .difference(&available_schemas)
434 .map(|s| s.to_string())
435 .collect();
436
437 if !missing_schemas.is_empty() {
438 Err(PlanError::NoTablesFoundForSchemas(missing_schemas))?
439 }
440
441 self.references
442 .iter()
443 .filter_map(|reference| {
444 reference
445 .namespace()
446 .map(|namespace| {
447 requested_schemas
448 .contains(namespace)
449 .then_some((reference, None))
450 })
451 .flatten()
452 })
453 .collect()
454 }
455 Some(ExternalReferences::SubsetTables(requested_tables)) => {
456 requested_tables
459 .into_iter()
460 .map(|requested_table| {
461 let idx = self.resolver.resolve_idx(&requested_table.reference.0)?;
462 Ok((&self.references[idx], requested_table.alias.as_ref()))
463 })
464 .collect::<Result<Vec<_>, PlanError>>()?
465 }
466 None => {
467 if self.references.len() != 1 {
470 Err(ExternalReferenceResolutionError::Ambiguous {
471 name: "".to_string(),
472 })?
473 }
474 vec![(&self.references[0], None)]
475 }
476 };
477
478 filtered
480 .into_iter()
481 .map(|(reference, alias)| {
482 let name = match alias {
483 Some(alias_name) => {
484 let partial = crate::normalize::unresolved_item_name(alias_name.clone())?;
485 match partial.schema {
486 Some(_) => alias_name.clone(),
487 None => super::source_export_name_gen(source_name, &partial.item)?,
490 }
491 }
492 None => {
493 super::source_export_name_gen(source_name, reference.name())?
497 }
498 };
499
500 Ok(RequestedSourceExport {
501 external_reference: reference.external_reference()?,
502 name,
503 meta: reference,
504 })
505 })
506 .collect::<Result<Vec<_>, PlanError>>()
507 }
508
509 pub(super) fn resolve_name(&self, name: &[Ident]) -> Result<&ReferenceMetadata, PlanError> {
510 let idx = self.resolver.resolve_idx(name)?;
511 Ok(&self.references[idx])
512 }
513
514 pub(super) fn all_references(&self) -> &Vec<ReferenceMetadata> {
515 &self.references
516 }
517}