1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
// Copyright Materialize, Inc. and contributors. All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use std::collections::BTreeSet;
use std::ops::DerefMut;

use mz_ore::now::SYSTEM_TIME;
use mz_repr::RelationDesc;
use mz_sql_parser::ast::{ExternalReferences, Ident, IdentError, UnresolvedItemName};
use mz_storage_types::sources::load_generator::{LoadGenerator, LoadGeneratorOutput};
use mz_storage_types::sources::{ExternalReferenceResolutionError, SourceReferenceResolver};

use crate::names::{FullItemName, RawDatabaseSpecifier};
use crate::plan::{PlanError, SourceReference, SourceReferences};

use super::{error::PgSourcePurificationError, RequestedSourceExport};

/// A client that allows determining all available source references and resolving
/// them to a user-specified source reference during purification.
pub(super) enum SourceReferenceClient<'a> {
    Postgres {
        client: &'a mz_postgres_util::Client,
        publication: &'a str,
        database: &'a str,
    },
    MySql {
        conn: &'a mut mz_mysql_util::MySqlConn,
        /// Sets whether to include tables from the built-in system schemas in the
        /// retrieved references.
        include_system_schemas: bool,
    },
    Kafka {
        topic: &'a str,
    },
    LoadGenerator {
        generator: &'a LoadGenerator,
    },
}

/// Metadata about an available source reference retrieved from the upstream system.
#[derive(Clone)]
pub(super) enum ReferenceMetadata {
    Postgres {
        table: mz_postgres_util::desc::PostgresTableDesc,
        database: String,
    },
    MySql(mz_mysql_util::MySqlTableSchema),
    Kafka(String),
    LoadGenerator {
        name: String,
        desc: Option<RelationDesc>,
        namespace: String,
        output: LoadGeneratorOutput,
    },
}

impl ReferenceMetadata {
    fn namespace(&self) -> Option<&str> {
        match self {
            ReferenceMetadata::Postgres { table, .. } => Some(&table.namespace),
            ReferenceMetadata::MySql(table) => Some(&table.schema_name),
            ReferenceMetadata::Kafka(_) => None,
            ReferenceMetadata::LoadGenerator { namespace, .. } => Some(namespace),
        }
    }

    fn name(&self) -> &str {
        match self {
            ReferenceMetadata::Postgres { table, .. } => &table.name,
            ReferenceMetadata::MySql(table) => &table.name,
            ReferenceMetadata::Kafka(topic) => topic,
            ReferenceMetadata::LoadGenerator { name, .. } => name,
        }
    }

    pub(super) fn postgres_desc(&self) -> Option<&mz_postgres_util::desc::PostgresTableDesc> {
        match self {
            ReferenceMetadata::Postgres { table, .. } => Some(table),
            _ => None,
        }
    }

    pub(super) fn mysql_table(&self) -> Option<&mz_mysql_util::MySqlTableSchema> {
        match self {
            ReferenceMetadata::MySql(table) => Some(table),
            _ => None,
        }
    }

    pub(super) fn load_generator_desc(&self) -> Option<&Option<RelationDesc>> {
        match self {
            ReferenceMetadata::LoadGenerator { desc, .. } => Some(desc),
            _ => None,
        }
    }

    pub(super) fn load_generator_output(&self) -> Option<&LoadGeneratorOutput> {
        match self {
            ReferenceMetadata::LoadGenerator { output, .. } => Some(output),
            _ => None,
        }
    }

    /// Convert the reference metadata into an `UnresolvedItemName` representing the
    /// external reference, normalized for each source type to be stored as part of
    /// the relevant statement in the catalog.
    pub(super) fn external_reference(&self) -> Result<UnresolvedItemName, IdentError> {
        match self {
            ReferenceMetadata::Postgres { table, database } => {
                Ok(UnresolvedItemName::qualified(&[
                    Ident::new(database)?,
                    Ident::new(&table.namespace)?,
                    Ident::new(&table.name)?,
                ]))
            }
            ReferenceMetadata::MySql(table) => Ok(UnresolvedItemName::qualified(&[
                Ident::new(&table.schema_name)?,
                Ident::new(&table.name)?,
            ])),
            ReferenceMetadata::Kafka(topic) => {
                Ok(UnresolvedItemName::qualified(&[Ident::new(topic)?]))
            }
            ReferenceMetadata::LoadGenerator {
                name, namespace, ..
            } => {
                let name = FullItemName {
                    database: RawDatabaseSpecifier::Name(
                        mz_storage_types::sources::load_generator::LOAD_GENERATOR_DATABASE_NAME
                            .to_owned(),
                    ),
                    schema: namespace.to_string(),
                    item: name.to_string(),
                };
                Ok(UnresolvedItemName::from(name))
            }
        }
    }
}

