mz_sql/pure/
references.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::{BTreeMap, BTreeSet};
11use std::ops::DerefMut;
12use std::sync::Arc;
13
14use mz_ore::now::SYSTEM_TIME;
15use mz_repr::RelationDesc;
16use mz_sql_parser::ast::{ExternalReferences, Ident, IdentError, UnresolvedItemName};
17use mz_sql_server_util::desc::SqlServerTableRaw;
18use mz_storage_types::sources::load_generator::{LoadGenerator, LoadGeneratorOutput};
19use mz_storage_types::sources::{ExternalReferenceResolutionError, SourceReferenceResolver};
20
21use crate::names::{FullItemName, RawDatabaseSpecifier};
22use crate::plan::{PlanError, SourceReference, SourceReferences};
23
24use super::{RequestedSourceExport, error::PgSourcePurificationError};
25
26/// A client that allows determining all available source references and resolving
27/// them to a user-specified source reference during purification.
28pub(super) enum SourceReferenceClient<'a> {
29    Postgres {
30        client: &'a mz_postgres_util::Client,
31        publication: &'a str,
32        database: &'a str,
33    },
34    MySql {
35        conn: &'a mut mz_mysql_util::MySqlConn,
36        /// Sets whether to include tables from the built-in system schemas in the
37        /// retrieved references.
38        include_system_schemas: bool,
39    },
40    SqlServer {
41        client: &'a mut mz_sql_server_util::Client,
42        database: Arc<str>,
43    },
44    Kafka {
45        topic: &'a str,
46    },
47    LoadGenerator {
48        generator: &'a LoadGenerator,
49    },
50}
51
52/// Metadata about an available source reference retrieved from the upstream system.
53#[derive(Clone, Debug)]
54pub(super) enum ReferenceMetadata {
55    Postgres {
56        table: mz_postgres_util::desc::PostgresTableDesc,
57        database: String,
58    },
59    MySql(mz_mysql_util::MySqlTableSchema),
60    SqlServer {
61        table: mz_sql_server_util::desc::SqlServerTableDesc,
62        database: Arc<str>,
63        capture_instance: Arc<str>,
64    },
65    Kafka(String),
66    LoadGenerator {
67        name: String,
68        desc: Option<RelationDesc>,
69        namespace: String,
70        output: LoadGeneratorOutput,
71    },
72}
73
74impl ReferenceMetadata {
75    fn namespace(&self) -> Option<&str> {
76        match self {
77            ReferenceMetadata::Postgres { table, .. } => Some(&table.namespace),
78            ReferenceMetadata::MySql(table) => Some(&table.schema_name),
79            ReferenceMetadata::SqlServer { table, .. } => Some(table.schema_name.as_ref()),
80            ReferenceMetadata::Kafka(_) => None,
81            ReferenceMetadata::LoadGenerator { namespace, .. } => Some(namespace),
82        }
83    }
84
85    fn name(&self) -> &str {
86        match self {
87            ReferenceMetadata::Postgres { table, .. } => &table.name,
88            ReferenceMetadata::MySql(table) => &table.name,
89            ReferenceMetadata::SqlServer { table, .. } => table.name.as_ref(),
90            ReferenceMetadata::Kafka(topic) => topic,
91            ReferenceMetadata::LoadGenerator { name, .. } => name,
92        }
93    }
94
95    pub(super) fn postgres_desc(&self) -> Option<&mz_postgres_util::desc::PostgresTableDesc> {
96        match self {
97            ReferenceMetadata::Postgres { table, .. } => Some(table),
98            _ => None,
99        }
100    }
101
102    pub(super) fn mysql_table(&self) -> Option<&mz_mysql_util::MySqlTableSchema> {
103        match self {
104            ReferenceMetadata::MySql(table) => Some(table),
105            _ => None,
106        }
107    }
108
109    pub(super) fn sql_server_table(&self) -> Option<&mz_sql_server_util::desc::SqlServerTableDesc> {
110        match self {
111            ReferenceMetadata::SqlServer { table, .. } => Some(table),
112            _ => None,
113        }
114    }
115
116    pub(super) fn sql_server_capture_instance(&self) -> Option<&Arc<str>> {
117        match self {
118            ReferenceMetadata::SqlServer {
119                capture_instance, ..
120            } => Some(capture_instance),
121            _ => None,
122        }
123    }
124
125    pub(super) fn load_generator_desc(&self) -> Option<&Option<RelationDesc>> {
126        match self {
127            ReferenceMetadata::LoadGenerator { desc, .. } => Some(desc),
128            _ => None,
129        }
130    }
131
132    pub(super) fn load_generator_output(&self) -> Option<&LoadGeneratorOutput> {
133        match self {
134            ReferenceMetadata::LoadGenerator { output, .. } => Some(output),
135            _ => None,
136        }
137    }
138
139    /// Convert the reference metadata into an `UnresolvedItemName` representing the
140    /// external reference, normalized for each source type to be stored as part of
141    /// the relevant statement in the catalog.
142    pub(super) fn external_reference(&self) -> Result<UnresolvedItemName, IdentError> {
143        match self {
144            ReferenceMetadata::Postgres { table, database } => {
145                Ok(UnresolvedItemName::qualified(&[
146                    Ident::new(database)?,
147                    Ident::new(&table.namespace)?,
148                    Ident::new(&table.name)?,
149                ]))
150            }
151            ReferenceMetadata::MySql(table) => Ok(UnresolvedItemName::qualified(&[
152                Ident::new(&table.schema_name)?,
153                Ident::new(&table.name)?,
154            ])),
155            ReferenceMetadata::SqlServer {
156                table,
157                database,
158                capture_instance: _,
159            } => Ok(UnresolvedItemName::qualified(&[
160                Ident::new(database.as_ref())?,
161                Ident::new(table.schema_name.as_ref())?,
162                Ident::new(table.name.as_ref())?,
163            ])),
164            ReferenceMetadata::Kafka(topic) => {
165                Ok(UnresolvedItemName::qualified(&[Ident::new(topic)?]))
166            }
167            ReferenceMetadata::LoadGenerator {
168                name, namespace, ..
169            } => {
170                let name = FullItemName {
171                    database: RawDatabaseSpecifier::Name(
172                        mz_storage_types::sources::load_generator::LOAD_GENERATOR_DATABASE_NAME
173                            .to_owned(),
174                    ),
175                    schema: namespace.to_string(),
176                    item: name.to_string(),
177                };
178                Ok(UnresolvedItemName::from(name))
179            }
180        }
181    }
182}
183
184/// A set of resolved source references.
185#[derive(Clone, Debug)]
186pub(super) struct RetrievedSourceReferences {
187    updated_at: u64,
188    references: Vec<ReferenceMetadata>,
189    resolver: SourceReferenceResolver,
190}
191
192/// The name of the fake database that we use for non-Postgres sources
193/// to fit the model of a 3-layer catalog used to resolve references
194/// in the `SourceReferenceResolver`. This isn't actually stored in
195/// the catalog since the `ReferenceMetadata::external_reference`
196/// method only includes the database name for Postgres sources.
197pub(crate) static DATABASE_FAKE_NAME: &str = "database";
198
199impl<'a> SourceReferenceClient<'a> {
200    /// Get all available source references from the upstream system
201    /// and return a `RetrievedSourceReferences` object that can be used
202    /// to resolve user-specified source references and create `SourceReferences`
203    /// for storage in the catalog.
204    pub(super) async fn get_source_references(
205        mut self,
206    ) -> Result<RetrievedSourceReferences, PlanError> {
207        let references = match self {
208            SourceReferenceClient::Postgres {
209                client,
210                publication,
211                database,
212            } => {
213                let tables = mz_postgres_util::publication_info(client, publication, None).await?;
214
215                if tables.is_empty() {
216                    Err(PgSourcePurificationError::EmptyPublication(
217                        publication.to_string(),
218                    ))?;
219                }
220
221                tables
222                    .into_iter()
223                    .map(|(_oid, desc)| ReferenceMetadata::Postgres {
224                        table: desc,
225                        database: database.to_string(),
226                    })
227                    .collect()
228            }
229            SourceReferenceClient::MySql {
230                ref mut conn,
231                include_system_schemas,
232            } => {
233                let request = if include_system_schemas {
234                    mz_mysql_util::SchemaRequest::AllWithSystemSchemas
235                } else {
236                    mz_mysql_util::SchemaRequest::All
237                };
238                // NOTE: mysql will only expose the schemas of tables we have at least one privilege on
239                // and we can't tell if a table exists without a privilege
240                let tables = mz_mysql_util::schema_info((*conn).deref_mut(), &request).await?;
241
242                tables.into_iter().map(ReferenceMetadata::MySql).collect()
243            }
244            SourceReferenceClient::SqlServer {
245                ref mut client,
246                ref database,
247            } => {
248                let tables = mz_sql_server_util::inspect::get_tables(client).await?;
249
250                let mut unique_tables: BTreeMap<(Arc<str>, Arc<str>), SqlServerTableRaw> =
251                    BTreeMap::default();
252                for table in tables {
253                    let key = (Arc::clone(&table.schema_name), Arc::clone(&table.name));
254
255                    unique_tables
256                        .entry(key)
257                        .and_modify(|chosen_table: &mut SqlServerTableRaw| {
258                            // When multiple capture instances exist for the same table,
259                            // we select deterministically based on:
260                            // 1. Most recent create_date (newer capture instance)
261                            // 2. If dates are equal, lexicographically greatest capture_instance name
262                            if chosen_table.capture_instance.create_date
263                                < table.capture_instance.create_date
264                                || (chosen_table.capture_instance.create_date
265                                    == table.capture_instance.create_date
266                                    && chosen_table.capture_instance.name
267                                        < table.capture_instance.name)
268                            {
269                                *chosen_table = table.clone();
270                            }
271                        })
272                        .or_insert(table);
273                }
274
275                unique_tables
276                    .into_values()
277                    .map(|raw| {
278                        let capture_instance = Arc::clone(&raw.capture_instance.name);
279                        let database = Arc::clone(database);
280                        let table = mz_sql_server_util::desc::SqlServerTableDesc::new(raw);
281                        ReferenceMetadata::SqlServer {
282                            table,
283                            database,
284                            capture_instance,
285                        }
286                    })
287                    .collect()
288            }
289            SourceReferenceClient::Kafka { topic } => {
290                vec![ReferenceMetadata::Kafka(topic.to_string())]
291            }
292            SourceReferenceClient::LoadGenerator { generator } => {
293                let mut references = generator
294                    .views()
295                    .into_iter()
296                    .map(
297                        |(view, relation, output)| ReferenceMetadata::LoadGenerator {
298                            name: view.to_string(),
299                            desc: Some(relation),
300                            namespace: generator.schema_name().to_string(),
301                            output,
302                        },
303                    )
304                    .collect::<Vec<_>>();
305
306                if references.is_empty() {
307                    // If there are no views then this load-generator just has a single output
308                    // that uses the load-generator's schema name.
309                    references.push(ReferenceMetadata::LoadGenerator {
310                        name: generator.schema_name().to_string(),
311                        desc: None,
312                        namespace: generator.schema_name().to_string(),
313                        output: LoadGeneratorOutput::Default,
314                    });
315                }
316                references
317            }
318        };
319
320        let reference_names: Vec<(&str, &str)> = references
321            .iter()
322            .map(|reference| {
323                (
324                    reference.namespace().unwrap_or_else(|| reference.name()),
325                    reference.name(),
326                )
327            })
328            .collect();
329        let resolver = match self {
330            SourceReferenceClient::Postgres { database, .. } => {
331                SourceReferenceResolver::new(database, &reference_names)
332            }
333            _ => SourceReferenceResolver::new(DATABASE_FAKE_NAME, &reference_names),
334        }?;
335
336        Ok(RetrievedSourceReferences {
337            updated_at: SYSTEM_TIME(),
338            references,
339            resolver,
340        })
341    }
342}
343
344impl RetrievedSourceReferences {
345    /// Convert the resolved source references into a `SourceReferences` object
346    /// for storage in the catalog.
347    pub(super) fn available_source_references(self) -> SourceReferences {
348        SourceReferences {
349            updated_at: self.updated_at,
350            references: self
351                .references
352                .into_iter()
353                .map(|reference| match reference {
354                    ReferenceMetadata::Postgres { table, .. } => SourceReference {
355                        name: table.name,
356                        namespace: Some(table.namespace),
357                        columns: table.columns.into_iter().map(|c| c.name).collect(),
358                    },
359                    ReferenceMetadata::MySql(table) => SourceReference {
360                        name: table.name,
361                        namespace: Some(table.schema_name),
362                        columns: table
363                            .columns
364                            .into_iter()
365                            .map(|column| column.name())
366                            .collect(),
367                    },
368                    ReferenceMetadata::SqlServer { table, .. } => SourceReference {
369                        name: table.name.to_string(),
370                        namespace: Some(table.schema_name.to_string()),
371                        columns: table
372                            .columns
373                            .into_iter()
374                            .map(|c| c.name.to_string())
375                            .collect(),
376                    },
377                    ReferenceMetadata::Kafka(topic) => SourceReference {
378                        name: topic,
379                        namespace: None,
380                        columns: vec![],
381                    },
382                    ReferenceMetadata::LoadGenerator {
383                        name,
384                        desc,
385                        namespace,
386                        ..
387                    } => SourceReference {
388                        name,
389                        namespace: Some(namespace),
390                        columns: desc
391                            .map(|desc| desc.iter_names().map(|n| n.to_string()).collect())
392                            .unwrap_or_default(),
393                    },
394                })
395                .collect(),
396        }
397    }
398
399    /// Resolve the requested external references to their appropriate source exports.
400    pub(super) fn requested_source_exports<'a>(
401        &'a self,
402        requested: Option<&ExternalReferences>,
403        source_name: &UnresolvedItemName,
404    ) -> Result<Vec<RequestedSourceExport<&'a ReferenceMetadata>>, PlanError> {
405        // Filter all available references to those requested by the `ExternalReferences`
406        // specification and include any alias that the user has specified.
407        // TODO(database-issues#8620): The alias handling can be removed once subsources are removed.
408        let filtered: Vec<(&ReferenceMetadata, Option<&UnresolvedItemName>)> = match requested {
409            Some(ExternalReferences::All) => self.references.iter().map(|r| (r, None)).collect(),
410            Some(ExternalReferences::SubsetSchemas(schemas)) => {
411                let available_schemas: BTreeSet<&str> = self
412                    .references
413                    .iter()
414                    .filter_map(|r| r.namespace())
415                    .collect();
416                let requested_schemas: BTreeSet<&str> =
417                    schemas.iter().map(|s| s.as_str()).collect();
418
419                let missing_schemas: Vec<_> = requested_schemas
420                    .difference(&available_schemas)
421                    .map(|s| s.to_string())
422                    .collect();
423
424                if !missing_schemas.is_empty() {
425                    Err(PlanError::NoTablesFoundForSchemas(missing_schemas))?
426                }
427
428                self.references
429                    .iter()
430                    .filter_map(|reference| {
431                        reference
432                            .namespace()
433                            .map(|namespace| {
434                                requested_schemas
435                                    .contains(namespace)
436                                    .then_some((reference, None))
437                            })
438                            .flatten()
439                    })
440                    .collect()
441            }
442            Some(ExternalReferences::SubsetTables(requested_tables)) => {
443                // Use the `SourceReferenceResolver` to resolve the requested tables to their
444                // appropriate index in the available references.
445                requested_tables
446                    .into_iter()
447                    .map(|requested_table| {
448                        let idx = self.resolver.resolve_idx(&requested_table.reference.0)?;
449                        Ok((&self.references[idx], requested_table.alias.as_ref()))
450                    })
451                    .collect::<Result<Vec<_>, PlanError>>()?
452            }
453            None => {
454                // If no reference is requested we must validate that only one reference is
455                // available, else we cannot determine which reference the user is referring to.
456                if self.references.len() != 1 {
457                    Err(ExternalReferenceResolutionError::Ambiguous {
458                        name: "".to_string(),
459                    })?
460                }
461                vec![(&self.references[0], None)]
462            }
463        };
464
465        // Convert the filtered references to their appropriate `RequestedSourceExport` form.
466        filtered
467            .into_iter()
468            .map(|(reference, alias)| {
469                let name = match alias {
470                    Some(alias_name) => {
471                        let partial = crate::normalize::unresolved_item_name(alias_name.clone())?;
472                        match partial.schema {
473                            Some(_) => alias_name.clone(),
474                            // In cases when a prefix is not provided for the aliased name
475                            // fallback to using the schema of the source with the given name
476                            None => super::source_export_name_gen(source_name, &partial.item)?,
477                        }
478                    }
479                    None => {
480                        // Just use the item name for this reference and ensure it's created in
481                        // the current schema or the source's schema if provided, not mirroring
482                        // the schema of the reference.
483                        super::source_export_name_gen(source_name, reference.name())?
484                    }
485                };
486
487                Ok(RequestedSourceExport {
488                    external_reference: reference.external_reference()?,
489                    name,
490                    meta: reference,
491                })
492            })
493            .collect::<Result<Vec<_>, PlanError>>()
494    }
495
496    pub(super) fn resolve_name(&self, name: &[Ident]) -> Result<&ReferenceMetadata, PlanError> {
497        let idx = self.resolver.resolve_idx(name)?;
498        Ok(&self.references[idx])
499    }
500
501    pub(super) fn all_references(&self) -> &Vec<ReferenceMetadata> {
502        &self.references
503    }
504}