mz_sql/pure/
postgres.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Postgres utilities for SQL purification.
11
12use std::collections::{BTreeMap, BTreeSet};
13
14use mz_expr::MirScalarExpr;
15use mz_postgres_util::desc::PostgresTableDesc;
16use mz_proto::RustType;
17use mz_repr::{SqlColumnType, SqlRelationType, SqlScalarType};
18use mz_sql_parser::ast::display::AstDisplay;
19use mz_sql_parser::ast::{
20    ColumnDef, CreateSubsourceOption, CreateSubsourceOptionName, CreateSubsourceStatement,
21    ExternalReferences, Ident, PgConfigOptionName, TableConstraint, UnresolvedItemName, Value,
22    WithOptionValue,
23};
24use mz_storage_types::sources::SourceExportStatementDetails;
25use mz_storage_types::sources::postgres::CastType;
26use prost::Message;
27use tokio_postgres::Client;
28use tokio_postgres::types::Oid;
29
30use crate::names::{Aug, ResolvedItemName};
31use crate::normalize;
32use crate::plan::hir::ColumnRef;
33use crate::plan::typeconv::{CastContext, plan_cast};
34use crate::plan::{
35    ExprContext, HirScalarExpr, PlanError, QueryContext, QueryLifetime, Scope, StatementContext,
36};
37
38use super::error::PgSourcePurificationError;
39use super::references::RetrievedSourceReferences;
40use super::{PartialItemName, PurifiedExportDetails, PurifiedSourceExport, SourceReferencePolicy};
41
42/// Ensure that we have select permissions on all tables; we have to do this before we
43/// start snapshotting because if we discover we cannot `COPY` from a table while
44/// snapshotting, we break the entire source.
45pub(super) async fn validate_requested_references_privileges(
46    client: &Client,
47    table_oids: &[Oid],
48) -> Result<(), PlanError> {
49    privileges::check_table_privileges(client, table_oids).await?;
50    privileges::check_rls_privileges(client, table_oids).await?;
51    replica_identity::check_replica_identity_full(client, table_oids).await?;
52
53    Ok(())
54}
55
56/// Map a list of column references to a map of table oids to column names.
57///
58/// Additionally, modify `columns` so that they contain database-qualified
59/// references to the columns.
60pub(super) fn map_column_refs(
61    retrieved_references: &RetrievedSourceReferences,
62    columns: &mut [UnresolvedItemName],
63    option_type: PgConfigOptionName,
64) -> Result<BTreeMap<u32, BTreeSet<String>>, PlanError> {
65    let mut cols_map: BTreeMap<u32, BTreeSet<String>> = BTreeMap::new();
66
67    for name in columns {
68        let (qual, col) = match name.0.split_last().expect("must have at least one element") {
69            (col, qual) if qual.is_empty() => {
70                return Err(PlanError::InvalidOptionValue {
71                    option_name: option_type.to_ast_string_simple(),
72                    err: Box::new(PlanError::UnderqualifiedColumnName(
73                        col.as_str().to_string(),
74                    )),
75                });
76            }
77            (col, qual) => (qual.to_vec(), col.as_str().to_string()),
78        };
79
80        let resolved_reference = retrieved_references.resolve_name(&qual)?;
81        let mut fully_qualified_name =
82            resolved_reference
83                .external_reference()
84                .map_err(|e| PlanError::InvalidOptionValue {
85                    option_name: option_type.to_ast_string_simple(),
86                    err: Box::new(e.into()),
87                })?;
88
89        let desc = resolved_reference
90            .postgres_desc()
91            .expect("known to be postgres");
92
93        if !desc.columns.iter().any(|column| column.name == col) {
94            let column = mz_repr::ColumnName::from(col);
95            let similar = desc
96                .columns
97                .iter()
98                .filter_map(|c| {
99                    let c_name = mz_repr::ColumnName::from(c.name.clone());
100                    c_name.is_similar(&column).then_some(c_name)
101                })
102                .collect();
103            return Err(PlanError::InvalidOptionValue {
104                option_name: option_type.to_ast_string_simple(),
105                err: Box::new(PlanError::UnknownColumn {
106                    table: Some(
107                        normalize::unresolved_item_name(fully_qualified_name)
108                            .expect("known to be of valid len"),
109                    ),
110                    column,
111                    similar,
112                }),
113            });
114        }
115
116        // Rewrite fully qualified name.
117        let col_ident = Ident::new(col.as_str().to_string())?;
118        fully_qualified_name.0.push(col_ident);
119        *name = fully_qualified_name;
120
121        let new = cols_map
122            .entry(desc.oid)
123            .or_default()
124            .insert(col.as_str().to_string());
125
126        if !new {
127            return Err(PlanError::InvalidOptionValue {
128                option_name: option_type.to_ast_string_simple(),
129                err: Box::new(PlanError::UnexpectedDuplicateReference { name: name.clone() }),
130            });
131        }
132    }
133
134    Ok(cols_map)
135}
136
137pub fn generate_create_subsource_statements(
138    scx: &StatementContext,
139    source_name: ResolvedItemName,
140    requested_subsources: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
141) -> Result<Vec<CreateSubsourceStatement<Aug>>, PlanError> {
142    // Aggregate all unrecognized types.
143    let mut unsupported_cols = vec![];
144
145    // Now that we have an explicit list of validated requested subsources we can create them
146    let mut subsources = Vec::with_capacity(requested_subsources.len());
147
148    for (subsource_name, purified_export) in requested_subsources {
149        let PostgresExportStatementValues {
150            columns,
151            constraints,
152            text_columns,
153            exclude_columns,
154            details,
155            external_reference,
156        } = generate_source_export_statement_values(scx, purified_export, &mut unsupported_cols)?;
157
158        let mut with_options = vec![
159            CreateSubsourceOption {
160                name: CreateSubsourceOptionName::ExternalReference,
161                value: Some(WithOptionValue::UnresolvedItemName(external_reference)),
162            },
163            CreateSubsourceOption {
164                name: CreateSubsourceOptionName::Details,
165                value: Some(WithOptionValue::Value(Value::String(hex::encode(
166                    details.into_proto().encode_to_vec(),
167                )))),
168            },
169        ];
170
171        if let Some(text_columns) = text_columns {
172            with_options.push(CreateSubsourceOption {
173                name: CreateSubsourceOptionName::TextColumns,
174                value: Some(WithOptionValue::Sequence(text_columns)),
175            });
176        }
177
178        if let Some(exclude_columns) = exclude_columns {
179            with_options.push(CreateSubsourceOption {
180                name: CreateSubsourceOptionName::ExcludeColumns,
181                value: Some(WithOptionValue::Sequence(exclude_columns)),
182            });
183        }
184
185        // Create the subsource statement
186        let subsource = CreateSubsourceStatement {
187            name: subsource_name,
188            columns,
189            // We might not know the primary source's `GlobalId` yet; if not,
190            // we'll fill it in once we generate it.
191            of_source: Some(source_name.clone()),
192            // TODO(petrosagg): nothing stops us from getting the constraints of the
193            // upstream tables and mirroring them here which will lead to more optimization
194            // opportunities if for example there is a primary key or an index.
195            //
196            // If we ever do that we must triple check that we will get notified *in the
197            // replication stream*, if our assumptions change. Failure to do that could
198            // mean that an upstream table that started with an index was then altered to
199            // one without and now we're producing garbage data.
200            constraints,
201            if_not_exists: false,
202            with_options,
203        };
204        subsources.push(subsource);
205    }
206
207    if !unsupported_cols.is_empty() {
208        unsupported_cols.sort();
209        Err(PgSourcePurificationError::UnrecognizedTypes {
210            cols: unsupported_cols,
211        })?;
212    }
213
214    Ok(subsources)
215}
216
217pub(super) struct PostgresExportStatementValues {
218    pub(super) columns: Vec<ColumnDef<Aug>>,
219    pub(super) constraints: Vec<TableConstraint<Aug>>,
220    pub(super) text_columns: Option<Vec<WithOptionValue<Aug>>>,
221    pub(super) exclude_columns: Option<Vec<WithOptionValue<Aug>>>,
222    pub(super) details: SourceExportStatementDetails,
223    pub(super) external_reference: UnresolvedItemName,
224}
225
226pub(super) fn generate_source_export_statement_values(
227    scx: &StatementContext,
228    purified_export: PurifiedSourceExport,
229    unsupported_cols: &mut Vec<(String, mz_repr::adt::system::Oid)>,
230) -> Result<PostgresExportStatementValues, PlanError> {
231    let PurifiedExportDetails::Postgres {
232        table,
233        text_columns,
234        exclude_columns,
235    } = purified_export.details
236    else {
237        unreachable!("purified export details must be postgres")
238    };
239
240    let text_column_set = BTreeSet::from_iter(text_columns.iter().flatten().map(Ident::as_str));
241    let exclude_column_set =
242        BTreeSet::from_iter(exclude_columns.iter().flatten().map(Ident::as_str));
243
244    // Figure out the schema of the subsource
245    let mut columns = vec![];
246    for c in table.columns.iter() {
247        let name = Ident::new(c.name.clone())?;
248
249        if exclude_column_set.contains(c.name.as_str()) {
250            continue;
251        }
252
253        let ty = if text_column_set.contains(c.name.as_str()) {
254            mz_pgrepr::Type::Text
255        } else {
256            match mz_pgrepr::Type::from_oid_and_typmod(c.type_oid, c.type_mod) {
257                Ok(t) => t,
258                Err(_) => {
259                    let mut full_name = purified_export.external_reference.0.clone();
260                    full_name.push(name);
261                    unsupported_cols.push((
262                        UnresolvedItemName(full_name).to_ast_string_simple(),
263                        mz_repr::adt::system::Oid(c.type_oid),
264                    ));
265                    continue;
266                }
267            }
268        };
269
270        let data_type = scx.resolve_type(ty)?;
271        let mut options = vec![];
272
273        if !c.nullable {
274            options.push(mz_sql_parser::ast::ColumnOptionDef {
275                name: None,
276                option: mz_sql_parser::ast::ColumnOption::NotNull,
277            });
278        }
279
280        columns.push(ColumnDef {
281            name,
282            data_type,
283            collation: None,
284            options,
285        });
286    }
287
288    let mut constraints = vec![];
289    for key in table.keys.clone() {
290        let mut key_columns = vec![];
291
292        for col_num in key.cols {
293            let ident = Ident::new(
294                table
295                    .columns
296                    .iter()
297                    .find(|col| col.col_num == col_num)
298                    .expect("key exists as column")
299                    .name
300                    .clone(),
301            )?;
302            key_columns.push(ident);
303        }
304
305        let constraint = mz_sql_parser::ast::TableConstraint::Unique {
306            name: Some(Ident::new(key.name)?),
307            columns: key_columns,
308            is_primary: key.is_primary,
309            nulls_not_distinct: key.nulls_not_distinct,
310        };
311
312        // We take the first constraint available to be the primary key.
313        if key.is_primary {
314            constraints.insert(0, constraint);
315        } else {
316            constraints.push(constraint);
317        }
318    }
319    let details = SourceExportStatementDetails::Postgres { table };
320
321    let text_columns = text_columns.map(|mut columns| {
322        columns.sort();
323        columns
324            .into_iter()
325            .map(WithOptionValue::Ident::<Aug>)
326            .collect()
327    });
328
329    let exclude_columns = exclude_columns.map(|mut columns| {
330        columns.sort();
331        columns
332            .into_iter()
333            .map(WithOptionValue::Ident::<Aug>)
334            .collect()
335    });
336
337    Ok(PostgresExportStatementValues {
338        columns,
339        constraints,
340        text_columns,
341        exclude_columns,
342        details,
343        external_reference: purified_export.external_reference,
344    })
345}
346
347pub(super) struct PurifiedSourceExports {
348    pub(super) source_exports: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
349    // NOTE(roshan): The text columns are already part of their
350    // appropriate `source_exports` above, but these are returned to allow
351    // round-tripping a `CREATE SOURCE` statement while we still allow creating
352    // implicit subsources from `CREATE SOURCE`. Remove once
353    // fully deprecating that feature and forcing users to use explicit
354    // `CREATE TABLE .. FROM SOURCE` statements.
355    pub(super) normalized_text_columns: Vec<WithOptionValue<Aug>>,
356}
357
358// Purify the requested external references, returning a set of purified
359// source exports corresponding to external tables, and and additional
360// fields necessary to generate relevant statements and update statement options
361pub(super) async fn purify_source_exports(
362    client: &Client,
363    retrieved_references: &RetrievedSourceReferences,
364    requested_references: &Option<ExternalReferences>,
365    mut text_columns: Vec<UnresolvedItemName>,
366    mut exclude_columns: Vec<UnresolvedItemName>,
367    unresolved_source_name: &UnresolvedItemName,
368    reference_policy: &SourceReferencePolicy,
369) -> Result<PurifiedSourceExports, PlanError> {
370    let requested_exports = match requested_references.as_ref() {
371        Some(requested) if matches!(reference_policy, SourceReferencePolicy::NotAllowed) => {
372            Err(PlanError::UseTablesForSources(requested.to_string()))?
373        }
374        Some(requested) => retrieved_references
375            .requested_source_exports(Some(requested), unresolved_source_name)?,
376        None => {
377            if matches!(reference_policy, SourceReferencePolicy::Required) {
378                Err(PgSourcePurificationError::RequiresExternalReferences)?
379            }
380
381            // If no external reference is specified, it does not make sense to include
382            // text columns.
383            if !text_columns.is_empty() {
384                Err(
385                    PgSourcePurificationError::UnnecessaryOptionsWithoutReferences(
386                        "TEXT COLUMNS".to_string(),
387                    ),
388                )?
389            }
390
391            // If no external reference is specified, it does not make sense to include
392            // exclude columns.
393            if !exclude_columns.is_empty() {
394                Err(
395                    PgSourcePurificationError::UnnecessaryOptionsWithoutReferences(
396                        "EXCLUDE COLUMNS".to_string(),
397                    ),
398                )?
399            }
400
401            return Ok(PurifiedSourceExports {
402                source_exports: BTreeMap::new(),
403                normalized_text_columns: vec![],
404            });
405        }
406    };
407
408    if requested_exports.is_empty() {
409        sql_bail!(
410            "[internal error]: Postgres reference {} did not match any tables",
411            requested_references
412                .as_ref()
413                .unwrap()
414                .to_ast_string_simple()
415        );
416    }
417
418    super::validate_source_export_names(&requested_exports)?;
419
420    let table_oids: Vec<_> = requested_exports
421        .iter()
422        .map(|r| r.meta.postgres_desc().expect("is postgres").oid)
423        .collect();
424
425    validate_requested_references_privileges(client, &table_oids).await?;
426
427    let mut text_column_map = map_column_refs(
428        retrieved_references,
429        &mut text_columns,
430        PgConfigOptionName::TextColumns,
431    )?;
432    let mut exclude_column_map = map_column_refs(
433        retrieved_references,
434        &mut exclude_columns,
435        PgConfigOptionName::ExcludeColumns,
436    )?;
437
438    // Normalize options to contain full qualified values.
439    text_columns.sort();
440    text_columns.dedup();
441    let normalized_text_columns: Vec<_> = text_columns
442        .into_iter()
443        .map(WithOptionValue::UnresolvedItemName)
444        .collect();
445
446    let source_exports = requested_exports
447        .into_iter()
448        .map(|r| {
449            let mut desc = r.meta.postgres_desc().expect("known postgres").clone();
450            let text_columns = text_column_map.remove(&desc.oid);
451            let exclude_columns = exclude_column_map.remove(&desc.oid);
452
453            if let Some(exclude_cols) = &exclude_columns {
454                desc.columns.retain(|c| !exclude_cols.contains(&c.name));
455            }
456
457            if let (Some(text_cols), Some(exclude_cols)) = (&text_columns, &exclude_columns) {
458                let intersection: Vec<_> = text_cols.intersection(exclude_cols).collect();
459                if !intersection.is_empty() {
460                    return Err(PgSourcePurificationError::DuplicatedColumnNames(
461                        intersection.iter().map(|s| (*s).to_string()).collect(),
462                    ));
463                }
464            }
465            Ok((
466                r.name,
467                PurifiedSourceExport {
468                    external_reference: r.external_reference,
469                    details: PurifiedExportDetails::Postgres {
470                        text_columns: text_columns.map(|v| {
471                            v.into_iter()
472                                .map(|s| Ident::new(s).expect("validated above"))
473                                .collect()
474                        }),
475                        exclude_columns: exclude_columns.map(|v| {
476                            v.into_iter()
477                                .map(|s| Ident::new(s).expect("validated above"))
478                                .collect()
479                        }),
480                        table: desc,
481                    },
482                },
483            ))
484        })
485        .collect::<Result<BTreeMap<_, _>, _>>()?;
486
487    if !text_column_map.is_empty() {
488        // If any any item was not removed from the text_column_map, it wasn't being
489        // added.
490        let mut dangling_text_column_refs = vec![];
491        let all_references = retrieved_references.all_references();
492
493        for id in text_column_map.keys() {
494            let desc = all_references
495                .iter()
496                .find_map(|reference| {
497                    let desc = reference.postgres_desc().expect("is postgres");
498                    if desc.oid == *id { Some(desc) } else { None }
499                })
500                .expect("validated when generating text columns");
501
502            dangling_text_column_refs.push(PartialItemName {
503                database: None,
504                schema: Some(desc.namespace.clone()),
505                item: desc.name.clone(),
506            });
507        }
508
509        dangling_text_column_refs.sort();
510        return Err(PlanError::from(
511            PgSourcePurificationError::DanglingTextColumns {
512                items: dangling_text_column_refs,
513            },
514        ));
515    }
516
517    if !exclude_column_map.is_empty() {
518        // If any any item was not removed from the exclude_column_map, it wasn't being
519        // added.
520        let mut dangling_exclude_column_refs = vec![];
521        let all_references = retrieved_references.all_references();
522
523        for id in exclude_column_map.keys() {
524            let desc = all_references
525                .iter()
526                .find_map(|reference| {
527                    let desc = reference.postgres_desc().expect("is postgres");
528                    if desc.oid == *id { Some(desc) } else { None }
529                })
530                .expect("validated when generating exclude columns");
531
532            dangling_exclude_column_refs.push(PartialItemName {
533                database: None,
534                schema: Some(desc.namespace.clone()),
535                item: desc.name.clone(),
536            });
537        }
538
539        dangling_exclude_column_refs.sort();
540        return Err(PlanError::from(
541            PgSourcePurificationError::DanglingExcludeColumns {
542                items: dangling_exclude_column_refs,
543            },
544        ));
545    }
546
547    Ok(PurifiedSourceExports {
548        source_exports,
549        normalized_text_columns,
550    })
551}
552
553pub(crate) fn generate_column_casts(
554    scx: &StatementContext,
555    table: &PostgresTableDesc,
556    text_columns: &Vec<Ident>,
557) -> Result<Vec<(CastType, MirScalarExpr)>, PlanError> {
558    // Generate the cast expressions required to convert the text encoded columns into
559    // the appropriate target types, creating a Vec<MirScalarExpr>
560    // The postgres source reader will then eval each of those on the incoming rows
561    // First, construct an expression context where the expression is evaluated on an
562    // imaginary row which has the same number of columns as the upstream table but all
563    // of the types are text
564    let mut cast_scx = scx.clone();
565    cast_scx.param_types = Default::default();
566    let cast_qcx = QueryContext::root(&cast_scx, QueryLifetime::Source);
567    let mut column_types = vec![];
568    for column in table.columns.iter() {
569        column_types.push(SqlColumnType {
570            nullable: column.nullable,
571            scalar_type: SqlScalarType::String,
572        });
573    }
574
575    let cast_ecx = ExprContext {
576        qcx: &cast_qcx,
577        name: "plan_postgres_source_cast",
578        scope: &Scope::empty(),
579        relation_type: &SqlRelationType {
580            column_types,
581            keys: vec![],
582        },
583        allow_aggregates: false,
584        allow_subqueries: false,
585        allow_parameters: false,
586        allow_windows: false,
587    };
588
589    let text_columns = BTreeSet::from_iter(text_columns.iter().map(Ident::as_str));
590
591    // Then, for each column we will generate a MirRelationExpr that extracts the nth
592    // column and casts it to the appropriate target type
593    let mut table_cast = vec![];
594    for (i, column) in table.columns.iter().enumerate() {
595        let (cast_type, ty) = if text_columns.contains(column.name.as_str()) {
596            // Treat the column as text if it was referenced in
597            // `TEXT COLUMNS`. This is the only place we need to
598            // perform this logic; even if the type is unsupported,
599            // we'll be able to ingest its values as text in
600            // storage.
601            (CastType::Text, mz_pgrepr::Type::Text)
602        } else {
603            match mz_pgrepr::Type::from_oid_and_typmod(column.type_oid, column.type_mod) {
604                Ok(t) => (CastType::Natural, t),
605                // If this reference survived purification, we
606                // do not expect it to be from a table that the
607                // user will consume., i.e. expect this table to
608                // be filtered out of table casts.
609                Err(_) => {
610                    table_cast.push((
611                        CastType::Natural,
612                        HirScalarExpr::call_variadic(
613                            mz_expr::VariadicFunc::ErrorIfNull,
614                            vec![
615                                HirScalarExpr::literal_null(SqlScalarType::String),
616                                HirScalarExpr::literal(
617                                    mz_repr::Datum::from(
618                                        format!("Unsupported type with OID {}", column.type_oid)
619                                            .as_str(),
620                                    ),
621                                    SqlScalarType::String,
622                                ),
623                            ],
624                        )
625                        .lower_uncorrelated()
626                        .expect("no correlation"),
627                    ));
628                    continue;
629                }
630            }
631        };
632
633        let data_type = scx.resolve_type(ty)?;
634        let scalar_type = crate::plan::query::scalar_type_from_sql(scx, &data_type)?;
635
636        let col_expr = HirScalarExpr::unnamed_column(ColumnRef {
637            level: 0,
638            column: i,
639        });
640
641        let cast_expr = plan_cast(&cast_ecx, CastContext::Explicit, col_expr, &scalar_type)?;
642
643        let cast = if column.nullable {
644            cast_expr
645        } else {
646            // We must enforce nullability constraint on cast
647            // because PG replication stream does not propagate
648            // constraint changes and we want to error subsource if
649            // e.g. the constraint is dropped and we don't notice
650            // it.
651            HirScalarExpr::call_variadic(mz_expr::VariadicFunc::ErrorIfNull, vec![
652                                cast_expr,
653                                HirScalarExpr::literal(
654                                    mz_repr::Datum::from(
655                                        format!(
656                                            "PG column {}.{}.{} contained NULL data, despite having NOT NULL constraint",
657                                            table.namespace.clone(),
658                                            table.name.clone(),
659                                            column.name.clone())
660                                            .as_str(),
661                                    ),
662                                    SqlScalarType::String,
663                                ),
664                            ],
665                            )
666        };
667
668        // We expect only reg* types to encounter this issue. Users
669        // can ingest the data as text if they need to ingest it.
670        // This is acceptable because we don't expect the OIDs from
671        // an external PG source to be unilaterally usable in
672        // resolving item names in MZ.
673        let mir_cast = cast.lower_uncorrelated().map_err(|_e| {
674            tracing::info!(
675                "cannot ingest {:?} data from PG source because cast is correlated",
676                scalar_type
677            );
678
679            PlanError::TableContainsUningestableTypes {
680                name: table.name.to_string(),
681                type_: scx.humanize_scalar_type(&scalar_type, false),
682                column: column.name.to_string(),
683            }
684        })?;
685
686        table_cast.push((cast_type, mir_cast));
687    }
688    Ok(table_cast)
689}
690
691mod privileges {
692    use mz_postgres_util::PostgresError;
693
694    use super::*;
695    use crate::plan::PlanError;
696    use crate::pure::PgSourcePurificationError;
697
698    async fn check_schema_privileges(client: &Client, table_oids: &[Oid]) -> Result<(), PlanError> {
699        let invalid_schema_privileges_rows = client
700            .query(
701                "
702                WITH distinct_namespace AS (
703                    SELECT
704                        DISTINCT n.oid, n.nspname AS schema_name
705                    FROM unnest($1::OID[]) AS oids (oid)
706                    JOIN pg_class AS c ON c.oid = oids.oid
707                    JOIN pg_namespace AS n ON c.relnamespace = n.oid
708                )
709                SELECT d.schema_name
710                FROM distinct_namespace AS d
711                WHERE
712                    NOT has_schema_privilege(CURRENT_USER::TEXT, d.oid, 'usage')",
713                &[&table_oids],
714            )
715            .await
716            .map_err(PostgresError::from)?;
717
718        let mut invalid_schema_privileges = invalid_schema_privileges_rows
719            .into_iter()
720            .map(|row| row.get("schema_name"))
721            .collect::<Vec<String>>();
722
723        if invalid_schema_privileges.is_empty() {
724            Ok(())
725        } else {
726            invalid_schema_privileges.sort();
727            Err(PgSourcePurificationError::UserLacksUsageOnSchemas {
728                schemas: invalid_schema_privileges,
729            })?
730        }
731    }
732
733    /// Ensure that the user specified in `config` has:
734    ///
735    /// -`SELECT` privileges for the identified `tables`.
736    ///
737    ///  `tables`'s elements should be of the structure `[<schema name>, <table name>]`.
738    ///
739    /// - `USAGE` privileges on the schemas references in `tables`.
740    ///
741    /// # Panics
742    /// If `config` does not specify a user.
743    pub async fn check_table_privileges(
744        client: &Client,
745        table_oids: &[Oid],
746    ) -> Result<(), PlanError> {
747        check_schema_privileges(client, table_oids).await?;
748
749        let invalid_table_privileges_rows = client
750            .query(
751                "
752            SELECT
753                format('%I.%I', n.nspname, c.relname) AS schema_qualified_table_name
754             FROM unnest($1::oid[]) AS oids (oid)
755             JOIN
756                 pg_class c ON c.oid = oids.oid
757             JOIN
758                 pg_namespace n ON c.relnamespace = n.oid
759             WHERE NOT has_table_privilege(CURRENT_USER::text, c.oid, 'select')",
760                &[&table_oids],
761            )
762            .await
763            .map_err(PostgresError::from)?;
764
765        let mut invalid_table_privileges = invalid_table_privileges_rows
766            .into_iter()
767            .map(|row| row.get("schema_qualified_table_name"))
768            .collect::<Vec<String>>();
769
770        if invalid_table_privileges.is_empty() {
771            Ok(())
772        } else {
773            invalid_table_privileges.sort();
774            Err(PgSourcePurificationError::UserLacksSelectOnTables {
775                tables: invalid_table_privileges,
776            })?
777        }
778    }
779
780    /// Ensure that the user specified in `config` can read data from tables if row level security
781    /// (RLS) is enabled. If the user/role does not have the BYPASSRLS attribute set, there is
782    /// the possibility that MZ may not be able to read all data during the snapshot, which would
783    /// result in missing data.
784    pub async fn check_rls_privileges(
785        client: &Client,
786        table_oids: &[Oid],
787    ) -> Result<(), PlanError> {
788        match mz_postgres_util::validate_no_rls_policies(client, table_oids).await {
789            Ok(_) => Ok(()),
790            Err(err) => match err {
791                // This is a little gross to do, but PlanError::PostgresConnectionErr implements
792                // From<PostgresError>, and the error in that case would be
793                // "failed to connect to PostgreSQL database", which doesn't make any sense.
794                PostgresError::BypassRLSRequired(tables) => {
795                    Err(PgSourcePurificationError::BypassRLSRequired { tables })?
796                }
797                _ => Err(err)?,
798            },
799        }
800    }
801}
802
803mod replica_identity {
804    use mz_postgres_util::PostgresError;
805
806    use super::*;
807    use crate::plan::PlanError;
808    use crate::pure::PgSourcePurificationError;
809
810    /// Ensures that all provided OIDs are tables with `REPLICA IDENTITY FULL`.
811    pub async fn check_replica_identity_full(
812        client: &Client,
813        table_oids: &[Oid],
814    ) -> Result<(), PlanError> {
815        let invalid_replica_identity_rows = client
816            .query(
817                "
818            SELECT
819                format('%I.%I', n.nspname, c.relname) AS schema_qualified_table_name
820             FROM unnest($1::oid[]) AS oids (oid)
821             JOIN
822                 pg_class c ON c.oid = oids.oid
823             JOIN
824                 pg_namespace n ON c.relnamespace = n.oid
825             WHERE relreplident != 'f' OR relreplident IS NULL;",
826                &[&table_oids],
827            )
828            .await
829            .map_err(PostgresError::from)?;
830
831        let mut invalid_replica_identity = invalid_replica_identity_rows
832            .into_iter()
833            .map(|row| row.get("schema_qualified_table_name"))
834            .collect::<Vec<String>>();
835
836        if invalid_replica_identity.is_empty() {
837            Ok(())
838        } else {
839            invalid_replica_identity.sort();
840            Err(PgSourcePurificationError::NotTablesWReplicaIdentityFull {
841                items: invalid_replica_identity,
842            })?
843        }
844    }
845}