/// A set of resolved source references.
#[derive(Clone)]
pub(super) struct RetrievedSourceReferences {
    updated_at: u64,
    references: Vec<ReferenceMetadata>,
    resolver: SourceReferenceResolver,
}

/// The name of the fake database that we use for non-Postgres sources
/// to fit the model of a 3-layer catalog used to resolve references
/// in the `SourceReferenceResolver`. This isn't actually stored in
/// the catalog since the `ReferenceMetadata::external_reference`
/// method only includes the database name for Postgres sources.
pub(crate) static DATABASE_FAKE_NAME: &str = "database";

impl<'a> SourceReferenceClient<'a> {
    /// Get all available source references from the upstream system
    /// and return a `RetrievedSourceReferences` object that can be used
    /// to resolve user-specified source references and create `SourceReferences`
    /// for storage in the catalog.
    pub(super) async fn get_source_references(
        mut self,
    ) -> Result<RetrievedSourceReferences, PlanError> {
        let references = match self {
            SourceReferenceClient::Postgres {
                client,
                publication,
                database,
            } => {
                let tables = mz_postgres_util::publication_info(client, publication).await?;

                if tables.is_empty() {
                    Err(PgSourcePurificationError::EmptyPublication(
                        publication.to_string(),
                    ))?;
                }

                tables
                    .into_iter()
                    .map(|desc| ReferenceMetadata::Postgres {
                        table: desc,
                        database: database.to_string(),
                    })
                    .collect()
            }
            SourceReferenceClient::MySql {
                ref mut conn,
                include_system_schemas,
            } => {
                let request = if include_system_schemas {
                    mz_mysql_util::SchemaRequest::AllWithSystemSchemas
                } else {
                    mz_mysql_util::SchemaRequest::All
                };
                // NOTE: mysql will only expose the schemas of tables we have at least one privilege on
                // and we can't tell if a table exists without a privilege
                let tables = mz_mysql_util::schema_info((*conn).deref_mut(), &request).await?;

                tables.into_iter().map(ReferenceMetadata::MySql).collect()
            }
            SourceReferenceClient::Kafka { topic } => {
                vec![ReferenceMetadata::Kafka(topic.to_string())]
            }
            SourceReferenceClient::LoadGenerator { generator } => {
                let mut references = generator
                    .views()
                    .into_iter()
                    .map(
                        |(view, relation, output)| ReferenceMetadata::LoadGenerator {
                            name: view.to_string(),
                            desc: Some(relation),
                            namespace: generator.schema_name().to_string(),
                            output,
                        },
                    )
                    .collect::<Vec<_>>();

                if references.is_empty() {
                    // If there are no views then this load-generator just has a single output
                    // that uses the load-generator's schema name.
                    references.push(ReferenceMetadata::LoadGenerator {
                        name: generator.schema_name().to_string(),
                        desc: None,
                        namespace: generator.schema_name().to_string(),
                        output: LoadGeneratorOutput::Default,
                    });
                }
                references
            }
        };

        let reference_names: Vec<(&str, &str)> = references
            .iter()
            .map(|reference| {
                (
                    reference.namespace().unwrap_or(reference.name()),
                    reference.name(),
                )
            })
            .collect();
        let resolver = match self {
            SourceReferenceClient::Postgres { database, .. } => {
                SourceReferenceResolver::new(database, &reference_names)
            }
            _ => SourceReferenceResolver::new(DATABASE_FAKE_NAME, &reference_names),
        }?;

        Ok(RetrievedSourceReferences {
            updated_at: SYSTEM_TIME(),
            references,
            resolver,
        })
    }
}

