mz_sql/pure/
references.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::{BTreeMap, BTreeSet};
11use std::ops::DerefMut;
12use std::sync::Arc;
13
14use mz_ore::now::SYSTEM_TIME;
15use mz_repr::RelationDesc;
16use mz_sql_parser::ast::{ExternalReferences, Ident, IdentError, UnresolvedItemName};
17use mz_sql_server_util::SqlServerError;
18use mz_sql_server_util::desc::SqlServerTableRaw;
19use mz_storage_types::sources::load_generator::{LoadGenerator, LoadGeneratorOutput};
20use mz_storage_types::sources::{ExternalReferenceResolutionError, SourceReferenceResolver};
21
22use crate::names::{FullItemName, RawDatabaseSpecifier};
23use crate::plan::{PlanError, SourceReference, SourceReferences};
24
25use super::{RequestedSourceExport, error::PgSourcePurificationError};
26
27/// A client that allows determining all available source references and resolving
28/// them to a user-specified source reference during purification.
29pub(super) enum SourceReferenceClient<'a> {
30    Postgres {
31        client: &'a mz_postgres_util::Client,
32        publication: &'a str,
33        database: &'a str,
34    },
35    MySql {
36        conn: &'a mut mz_mysql_util::MySqlConn,
37        /// Sets whether to include tables from the built-in system schemas in the
38        /// retrieved references.
39        include_system_schemas: bool,
40    },
41    SqlServer {
42        client: &'a mut mz_sql_server_util::Client,
43        database: Arc<str>,
44    },
45    Kafka {
46        topic: &'a str,
47    },
48    LoadGenerator {
49        generator: &'a LoadGenerator,
50    },
51}
52
53/// Metadata about an available source reference retrieved from the upstream system.
54#[derive(Clone, Debug)]
55pub(super) enum ReferenceMetadata {
56    Postgres {
57        table: mz_postgres_util::desc::PostgresTableDesc,
58        database: String,
59    },
60    MySql(mz_mysql_util::MySqlTableSchema),
61    SqlServer {
62        table: mz_sql_server_util::desc::SqlServerTableDesc,
63        database: Arc<str>,
64        capture_instance: Arc<str>,
65    },
66    Kafka(String),
67    LoadGenerator {
68        name: String,
69        desc: Option<RelationDesc>,
70        namespace: String,
71        output: LoadGeneratorOutput,
72    },
73}
74
75impl ReferenceMetadata {
76    fn namespace(&self) -> Option<&str> {
77        match self {
78            ReferenceMetadata::Postgres { table, .. } => Some(&table.namespace),
79            ReferenceMetadata::MySql(table) => Some(&table.schema_name),
80            ReferenceMetadata::SqlServer { table, .. } => Some(table.schema_name.as_ref()),
81            ReferenceMetadata::Kafka(_) => None,
82            ReferenceMetadata::LoadGenerator { namespace, .. } => Some(namespace),
83        }
84    }
85
86    fn name(&self) -> &str {
87        match self {
88            ReferenceMetadata::Postgres { table, .. } => &table.name,
89            ReferenceMetadata::MySql(table) => &table.name,
90            ReferenceMetadata::SqlServer { table, .. } => table.name.as_ref(),
91            ReferenceMetadata::Kafka(topic) => topic,
92            ReferenceMetadata::LoadGenerator { name, .. } => name,
93        }
94    }
95
96    pub(super) fn postgres_desc(&self) -> Option<&mz_postgres_util::desc::PostgresTableDesc> {
97        match self {
98            ReferenceMetadata::Postgres { table, .. } => Some(table),
99            _ => None,
100        }
101    }
102
103    pub(super) fn mysql_table(&self) -> Option<&mz_mysql_util::MySqlTableSchema> {
104        match self {
105            ReferenceMetadata::MySql(table) => Some(table),
106            _ => None,
107        }
108    }
109
110    pub(super) fn sql_server_table(&self) -> Option<&mz_sql_server_util::desc::SqlServerTableDesc> {
111        match self {
112            ReferenceMetadata::SqlServer { table, .. } => Some(table),
113            _ => None,
114        }
115    }
116
117    pub(super) fn sql_server_capture_instance(&self) -> Option<&Arc<str>> {
118        match self {
119            ReferenceMetadata::SqlServer {
120                capture_instance, ..
121            } => Some(capture_instance),
122            _ => None,
123        }
124    }
125
126    pub(super) fn load_generator_desc(&self) -> Option<&Option<RelationDesc>> {
127        match self {
128            ReferenceMetadata::LoadGenerator { desc, .. } => Some(desc),
129            _ => None,
130        }
131    }
132
133    pub(super) fn load_generator_output(&self) -> Option<&LoadGeneratorOutput> {
134        match self {
135            ReferenceMetadata::LoadGenerator { output, .. } => Some(output),
136            _ => None,
137        }
138    }
139
140    /// Convert the reference metadata into an `UnresolvedItemName` representing the
141    /// external reference, normalized for each source type to be stored as part of
142    /// the relevant statement in the catalog.
143    pub(super) fn external_reference(&self) -> Result<UnresolvedItemName, IdentError> {
144        match self {
145            ReferenceMetadata::Postgres { table, database } => {
146                Ok(UnresolvedItemName::qualified(&[
147                    Ident::new(database)?,
148                    Ident::new(&table.namespace)?,
149                    Ident::new(&table.name)?,
150                ]))
151            }
152            ReferenceMetadata::MySql(table) => Ok(UnresolvedItemName::qualified(&[
153                Ident::new(&table.schema_name)?,
154                Ident::new(&table.name)?,
155            ])),
156            ReferenceMetadata::SqlServer {
157                table,
158                database,
159                capture_instance: _,
160            } => Ok(UnresolvedItemName::qualified(&[
161                Ident::new(database.as_ref())?,
162                Ident::new(table.schema_name.as_ref())?,
163                Ident::new(table.name.as_ref())?,
164            ])),
165            ReferenceMetadata::Kafka(topic) => {
166                Ok(UnresolvedItemName::qualified(&[Ident::new(topic)?]))
167            }
168            ReferenceMetadata::LoadGenerator {
169                name, namespace, ..
170            } => {
171                let name = FullItemName {
172                    database: RawDatabaseSpecifier::Name(
173                        mz_storage_types::sources::load_generator::LOAD_GENERATOR_DATABASE_NAME
174                            .to_owned(),
175                    ),
176                    schema: namespace.to_string(),
177                    item: name.to_string(),
178                };
179                Ok(UnresolvedItemName::from(name))
180            }
181        }
182    }
183}
184
185/// A set of resolved source references.
186#[derive(Clone, Debug)]
187pub(super) struct RetrievedSourceReferences {
188    updated_at: u64,
189    references: Vec<ReferenceMetadata>,
190    resolver: SourceReferenceResolver,
191}
192
193/// The name of the fake database that we use for non-Postgres sources
194/// to fit the model of a 3-layer catalog used to resolve references
195/// in the `SourceReferenceResolver`. This isn't actually stored in
196/// the catalog since the `ReferenceMetadata::external_reference`
197/// method only includes the database name for Postgres sources.
198pub(crate) static DATABASE_FAKE_NAME: &str = "database";
199
200impl<'a> SourceReferenceClient<'a> {
201    /// Get all available source references from the upstream system
202    /// and return a `RetrievedSourceReferences` object that can be used
203    /// to resolve user-specified source references and create `SourceReferences`
204    /// for storage in the catalog.
205    pub(super) async fn get_source_references(
206        mut self,
207    ) -> Result<RetrievedSourceReferences, PlanError> {
208        let references = match self {
209            SourceReferenceClient::Postgres {
210                client,
211                publication,
212                database,
213            } => {
214                let tables = mz_postgres_util::publication_info(client, publication, None).await?;
215
216                if tables.is_empty() {
217                    Err(PgSourcePurificationError::EmptyPublication(
218                        publication.to_string(),
219                    ))?;
220                }
221
222                tables
223                    .into_iter()
224                    .map(|(_oid, desc)| ReferenceMetadata::Postgres {
225                        table: desc,
226                        database: database.to_string(),
227                    })
228                    .collect()
229            }
230            SourceReferenceClient::MySql {
231                ref mut conn,
232                include_system_schemas,
233            } => {
234                let request = if include_system_schemas {
235                    mz_mysql_util::SchemaRequest::AllWithSystemSchemas
236                } else {
237                    mz_mysql_util::SchemaRequest::All
238                };
239                // NOTE: mysql will only expose the schemas of tables we have at least one privilege on
240                // and we can't tell if a table exists without a privilege
241                let tables = mz_mysql_util::schema_info((*conn).deref_mut(), &request).await?;
242
243                tables.into_iter().map(ReferenceMetadata::MySql).collect()
244            }
245            SourceReferenceClient::SqlServer {
246                ref mut client,
247                ref database,
248            } => {
249                let tables = mz_sql_server_util::inspect::get_tables(client).await?;
250
251                let mut unique_tables: BTreeMap<(Arc<str>, Arc<str>), SqlServerTableRaw> =
252                    BTreeMap::default();
253                for table in tables {
254                    let key = (Arc::clone(&table.schema_name), Arc::clone(&table.name));
255
256                    unique_tables
257                        .entry(key)
258                        .and_modify(|chosen_table: &mut SqlServerTableRaw| {
259                            // When multiple capture instances exist for the same table,
260                            // we select deterministically based on:
261                            // 1. Most recent create_date (newer capture instance)
262                            // 2. If dates are equal, lexicographically greatest capture_instance name
263                            if chosen_table.capture_instance.create_date
264                                < table.capture_instance.create_date
265                                || (chosen_table.capture_instance.create_date
266                                    == table.capture_instance.create_date
267                                    && chosen_table.capture_instance.name
268                                        < table.capture_instance.name)
269                            {
270                                *chosen_table = table.clone();
271                            }
272                        })
273                        .or_insert(table);
274                }
275
276                let mut constraints_by_table =
277                    mz_sql_server_util::inspect::get_constraints_for_tables(
278                        client,
279                        unique_tables.keys(),
280                    )
281                    .await?;
282                unique_tables
283                    .into_iter()
284                    .map(|(qualified_table_name, raw_table)| {
285                        let constraints = constraints_by_table
286                            .remove(&qualified_table_name)
287                            .unwrap_or_default();
288                        let capture_instance = Arc::clone(&raw_table.capture_instance.name);
289                        let database = Arc::clone(database);
290                        let table = mz_sql_server_util::desc::SqlServerTableDesc::new(
291                            raw_table,
292                            constraints,
293                        )?;
294                        Ok(ReferenceMetadata::SqlServer {
295                            table,
296                            database,
297                            capture_instance,
298                        })
299                    })
300                    .collect::<Result<_, SqlServerError>>()?
301            }
302            SourceReferenceClient::Kafka { topic } => {
303                vec![ReferenceMetadata::Kafka(topic.to_string())]
304            }
305            SourceReferenceClient::LoadGenerator { generator } => {
306                let mut references = generator
307                    .views()
308                    .into_iter()
309                    .map(
310                        |(view, relation, output)| ReferenceMetadata::LoadGenerator {
311                            name: view.to_string(),
312                            desc: Some(relation),
313                            namespace: generator.schema_name().to_string(),
314                            output,
315                        },
316                    )
317                    .collect::<Vec<_>>();
318
319                if references.is_empty() {
320                    // If there are no views then this load-generator just has a single output
321                    // that uses the load-generator's schema name.
322                    references.push(ReferenceMetadata::LoadGenerator {
323                        name: generator.schema_name().to_string(),
324                        desc: None,
325                        namespace: generator.schema_name().to_string(),
326                        output: LoadGeneratorOutput::Default,
327                    });
328                }
329                references
330            }
331        };
332
333        let reference_names: Vec<(&str, &str)> = references
334            .iter()
335            .map(|reference| {
336                (
337                    reference.namespace().unwrap_or_else(|| reference.name()),
338                    reference.name(),
339                )
340            })
341            .collect();
342        let resolver = match self {
343            SourceReferenceClient::Postgres { database, .. } => {
344                SourceReferenceResolver::new(database, &reference_names)
345            }
346            _ => SourceReferenceResolver::new(DATABASE_FAKE_NAME, &reference_names),
347        }?;
348
349        Ok(RetrievedSourceReferences {
350            updated_at: SYSTEM_TIME(),
351            references,
352            resolver,
353        })
354    }
355}
356
357impl RetrievedSourceReferences {
358    /// Convert the resolved source references into a `SourceReferences` object
359    /// for storage in the catalog.
360    pub(super) fn available_source_references(self) -> SourceReferences {
361        SourceReferences {
362            updated_at: self.updated_at,
363            references: self
364                .references
365                .into_iter()
366                .map(|reference| match reference {
367                    ReferenceMetadata::Postgres { table, .. } => SourceReference {
368                        name: table.name,
369                        namespace: Some(table.namespace),
370                        columns: table.columns.into_iter().map(|c| c.name).collect(),
371                    },
372                    ReferenceMetadata::MySql(table) => SourceReference {
373                        name: table.name,
374                        namespace: Some(table.schema_name),
375                        columns: table
376                            .columns
377                            .into_iter()
378                            .map(|column| column.name())
379                            .collect(),
380                    },
381                    ReferenceMetadata::SqlServer { table, .. } => SourceReference {
382                        name: table.name.to_string(),
383                        namespace: Some(table.schema_name.to_string()),
384                        columns: table
385                            .columns
386                            .into_iter()
387                            .map(|c| c.name.to_string())
388                            .collect(),
389                    },
390                    ReferenceMetadata::Kafka(topic) => SourceReference {
391                        name: topic,
392                        namespace: None,
393                        columns: vec![],
394                    },
395                    ReferenceMetadata::LoadGenerator {
396                        name,
397                        desc,
398                        namespace,
399                        ..
400                    } => SourceReference {
401                        name,
402                        namespace: Some(namespace),
403                        columns: desc
404                            .map(|desc| desc.iter_names().map(|n| n.to_string()).collect())
405                            .unwrap_or_default(),
406                    },
407                })
408                .collect(),
409        }
410    }
411
412    /// Resolve the requested external references to their appropriate source exports.
413    pub(super) fn requested_source_exports<'a>(
414        &'a self,
415        requested: Option<&ExternalReferences>,
416        source_name: &UnresolvedItemName,
417    ) -> Result<Vec<RequestedSourceExport<&'a ReferenceMetadata>>, PlanError> {
418        // Filter all available references to those requested by the `ExternalReferences`
419        // specification and include any alias that the user has specified.
420        // TODO(database-issues#8620): The alias handling can be removed once subsources are removed.
421        let filtered: Vec<(&ReferenceMetadata, Option<&UnresolvedItemName>)> = match requested {
422            Some(ExternalReferences::All) => self.references.iter().map(|r| (r, None)).collect(),
423            Some(ExternalReferences::SubsetSchemas(schemas)) => {
424                let available_schemas: BTreeSet<&str> = self
425                    .references
426                    .iter()
427                    .filter_map(|r| r.namespace())
428                    .collect();
429                let requested_schemas: BTreeSet<&str> =
430                    schemas.iter().map(|s| s.as_str()).collect();
431
432                let missing_schemas: Vec<_> = requested_schemas
433                    .difference(&available_schemas)
434                    .map(|s| s.to_string())
435                    .collect();
436
437                if !missing_schemas.is_empty() {
438                    Err(PlanError::NoTablesFoundForSchemas(missing_schemas))?
439                }
440
441                self.references
442                    .iter()
443                    .filter_map(|reference| {
444                        reference
445                            .namespace()
446                            .map(|namespace| {
447                                requested_schemas
448                                    .contains(namespace)
449                                    .then_some((reference, None))
450                            })
451                            .flatten()
452                    })
453                    .collect()
454            }
455            Some(ExternalReferences::SubsetTables(requested_tables)) => {
456                // Use the `SourceReferenceResolver` to resolve the requested tables to their
457                // appropriate index in the available references.
458                requested_tables
459                    .into_iter()
460                    .map(|requested_table| {
461                        let idx = self.resolver.resolve_idx(&requested_table.reference.0)?;
462                        Ok((&self.references[idx], requested_table.alias.as_ref()))
463                    })
464                    .collect::<Result<Vec<_>, PlanError>>()?
465            }
466            None => {
467                // If no reference is requested we must validate that only one reference is
468                // available, else we cannot determine which reference the user is referring to.
469                if self.references.len() != 1 {
470                    Err(ExternalReferenceResolutionError::Ambiguous {
471                        name: "".to_string(),
472                    })?
473                }
474                vec![(&self.references[0], None)]
475            }
476        };
477
478        // Convert the filtered references to their appropriate `RequestedSourceExport` form.
479        filtered
480            .into_iter()
481            .map(|(reference, alias)| {
482                let name = match alias {
483                    Some(alias_name) => {
484                        let partial = crate::normalize::unresolved_item_name(alias_name.clone())?;
485                        match partial.schema {
486                            Some(_) => alias_name.clone(),
487                            // In cases when a prefix is not provided for the aliased name
488                            // fallback to using the schema of the source with the given name
489                            None => super::source_export_name_gen(source_name, &partial.item)?,
490                        }
491                    }
492                    None => {
493                        // Just use the item name for this reference and ensure it's created in
494                        // the current schema or the source's schema if provided, not mirroring
495                        // the schema of the reference.
496                        super::source_export_name_gen(source_name, reference.name())?
497                    }
498                };
499
500                Ok(RequestedSourceExport {
501                    external_reference: reference.external_reference()?,
502                    name,
503                    meta: reference,
504                })
505            })
506            .collect::<Result<Vec<_>, PlanError>>()
507    }
508
509    pub(super) fn resolve_name(&self, name: &[Ident]) -> Result<&ReferenceMetadata, PlanError> {
510        let idx = self.resolver.resolve_idx(name)?;
511        Ok(&self.references[idx])
512    }
513
514    pub(super) fn all_references(&self) -> &Vec<ReferenceMetadata> {
515        &self.references
516    }
517}