mz_sql/pure/
references.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::{BTreeMap, BTreeSet};
11use std::ops::DerefMut;
12use std::sync::Arc;
13
14use mz_ore::now::SYSTEM_TIME;
15use mz_repr::RelationDesc;
16use mz_sql_parser::ast::{ExternalReferences, Ident, IdentError, UnresolvedItemName};
17use mz_storage_types::sources::load_generator::{LoadGenerator, LoadGeneratorOutput};
18use mz_storage_types::sources::{ExternalReferenceResolutionError, SourceReferenceResolver};
19
20use crate::names::{FullItemName, RawDatabaseSpecifier};
21use crate::plan::{PlanError, SourceReference, SourceReferences};
22
23use super::{RequestedSourceExport, error::PgSourcePurificationError};
24
25/// A client that allows determining all available source references and resolving
26/// them to a user-specified source reference during purification.
27pub(super) enum SourceReferenceClient<'a> {
28    Postgres {
29        client: &'a mz_postgres_util::Client,
30        publication: &'a str,
31        database: &'a str,
32    },
33    MySql {
34        conn: &'a mut mz_mysql_util::MySqlConn,
35        /// Sets whether to include tables from the built-in system schemas in the
36        /// retrieved references.
37        include_system_schemas: bool,
38    },
39    SqlServer {
40        client: &'a mut mz_sql_server_util::Client,
41        database: Arc<str>,
42    },
43    Kafka {
44        topic: &'a str,
45    },
46    LoadGenerator {
47        generator: &'a LoadGenerator,
48    },
49}
50
51/// Metadata about an available source reference retrieved from the upstream system.
52#[derive(Clone, Debug)]
53pub(super) enum ReferenceMetadata {
54    Postgres {
55        table: mz_postgres_util::desc::PostgresTableDesc,
56        database: String,
57    },
58    MySql(mz_mysql_util::MySqlTableSchema),
59    SqlServer {
60        table: mz_sql_server_util::desc::SqlServerTableDesc,
61        database: Arc<str>,
62        capture_instance: Arc<str>,
63    },
64    Kafka(String),
65    LoadGenerator {
66        name: String,
67        desc: Option<RelationDesc>,
68        namespace: String,
69        output: LoadGeneratorOutput,
70    },
71}
72
73impl ReferenceMetadata {
74    fn namespace(&self) -> Option<&str> {
75        match self {
76            ReferenceMetadata::Postgres { table, .. } => Some(&table.namespace),
77            ReferenceMetadata::MySql(table) => Some(&table.schema_name),
78            ReferenceMetadata::SqlServer { table, .. } => Some(table.schema_name.as_ref()),
79            ReferenceMetadata::Kafka(_) => None,
80            ReferenceMetadata::LoadGenerator { namespace, .. } => Some(namespace),
81        }
82    }
83
84    fn name(&self) -> &str {
85        match self {
86            ReferenceMetadata::Postgres { table, .. } => &table.name,
87            ReferenceMetadata::MySql(table) => &table.name,
88            ReferenceMetadata::SqlServer { table, .. } => table.name.as_ref(),
89            ReferenceMetadata::Kafka(topic) => topic,
90            ReferenceMetadata::LoadGenerator { name, .. } => name,
91        }
92    }
93
94    pub(super) fn postgres_desc(&self) -> Option<&mz_postgres_util::desc::PostgresTableDesc> {
95        match self {
96            ReferenceMetadata::Postgres { table, .. } => Some(table),
97            _ => None,
98        }
99    }
100
101    pub(super) fn mysql_table(&self) -> Option<&mz_mysql_util::MySqlTableSchema> {
102        match self {
103            ReferenceMetadata::MySql(table) => Some(table),
104            _ => None,
105        }
106    }
107
108    pub(super) fn sql_server_table(&self) -> Option<&mz_sql_server_util::desc::SqlServerTableDesc> {
109        match self {
110            ReferenceMetadata::SqlServer { table, .. } => Some(table),
111            _ => None,
112        }
113    }
114
115    pub(super) fn sql_server_capture_instance(&self) -> Option<&Arc<str>> {
116        match self {
117            ReferenceMetadata::SqlServer {
118                capture_instance, ..
119            } => Some(capture_instance),
120            _ => None,
121        }
122    }
123
124    pub(super) fn load_generator_desc(&self) -> Option<&Option<RelationDesc>> {
125        match self {
126            ReferenceMetadata::LoadGenerator { desc, .. } => Some(desc),
127            _ => None,
128        }
129    }
130
131    pub(super) fn load_generator_output(&self) -> Option<&LoadGeneratorOutput> {
132        match self {
133            ReferenceMetadata::LoadGenerator { output, .. } => Some(output),
134            _ => None,
135        }
136    }
137
138    /// Convert the reference metadata into an `UnresolvedItemName` representing the
139    /// external reference, normalized for each source type to be stored as part of
140    /// the relevant statement in the catalog.
141    pub(super) fn external_reference(&self) -> Result<UnresolvedItemName, IdentError> {
142        match self {
143            ReferenceMetadata::Postgres { table, database } => {
144                Ok(UnresolvedItemName::qualified(&[
145                    Ident::new(database)?,
146                    Ident::new(&table.namespace)?,
147                    Ident::new(&table.name)?,
148                ]))
149            }
150            ReferenceMetadata::MySql(table) => Ok(UnresolvedItemName::qualified(&[
151                Ident::new(&table.schema_name)?,
152                Ident::new(&table.name)?,
153            ])),
154            ReferenceMetadata::SqlServer {
155                table,
156                database,
157                capture_instance: _,
158            } => Ok(UnresolvedItemName::qualified(&[
159                Ident::new(database.as_ref())?,
160                Ident::new(table.schema_name.as_ref())?,
161                Ident::new(table.name.as_ref())?,
162            ])),
163            ReferenceMetadata::Kafka(topic) => {
164                Ok(UnresolvedItemName::qualified(&[Ident::new(topic)?]))
165            }
166            ReferenceMetadata::LoadGenerator {
167                name, namespace, ..
168            } => {
169                let name = FullItemName {
170                    database: RawDatabaseSpecifier::Name(
171                        mz_storage_types::sources::load_generator::LOAD_GENERATOR_DATABASE_NAME
172                            .to_owned(),
173                    ),
174                    schema: namespace.to_string(),
175                    item: name.to_string(),
176                };
177                Ok(UnresolvedItemName::from(name))
178            }
179        }
180    }
181}
182
183/// A set of resolved source references.
184#[derive(Clone, Debug)]
185pub(super) struct RetrievedSourceReferences {
186    updated_at: u64,
187    references: Vec<ReferenceMetadata>,
188    resolver: SourceReferenceResolver,
189}
190
191/// The name of the fake database that we use for non-Postgres sources
192/// to fit the model of a 3-layer catalog used to resolve references
193/// in the `SourceReferenceResolver`. This isn't actually stored in
194/// the catalog since the `ReferenceMetadata::external_reference`
195/// method only includes the database name for Postgres sources.
196pub(crate) static DATABASE_FAKE_NAME: &str = "database";
197
198impl<'a> SourceReferenceClient<'a> {
199    /// Get all available source references from the upstream system
200    /// and return a `RetrievedSourceReferences` object that can be used
201    /// to resolve user-specified source references and create `SourceReferences`
202    /// for storage in the catalog.
203    pub(super) async fn get_source_references(
204        mut self,
205    ) -> Result<RetrievedSourceReferences, PlanError> {
206        let references = match self {
207            SourceReferenceClient::Postgres {
208                client,
209                publication,
210                database,
211            } => {
212                let tables = mz_postgres_util::publication_info(client, publication, None).await?;
213
214                if tables.is_empty() {
215                    Err(PgSourcePurificationError::EmptyPublication(
216                        publication.to_string(),
217                    ))?;
218                }
219
220                tables
221                    .into_iter()
222                    .map(|(_oid, desc)| ReferenceMetadata::Postgres {
223                        table: desc,
224                        database: database.to_string(),
225                    })
226                    .collect()
227            }
228            SourceReferenceClient::MySql {
229                ref mut conn,
230                include_system_schemas,
231            } => {
232                let request = if include_system_schemas {
233                    mz_mysql_util::SchemaRequest::AllWithSystemSchemas
234                } else {
235                    mz_mysql_util::SchemaRequest::All
236                };
237                // NOTE: mysql will only expose the schemas of tables we have at least one privilege on
238                // and we can't tell if a table exists without a privilege
239                let tables = mz_mysql_util::schema_info((*conn).deref_mut(), &request).await?;
240
241                tables.into_iter().map(ReferenceMetadata::MySql).collect()
242            }
243            SourceReferenceClient::SqlServer {
244                ref mut client,
245                ref database,
246            } => {
247                let tables = mz_sql_server_util::inspect::get_tables(client).await?;
248
249                // TODO(sql_server2): Figure out how to handle a single table with
250                // multiple capture instances.
251                let mut unique_tables = BTreeMap::default();
252                for table in tables {
253                    let key = (Arc::clone(&table.schema_name), Arc::clone(&table.name));
254                    if unique_tables.contains_key(&key) {
255                        tracing::warn!(?table, "filtering out other instance of table");
256                    } else {
257                        unique_tables.insert(key, table);
258                    }
259                }
260
261                unique_tables
262                    .into_values()
263                    .map(|raw| {
264                        let capture_instance = Arc::clone(&raw.capture_instance);
265                        let database = Arc::clone(database);
266                        let table = mz_sql_server_util::desc::SqlServerTableDesc::new(raw);
267                        ReferenceMetadata::SqlServer {
268                            table,
269                            database,
270                            capture_instance,
271                        }
272                    })
273                    .collect()
274            }
275            SourceReferenceClient::Kafka { topic } => {
276                vec![ReferenceMetadata::Kafka(topic.to_string())]
277            }
278            SourceReferenceClient::LoadGenerator { generator } => {
279                let mut references = generator
280                    .views()
281                    .into_iter()
282                    .map(
283                        |(view, relation, output)| ReferenceMetadata::LoadGenerator {
284                            name: view.to_string(),
285                            desc: Some(relation),
286                            namespace: generator.schema_name().to_string(),
287                            output,
288                        },
289                    )
290                    .collect::<Vec<_>>();
291
292                if references.is_empty() {
293                    // If there are no views then this load-generator just has a single output
294                    // that uses the load-generator's schema name.
295                    references.push(ReferenceMetadata::LoadGenerator {
296                        name: generator.schema_name().to_string(),
297                        desc: None,
298                        namespace: generator.schema_name().to_string(),
299                        output: LoadGeneratorOutput::Default,
300                    });
301                }
302                references
303            }
304        };
305
306        let reference_names: Vec<(&str, &str)> = references
307            .iter()
308            .map(|reference| {
309                (
310                    reference.namespace().unwrap_or_else(|| reference.name()),
311                    reference.name(),
312                )
313            })
314            .collect();
315        let resolver = match self {
316            SourceReferenceClient::Postgres { database, .. } => {
317                SourceReferenceResolver::new(database, &reference_names)
318            }
319            _ => SourceReferenceResolver::new(DATABASE_FAKE_NAME, &reference_names),
320        }?;
321
322        Ok(RetrievedSourceReferences {
323            updated_at: SYSTEM_TIME(),
324            references,
325            resolver,
326        })
327    }
328}
329
330impl RetrievedSourceReferences {
331    /// Convert the resolved source references into a `SourceReferences` object
332    /// for storage in the catalog.
333    pub(super) fn available_source_references(self) -> SourceReferences {
334        SourceReferences {
335            updated_at: self.updated_at,
336            references: self
337                .references
338                .into_iter()
339                .map(|reference| match reference {
340                    ReferenceMetadata::Postgres { table, .. } => SourceReference {
341                        name: table.name,
342                        namespace: Some(table.namespace),
343                        columns: table.columns.into_iter().map(|c| c.name).collect(),
344                    },
345                    ReferenceMetadata::MySql(table) => SourceReference {
346                        name: table.name,
347                        namespace: Some(table.schema_name),
348                        columns: table
349                            .columns
350                            .into_iter()
351                            .map(|column| column.name())
352                            .collect(),
353                    },
354                    ReferenceMetadata::SqlServer { table, .. } => SourceReference {
355                        name: table.name.to_string(),
356                        namespace: Some(table.schema_name.to_string()),
357                        columns: table
358                            .columns
359                            .into_iter()
360                            .map(|c| c.name.to_string())
361                            .collect(),
362                    },
363                    ReferenceMetadata::Kafka(topic) => SourceReference {
364                        name: topic,
365                        namespace: None,
366                        columns: vec![],
367                    },
368                    ReferenceMetadata::LoadGenerator {
369                        name,
370                        desc,
371                        namespace,
372                        ..
373                    } => SourceReference {
374                        name,
375                        namespace: Some(namespace),
376                        columns: desc
377                            .map(|desc| desc.iter_names().map(|n| n.to_string()).collect())
378                            .unwrap_or_default(),
379                    },
380                })
381                .collect(),
382        }
383    }
384
385    /// Resolve the requested external references to their appropriate source exports.
386    pub(super) fn requested_source_exports<'a>(
387        &'a self,
388        requested: Option<&ExternalReferences>,
389        source_name: &UnresolvedItemName,
390    ) -> Result<Vec<RequestedSourceExport<&'a ReferenceMetadata>>, PlanError> {
391        // Filter all available references to those requested by the `ExternalReferences`
392        // specification and include any alias that the user has specified.
393        // TODO(database-issues#8620): The alias handling can be removed once subsources are removed.
394        let filtered: Vec<(&ReferenceMetadata, Option<&UnresolvedItemName>)> = match requested {
395            Some(ExternalReferences::All) => self.references.iter().map(|r| (r, None)).collect(),
396            Some(ExternalReferences::SubsetSchemas(schemas)) => {
397                let available_schemas: BTreeSet<&str> = self
398                    .references
399                    .iter()
400                    .filter_map(|r| r.namespace())
401                    .collect();
402                let requested_schemas: BTreeSet<&str> =
403                    schemas.iter().map(|s| s.as_str()).collect();
404
405                let missing_schemas: Vec<_> = requested_schemas
406                    .difference(&available_schemas)
407                    .map(|s| s.to_string())
408                    .collect();
409
410                if !missing_schemas.is_empty() {
411                    Err(PlanError::NoTablesFoundForSchemas(missing_schemas))?
412                }
413
414                self.references
415                    .iter()
416                    .filter_map(|reference| {
417                        reference
418                            .namespace()
419                            .map(|namespace| {
420                                requested_schemas
421                                    .contains(namespace)
422                                    .then_some((reference, None))
423                            })
424                            .flatten()
425                    })
426                    .collect()
427            }
428            Some(ExternalReferences::SubsetTables(requested_tables)) => {
429                // Use the `SourceReferenceResolver` to resolve the requested tables to their
430                // appropriate index in the available references.
431                requested_tables
432                    .into_iter()
433                    .map(|requested_table| {
434                        let idx = self.resolver.resolve_idx(&requested_table.reference.0)?;
435                        Ok((&self.references[idx], requested_table.alias.as_ref()))
436                    })
437                    .collect::<Result<Vec<_>, PlanError>>()?
438            }
439            None => {
440                // If no reference is requested we must validate that only one reference is
441                // available, else we cannot determine which reference the user is referring to.
442                if self.references.len() != 1 {
443                    Err(ExternalReferenceResolutionError::Ambiguous {
444                        name: "".to_string(),
445                    })?
446                }
447                vec![(&self.references[0], None)]
448            }
449        };
450
451        // Convert the filtered references to their appropriate `RequestedSourceExport` form.
452        filtered
453            .into_iter()
454            .map(|(reference, alias)| {
455                let name = match alias {
456                    Some(alias_name) => {
457                        let partial = crate::normalize::unresolved_item_name(alias_name.clone())?;
458                        match partial.schema {
459                            Some(_) => alias_name.clone(),
460                            // In cases when a prefix is not provided for the aliased name
461                            // fallback to using the schema of the source with the given name
462                            None => super::source_export_name_gen(source_name, &partial.item)?,
463                        }
464                    }
465                    None => {
466                        // Just use the item name for this reference and ensure it's created in
467                        // the current schema or the source's schema if provided, not mirroring
468                        // the schema of the reference.
469                        super::source_export_name_gen(source_name, reference.name())?
470                    }
471                };
472
473                Ok(RequestedSourceExport {
474                    external_reference: reference.external_reference()?,
475                    name,
476                    meta: reference,
477                })
478            })
479            .collect::<Result<Vec<_>, PlanError>>()
480    }
481
482    pub(super) fn resolve_name(&self, name: &[Ident]) -> Result<&ReferenceMetadata, PlanError> {
483        let idx = self.resolver.resolve_idx(name)?;
484        Ok(&self.references[idx])
485    }
486
487    pub(super) fn all_references(&self) -> &Vec<ReferenceMetadata> {
488        &self.references
489    }
490}