impl RetrievedSourceReferences {
    /// Convert the resolved source references into a `SourceReferences` object
    /// for storage in the catalog.
    pub(super) fn available_source_references(self) -> SourceReferences {
        SourceReferences {
            updated_at: self.updated_at,
            references: self
                .references
                .into_iter()
                .map(|reference| match reference {
                    ReferenceMetadata::Postgres { table, .. } => SourceReference {
                        name: table.name,
                        namespace: Some(table.namespace),
                        columns: table.columns.into_iter().map(|c| c.name).collect(),
                    },
                    ReferenceMetadata::MySql(table) => SourceReference {
                        name: table.name,
                        namespace: Some(table.schema_name),
                        columns: table
                            .columns
                            .into_iter()
                            .map(|column| column.name())
                            .collect(),
                    },
                    ReferenceMetadata::Kafka(topic) => SourceReference {
                        name: topic,
                        namespace: None,
                        columns: vec![],
                    },
                    ReferenceMetadata::LoadGenerator {
                        name,
                        desc,
                        namespace,
                        ..
                    } => SourceReference {
                        name,
                        namespace: Some(namespace),
                        columns: desc
                            .map(|desc| desc.iter_names().map(|n| n.to_string()).collect())
                            .unwrap_or_default(),
                    },
                })
                .collect(),
        }
    }

    /// Resolve the requested external references to their appropriate source exports.
    pub(super) fn requested_source_exports<'a>(
        &'a self,
        requested: Option<&ExternalReferences>,
        source_name: &UnresolvedItemName,
    ) -> Result<Vec<RequestedSourceExport<&ReferenceMetadata>>, PlanError> {
        // Filter all available references to those requested by the `ExternalReferences`
        // specification and include any alias that the user has specified.
        // TODO(database-issues#8620): The alias handling can be removed once subsources are removed.
        let filtered: Vec<(&ReferenceMetadata, Option<&UnresolvedItemName>)> = match requested {
            Some(ExternalReferences::All) => self.references.iter().map(|r| (r, None)).collect(),
            Some(ExternalReferences::SubsetSchemas(schemas)) => {
                let available_schemas: BTreeSet<&str> = self
                    .references
                    .iter()
                    .filter_map(|r| r.namespace())
                    .collect();
                let requested_schemas: BTreeSet<&str> =
                    schemas.iter().map(|s| s.as_str()).collect();

                let missing_schemas: Vec<_> = requested_schemas
                    .difference(&available_schemas)
                    .map(|s| s.to_string())
                    .collect();

                if !missing_schemas.is_empty() {
                    Err(PlanError::NoTablesFoundForSchemas(missing_schemas))?
                }

                self.references
                    .iter()
                    .filter_map(|reference| {
                        reference
                            .namespace()
                            .map(|namespace| {
                                requested_schemas
                                    .contains(namespace)
                                    .then_some((reference, None))
                            })
                            .flatten()
                    })
                    .collect()
            }
            Some(ExternalReferences::SubsetTables(requested_tables)) => {
                // Use the `SourceReferenceResolver` to resolve the requested tables to their
                // appropriate index in the available references.
                requested_tables
                    .into_iter()
                    .map(|requested_table| {
                        let idx = self.resolver.resolve_idx(&requested_table.reference.0)?;
                        Ok((&self.references[idx], requested_table.alias.as_ref()))
                    })
                    .collect::<Result<Vec<_>, PlanError>>()?
            }
            None => {
                // If no reference is requested we must validate that only one reference is
                // available, else we cannot determine which reference the user is referring to.
                if self.references.len() != 1 {
                    Err(ExternalReferenceResolutionError::Ambiguous {
                        name: "".to_string(),
                    })?
                }
                vec![(&self.references[0], None)]
            }
        };

        // Convert the filtered references to their appropriate `RequestedSourceExport` form.
        filtered
            .into_iter()
            .map(|(reference, alias)| {
                let name = match alias {
                    Some(alias_name) => {
                        let partial = crate::normalize::unresolved_item_name(alias_name.clone())?;
                        match partial.schema {
                            Some(_) => alias_name.clone(),
                            // In cases when a prefix is not provided for the aliased name
                            // fallback to using the schema of the source with the given name
                            None => super::source_export_name_gen(source_name, &partial.item)?,
                        }
                    }
                    None => {
                        // Just use the item name for this reference and ensure it's created in
                        // the current schema or the source's schema if provided, not mirroring
                        // the schema of the reference.
                        super::source_export_name_gen(source_name, reference.name())?
                    }
                };

                Ok(RequestedSourceExport {
                    external_reference: reference.external_reference()?,
                    name,
                    meta: reference,
                })
            })
            .collect::<Result<Vec<_>, PlanError>>()
    }

    pub(super) fn resolve_name(&self, name: &[Ident]) -> Result<&ReferenceMetadata, PlanError> {
        let idx = self.resolver.resolve_idx(name)?;
        Ok(&self.references[idx])
    }

    pub(super) fn all_references(&self) -> &Vec<ReferenceMetadata> {
        &self.references
    }
}