Skip to main content

mz_sql/pure/
references.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::{BTreeMap, BTreeSet};
11use std::ops::DerefMut;
12use std::sync::Arc;
13
14use mz_ore::now::SYSTEM_TIME;
15use mz_repr::RelationDesc;
16use mz_sql_parser::ast::{ExternalReferences, Ident, IdentError, UnresolvedItemName};
17use mz_sql_server_util::SqlServerError;
18use mz_sql_server_util::desc::SqlServerTableRaw;
19use mz_storage_types::sources::load_generator::{
20    LOAD_GENERATOR_DATABASE_NAME, LoadGenerator, LoadGeneratorOutput,
21};
22use mz_storage_types::sources::{ExternalReferenceResolutionError, SourceReferenceResolver};
23
24use crate::names::{FullItemName, RawDatabaseSpecifier};
25use crate::plan::{PlanError, SourceReference, SourceReferences};
26
27use super::{RequestedSourceExport, error::PgSourcePurificationError};
28
29/// A client that allows determining all available source references and resolving
30/// them to a user-specified source reference during purification.
31pub(super) enum SourceReferenceClient<'a> {
32    Postgres {
33        client: &'a mz_postgres_util::Client,
34        publication: &'a str,
35        database: &'a str,
36    },
37    MySql {
38        conn: &'a mut mz_mysql_util::MySqlConn,
39        /// Sets whether to include tables from the built-in system schemas in the
40        /// retrieved references.
41        include_system_schemas: bool,
42    },
43    SqlServer {
44        client: &'a mut mz_sql_server_util::Client,
45        database: Arc<str>,
46    },
47    Kafka {
48        topic: &'a str,
49    },
50    LoadGenerator {
51        generator: &'a LoadGenerator,
52    },
53}
54
55/// Metadata about an available source reference retrieved from the upstream system.
56#[derive(Clone, Debug)]
57pub(super) enum ReferenceMetadata {
58    Postgres {
59        table: mz_postgres_util::desc::PostgresTableDesc,
60        database: String,
61    },
62    MySql(mz_mysql_util::MySqlTableSchema),
63    SqlServer {
64        table: mz_sql_server_util::desc::SqlServerTableDesc,
65        database: Arc<str>,
66        capture_instance: Arc<str>,
67    },
68    Kafka(String),
69    LoadGenerator {
70        name: String,
71        desc: Option<RelationDesc>,
72        namespace: String,
73        output: LoadGeneratorOutput,
74    },
75}
76
77impl ReferenceMetadata {
78    fn namespace(&self) -> Option<&str> {
79        match self {
80            ReferenceMetadata::Postgres { table, .. } => Some(&table.namespace),
81            ReferenceMetadata::MySql(table) => Some(&table.schema_name),
82            ReferenceMetadata::SqlServer { table, .. } => Some(table.schema_name.as_ref()),
83            ReferenceMetadata::Kafka(_) => None,
84            ReferenceMetadata::LoadGenerator { namespace, .. } => Some(namespace),
85        }
86    }
87
88    fn name(&self) -> &str {
89        match self {
90            ReferenceMetadata::Postgres { table, .. } => &table.name,
91            ReferenceMetadata::MySql(table) => &table.name,
92            ReferenceMetadata::SqlServer { table, .. } => table.name.as_ref(),
93            ReferenceMetadata::Kafka(topic) => topic,
94            ReferenceMetadata::LoadGenerator { name, .. } => name,
95        }
96    }
97
98    pub(super) fn postgres_desc(&self) -> Option<&mz_postgres_util::desc::PostgresTableDesc> {
99        match self {
100            ReferenceMetadata::Postgres { table, .. } => Some(table),
101            _ => None,
102        }
103    }
104
105    pub(super) fn mysql_table(&self) -> Option<&mz_mysql_util::MySqlTableSchema> {
106        match self {
107            ReferenceMetadata::MySql(table) => Some(table),
108            _ => None,
109        }
110    }
111
112    pub(super) fn sql_server_table(&self) -> Option<&mz_sql_server_util::desc::SqlServerTableDesc> {
113        match self {
114            ReferenceMetadata::SqlServer { table, .. } => Some(table),
115            _ => None,
116        }
117    }
118
119    pub(super) fn sql_server_capture_instance(&self) -> Option<&Arc<str>> {
120        match self {
121            ReferenceMetadata::SqlServer {
122                capture_instance, ..
123            } => Some(capture_instance),
124            _ => None,
125        }
126    }
127
128    pub(super) fn load_generator_desc(&self) -> Option<&Option<RelationDesc>> {
129        match self {
130            ReferenceMetadata::LoadGenerator { desc, .. } => Some(desc),
131            _ => None,
132        }
133    }
134
135    pub(super) fn load_generator_output(&self) -> Option<&LoadGeneratorOutput> {
136        match self {
137            ReferenceMetadata::LoadGenerator { output, .. } => Some(output),
138            _ => None,
139        }
140    }
141
142    /// Convert the reference metadata into an `UnresolvedItemName` representing the
143    /// external reference, normalized for each source type to be stored as part of
144    /// the relevant statement in the catalog.
145    pub(super) fn external_reference(&self) -> Result<UnresolvedItemName, IdentError> {
146        match self {
147            ReferenceMetadata::Postgres { table, database } => {
148                Ok(UnresolvedItemName::qualified(&[
149                    Ident::new(database)?,
150                    Ident::new(&table.namespace)?,
151                    Ident::new(&table.name)?,
152                ]))
153            }
154            ReferenceMetadata::MySql(table) => Ok(UnresolvedItemName::qualified(&[
155                Ident::new(&table.schema_name)?,
156                Ident::new(&table.name)?,
157            ])),
158            ReferenceMetadata::SqlServer {
159                table,
160                database,
161                capture_instance: _,
162            } => Ok(UnresolvedItemName::qualified(&[
163                Ident::new(database.as_ref())?,
164                Ident::new(table.schema_name.as_ref())?,
165                Ident::new(table.name.as_ref())?,
166            ])),
167            ReferenceMetadata::Kafka(topic) => {
168                Ok(UnresolvedItemName::qualified(&[Ident::new(topic)?]))
169            }
170            ReferenceMetadata::LoadGenerator {
171                name, namespace, ..
172            } => {
173                let name = FullItemName {
174                    database: RawDatabaseSpecifier::Name(LOAD_GENERATOR_DATABASE_NAME.to_owned()),
175                    schema: namespace.to_string(),
176                    item: name.to_string(),
177                };
178                Ok(UnresolvedItemName::from(name))
179            }
180        }
181    }
182}
183
184/// A set of resolved source references.
185#[derive(Clone, Debug)]
186pub(super) struct RetrievedSourceReferences {
187    updated_at: u64,
188    references: Vec<ReferenceMetadata>,
189    resolver: SourceReferenceResolver,
190}
191
192/// The name of the fake database used to fit references into the 3-layer catalog
193/// model of the [`SourceReferenceResolver`] for source types whose
194/// [`ReferenceMetadata::external_reference`] stores no database component
195/// (MySQL and Kafka). Because those references are never fully qualified with a
196/// database, the resolver's database name is never matched against and this
197/// placeholder is never stored in the catalog.
198///
199/// Note: this is *not* usable for every non-Postgres source. SQL Server and load
200/// generators do store a database in their external reference (the real upstream
201/// database and the synthetic `mz_load_generators`, respectively), so their
202/// resolvers must be built with that same database for the stored reference to
203/// resolve. See the resolver construction in [`SourceReferenceClient`].
204pub(crate) static DATABASE_FAKE_NAME: &str = "database";
205
206impl<'a> SourceReferenceClient<'a> {
207    /// Get all available source references from the upstream system
208    /// and return a `RetrievedSourceReferences` object that can be used
209    /// to resolve user-specified source references and create `SourceReferences`
210    /// for storage in the catalog.
211    pub(super) async fn get_source_references(
212        mut self,
213    ) -> Result<RetrievedSourceReferences, PlanError> {
214        let references = match self {
215            SourceReferenceClient::Postgres {
216                client,
217                publication,
218                database,
219            } => {
220                let tables = mz_postgres_util::publication_info(client, publication, None).await?;
221
222                if tables.is_empty() {
223                    Err(PgSourcePurificationError::EmptyPublication(
224                        publication.to_string(),
225                    ))?;
226                }
227
228                tables
229                    .into_iter()
230                    .map(|(_oid, desc)| ReferenceMetadata::Postgres {
231                        table: desc,
232                        database: database.to_string(),
233                    })
234                    .collect()
235            }
236            SourceReferenceClient::MySql {
237                ref mut conn,
238                include_system_schemas,
239            } => {
240                let request = if include_system_schemas {
241                    mz_mysql_util::SchemaRequest::AllWithSystemSchemas
242                } else {
243                    mz_mysql_util::SchemaRequest::All
244                };
245                // NOTE: mysql will only expose the schemas of tables we have at least one privilege on
246                // and we can't tell if a table exists without a privilege
247                let tables = mz_mysql_util::schema_info((*conn).deref_mut(), &request).await?;
248
249                tables.into_iter().map(ReferenceMetadata::MySql).collect()
250            }
251            SourceReferenceClient::SqlServer {
252                ref mut client,
253                ref database,
254            } => {
255                let tables = mz_sql_server_util::inspect::get_tables(client).await?;
256
257                let mut unique_tables: BTreeMap<(Arc<str>, Arc<str>), SqlServerTableRaw> =
258                    BTreeMap::default();
259                for table in tables {
260                    let key = (Arc::clone(&table.schema_name), Arc::clone(&table.name));
261
262                    unique_tables
263                        .entry(key)
264                        .and_modify(|chosen_table: &mut SqlServerTableRaw| {
265                            // When multiple capture instances exist for the same table,
266                            // we select deterministically based on:
267                            // 1. Most recent create_date (newer capture instance)
268                            // 2. If dates are equal, lexicographically greatest capture_instance name
269                            if chosen_table.capture_instance.create_date
270                                < table.capture_instance.create_date
271                                || (chosen_table.capture_instance.create_date
272                                    == table.capture_instance.create_date
273                                    && chosen_table.capture_instance.name
274                                        < table.capture_instance.name)
275                            {
276                                *chosen_table = table.clone();
277                            }
278                        })
279                        .or_insert(table);
280                }
281
282                let mut constraints_by_table =
283                    mz_sql_server_util::inspect::get_constraints_for_tables(
284                        client,
285                        unique_tables.keys(),
286                    )
287                    .await?;
288                unique_tables
289                    .into_iter()
290                    .map(|(qualified_table_name, raw_table)| {
291                        let constraints = constraints_by_table
292                            .remove(&qualified_table_name)
293                            .unwrap_or_default();
294                        let capture_instance = Arc::clone(&raw_table.capture_instance.name);
295                        let database = Arc::clone(database);
296                        let table = mz_sql_server_util::desc::SqlServerTableDesc::new(
297                            raw_table,
298                            constraints,
299                        )?;
300                        Ok(ReferenceMetadata::SqlServer {
301                            table,
302                            database,
303                            capture_instance,
304                        })
305                    })
306                    .collect::<Result<_, SqlServerError>>()?
307            }
308            SourceReferenceClient::Kafka { topic } => {
309                vec![ReferenceMetadata::Kafka(topic.to_string())]
310            }
311            SourceReferenceClient::LoadGenerator { generator } => {
312                let mut references = generator
313                    .views()
314                    .into_iter()
315                    .map(
316                        |(view, relation, output)| ReferenceMetadata::LoadGenerator {
317                            name: view.to_string(),
318                            desc: Some(relation),
319                            namespace: generator.schema_name().to_string(),
320                            output,
321                        },
322                    )
323                    .collect::<Vec<_>>();
324
325                if references.is_empty() {
326                    // If there are no views then this load-generator just has a single output
327                    // that uses the load-generator's schema name.
328                    references.push(ReferenceMetadata::LoadGenerator {
329                        name: generator.schema_name().to_string(),
330                        desc: None,
331                        namespace: generator.schema_name().to_string(),
332                        output: LoadGeneratorOutput::Default,
333                    });
334                }
335                references
336            }
337        };
338
339        let reference_names: Vec<(&str, &str)> = references
340            .iter()
341            .map(|reference| {
342                (
343                    reference.namespace().unwrap_or_else(|| reference.name()),
344                    reference.name(),
345                )
346            })
347            .collect();
348        // The resolver's database must match the database that each source type
349        // embeds in its `ReferenceMetadata::external_reference()`, otherwise the
350        // fully-qualified reference we store (and print in `SHOW CREATE TABLE`)
351        // won't resolve when fed back in. Postgres and SQL Server store the real
352        // upstream database; load generators store the synthetic
353        // `mz_load_generators` database. MySQL and Kafka store no database
354        // component, so the resolver's database is never matched against and the
355        // fake name is fine.
356        let resolver = match self {
357            SourceReferenceClient::Postgres { database, .. } => {
358                SourceReferenceResolver::new(database, &reference_names)
359            }
360            SourceReferenceClient::SqlServer { database, .. } => {
361                SourceReferenceResolver::new(&database, &reference_names)
362            }
363            SourceReferenceClient::LoadGenerator { .. } => {
364                SourceReferenceResolver::new(LOAD_GENERATOR_DATABASE_NAME, &reference_names)
365            }
366            SourceReferenceClient::MySql { .. } | SourceReferenceClient::Kafka { .. } => {
367                SourceReferenceResolver::new(DATABASE_FAKE_NAME, &reference_names)
368            }
369        }?;
370
371        Ok(RetrievedSourceReferences {
372            updated_at: SYSTEM_TIME(),
373            references,
374            resolver,
375        })
376    }
377}
378
379impl RetrievedSourceReferences {
380    /// Convert the resolved source references into a `SourceReferences` object
381    /// for storage in the catalog.
382    pub(super) fn available_source_references(self) -> SourceReferences {
383        SourceReferences {
384            updated_at: self.updated_at,
385            references: self
386                .references
387                .into_iter()
388                .map(|reference| match reference {
389                    ReferenceMetadata::Postgres { table, .. } => SourceReference {
390                        name: table.name,
391                        namespace: Some(table.namespace),
392                        columns: table.columns.into_iter().map(|c| c.name).collect(),
393                    },
394                    ReferenceMetadata::MySql(table) => SourceReference {
395                        name: table.name,
396                        namespace: Some(table.schema_name),
397                        columns: table
398                            .columns
399                            .into_iter()
400                            .map(|column| column.name())
401                            .collect(),
402                    },
403                    ReferenceMetadata::SqlServer { table, .. } => SourceReference {
404                        name: table.name.to_string(),
405                        namespace: Some(table.schema_name.to_string()),
406                        columns: table
407                            .columns
408                            .into_iter()
409                            .map(|c| c.name.to_string())
410                            .collect(),
411                    },
412                    ReferenceMetadata::Kafka(topic) => SourceReference {
413                        name: topic,
414                        namespace: None,
415                        columns: vec![],
416                    },
417                    ReferenceMetadata::LoadGenerator {
418                        name,
419                        desc,
420                        namespace,
421                        ..
422                    } => SourceReference {
423                        name,
424                        namespace: Some(namespace),
425                        columns: desc
426                            .map(|desc| desc.iter_names().map(|n| n.to_string()).collect())
427                            .unwrap_or_default(),
428                    },
429                })
430                .collect(),
431        }
432    }
433
434    /// Resolve the requested external references to their appropriate source exports.
435    pub(super) fn requested_source_exports<'a>(
436        &'a self,
437        requested: Option<&ExternalReferences>,
438        source_name: &UnresolvedItemName,
439    ) -> Result<Vec<RequestedSourceExport<&'a ReferenceMetadata>>, PlanError> {
440        // Filter all available references to those requested by the `ExternalReferences`
441        // specification and include any alias that the user has specified.
442        // TODO(database-issues#8620): The alias handling can be removed once subsources are removed.
443        let filtered: Vec<(&ReferenceMetadata, Option<&UnresolvedItemName>)> = match requested {
444            Some(ExternalReferences::All) => self.references.iter().map(|r| (r, None)).collect(),
445            Some(ExternalReferences::SubsetSchemas(schemas)) => {
446                let available_schemas: BTreeSet<&str> = self
447                    .references
448                    .iter()
449                    .filter_map(|r| r.namespace())
450                    .collect();
451                let requested_schemas: BTreeSet<&str> =
452                    schemas.iter().map(|s| s.as_str()).collect();
453
454                let missing_schemas: Vec<_> = requested_schemas
455                    .difference(&available_schemas)
456                    .map(|s| s.to_string())
457                    .collect();
458
459                if !missing_schemas.is_empty() {
460                    Err(PlanError::NoTablesFoundForSchemas(missing_schemas))?
461                }
462
463                self.references
464                    .iter()
465                    .filter_map(|reference| {
466                        reference
467                            .namespace()
468                            .map(|namespace| {
469                                requested_schemas
470                                    .contains(namespace)
471                                    .then_some((reference, None))
472                            })
473                            .flatten()
474                    })
475                    .collect()
476            }
477            Some(ExternalReferences::SubsetTables(requested_tables)) => {
478                // Use the `SourceReferenceResolver` to resolve the requested tables to their
479                // appropriate index in the available references.
480                requested_tables
481                    .into_iter()
482                    .map(|requested_table| {
483                        let idx = self.resolver.resolve_idx(&requested_table.reference.0)?;
484                        Ok((&self.references[idx], requested_table.alias.as_ref()))
485                    })
486                    .collect::<Result<Vec<_>, PlanError>>()?
487            }
488            None => {
489                // If no reference is requested we must validate that only one reference is
490                // available, else we cannot determine which reference the user is referring to.
491                if self.references.len() != 1 {
492                    Err(ExternalReferenceResolutionError::Ambiguous {
493                        name: "".to_string(),
494                    })?
495                }
496                vec![(&self.references[0], None)]
497            }
498        };
499
500        // Convert the filtered references to their appropriate `RequestedSourceExport` form.
501        filtered
502            .into_iter()
503            .map(|(reference, alias)| {
504                let name = match alias {
505                    Some(alias_name) => {
506                        let partial = crate::normalize::unresolved_item_name(alias_name.clone())?;
507                        match partial.schema {
508                            Some(_) => alias_name.clone(),
509                            // In cases when a prefix is not provided for the aliased name
510                            // fallback to using the schema of the source with the given name
511                            None => super::source_export_name_gen(source_name, &partial.item)?,
512                        }
513                    }
514                    None => {
515                        // Just use the item name for this reference and ensure it's created in
516                        // the current schema or the source's schema if provided, not mirroring
517                        // the schema of the reference.
518                        super::source_export_name_gen(source_name, reference.name())?
519                    }
520                };
521
522                Ok(RequestedSourceExport {
523                    external_reference: reference.external_reference()?,
524                    name,
525                    meta: reference,
526                })
527            })
528            .collect::<Result<Vec<_>, PlanError>>()
529    }
530
531    pub(super) fn resolve_name(&self, name: &[Ident]) -> Result<&ReferenceMetadata, PlanError> {
532        let idx = self.resolver.resolve_idx(name)?;
533        Ok(&self.references[idx])
534    }
535
536    pub(super) fn all_references(&self) -> &Vec<ReferenceMetadata> {
537        &self.references
538    }
539}