1use std::collections::{BTreeMap, BTreeSet};
11use std::ops::DerefMut;
12use std::sync::Arc;
13
14use mz_ore::now::SYSTEM_TIME;
15use mz_repr::RelationDesc;
16use mz_sql_parser::ast::{ExternalReferences, Ident, IdentError, UnresolvedItemName};
17use mz_sql_server_util::desc::SqlServerTableRaw;
18use mz_storage_types::sources::load_generator::{LoadGenerator, LoadGeneratorOutput};
19use mz_storage_types::sources::{ExternalReferenceResolutionError, SourceReferenceResolver};
20
21use crate::names::{FullItemName, RawDatabaseSpecifier};
22use crate::plan::{PlanError, SourceReference, SourceReferences};
23
24use super::{RequestedSourceExport, error::PgSourcePurificationError};
25
26pub(super) enum SourceReferenceClient<'a> {
29 Postgres {
30 client: &'a mz_postgres_util::Client,
31 publication: &'a str,
32 database: &'a str,
33 },
34 MySql {
35 conn: &'a mut mz_mysql_util::MySqlConn,
36 include_system_schemas: bool,
39 },
40 SqlServer {
41 client: &'a mut mz_sql_server_util::Client,
42 database: Arc<str>,
43 },
44 Kafka {
45 topic: &'a str,
46 },
47 LoadGenerator {
48 generator: &'a LoadGenerator,
49 },
50}
51
52#[derive(Clone, Debug)]
54pub(super) enum ReferenceMetadata {
55 Postgres {
56 table: mz_postgres_util::desc::PostgresTableDesc,
57 database: String,
58 },
59 MySql(mz_mysql_util::MySqlTableSchema),
60 SqlServer {
61 table: mz_sql_server_util::desc::SqlServerTableDesc,
62 database: Arc<str>,
63 capture_instance: Arc<str>,
64 },
65 Kafka(String),
66 LoadGenerator {
67 name: String,
68 desc: Option<RelationDesc>,
69 namespace: String,
70 output: LoadGeneratorOutput,
71 },
72}
73
74impl ReferenceMetadata {
75 fn namespace(&self) -> Option<&str> {
76 match self {
77 ReferenceMetadata::Postgres { table, .. } => Some(&table.namespace),
78 ReferenceMetadata::MySql(table) => Some(&table.schema_name),
79 ReferenceMetadata::SqlServer { table, .. } => Some(table.schema_name.as_ref()),
80 ReferenceMetadata::Kafka(_) => None,
81 ReferenceMetadata::LoadGenerator { namespace, .. } => Some(namespace),
82 }
83 }
84
85 fn name(&self) -> &str {
86 match self {
87 ReferenceMetadata::Postgres { table, .. } => &table.name,
88 ReferenceMetadata::MySql(table) => &table.name,
89 ReferenceMetadata::SqlServer { table, .. } => table.name.as_ref(),
90 ReferenceMetadata::Kafka(topic) => topic,
91 ReferenceMetadata::LoadGenerator { name, .. } => name,
92 }
93 }
94
95 pub(super) fn postgres_desc(&self) -> Option<&mz_postgres_util::desc::PostgresTableDesc> {
96 match self {
97 ReferenceMetadata::Postgres { table, .. } => Some(table),
98 _ => None,
99 }
100 }
101
102 pub(super) fn mysql_table(&self) -> Option<&mz_mysql_util::MySqlTableSchema> {
103 match self {
104 ReferenceMetadata::MySql(table) => Some(table),
105 _ => None,
106 }
107 }
108
109 pub(super) fn sql_server_table(&self) -> Option<&mz_sql_server_util::desc::SqlServerTableDesc> {
110 match self {
111 ReferenceMetadata::SqlServer { table, .. } => Some(table),
112 _ => None,
113 }
114 }
115
116 pub(super) fn sql_server_capture_instance(&self) -> Option<&Arc<str>> {
117 match self {
118 ReferenceMetadata::SqlServer {
119 capture_instance, ..
120 } => Some(capture_instance),
121 _ => None,
122 }
123 }
124
125 pub(super) fn load_generator_desc(&self) -> Option<&Option<RelationDesc>> {
126 match self {
127 ReferenceMetadata::LoadGenerator { desc, .. } => Some(desc),
128 _ => None,
129 }
130 }
131
132 pub(super) fn load_generator_output(&self) -> Option<&LoadGeneratorOutput> {
133 match self {
134 ReferenceMetadata::LoadGenerator { output, .. } => Some(output),
135 _ => None,
136 }
137 }
138
139 pub(super) fn external_reference(&self) -> Result<UnresolvedItemName, IdentError> {
143 match self {
144 ReferenceMetadata::Postgres { table, database } => {
145 Ok(UnresolvedItemName::qualified(&[
146 Ident::new(database)?,
147 Ident::new(&table.namespace)?,
148 Ident::new(&table.name)?,
149 ]))
150 }
151 ReferenceMetadata::MySql(table) => Ok(UnresolvedItemName::qualified(&[
152 Ident::new(&table.schema_name)?,
153 Ident::new(&table.name)?,
154 ])),
155 ReferenceMetadata::SqlServer {
156 table,
157 database,
158 capture_instance: _,
159 } => Ok(UnresolvedItemName::qualified(&[
160 Ident::new(database.as_ref())?,
161 Ident::new(table.schema_name.as_ref())?,
162 Ident::new(table.name.as_ref())?,
163 ])),
164 ReferenceMetadata::Kafka(topic) => {
165 Ok(UnresolvedItemName::qualified(&[Ident::new(topic)?]))
166 }
167 ReferenceMetadata::LoadGenerator {
168 name, namespace, ..
169 } => {
170 let name = FullItemName {
171 database: RawDatabaseSpecifier::Name(
172 mz_storage_types::sources::load_generator::LOAD_GENERATOR_DATABASE_NAME
173 .to_owned(),
174 ),
175 schema: namespace.to_string(),
176 item: name.to_string(),
177 };
178 Ok(UnresolvedItemName::from(name))
179 }
180 }
181 }
182}
183
184#[derive(Clone, Debug)]
186pub(super) struct RetrievedSourceReferences {
187 updated_at: u64,
188 references: Vec<ReferenceMetadata>,
189 resolver: SourceReferenceResolver,
190}
191
192pub(crate) static DATABASE_FAKE_NAME: &str = "database";
198
199impl<'a> SourceReferenceClient<'a> {
200 pub(super) async fn get_source_references(
205 mut self,
206 ) -> Result<RetrievedSourceReferences, PlanError> {
207 let references = match self {
208 SourceReferenceClient::Postgres {
209 client,
210 publication,
211 database,
212 } => {
213 let tables = mz_postgres_util::publication_info(client, publication, None).await?;
214
215 if tables.is_empty() {
216 Err(PgSourcePurificationError::EmptyPublication(
217 publication.to_string(),
218 ))?;
219 }
220
221 tables
222 .into_iter()
223 .map(|(_oid, desc)| ReferenceMetadata::Postgres {
224 table: desc,
225 database: database.to_string(),
226 })
227 .collect()
228 }
229 SourceReferenceClient::MySql {
230 ref mut conn,
231 include_system_schemas,
232 } => {
233 let request = if include_system_schemas {
234 mz_mysql_util::SchemaRequest::AllWithSystemSchemas
235 } else {
236 mz_mysql_util::SchemaRequest::All
237 };
238 let tables = mz_mysql_util::schema_info((*conn).deref_mut(), &request).await?;
241
242 tables.into_iter().map(ReferenceMetadata::MySql).collect()
243 }
244 SourceReferenceClient::SqlServer {
245 ref mut client,
246 ref database,
247 } => {
248 let tables = mz_sql_server_util::inspect::get_tables(client).await?;
249
250 let mut unique_tables: BTreeMap<(Arc<str>, Arc<str>), SqlServerTableRaw> =
251 BTreeMap::default();
252 for table in tables {
253 let key = (Arc::clone(&table.schema_name), Arc::clone(&table.name));
254
255 unique_tables
256 .entry(key)
257 .and_modify(|chosen_table: &mut SqlServerTableRaw| {
258 if chosen_table.capture_instance.create_date
263 < table.capture_instance.create_date
264 || (chosen_table.capture_instance.create_date
265 == table.capture_instance.create_date
266 && chosen_table.capture_instance.name
267 < table.capture_instance.name)
268 {
269 *chosen_table = table.clone();
270 }
271 })
272 .or_insert(table);
273 }
274
275 unique_tables
276 .into_values()
277 .map(|raw| {
278 let capture_instance = Arc::clone(&raw.capture_instance.name);
279 let database = Arc::clone(database);
280 let table = mz_sql_server_util::desc::SqlServerTableDesc::new(raw);
281 ReferenceMetadata::SqlServer {
282 table,
283 database,
284 capture_instance,
285 }
286 })
287 .collect()
288 }
289 SourceReferenceClient::Kafka { topic } => {
290 vec![ReferenceMetadata::Kafka(topic.to_string())]
291 }
292 SourceReferenceClient::LoadGenerator { generator } => {
293 let mut references = generator
294 .views()
295 .into_iter()
296 .map(
297 |(view, relation, output)| ReferenceMetadata::LoadGenerator {
298 name: view.to_string(),
299 desc: Some(relation),
300 namespace: generator.schema_name().to_string(),
301 output,
302 },
303 )
304 .collect::<Vec<_>>();
305
306 if references.is_empty() {
307 references.push(ReferenceMetadata::LoadGenerator {
310 name: generator.schema_name().to_string(),
311 desc: None,
312 namespace: generator.schema_name().to_string(),
313 output: LoadGeneratorOutput::Default,
314 });
315 }
316 references
317 }
318 };
319
320 let reference_names: Vec<(&str, &str)> = references
321 .iter()
322 .map(|reference| {
323 (
324 reference.namespace().unwrap_or_else(|| reference.name()),
325 reference.name(),
326 )
327 })
328 .collect();
329 let resolver = match self {
330 SourceReferenceClient::Postgres { database, .. } => {
331 SourceReferenceResolver::new(database, &reference_names)
332 }
333 _ => SourceReferenceResolver::new(DATABASE_FAKE_NAME, &reference_names),
334 }?;
335
336 Ok(RetrievedSourceReferences {
337 updated_at: SYSTEM_TIME(),
338 references,
339 resolver,
340 })
341 }
342}
343
344impl RetrievedSourceReferences {
345 pub(super) fn available_source_references(self) -> SourceReferences {
348 SourceReferences {
349 updated_at: self.updated_at,
350 references: self
351 .references
352 .into_iter()
353 .map(|reference| match reference {
354 ReferenceMetadata::Postgres { table, .. } => SourceReference {
355 name: table.name,
356 namespace: Some(table.namespace),
357 columns: table.columns.into_iter().map(|c| c.name).collect(),
358 },
359 ReferenceMetadata::MySql(table) => SourceReference {
360 name: table.name,
361 namespace: Some(table.schema_name),
362 columns: table
363 .columns
364 .into_iter()
365 .map(|column| column.name())
366 .collect(),
367 },
368 ReferenceMetadata::SqlServer { table, .. } => SourceReference {
369 name: table.name.to_string(),
370 namespace: Some(table.schema_name.to_string()),
371 columns: table
372 .columns
373 .into_iter()
374 .map(|c| c.name.to_string())
375 .collect(),
376 },
377 ReferenceMetadata::Kafka(topic) => SourceReference {
378 name: topic,
379 namespace: None,
380 columns: vec![],
381 },
382 ReferenceMetadata::LoadGenerator {
383 name,
384 desc,
385 namespace,
386 ..
387 } => SourceReference {
388 name,
389 namespace: Some(namespace),
390 columns: desc
391 .map(|desc| desc.iter_names().map(|n| n.to_string()).collect())
392 .unwrap_or_default(),
393 },
394 })
395 .collect(),
396 }
397 }
398
399 pub(super) fn requested_source_exports<'a>(
401 &'a self,
402 requested: Option<&ExternalReferences>,
403 source_name: &UnresolvedItemName,
404 ) -> Result<Vec<RequestedSourceExport<&'a ReferenceMetadata>>, PlanError> {
405 let filtered: Vec<(&ReferenceMetadata, Option<&UnresolvedItemName>)> = match requested {
409 Some(ExternalReferences::All) => self.references.iter().map(|r| (r, None)).collect(),
410 Some(ExternalReferences::SubsetSchemas(schemas)) => {
411 let available_schemas: BTreeSet<&str> = self
412 .references
413 .iter()
414 .filter_map(|r| r.namespace())
415 .collect();
416 let requested_schemas: BTreeSet<&str> =
417 schemas.iter().map(|s| s.as_str()).collect();
418
419 let missing_schemas: Vec<_> = requested_schemas
420 .difference(&available_schemas)
421 .map(|s| s.to_string())
422 .collect();
423
424 if !missing_schemas.is_empty() {
425 Err(PlanError::NoTablesFoundForSchemas(missing_schemas))?
426 }
427
428 self.references
429 .iter()
430 .filter_map(|reference| {
431 reference
432 .namespace()
433 .map(|namespace| {
434 requested_schemas
435 .contains(namespace)
436 .then_some((reference, None))
437 })
438 .flatten()
439 })
440 .collect()
441 }
442 Some(ExternalReferences::SubsetTables(requested_tables)) => {
443 requested_tables
446 .into_iter()
447 .map(|requested_table| {
448 let idx = self.resolver.resolve_idx(&requested_table.reference.0)?;
449 Ok((&self.references[idx], requested_table.alias.as_ref()))
450 })
451 .collect::<Result<Vec<_>, PlanError>>()?
452 }
453 None => {
454 if self.references.len() != 1 {
457 Err(ExternalReferenceResolutionError::Ambiguous {
458 name: "".to_string(),
459 })?
460 }
461 vec![(&self.references[0], None)]
462 }
463 };
464
465 filtered
467 .into_iter()
468 .map(|(reference, alias)| {
469 let name = match alias {
470 Some(alias_name) => {
471 let partial = crate::normalize::unresolved_item_name(alias_name.clone())?;
472 match partial.schema {
473 Some(_) => alias_name.clone(),
474 None => super::source_export_name_gen(source_name, &partial.item)?,
477 }
478 }
479 None => {
480 super::source_export_name_gen(source_name, reference.name())?
484 }
485 };
486
487 Ok(RequestedSourceExport {
488 external_reference: reference.external_reference()?,
489 name,
490 meta: reference,
491 })
492 })
493 .collect::<Result<Vec<_>, PlanError>>()
494 }
495
496 pub(super) fn resolve_name(&self, name: &[Ident]) -> Result<&ReferenceMetadata, PlanError> {
497 let idx = self.resolver.resolve_idx(name)?;
498 Ok(&self.references[idx])
499 }
500
501 pub(super) fn all_references(&self) -> &Vec<ReferenceMetadata> {
502 &self.references
503 }
504}