mz_sql/pure/
postgres.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Postgres utilities for SQL purification.
11
12use std::collections::{BTreeMap, BTreeSet};
13
14use mz_expr::MirScalarExpr;
15use mz_postgres_util::Config;
16use mz_postgres_util::desc::PostgresTableDesc;
17use mz_proto::RustType;
18use mz_repr::{ColumnType, RelationType, ScalarType};
19use mz_sql_parser::ast::display::AstDisplay;
20use mz_sql_parser::ast::{
21    ColumnDef, CreateSubsourceOption, CreateSubsourceOptionName, CreateSubsourceStatement,
22    ExternalReferences, Ident, TableConstraint, UnresolvedItemName, Value, WithOptionValue,
23};
24use mz_storage_types::sources::SourceExportStatementDetails;
25use mz_storage_types::sources::postgres::CastType;
26use prost::Message;
27use tokio_postgres::Client;
28use tokio_postgres::types::Oid;
29
30use crate::names::{Aug, ResolvedItemName};
31use crate::normalize;
32use crate::plan::hir::ColumnRef;
33use crate::plan::typeconv::{CastContext, plan_cast};
34use crate::plan::{
35    ExprContext, HirScalarExpr, PlanError, QueryContext, QueryLifetime, Scope, StatementContext,
36};
37
38use super::error::PgSourcePurificationError;
39use super::references::RetrievedSourceReferences;
40use super::{PartialItemName, PurifiedExportDetails, PurifiedSourceExport, SourceReferencePolicy};
41
42/// Ensure that we have select permissions on all tables; we have to do this before we
43/// start snapshotting because if we discover we cannot `COPY` from a table while
44/// snapshotting, we break the entire source.
45pub(super) async fn validate_requested_references_privileges(
46    config: &Config,
47    client: &Client,
48    table_oids: &[Oid],
49) -> Result<(), PlanError> {
50    privileges::check_table_privileges(config, client, table_oids).await?;
51    replica_identity::check_replica_identity_full(client, table_oids).await?;
52
53    Ok(())
54}
55
56/// Generate a mapping of `Oid`s to column names that should be ingested as text
57/// (rather than their type in the upstream database).
58///
59/// Additionally, modify `text_columns` so that they contain database-qualified
60/// references to the columns.
61pub(super) fn generate_text_columns(
62    retrieved_references: &RetrievedSourceReferences,
63    text_columns: &mut [UnresolvedItemName],
64) -> Result<BTreeMap<u32, BTreeSet<String>>, PlanError> {
65    let mut text_cols_dict: BTreeMap<u32, BTreeSet<String>> = BTreeMap::new();
66
67    for name in text_columns {
68        let (qual, col) = match name.0.split_last().expect("must have at least one element") {
69            (col, qual) if qual.is_empty() => {
70                return Err(PlanError::InvalidOptionValue {
71                    option_name: "TEXT COLUMNS".to_string(),
72                    err: Box::new(PlanError::UnderqualifiedColumnName(
73                        col.as_str().to_string(),
74                    )),
75                });
76            }
77            (col, qual) => (qual.to_vec(), col.as_str().to_string()),
78        };
79
80        let resolved_reference = retrieved_references.resolve_name(&qual)?;
81        let mut fully_qualified_name =
82            resolved_reference
83                .external_reference()
84                .map_err(|e| PlanError::InvalidOptionValue {
85                    option_name: "TEXT COLUMNS".to_string(),
86                    err: Box::new(e.into()),
87                })?;
88
89        let desc = resolved_reference
90            .postgres_desc()
91            .expect("known to be postgres");
92
93        if !desc.columns.iter().any(|column| column.name == col) {
94            let column = mz_repr::ColumnName::from(col);
95            let similar = desc
96                .columns
97                .iter()
98                .filter_map(|c| {
99                    let c_name = mz_repr::ColumnName::from(c.name.clone());
100                    c_name.is_similar(&column).then_some(c_name)
101                })
102                .collect();
103            return Err(PlanError::InvalidOptionValue {
104                option_name: "TEXT COLUMNS".to_string(),
105                err: Box::new(PlanError::UnknownColumn {
106                    table: Some(
107                        normalize::unresolved_item_name(fully_qualified_name)
108                            .expect("known to be of valid len"),
109                    ),
110                    column,
111                    similar,
112                }),
113            });
114        }
115
116        // Rewrite fully qualified name.
117        let col_ident = Ident::new(col.as_str().to_string())?;
118        fully_qualified_name.0.push(col_ident);
119        *name = fully_qualified_name;
120
121        let new = text_cols_dict
122            .entry(desc.oid)
123            .or_default()
124            .insert(col.as_str().to_string());
125
126        if !new {
127            return Err(PlanError::InvalidOptionValue {
128                option_name: "TEXT COLUMNS".to_string(),
129                err: Box::new(PlanError::UnexpectedDuplicateReference { name: name.clone() }),
130            });
131        }
132    }
133
134    Ok(text_cols_dict)
135}
136
137pub fn generate_create_subsource_statements(
138    scx: &StatementContext,
139    source_name: ResolvedItemName,
140    requested_subsources: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
141) -> Result<Vec<CreateSubsourceStatement<Aug>>, PlanError> {
142    // Aggregate all unrecognized types.
143    let mut unsupported_cols = vec![];
144
145    // Now that we have an explicit list of validated requested subsources we can create them
146    let mut subsources = Vec::with_capacity(requested_subsources.len());
147
148    for (subsource_name, purified_export) in requested_subsources {
149        let PostgresExportStatementValues {
150            columns,
151            constraints,
152            text_columns,
153            details,
154            external_reference,
155        } = generate_source_export_statement_values(scx, purified_export, &mut unsupported_cols)?;
156
157        let mut with_options = vec![
158            CreateSubsourceOption {
159                name: CreateSubsourceOptionName::ExternalReference,
160                value: Some(WithOptionValue::UnresolvedItemName(external_reference)),
161            },
162            CreateSubsourceOption {
163                name: CreateSubsourceOptionName::Details,
164                value: Some(WithOptionValue::Value(Value::String(hex::encode(
165                    details.into_proto().encode_to_vec(),
166                )))),
167            },
168        ];
169
170        if let Some(text_columns) = text_columns {
171            with_options.push(CreateSubsourceOption {
172                name: CreateSubsourceOptionName::TextColumns,
173                value: Some(WithOptionValue::Sequence(text_columns)),
174            });
175        }
176
177        // Create the subsource statement
178        let subsource = CreateSubsourceStatement {
179            name: subsource_name,
180            columns,
181            // We might not know the primary source's `GlobalId` yet; if not,
182            // we'll fill it in once we generate it.
183            of_source: Some(source_name.clone()),
184            // TODO(petrosagg): nothing stops us from getting the constraints of the
185            // upstream tables and mirroring them here which will lead to more optimization
186            // opportunities if for example there is a primary key or an index.
187            //
188            // If we ever do that we must triple check that we will get notified *in the
189            // replication stream*, if our assumptions change. Failure to do that could
190            // mean that an upstream table that started with an index was then altered to
191            // one without and now we're producing garbage data.
192            constraints,
193            if_not_exists: false,
194            with_options,
195        };
196        subsources.push(subsource);
197    }
198
199    if !unsupported_cols.is_empty() {
200        unsupported_cols.sort();
201        Err(PgSourcePurificationError::UnrecognizedTypes {
202            cols: unsupported_cols,
203        })?;
204    }
205
206    Ok(subsources)
207}
208
209pub(super) struct PostgresExportStatementValues {
210    pub(super) columns: Vec<ColumnDef<Aug>>,
211    pub(super) constraints: Vec<TableConstraint<Aug>>,
212    pub(super) text_columns: Option<Vec<WithOptionValue<Aug>>>,
213    pub(super) details: SourceExportStatementDetails,
214    pub(super) external_reference: UnresolvedItemName,
215}
216
217pub(super) fn generate_source_export_statement_values(
218    scx: &StatementContext,
219    purified_export: PurifiedSourceExport,
220    unsupported_cols: &mut Vec<(String, mz_repr::adt::system::Oid)>,
221) -> Result<PostgresExportStatementValues, PlanError> {
222    let (text_columns, table) = match purified_export.details {
223        PurifiedExportDetails::Postgres {
224            text_columns,
225            table,
226        } => (text_columns, table),
227        _ => unreachable!("purified export details must be postgres"),
228    };
229
230    let text_column_set = text_columns
231        .as_ref()
232        .map(|v| BTreeSet::from_iter(v.iter().map(Ident::as_str)));
233
234    // Figure out the schema of the subsource
235    let mut columns = vec![];
236    for c in table.columns.iter() {
237        let name = Ident::new(c.name.clone())?;
238
239        let ty = match text_column_set {
240            Some(ref names) if names.contains(c.name.as_str()) => mz_pgrepr::Type::Text,
241            _ => match mz_pgrepr::Type::from_oid_and_typmod(c.type_oid, c.type_mod) {
242                Ok(t) => t,
243                Err(_) => {
244                    let mut full_name = purified_export.external_reference.0.clone();
245                    full_name.push(name);
246                    unsupported_cols.push((
247                        UnresolvedItemName(full_name).to_ast_string_simple(),
248                        mz_repr::adt::system::Oid(c.type_oid),
249                    ));
250                    continue;
251                }
252            },
253        };
254
255        let data_type = scx.resolve_type(ty)?;
256        let mut options = vec![];
257
258        if !c.nullable {
259            options.push(mz_sql_parser::ast::ColumnOptionDef {
260                name: None,
261                option: mz_sql_parser::ast::ColumnOption::NotNull,
262            });
263        }
264
265        columns.push(ColumnDef {
266            name,
267            data_type,
268            collation: None,
269            options,
270        });
271    }
272
273    let mut constraints = vec![];
274    for key in table.keys.clone() {
275        let mut key_columns = vec![];
276
277        for col_num in key.cols {
278            let ident = Ident::new(
279                table
280                    .columns
281                    .iter()
282                    .find(|col| col.col_num == col_num)
283                    .expect("key exists as column")
284                    .name
285                    .clone(),
286            )?;
287            key_columns.push(ident);
288        }
289
290        let constraint = mz_sql_parser::ast::TableConstraint::Unique {
291            name: Some(Ident::new(key.name)?),
292            columns: key_columns,
293            is_primary: key.is_primary,
294            nulls_not_distinct: key.nulls_not_distinct,
295        };
296
297        // We take the first constraint available to be the primary key.
298        if key.is_primary {
299            constraints.insert(0, constraint);
300        } else {
301            constraints.push(constraint);
302        }
303    }
304    let details = SourceExportStatementDetails::Postgres { table };
305
306    let text_columns = text_columns.map(|mut columns| {
307        columns.sort();
308        columns
309            .into_iter()
310            .map(WithOptionValue::Ident::<Aug>)
311            .collect()
312    });
313
314    Ok(PostgresExportStatementValues {
315        columns,
316        constraints,
317        text_columns,
318        details,
319        external_reference: purified_export.external_reference,
320    })
321}
322
323pub(super) struct PurifiedSourceExports {
324    pub(super) source_exports: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
325    // NOTE(roshan): The text columns are already part of their
326    // appropriate `source_exports` above, but these are returned to allow
327    // round-tripping a `CREATE SOURCE` statement while we still allow creating
328    // implicit subsources from `CREATE SOURCE`. Remove once
329    // fully deprecating that feature and forcing users to use explicit
330    // `CREATE TABLE .. FROM SOURCE` statements.
331    pub(super) normalized_text_columns: Vec<WithOptionValue<Aug>>,
332}
333
334// Purify the requested external references, returning a set of purified
335// source exports corresponding to external tables, and and additional
336// fields necessary to generate relevant statements and update statement options
337pub(super) async fn purify_source_exports(
338    client: &Client,
339    config: &mz_postgres_util::Config,
340    retrieved_references: &RetrievedSourceReferences,
341    requested_references: &Option<ExternalReferences>,
342    mut text_columns: Vec<UnresolvedItemName>,
343    unresolved_source_name: &UnresolvedItemName,
344    reference_policy: &SourceReferencePolicy,
345) -> Result<PurifiedSourceExports, PlanError> {
346    let requested_exports = match requested_references.as_ref() {
347        Some(requested) if matches!(reference_policy, SourceReferencePolicy::NotAllowed) => {
348            Err(PlanError::UseTablesForSources(requested.to_string()))?
349        }
350        Some(requested) => retrieved_references
351            .requested_source_exports(Some(requested), unresolved_source_name)?,
352        None => {
353            if matches!(reference_policy, SourceReferencePolicy::Required) {
354                Err(PgSourcePurificationError::RequiresExternalReferences)?
355            }
356
357            // If no external reference is specified, it does not make sense to include
358            // text columns.
359            if !text_columns.is_empty() {
360                Err(
361                    PgSourcePurificationError::UnnecessaryOptionsWithoutReferences(
362                        "TEXT COLUMNS".to_string(),
363                    ),
364                )?
365            }
366
367            return Ok(PurifiedSourceExports {
368                source_exports: BTreeMap::new(),
369                normalized_text_columns: vec![],
370            });
371        }
372    };
373
374    if requested_exports.is_empty() {
375        sql_bail!(
376            "[internal error]: Postgres reference {} did not match any tables",
377            requested_references
378                .as_ref()
379                .unwrap()
380                .to_ast_string_simple()
381        );
382    }
383
384    super::validate_source_export_names(&requested_exports)?;
385
386    let table_oids: Vec<_> = requested_exports
387        .iter()
388        .map(|r| r.meta.postgres_desc().expect("is postgres").oid)
389        .collect();
390
391    validate_requested_references_privileges(config, client, &table_oids).await?;
392
393    let mut text_column_map = generate_text_columns(retrieved_references, &mut text_columns)?;
394
395    // Normalize options to contain full qualified values.
396    text_columns.sort();
397    text_columns.dedup();
398    let normalized_text_columns: Vec<_> = text_columns
399        .into_iter()
400        .map(WithOptionValue::UnresolvedItemName)
401        .collect();
402
403    let source_exports = requested_exports
404        .into_iter()
405        .map(|r| {
406            let desc = r.meta.postgres_desc().expect("known postgres");
407            (
408                r.name,
409                PurifiedSourceExport {
410                    external_reference: r.external_reference,
411                    details: PurifiedExportDetails::Postgres {
412                        text_columns: text_column_map.remove(&desc.oid).map(|v| {
413                            v.into_iter()
414                                .map(|s| Ident::new(s).expect("validated above"))
415                                .collect()
416                        }),
417                        table: desc.clone(),
418                    },
419                },
420            )
421        })
422        .collect();
423
424    if !text_column_map.is_empty() {
425        // If any any item was not removed from the text_column_map, it wasn't being
426        // added.
427        let mut dangling_text_column_refs = vec![];
428        let all_references = retrieved_references.all_references();
429
430        for id in text_column_map.keys() {
431            let desc = all_references
432                .iter()
433                .find_map(|reference| {
434                    let desc = reference.postgres_desc().expect("is postgres");
435                    if desc.oid == *id { Some(desc) } else { None }
436                })
437                .expect("validated when generating text columns");
438
439            dangling_text_column_refs.push(PartialItemName {
440                database: None,
441                schema: Some(desc.namespace.clone()),
442                item: desc.name.clone(),
443            });
444        }
445
446        dangling_text_column_refs.sort();
447        Err(PgSourcePurificationError::DanglingTextColumns {
448            items: dangling_text_column_refs,
449        })?;
450    }
451
452    Ok(PurifiedSourceExports {
453        source_exports,
454        normalized_text_columns,
455    })
456}
457
458pub(crate) fn generate_column_casts(
459    scx: &StatementContext,
460    table: &PostgresTableDesc,
461    text_columns: &Vec<Ident>,
462) -> Result<Vec<(CastType, MirScalarExpr)>, PlanError> {
463    // Generate the cast expressions required to convert the text encoded columns into
464    // the appropriate target types, creating a Vec<MirScalarExpr>
465    // The postgres source reader will then eval each of those on the incoming rows
466    // First, construct an expression context where the expression is evaluated on an
467    // imaginary row which has the same number of columns as the upstream table but all
468    // of the types are text
469    let mut cast_scx = scx.clone();
470    cast_scx.param_types = Default::default();
471    let cast_qcx = QueryContext::root(&cast_scx, QueryLifetime::Source);
472    let mut column_types = vec![];
473    for column in table.columns.iter() {
474        column_types.push(ColumnType {
475            nullable: column.nullable,
476            scalar_type: ScalarType::String,
477        });
478    }
479
480    let cast_ecx = ExprContext {
481        qcx: &cast_qcx,
482        name: "plan_postgres_source_cast",
483        scope: &Scope::empty(),
484        relation_type: &RelationType {
485            column_types,
486            keys: vec![],
487        },
488        allow_aggregates: false,
489        allow_subqueries: false,
490        allow_parameters: false,
491        allow_windows: false,
492    };
493
494    let text_columns = BTreeSet::from_iter(text_columns.iter().map(Ident::as_str));
495
496    // Then, for each column we will generate a MirRelationExpr that extracts the nth
497    // column and casts it to the appropriate target type
498    let mut table_cast = vec![];
499    for (i, column) in table.columns.iter().enumerate() {
500        let (cast_type, ty) = if text_columns.contains(column.name.as_str()) {
501            // Treat the column as text if it was referenced in
502            // `TEXT COLUMNS`. This is the only place we need to
503            // perform this logic; even if the type is unsupported,
504            // we'll be able to ingest its values as text in
505            // storage.
506            (CastType::Text, mz_pgrepr::Type::Text)
507        } else {
508            match mz_pgrepr::Type::from_oid_and_typmod(column.type_oid, column.type_mod) {
509                Ok(t) => (CastType::Natural, t),
510                // If this reference survived purification, we
511                // do not expect it to be from a table that the
512                // user will consume., i.e. expect this table to
513                // be filtered out of table casts.
514                Err(_) => {
515                    table_cast.push((
516                        CastType::Natural,
517                        HirScalarExpr::CallVariadic {
518                            func: mz_expr::VariadicFunc::ErrorIfNull,
519                            exprs: vec![
520                                HirScalarExpr::literal_null(ScalarType::String),
521                                HirScalarExpr::literal(
522                                    mz_repr::Datum::from(
523                                        format!("Unsupported type with OID {}", column.type_oid)
524                                            .as_str(),
525                                    ),
526                                    ScalarType::String,
527                                ),
528                            ],
529                        }
530                        .lower_uncorrelated()
531                        .expect("no correlation"),
532                    ));
533                    continue;
534                }
535            }
536        };
537
538        let data_type = scx.resolve_type(ty)?;
539        let scalar_type = crate::plan::query::scalar_type_from_sql(scx, &data_type)?;
540
541        let col_expr = HirScalarExpr::Column(ColumnRef {
542            level: 0,
543            column: i,
544        });
545
546        let cast_expr = plan_cast(&cast_ecx, CastContext::Explicit, col_expr, &scalar_type)?;
547
548        let cast = if column.nullable {
549            cast_expr
550        } else {
551            // We must enforce nullability constraint on cast
552            // because PG replication stream does not propagate
553            // constraint changes and we want to error subsource if
554            // e.g. the constraint is dropped and we don't notice
555            // it.
556            HirScalarExpr::CallVariadic {
557                            func: mz_expr::VariadicFunc::ErrorIfNull,
558                            exprs: vec![
559                                cast_expr,
560                                HirScalarExpr::literal(
561                                    mz_repr::Datum::from(
562                                        format!(
563                                            "PG column {}.{}.{} contained NULL data, despite having NOT NULL constraint",
564                                            table.namespace.clone(),
565                                            table.name.clone(),
566                                            column.name.clone())
567                                            .as_str(),
568                                    ),
569                                    ScalarType::String,
570                                ),
571                            ],
572                        }
573        };
574
575        // We expect only reg* types to encounter this issue. Users
576        // can ingest the data as text if they need to ingest it.
577        // This is acceptable because we don't expect the OIDs from
578        // an external PG source to be unilaterally usable in
579        // resolving item names in MZ.
580        let mir_cast = cast.lower_uncorrelated().map_err(|_e| {
581            tracing::info!(
582                "cannot ingest {:?} data from PG source because cast is correlated",
583                scalar_type
584            );
585
586            PlanError::TableContainsUningestableTypes {
587                name: table.name.to_string(),
588                type_: scx.humanize_scalar_type(&scalar_type, false),
589                column: column.name.to_string(),
590            }
591        })?;
592
593        table_cast.push((cast_type, mir_cast));
594    }
595    Ok(table_cast)
596}
597
598mod privileges {
599    use mz_postgres_util::{Config, PostgresError};
600
601    use super::*;
602    use crate::plan::PlanError;
603    use crate::pure::PgSourcePurificationError;
604
605    async fn check_schema_privileges(
606        config: &Config,
607        client: &Client,
608        table_oids: &[Oid],
609    ) -> Result<(), PlanError> {
610        let invalid_schema_privileges_rows = client
611            .query(
612                "
613                WITH distinct_namespace AS (
614                    SELECT
615                        DISTINCT n.oid, n.nspname AS schema_name
616                    FROM unnest($1::OID[]) AS oids (oid)
617                    JOIN pg_class AS c ON c.oid = oids.oid
618                    JOIN pg_namespace AS n ON c.relnamespace = n.oid
619                )
620                SELECT d.schema_name
621                FROM distinct_namespace AS d
622                WHERE
623                    NOT has_schema_privilege($2::TEXT, d.oid, 'usage')",
624                &[
625                    &table_oids,
626                    &config.get_user().expect("connection specifies user"),
627                ],
628            )
629            .await
630            .map_err(PostgresError::from)?;
631
632        let mut invalid_schema_privileges = invalid_schema_privileges_rows
633            .into_iter()
634            .map(|row| row.get("schema_name"))
635            .collect::<Vec<String>>();
636
637        if invalid_schema_privileges.is_empty() {
638            Ok(())
639        } else {
640            invalid_schema_privileges.sort();
641            Err(PgSourcePurificationError::UserLacksUsageOnSchemas {
642                user: config
643                    .get_user()
644                    .expect("connection specifies user")
645                    .to_string(),
646                schemas: invalid_schema_privileges,
647            })?
648        }
649    }
650
651    /// Ensure that the user specified in `config` has:
652    ///
653    /// -`SELECT` privileges for the identified `tables`.
654    ///
655    ///  `tables`'s elements should be of the structure `[<schema name>, <table name>]`.
656    ///
657    /// - `USAGE` privileges on the schemas references in `tables`.
658    ///
659    /// # Panics
660    /// If `config` does not specify a user.
661    pub async fn check_table_privileges(
662        config: &Config,
663        client: &Client,
664        table_oids: &[Oid],
665    ) -> Result<(), PlanError> {
666        check_schema_privileges(config, client, table_oids).await?;
667
668        let invalid_table_privileges_rows = client
669            .query(
670                "
671            SELECT
672                format('%I.%I', n.nspname, c.relname) AS schema_qualified_table_name
673             FROM unnest($1::oid[]) AS oids (oid)
674             JOIN
675                 pg_class c ON c.oid = oids.oid
676             JOIN
677                 pg_namespace n ON c.relnamespace = n.oid
678             WHERE NOT has_table_privilege($2::text, c.oid, 'select')",
679                &[
680                    &table_oids,
681                    &config.get_user().expect("connection specifies user"),
682                ],
683            )
684            .await
685            .map_err(PostgresError::from)?;
686
687        let mut invalid_table_privileges = invalid_table_privileges_rows
688            .into_iter()
689            .map(|row| row.get("schema_qualified_table_name"))
690            .collect::<Vec<String>>();
691
692        if invalid_table_privileges.is_empty() {
693            Ok(())
694        } else {
695            invalid_table_privileges.sort();
696            Err(PgSourcePurificationError::UserLacksSelectOnTables {
697                user: config
698                    .get_user()
699                    .expect("connection must specify user")
700                    .to_string(),
701                tables: invalid_table_privileges,
702            })?
703        }
704    }
705}
706
707mod replica_identity {
708    use mz_postgres_util::PostgresError;
709
710    use super::*;
711    use crate::plan::PlanError;
712    use crate::pure::PgSourcePurificationError;
713
714    /// Ensures that all provided OIDs are tables with `REPLICA IDENTITY FULL`.
715    pub async fn check_replica_identity_full(
716        client: &Client,
717        table_oids: &[Oid],
718    ) -> Result<(), PlanError> {
719        let invalid_replica_identity_rows = client
720            .query(
721                "
722            SELECT
723                format('%I.%I', n.nspname, c.relname) AS schema_qualified_table_name
724             FROM unnest($1::oid[]) AS oids (oid)
725             JOIN
726                 pg_class c ON c.oid = oids.oid
727             JOIN
728                 pg_namespace n ON c.relnamespace = n.oid
729             WHERE relreplident != 'f' OR relreplident IS NULL;",
730                &[&table_oids],
731            )
732            .await
733            .map_err(PostgresError::from)?;
734
735        let mut invalid_replica_identity = invalid_replica_identity_rows
736            .into_iter()
737            .map(|row| row.get("schema_qualified_table_name"))
738            .collect::<Vec<String>>();
739
740        if invalid_replica_identity.is_empty() {
741            Ok(())
742        } else {
743            invalid_replica_identity.sort();
744            Err(PgSourcePurificationError::NotTablesWReplicaIdentityFull {
745                items: invalid_replica_identity,
746            })?
747        }
748    }
749}