Skip to main content

mz_sql/pure/
postgres.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Postgres utilities for SQL purification.
11
12use std::collections::{BTreeMap, BTreeSet};
13
14use mz_expr::MirScalarExpr;
15use mz_expr::func::variadic::ErrorIfNull;
16use mz_postgres_util::desc::PostgresTableDesc;
17use mz_proto::RustType;
18use mz_repr::{SqlColumnType, SqlRelationType, SqlScalarType};
19use mz_sql_parser::ast::display::AstDisplay;
20use mz_sql_parser::ast::{
21    ColumnDef, CreateSubsourceOption, CreateSubsourceOptionName, CreateSubsourceStatement,
22    ExternalReferences, Ident, PgConfigOptionName, TableConstraint, UnresolvedItemName, Value,
23    WithOptionValue,
24};
25use mz_storage_types::sources::SourceExportStatementDetails;
26use mz_storage_types::sources::postgres::CastType;
27use prost::Message;
28use tokio_postgres::Client;
29use tokio_postgres::types::Oid;
30
31use crate::names::{Aug, ResolvedItemName};
32use crate::normalize;
33use crate::plan::hir::ColumnRef;
34use crate::plan::typeconv::{CastContext, plan_cast};
35use crate::plan::{
36    ExprContext, HirScalarExpr, PlanError, QueryContext, QueryLifetime, Scope, StatementContext,
37};
38
39use super::error::PgSourcePurificationError;
40use super::references::RetrievedSourceReferences;
41use super::{PartialItemName, PurifiedExportDetails, PurifiedSourceExport, SourceReferencePolicy};
42
43/// Ensure that we have select permissions on all tables; we have to do this before we
44/// start snapshotting because if we discover we cannot `COPY` from a table while
45/// snapshotting, we break the entire source.
46pub(super) async fn validate_requested_references_privileges(
47    client: &Client,
48    table_oids: &[Oid],
49) -> Result<(), PlanError> {
50    privileges::check_table_privileges(client, table_oids).await?;
51    privileges::check_rls_privileges(client, table_oids).await?;
52    replica_identity::check_replica_identity_full(client, table_oids).await?;
53
54    Ok(())
55}
56
57/// Map a list of column references to a map of table oids to column names.
58///
59/// Additionally, modify `columns` so that they contain database-qualified
60/// references to the columns.
61pub(super) fn map_column_refs(
62    retrieved_references: &RetrievedSourceReferences,
63    columns: &mut [UnresolvedItemName],
64    option_type: PgConfigOptionName,
65) -> Result<BTreeMap<u32, BTreeSet<String>>, PlanError> {
66    let mut cols_map: BTreeMap<u32, BTreeSet<String>> = BTreeMap::new();
67
68    for name in columns {
69        let (qual, col) = match name.0.split_last().expect("must have at least one element") {
70            (col, qual) if qual.is_empty() => {
71                return Err(PlanError::InvalidOptionValue {
72                    option_name: option_type.to_ast_string_simple(),
73                    err: Box::new(PlanError::UnderqualifiedColumnName(
74                        col.as_str().to_string(),
75                    )),
76                });
77            }
78            (col, qual) => (qual.to_vec(), col.as_str().to_string()),
79        };
80
81        let resolved_reference = retrieved_references.resolve_name(&qual)?;
82        let mut fully_qualified_name =
83            resolved_reference
84                .external_reference()
85                .map_err(|e| PlanError::InvalidOptionValue {
86                    option_name: option_type.to_ast_string_simple(),
87                    err: Box::new(e.into()),
88                })?;
89
90        let desc = resolved_reference
91            .postgres_desc()
92            .expect("known to be postgres");
93
94        if !desc.columns.iter().any(|column| column.name == col) {
95            let column = mz_repr::ColumnName::from(col);
96            let similar = desc
97                .columns
98                .iter()
99                .filter_map(|c| {
100                    let c_name = mz_repr::ColumnName::from(c.name.clone());
101                    c_name.is_similar(&column).then_some(c_name)
102                })
103                .collect();
104            return Err(PlanError::InvalidOptionValue {
105                option_name: option_type.to_ast_string_simple(),
106                err: Box::new(PlanError::UnknownColumn {
107                    table: Some(
108                        normalize::unresolved_item_name(fully_qualified_name)
109                            .expect("known to be of valid len"),
110                    ),
111                    column,
112                    similar,
113                }),
114            });
115        }
116
117        // Rewrite fully qualified name.
118        let col_ident = Ident::new(col.as_str().to_string())?;
119        fully_qualified_name.0.push(col_ident);
120        *name = fully_qualified_name;
121
122        let new = cols_map
123            .entry(desc.oid)
124            .or_default()
125            .insert(col.as_str().to_string());
126
127        if !new {
128            return Err(PlanError::InvalidOptionValue {
129                option_name: option_type.to_ast_string_simple(),
130                err: Box::new(PlanError::UnexpectedDuplicateReference { name: name.clone() }),
131            });
132        }
133    }
134
135    Ok(cols_map)
136}
137
138pub fn generate_create_subsource_statements(
139    scx: &StatementContext,
140    source_name: ResolvedItemName,
141    requested_subsources: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
142) -> Result<Vec<CreateSubsourceStatement<Aug>>, PlanError> {
143    // Aggregate all unrecognized types.
144    let mut unsupported_cols = vec![];
145
146    // Now that we have an explicit list of validated requested subsources we can create them
147    let mut subsources = Vec::with_capacity(requested_subsources.len());
148
149    for (subsource_name, purified_export) in requested_subsources {
150        let PostgresExportStatementValues {
151            columns,
152            constraints,
153            text_columns,
154            exclude_columns,
155            details,
156            external_reference,
157        } = generate_source_export_statement_values(scx, purified_export, &mut unsupported_cols)?;
158
159        let mut with_options = vec![
160            CreateSubsourceOption {
161                name: CreateSubsourceOptionName::ExternalReference,
162                value: Some(WithOptionValue::UnresolvedItemName(external_reference)),
163            },
164            CreateSubsourceOption {
165                name: CreateSubsourceOptionName::Details,
166                value: Some(WithOptionValue::Value(Value::String(hex::encode(
167                    details.into_proto().encode_to_vec(),
168                )))),
169            },
170        ];
171
172        if let Some(text_columns) = text_columns {
173            with_options.push(CreateSubsourceOption {
174                name: CreateSubsourceOptionName::TextColumns,
175                value: Some(WithOptionValue::Sequence(text_columns)),
176            });
177        }
178
179        if let Some(exclude_columns) = exclude_columns {
180            with_options.push(CreateSubsourceOption {
181                name: CreateSubsourceOptionName::ExcludeColumns,
182                value: Some(WithOptionValue::Sequence(exclude_columns)),
183            });
184        }
185
186        // Create the subsource statement
187        let subsource = CreateSubsourceStatement {
188            name: subsource_name,
189            columns,
190            // We might not know the primary source's `GlobalId` yet; if not,
191            // we'll fill it in once we generate it.
192            of_source: Some(source_name.clone()),
193            // TODO(petrosagg): nothing stops us from getting the constraints of the
194            // upstream tables and mirroring them here which will lead to more optimization
195            // opportunities if for example there is a primary key or an index.
196            //
197            // If we ever do that we must triple check that we will get notified *in the
198            // replication stream*, if our assumptions change. Failure to do that could
199            // mean that an upstream table that started with an index was then altered to
200            // one without and now we're producing garbage data.
201            constraints,
202            if_not_exists: false,
203            with_options,
204        };
205        subsources.push(subsource);
206    }
207
208    if !unsupported_cols.is_empty() {
209        unsupported_cols.sort();
210        Err(PgSourcePurificationError::UnrecognizedTypes {
211            cols: unsupported_cols,
212        })?;
213    }
214
215    Ok(subsources)
216}
217
218pub(super) struct PostgresExportStatementValues {
219    pub(super) columns: Vec<ColumnDef<Aug>>,
220    pub(super) constraints: Vec<TableConstraint<Aug>>,
221    pub(super) text_columns: Option<Vec<WithOptionValue<Aug>>>,
222    pub(super) exclude_columns: Option<Vec<WithOptionValue<Aug>>>,
223    pub(super) details: SourceExportStatementDetails,
224    pub(super) external_reference: UnresolvedItemName,
225}
226
227pub(super) fn generate_source_export_statement_values(
228    scx: &StatementContext,
229    purified_export: PurifiedSourceExport,
230    unsupported_cols: &mut Vec<(String, mz_repr::adt::system::Oid)>,
231) -> Result<PostgresExportStatementValues, PlanError> {
232    let PurifiedExportDetails::Postgres {
233        table,
234        text_columns,
235        exclude_columns,
236    } = purified_export.details
237    else {
238        unreachable!("purified export details must be postgres")
239    };
240
241    let text_column_set = BTreeSet::from_iter(text_columns.iter().flatten().map(Ident::as_str));
242    let exclude_column_set =
243        BTreeSet::from_iter(exclude_columns.iter().flatten().map(Ident::as_str));
244
245    // Figure out the schema of the subsource
246    let mut columns = vec![];
247    for c in table.columns.iter() {
248        let name = Ident::new(c.name.clone())?;
249
250        if exclude_column_set.contains(c.name.as_str()) {
251            continue;
252        }
253
254        let ty = if text_column_set.contains(c.name.as_str()) {
255            mz_pgrepr::Type::Text
256        } else {
257            match mz_pgrepr::Type::from_oid_and_typmod(c.type_oid, c.type_mod) {
258                Ok(t) => t,
259                Err(_) => {
260                    let mut full_name = purified_export.external_reference.0.clone();
261                    full_name.push(name);
262                    unsupported_cols.push((
263                        UnresolvedItemName(full_name).to_ast_string_simple(),
264                        mz_repr::adt::system::Oid(c.type_oid),
265                    ));
266                    continue;
267                }
268            }
269        };
270
271        let data_type = scx.resolve_type(ty)?;
272        let mut options = vec![];
273
274        if !c.nullable {
275            options.push(mz_sql_parser::ast::ColumnOptionDef {
276                name: None,
277                option: mz_sql_parser::ast::ColumnOption::NotNull,
278            });
279        }
280
281        columns.push(ColumnDef {
282            name,
283            data_type,
284            collation: None,
285            options,
286        });
287    }
288
289    let mut constraints = vec![];
290    for key in table.keys.clone() {
291        let mut key_columns = vec![];
292
293        for col_num in key.cols {
294            let ident = Ident::new(
295                table
296                    .columns
297                    .iter()
298                    .find(|col| col.col_num == col_num)
299                    .expect("key exists as column")
300                    .name
301                    .clone(),
302            )?;
303            key_columns.push(ident);
304        }
305
306        let constraint = mz_sql_parser::ast::TableConstraint::Unique {
307            name: Some(Ident::new(key.name)?),
308            columns: key_columns,
309            is_primary: key.is_primary,
310            nulls_not_distinct: key.nulls_not_distinct,
311        };
312
313        // We take the first constraint available to be the primary key.
314        if key.is_primary {
315            constraints.insert(0, constraint);
316        } else {
317            constraints.push(constraint);
318        }
319    }
320    let details = SourceExportStatementDetails::Postgres { table };
321
322    let text_columns = text_columns.map(|mut columns| {
323        columns.sort();
324        columns
325            .into_iter()
326            .map(WithOptionValue::Ident::<Aug>)
327            .collect()
328    });
329
330    let exclude_columns = exclude_columns.map(|mut columns| {
331        columns.sort();
332        columns
333            .into_iter()
334            .map(WithOptionValue::Ident::<Aug>)
335            .collect()
336    });
337
338    Ok(PostgresExportStatementValues {
339        columns,
340        constraints,
341        text_columns,
342        exclude_columns,
343        details,
344        external_reference: purified_export.external_reference,
345    })
346}
347
348pub(super) struct PurifiedSourceExports {
349    pub(super) source_exports: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
350    // NOTE(roshan): The text columns are already part of their
351    // appropriate `source_exports` above, but these are returned to allow
352    // round-tripping a `CREATE SOURCE` statement while we still allow creating
353    // implicit subsources from `CREATE SOURCE`. Remove once
354    // fully deprecating that feature and forcing users to use explicit
355    // `CREATE TABLE .. FROM SOURCE` statements.
356    pub(super) normalized_text_columns: Vec<WithOptionValue<Aug>>,
357}
358
359// Purify the requested external references, returning a set of purified
360// source exports corresponding to external tables, and and additional
361// fields necessary to generate relevant statements and update statement options
362pub(super) async fn purify_source_exports(
363    client: &Client,
364    retrieved_references: &RetrievedSourceReferences,
365    requested_references: &Option<ExternalReferences>,
366    mut text_columns: Vec<UnresolvedItemName>,
367    mut exclude_columns: Vec<UnresolvedItemName>,
368    unresolved_source_name: &UnresolvedItemName,
369    reference_policy: &SourceReferencePolicy,
370) -> Result<PurifiedSourceExports, PlanError> {
371    let requested_exports = match requested_references.as_ref() {
372        Some(requested) if matches!(reference_policy, SourceReferencePolicy::NotAllowed) => {
373            Err(PlanError::UseTablesForSources(requested.to_string()))?
374        }
375        Some(requested) => retrieved_references
376            .requested_source_exports(Some(requested), unresolved_source_name)?,
377        None => {
378            if matches!(reference_policy, SourceReferencePolicy::Required) {
379                Err(PgSourcePurificationError::RequiresExternalReferences)?
380            }
381
382            // If no external reference is specified, it does not make sense to include
383            // text columns.
384            if !text_columns.is_empty() {
385                Err(
386                    PgSourcePurificationError::UnnecessaryOptionsWithoutReferences(
387                        "TEXT COLUMNS".to_string(),
388                    ),
389                )?
390            }
391
392            // If no external reference is specified, it does not make sense to include
393            // exclude columns.
394            if !exclude_columns.is_empty() {
395                Err(
396                    PgSourcePurificationError::UnnecessaryOptionsWithoutReferences(
397                        "EXCLUDE COLUMNS".to_string(),
398                    ),
399                )?
400            }
401
402            return Ok(PurifiedSourceExports {
403                source_exports: BTreeMap::new(),
404                normalized_text_columns: vec![],
405            });
406        }
407    };
408
409    if requested_exports.is_empty() {
410        sql_bail!(
411            "[internal error]: Postgres reference {} did not match any tables",
412            requested_references
413                .as_ref()
414                .unwrap()
415                .to_ast_string_simple()
416        );
417    }
418
419    super::validate_source_export_names(&requested_exports)?;
420
421    let table_oids: Vec<_> = requested_exports
422        .iter()
423        .map(|r| r.meta.postgres_desc().expect("is postgres").oid)
424        .collect();
425
426    validate_requested_references_privileges(client, &table_oids).await?;
427
428    let mut text_column_map = map_column_refs(
429        retrieved_references,
430        &mut text_columns,
431        PgConfigOptionName::TextColumns,
432    )?;
433    let mut exclude_column_map = map_column_refs(
434        retrieved_references,
435        &mut exclude_columns,
436        PgConfigOptionName::ExcludeColumns,
437    )?;
438
439    // Normalize options to contain full qualified values.
440    text_columns.sort();
441    text_columns.dedup();
442    let normalized_text_columns: Vec<_> = text_columns
443        .into_iter()
444        .map(WithOptionValue::UnresolvedItemName)
445        .collect();
446
447    let source_exports = requested_exports
448        .into_iter()
449        .map(|r| {
450            let mut desc = r.meta.postgres_desc().expect("known postgres").clone();
451            let text_columns = text_column_map.remove(&desc.oid);
452            let exclude_columns = exclude_column_map.remove(&desc.oid);
453
454            if let Some(exclude_cols) = &exclude_columns {
455                desc.columns.retain(|c| !exclude_cols.contains(&c.name));
456            }
457
458            if let (Some(text_cols), Some(exclude_cols)) = (&text_columns, &exclude_columns) {
459                let intersection: Vec<_> = text_cols.intersection(exclude_cols).collect();
460                if !intersection.is_empty() {
461                    return Err(PgSourcePurificationError::DuplicatedColumnNames(
462                        intersection.iter().map(|s| (*s).to_string()).collect(),
463                    ));
464                }
465            }
466            Ok((
467                r.name,
468                PurifiedSourceExport {
469                    external_reference: r.external_reference,
470                    details: PurifiedExportDetails::Postgres {
471                        text_columns: text_columns.map(|v| {
472                            v.into_iter()
473                                .map(|s| Ident::new(s).expect("validated above"))
474                                .collect()
475                        }),
476                        exclude_columns: exclude_columns.map(|v| {
477                            v.into_iter()
478                                .map(|s| Ident::new(s).expect("validated above"))
479                                .collect()
480                        }),
481                        table: desc,
482                    },
483                },
484            ))
485        })
486        .collect::<Result<BTreeMap<_, _>, _>>()?;
487
488    if !text_column_map.is_empty() {
489        // If any any item was not removed from the text_column_map, it wasn't being
490        // added.
491        let mut dangling_text_column_refs = vec![];
492        let all_references = retrieved_references.all_references();
493
494        for id in text_column_map.keys() {
495            let desc = all_references
496                .iter()
497                .find_map(|reference| {
498                    let desc = reference.postgres_desc().expect("is postgres");
499                    if desc.oid == *id { Some(desc) } else { None }
500                })
501                .expect("validated when generating text columns");
502
503            dangling_text_column_refs.push(PartialItemName {
504                database: None,
505                schema: Some(desc.namespace.clone()),
506                item: desc.name.clone(),
507            });
508        }
509
510        dangling_text_column_refs.sort();
511        return Err(PlanError::from(
512            PgSourcePurificationError::DanglingTextColumns {
513                items: dangling_text_column_refs,
514            },
515        ));
516    }
517
518    if !exclude_column_map.is_empty() {
519        // If any any item was not removed from the exclude_column_map, it wasn't being
520        // added.
521        let mut dangling_exclude_column_refs = vec![];
522        let all_references = retrieved_references.all_references();
523
524        for id in exclude_column_map.keys() {
525            let desc = all_references
526                .iter()
527                .find_map(|reference| {
528                    let desc = reference.postgres_desc().expect("is postgres");
529                    if desc.oid == *id { Some(desc) } else { None }
530                })
531                .expect("validated when generating exclude columns");
532
533            dangling_exclude_column_refs.push(PartialItemName {
534                database: None,
535                schema: Some(desc.namespace.clone()),
536                item: desc.name.clone(),
537            });
538        }
539
540        dangling_exclude_column_refs.sort();
541        return Err(PlanError::from(
542            PgSourcePurificationError::DanglingExcludeColumns {
543                items: dangling_exclude_column_refs,
544            },
545        ));
546    }
547
548    Ok(PurifiedSourceExports {
549        source_exports,
550        normalized_text_columns,
551    })
552}
553
554pub(crate) fn generate_column_casts(
555    scx: &StatementContext,
556    table: &PostgresTableDesc,
557    text_columns: &Vec<Ident>,
558) -> Result<Vec<(CastType, MirScalarExpr)>, PlanError> {
559    // Generate the cast expressions required to convert the text encoded columns into
560    // the appropriate target types, creating a Vec<MirScalarExpr>
561    // The postgres source reader will then eval each of those on the incoming rows
562    // First, construct an expression context where the expression is evaluated on an
563    // imaginary row which has the same number of columns as the upstream table but all
564    // of the types are text
565    let mut cast_scx = scx.clone();
566    cast_scx.param_types = Default::default();
567    let cast_qcx = QueryContext::root(&cast_scx, QueryLifetime::Source);
568    let mut column_types = vec![];
569    for column in table.columns.iter() {
570        column_types.push(SqlColumnType {
571            nullable: column.nullable,
572            scalar_type: SqlScalarType::String,
573        });
574    }
575
576    let cast_ecx = ExprContext {
577        qcx: &cast_qcx,
578        name: "plan_postgres_source_cast",
579        scope: &Scope::empty(),
580        relation_type: &SqlRelationType {
581            column_types,
582            keys: vec![],
583        },
584        allow_aggregates: false,
585        // Even though we set `allow_subqueries` to `true`, with don't actually support casts here
586        // that expand to subqueries (through `sql_impl_cast`). The reason we still specify true is
587        // that cast planning doesn't have a way to report fine-grained errors, but we want to
588        // specifically sniff out the situation when the cast failed because of a subquery. So, what
589        // we do is we allow the HIR planning to succeed despite the subquery, and then catch the
590        // failure of `lower_uncorrelated` later.
591        allow_subqueries: true,
592        allow_parameters: false,
593        allow_windows: false,
594    };
595
596    let text_columns = BTreeSet::from_iter(text_columns.iter().map(Ident::as_str));
597
598    // Then, for each column we will generate a MirRelationExpr that extracts the nth
599    // column and casts it to the appropriate target type
600    let mut table_cast = vec![];
601    for (i, column) in table.columns.iter().enumerate() {
602        let (cast_type, ty) = if text_columns.contains(column.name.as_str()) {
603            // Treat the column as text if it was referenced in
604            // `TEXT COLUMNS`. This is the only place we need to
605            // perform this logic; even if the type is unsupported,
606            // we'll be able to ingest its values as text in
607            // storage.
608            (CastType::Text, mz_pgrepr::Type::Text)
609        } else {
610            match mz_pgrepr::Type::from_oid_and_typmod(column.type_oid, column.type_mod) {
611                Ok(t) => (CastType::Natural, t),
612                // If this reference survived purification, we
613                // do not expect it to be from a table that the
614                // user will consume., i.e. expect this table to
615                // be filtered out of table casts.
616                Err(_) => {
617                    table_cast.push((
618                        CastType::Natural,
619                        HirScalarExpr::call_variadic(
620                            ErrorIfNull,
621                            vec![
622                                HirScalarExpr::literal_null(SqlScalarType::String),
623                                HirScalarExpr::literal(
624                                    mz_repr::Datum::from(
625                                        format!("Unsupported type with OID {}", column.type_oid)
626                                            .as_str(),
627                                    ),
628                                    SqlScalarType::String,
629                                ),
630                            ],
631                        )
632                        .lower_uncorrelated(scx.catalog.system_vars())
633                        .expect("no correlation"),
634                    ));
635                    continue;
636                }
637            }
638        };
639
640        let data_type = scx.resolve_type(ty)?;
641        let scalar_type = crate::plan::query::scalar_type_from_sql(scx, &data_type)?;
642
643        let col_expr = HirScalarExpr::unnamed_column(ColumnRef {
644            level: 0,
645            column: i,
646        });
647
648        let cast_expr = plan_cast(&cast_ecx, CastContext::Explicit, col_expr, &scalar_type)?;
649
650        let cast = if column.nullable {
651            cast_expr
652        } else {
653            // We must enforce nullability constraint on cast
654            // because PG replication stream does not propagate
655            // constraint changes and we want to error subsource if
656            // e.g. the constraint is dropped and we don't notice
657            // it.
658            let message = format!(
659                "PG column {}.{}.{} contained NULL data, despite having NOT NULL constraint",
660                table.namespace, table.name, column.name
661            );
662            let exprs = vec![
663                cast_expr,
664                HirScalarExpr::literal(
665                    mz_repr::Datum::from(message.as_str()),
666                    SqlScalarType::String,
667                ),
668            ];
669            HirScalarExpr::call_variadic(ErrorIfNull, exprs)
670        };
671
672        // We expect only reg* types to encounter this issue. Users
673        // can ingest the data as text if they need to ingest it.
674        // This is acceptable because we don't expect the OIDs from
675        // an external PG source to be unilaterally usable in
676        // resolving item names in MZ.
677        let mir_cast = cast
678            .lower_uncorrelated(scx.catalog.system_vars())
679            .map_err(|_e| {
680                // We allowed subqueries in the cast's HIR planning above, but we don't actually
681                // support them here.
682                tracing::info!(
683                    "cannot ingest {:?} data from PG source because cast is correlated",
684                    scalar_type
685                );
686
687                PlanError::TableContainsUningestableTypes {
688                    name: table.name.to_string(),
689                    type_: scx.humanize_scalar_type(&scalar_type, false),
690                    column: column.name.to_string(),
691                }
692            })?;
693
694        table_cast.push((cast_type, mir_cast));
695    }
696    Ok(table_cast)
697}
698
699mod privileges {
700    use mz_postgres_util::PostgresError;
701
702    use super::*;
703    use crate::plan::PlanError;
704    use crate::pure::PgSourcePurificationError;
705
706    async fn check_schema_privileges(client: &Client, table_oids: &[Oid]) -> Result<(), PlanError> {
707        let invalid_schema_privileges_rows = client
708            .query(
709                "
710                WITH distinct_namespace AS (
711                    SELECT
712                        DISTINCT n.oid, n.nspname AS schema_name
713                    FROM unnest($1::OID[]) AS oids (oid)
714                    JOIN pg_class AS c ON c.oid = oids.oid
715                    JOIN pg_namespace AS n ON c.relnamespace = n.oid
716                )
717                SELECT d.schema_name
718                FROM distinct_namespace AS d
719                WHERE
720                    NOT has_schema_privilege(CURRENT_USER::TEXT, d.oid, 'usage')",
721                &[&table_oids],
722            )
723            .await
724            .map_err(PostgresError::from)?;
725
726        let mut invalid_schema_privileges = invalid_schema_privileges_rows
727            .into_iter()
728            .map(|row| row.get("schema_name"))
729            .collect::<Vec<String>>();
730
731        if invalid_schema_privileges.is_empty() {
732            Ok(())
733        } else {
734            invalid_schema_privileges.sort();
735            Err(PgSourcePurificationError::UserLacksUsageOnSchemas {
736                schemas: invalid_schema_privileges,
737            })?
738        }
739    }
740
741    /// Ensure that the user specified in `config` has:
742    ///
743    /// -`SELECT` privileges for the identified `tables`.
744    ///
745    ///  `tables`'s elements should be of the structure `[<schema name>, <table name>]`.
746    ///
747    /// - `USAGE` privileges on the schemas references in `tables`.
748    ///
749    /// # Panics
750    /// If `config` does not specify a user.
751    pub async fn check_table_privileges(
752        client: &Client,
753        table_oids: &[Oid],
754    ) -> Result<(), PlanError> {
755        check_schema_privileges(client, table_oids).await?;
756
757        let invalid_table_privileges_rows = client
758            .query(
759                "
760            SELECT
761                format('%I.%I', n.nspname, c.relname) AS schema_qualified_table_name
762             FROM unnest($1::oid[]) AS oids (oid)
763             JOIN
764                 pg_class c ON c.oid = oids.oid
765             JOIN
766                 pg_namespace n ON c.relnamespace = n.oid
767             WHERE NOT has_table_privilege(CURRENT_USER::text, c.oid, 'select')",
768                &[&table_oids],
769            )
770            .await
771            .map_err(PostgresError::from)?;
772
773        let mut invalid_table_privileges = invalid_table_privileges_rows
774            .into_iter()
775            .map(|row| row.get("schema_qualified_table_name"))
776            .collect::<Vec<String>>();
777
778        if invalid_table_privileges.is_empty() {
779            Ok(())
780        } else {
781            invalid_table_privileges.sort();
782            Err(PgSourcePurificationError::UserLacksSelectOnTables {
783                tables: invalid_table_privileges,
784            })?
785        }
786    }
787
788    /// Ensure that the user specified in `config` can read data from tables if row level security
789    /// (RLS) is enabled. If the user/role does not have the BYPASSRLS attribute set, there is
790    /// the possibility that MZ may not be able to read all data during the snapshot, which would
791    /// result in missing data.
792    pub async fn check_rls_privileges(
793        client: &Client,
794        table_oids: &[Oid],
795    ) -> Result<(), PlanError> {
796        match mz_postgres_util::validate_no_rls_policies(client, table_oids).await {
797            Ok(_) => Ok(()),
798            Err(err) => match err {
799                // This is a little gross to do, but PlanError::PostgresConnectionErr implements
800                // From<PostgresError>, and the error in that case would be
801                // "failed to connect to PostgreSQL database", which doesn't make any sense.
802                PostgresError::BypassRLSRequired(tables) => {
803                    Err(PgSourcePurificationError::BypassRLSRequired { tables })?
804                }
805                _ => Err(err)?,
806            },
807        }
808    }
809}
810
811mod replica_identity {
812    use mz_postgres_util::PostgresError;
813
814    use super::*;
815    use crate::plan::PlanError;
816    use crate::pure::PgSourcePurificationError;
817
818    /// Ensures that all provided OIDs are tables with `REPLICA IDENTITY FULL`.
819    pub async fn check_replica_identity_full(
820        client: &Client,
821        table_oids: &[Oid],
822    ) -> Result<(), PlanError> {
823        let invalid_replica_identity_rows = client
824            .query(
825                "
826            SELECT
827                format('%I.%I', n.nspname, c.relname) AS schema_qualified_table_name
828             FROM unnest($1::oid[]) AS oids (oid)
829             JOIN
830                 pg_class c ON c.oid = oids.oid
831             JOIN
832                 pg_namespace n ON c.relnamespace = n.oid
833             WHERE relreplident != 'f' OR relreplident IS NULL;",
834                &[&table_oids],
835            )
836            .await
837            .map_err(PostgresError::from)?;
838
839        let mut invalid_replica_identity = invalid_replica_identity_rows
840            .into_iter()
841            .map(|row| row.get("schema_qualified_table_name"))
842            .collect::<Vec<String>>();
843
844        if invalid_replica_identity.is_empty() {
845            Ok(())
846        } else {
847            invalid_replica_identity.sort();
848            Err(PgSourcePurificationError::NotTablesWReplicaIdentityFull {
849                items: invalid_replica_identity,
850            })?
851        }
852    }
853}