mz_sql/pure/
postgres.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Postgres utilities for SQL purification.
11
12use std::collections::{BTreeMap, BTreeSet};
13use std::sync::Arc;
14
15use mz_expr::MirScalarExpr;
16use mz_postgres_util::Config;
17use mz_postgres_util::desc::PostgresTableDesc;
18use mz_proto::RustType;
19use mz_repr::{ColumnType, RelationType, ScalarType};
20use mz_sql_parser::ast::display::AstDisplay;
21use mz_sql_parser::ast::{
22    ColumnDef, CreateSubsourceOption, CreateSubsourceOptionName, CreateSubsourceStatement,
23    ExternalReferences, Ident, TableConstraint, UnresolvedItemName, Value, WithOptionValue,
24};
25use mz_storage_types::sources::SourceExportStatementDetails;
26use mz_storage_types::sources::postgres::CastType;
27use prost::Message;
28use tokio_postgres::Client;
29use tokio_postgres::types::Oid;
30
31use crate::names::{Aug, ResolvedItemName};
32use crate::normalize;
33use crate::plan::hir::ColumnRef;
34use crate::plan::typeconv::{CastContext, plan_cast};
35use crate::plan::{
36    ExprContext, HirScalarExpr, PlanError, QueryContext, QueryLifetime, Scope, StatementContext,
37};
38
39use super::error::PgSourcePurificationError;
40use super::references::RetrievedSourceReferences;
41use super::{PartialItemName, PurifiedExportDetails, PurifiedSourceExport, SourceReferencePolicy};
42
43/// Ensure that we have select permissions on all tables; we have to do this before we
44/// start snapshotting because if we discover we cannot `COPY` from a table while
45/// snapshotting, we break the entire source.
46pub(super) async fn validate_requested_references_privileges(
47    config: &Config,
48    client: &Client,
49    table_oids: &[Oid],
50) -> Result<(), PlanError> {
51    privileges::check_table_privileges(config, client, table_oids).await?;
52    replica_identity::check_replica_identity_full(client, table_oids).await?;
53
54    Ok(())
55}
56
57/// Generate a mapping of `Oid`s to column names that should be ingested as text
58/// (rather than their type in the upstream database).
59///
60/// Additionally, modify `text_columns` so that they contain database-qualified
61/// references to the columns.
62pub(super) fn generate_text_columns(
63    retrieved_references: &RetrievedSourceReferences,
64    text_columns: &mut [UnresolvedItemName],
65) -> Result<BTreeMap<u32, BTreeSet<String>>, PlanError> {
66    let mut text_cols_dict: BTreeMap<u32, BTreeSet<String>> = BTreeMap::new();
67
68    for name in text_columns {
69        let (qual, col) = match name.0.split_last().expect("must have at least one element") {
70            (col, qual) if qual.is_empty() => {
71                return Err(PlanError::InvalidOptionValue {
72                    option_name: "TEXT COLUMNS".to_string(),
73                    err: Box::new(PlanError::UnderqualifiedColumnName(
74                        col.as_str().to_string(),
75                    )),
76                });
77            }
78            (col, qual) => (qual.to_vec(), col.as_str().to_string()),
79        };
80
81        let resolved_reference = retrieved_references.resolve_name(&qual)?;
82        let mut fully_qualified_name =
83            resolved_reference
84                .external_reference()
85                .map_err(|e| PlanError::InvalidOptionValue {
86                    option_name: "TEXT COLUMNS".to_string(),
87                    err: Box::new(e.into()),
88                })?;
89
90        let desc = resolved_reference
91            .postgres_desc()
92            .expect("known to be postgres");
93
94        if !desc.columns.iter().any(|column| column.name == col) {
95            let column = mz_repr::ColumnName::from(col);
96            let similar = desc
97                .columns
98                .iter()
99                .filter_map(|c| {
100                    let c_name = mz_repr::ColumnName::from(c.name.clone());
101                    c_name.is_similar(&column).then_some(c_name)
102                })
103                .collect();
104            return Err(PlanError::InvalidOptionValue {
105                option_name: "TEXT COLUMNS".to_string(),
106                err: Box::new(PlanError::UnknownColumn {
107                    table: Some(
108                        normalize::unresolved_item_name(fully_qualified_name)
109                            .expect("known to be of valid len"),
110                    ),
111                    column,
112                    similar,
113                }),
114            });
115        }
116
117        // Rewrite fully qualified name.
118        let col_ident = Ident::new(col.as_str().to_string())?;
119        fully_qualified_name.0.push(col_ident);
120        *name = fully_qualified_name;
121
122        let new = text_cols_dict
123            .entry(desc.oid)
124            .or_default()
125            .insert(col.as_str().to_string());
126
127        if !new {
128            return Err(PlanError::InvalidOptionValue {
129                option_name: "TEXT COLUMNS".to_string(),
130                err: Box::new(PlanError::UnexpectedDuplicateReference { name: name.clone() }),
131            });
132        }
133    }
134
135    Ok(text_cols_dict)
136}
137
138pub fn generate_create_subsource_statements(
139    scx: &StatementContext,
140    source_name: ResolvedItemName,
141    requested_subsources: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
142) -> Result<Vec<CreateSubsourceStatement<Aug>>, PlanError> {
143    // Aggregate all unrecognized types.
144    let mut unsupported_cols = vec![];
145
146    // Now that we have an explicit list of validated requested subsources we can create them
147    let mut subsources = Vec::with_capacity(requested_subsources.len());
148
149    for (subsource_name, purified_export) in requested_subsources {
150        let PostgresExportStatementValues {
151            columns,
152            constraints,
153            text_columns,
154            details,
155            external_reference,
156        } = generate_source_export_statement_values(scx, purified_export, &mut unsupported_cols)?;
157
158        let mut with_options = vec![
159            CreateSubsourceOption {
160                name: CreateSubsourceOptionName::ExternalReference,
161                value: Some(WithOptionValue::UnresolvedItemName(external_reference)),
162            },
163            CreateSubsourceOption {
164                name: CreateSubsourceOptionName::Details,
165                value: Some(WithOptionValue::Value(Value::String(hex::encode(
166                    details.into_proto().encode_to_vec(),
167                )))),
168            },
169        ];
170
171        if let Some(text_columns) = text_columns {
172            with_options.push(CreateSubsourceOption {
173                name: CreateSubsourceOptionName::TextColumns,
174                value: Some(WithOptionValue::Sequence(text_columns)),
175            });
176        }
177
178        // Create the subsource statement
179        let subsource = CreateSubsourceStatement {
180            name: subsource_name,
181            columns,
182            // We might not know the primary source's `GlobalId` yet; if not,
183            // we'll fill it in once we generate it.
184            of_source: Some(source_name.clone()),
185            // TODO(petrosagg): nothing stops us from getting the constraints of the
186            // upstream tables and mirroring them here which will lead to more optimization
187            // opportunities if for example there is a primary key or an index.
188            //
189            // If we ever do that we must triple check that we will get notified *in the
190            // replication stream*, if our assumptions change. Failure to do that could
191            // mean that an upstream table that started with an index was then altered to
192            // one without and now we're producing garbage data.
193            constraints,
194            if_not_exists: false,
195            with_options,
196        };
197        subsources.push(subsource);
198    }
199
200    if !unsupported_cols.is_empty() {
201        unsupported_cols.sort();
202        Err(PgSourcePurificationError::UnrecognizedTypes {
203            cols: unsupported_cols,
204        })?;
205    }
206
207    Ok(subsources)
208}
209
210pub(super) struct PostgresExportStatementValues {
211    pub(super) columns: Vec<ColumnDef<Aug>>,
212    pub(super) constraints: Vec<TableConstraint<Aug>>,
213    pub(super) text_columns: Option<Vec<WithOptionValue<Aug>>>,
214    pub(super) details: SourceExportStatementDetails,
215    pub(super) external_reference: UnresolvedItemName,
216}
217
218pub(super) fn generate_source_export_statement_values(
219    scx: &StatementContext,
220    purified_export: PurifiedSourceExport,
221    unsupported_cols: &mut Vec<(String, mz_repr::adt::system::Oid)>,
222) -> Result<PostgresExportStatementValues, PlanError> {
223    let (text_columns, table) = match purified_export.details {
224        PurifiedExportDetails::Postgres {
225            text_columns,
226            table,
227        } => (text_columns, table),
228        _ => unreachable!("purified export details must be postgres"),
229    };
230
231    let text_column_set = text_columns
232        .as_ref()
233        .map(|v| BTreeSet::from_iter(v.iter().map(Ident::as_str)));
234
235    // Figure out the schema of the subsource
236    let mut columns = vec![];
237    for c in table.columns.iter() {
238        let name = Ident::new(c.name.clone())?;
239
240        let ty = match text_column_set {
241            Some(ref names) if names.contains(c.name.as_str()) => mz_pgrepr::Type::Text,
242            _ => match mz_pgrepr::Type::from_oid_and_typmod(c.type_oid, c.type_mod) {
243                Ok(t) => t,
244                Err(_) => {
245                    let mut full_name = purified_export.external_reference.0.clone();
246                    full_name.push(name);
247                    unsupported_cols.push((
248                        UnresolvedItemName(full_name).to_ast_string_simple(),
249                        mz_repr::adt::system::Oid(c.type_oid),
250                    ));
251                    continue;
252                }
253            },
254        };
255
256        let data_type = scx.resolve_type(ty)?;
257        let mut options = vec![];
258
259        if !c.nullable {
260            options.push(mz_sql_parser::ast::ColumnOptionDef {
261                name: None,
262                option: mz_sql_parser::ast::ColumnOption::NotNull,
263            });
264        }
265
266        columns.push(ColumnDef {
267            name,
268            data_type,
269            collation: None,
270            options,
271        });
272    }
273
274    let mut constraints = vec![];
275    for key in table.keys.clone() {
276        let mut key_columns = vec![];
277
278        for col_num in key.cols {
279            let ident = Ident::new(
280                table
281                    .columns
282                    .iter()
283                    .find(|col| col.col_num == col_num)
284                    .expect("key exists as column")
285                    .name
286                    .clone(),
287            )?;
288            key_columns.push(ident);
289        }
290
291        let constraint = mz_sql_parser::ast::TableConstraint::Unique {
292            name: Some(Ident::new(key.name)?),
293            columns: key_columns,
294            is_primary: key.is_primary,
295            nulls_not_distinct: key.nulls_not_distinct,
296        };
297
298        // We take the first constraint available to be the primary key.
299        if key.is_primary {
300            constraints.insert(0, constraint);
301        } else {
302            constraints.push(constraint);
303        }
304    }
305    let details = SourceExportStatementDetails::Postgres { table };
306
307    let text_columns = text_columns.map(|mut columns| {
308        columns.sort();
309        columns
310            .into_iter()
311            .map(WithOptionValue::Ident::<Aug>)
312            .collect()
313    });
314
315    Ok(PostgresExportStatementValues {
316        columns,
317        constraints,
318        text_columns,
319        details,
320        external_reference: purified_export.external_reference,
321    })
322}
323
324pub(super) struct PurifiedSourceExports {
325    pub(super) source_exports: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
326    // NOTE(roshan): The text columns are already part of their
327    // appropriate `source_exports` above, but these are returned to allow
328    // round-tripping a `CREATE SOURCE` statement while we still allow creating
329    // implicit subsources from `CREATE SOURCE`. Remove once
330    // fully deprecating that feature and forcing users to use explicit
331    // `CREATE TABLE .. FROM SOURCE` statements.
332    pub(super) normalized_text_columns: Vec<WithOptionValue<Aug>>,
333}
334
335// Purify the requested external references, returning a set of purified
336// source exports corresponding to external tables, and and additional
337// fields necessary to generate relevant statements and update statement options
338pub(super) async fn purify_source_exports(
339    client: &Client,
340    config: &mz_postgres_util::Config,
341    retrieved_references: &RetrievedSourceReferences,
342    requested_references: &Option<ExternalReferences>,
343    mut text_columns: Vec<UnresolvedItemName>,
344    unresolved_source_name: &UnresolvedItemName,
345    reference_policy: &SourceReferencePolicy,
346) -> Result<PurifiedSourceExports, PlanError> {
347    let requested_exports = match requested_references.as_ref() {
348        Some(requested) if matches!(reference_policy, SourceReferencePolicy::NotAllowed) => {
349            Err(PlanError::UseTablesForSources(requested.to_string()))?
350        }
351        Some(requested) => retrieved_references
352            .requested_source_exports(Some(requested), unresolved_source_name)?,
353        None => {
354            if matches!(reference_policy, SourceReferencePolicy::Required) {
355                Err(PgSourcePurificationError::RequiresExternalReferences)?
356            }
357
358            // If no external reference is specified, it does not make sense to include
359            // text columns.
360            if !text_columns.is_empty() {
361                Err(
362                    PgSourcePurificationError::UnnecessaryOptionsWithoutReferences(
363                        "TEXT COLUMNS".to_string(),
364                    ),
365                )?
366            }
367
368            return Ok(PurifiedSourceExports {
369                source_exports: BTreeMap::new(),
370                normalized_text_columns: vec![],
371            });
372        }
373    };
374
375    if requested_exports.is_empty() {
376        sql_bail!(
377            "[internal error]: Postgres reference {} did not match any tables",
378            requested_references
379                .as_ref()
380                .unwrap()
381                .to_ast_string_simple()
382        );
383    }
384
385    super::validate_source_export_names(&requested_exports)?;
386
387    let table_oids: Vec<_> = requested_exports
388        .iter()
389        .map(|r| r.meta.postgres_desc().expect("is postgres").oid)
390        .collect();
391
392    validate_requested_references_privileges(config, client, &table_oids).await?;
393
394    let mut text_column_map = generate_text_columns(retrieved_references, &mut text_columns)?;
395
396    // Normalize options to contain full qualified values.
397    text_columns.sort();
398    text_columns.dedup();
399    let normalized_text_columns: Vec<_> = text_columns
400        .into_iter()
401        .map(WithOptionValue::UnresolvedItemName)
402        .collect();
403
404    let source_exports = requested_exports
405        .into_iter()
406        .map(|r| {
407            let desc = r.meta.postgres_desc().expect("known postgres");
408            (
409                r.name,
410                PurifiedSourceExport {
411                    external_reference: r.external_reference,
412                    details: PurifiedExportDetails::Postgres {
413                        text_columns: text_column_map.remove(&desc.oid).map(|v| {
414                            v.into_iter()
415                                .map(|s| Ident::new(s).expect("validated above"))
416                                .collect()
417                        }),
418                        table: desc.clone(),
419                    },
420                },
421            )
422        })
423        .collect();
424
425    if !text_column_map.is_empty() {
426        // If any any item was not removed from the text_column_map, it wasn't being
427        // added.
428        let mut dangling_text_column_refs = vec![];
429        let all_references = retrieved_references.all_references();
430
431        for id in text_column_map.keys() {
432            let desc = all_references
433                .iter()
434                .find_map(|reference| {
435                    let desc = reference.postgres_desc().expect("is postgres");
436                    if desc.oid == *id { Some(desc) } else { None }
437                })
438                .expect("validated when generating text columns");
439
440            dangling_text_column_refs.push(PartialItemName {
441                database: None,
442                schema: Some(desc.namespace.clone()),
443                item: desc.name.clone(),
444            });
445        }
446
447        dangling_text_column_refs.sort();
448        Err(PgSourcePurificationError::DanglingTextColumns {
449            items: dangling_text_column_refs,
450        })?;
451    }
452
453    Ok(PurifiedSourceExports {
454        source_exports,
455        normalized_text_columns,
456    })
457}
458
459pub(crate) fn generate_column_casts(
460    scx: &StatementContext,
461    table: &PostgresTableDesc,
462    text_columns: &Vec<Ident>,
463) -> Result<Vec<(CastType, MirScalarExpr)>, PlanError> {
464    // Generate the cast expressions required to convert the text encoded columns into
465    // the appropriate target types, creating a Vec<MirScalarExpr>
466    // The postgres source reader will then eval each of those on the incoming rows
467    // First, construct an expression context where the expression is evaluated on an
468    // imaginary row which has the same number of columns as the upstream table but all
469    // of the types are text
470    let mut cast_scx = scx.clone();
471    cast_scx.param_types = Default::default();
472    let cast_qcx = QueryContext::root(&cast_scx, QueryLifetime::Source);
473    let mut column_types = vec![];
474    for column in table.columns.iter() {
475        column_types.push(ColumnType {
476            nullable: column.nullable,
477            scalar_type: ScalarType::String,
478        });
479    }
480
481    let cast_ecx = ExprContext {
482        qcx: &cast_qcx,
483        name: "plan_postgres_source_cast",
484        scope: &Scope::empty(),
485        relation_type: &RelationType {
486            column_types,
487            keys: vec![],
488        },
489        allow_aggregates: false,
490        allow_subqueries: false,
491        allow_parameters: false,
492        allow_windows: false,
493    };
494
495    let text_columns = BTreeSet::from_iter(text_columns.iter().map(Ident::as_str));
496
497    // Then, for each column we will generate a MirRelationExpr that extracts the nth
498    // column and casts it to the appropriate target type
499    let mut table_cast = vec![];
500    for (i, column) in table.columns.iter().enumerate() {
501        let (cast_type, ty) = if text_columns.contains(column.name.as_str()) {
502            // Treat the column as text if it was referenced in
503            // `TEXT COLUMNS`. This is the only place we need to
504            // perform this logic; even if the type is unsupported,
505            // we'll be able to ingest its values as text in
506            // storage.
507            (CastType::Text, mz_pgrepr::Type::Text)
508        } else {
509            match mz_pgrepr::Type::from_oid_and_typmod(column.type_oid, column.type_mod) {
510                Ok(t) => (CastType::Natural, t),
511                // If this reference survived purification, we
512                // do not expect it to be from a table that the
513                // user will consume., i.e. expect this table to
514                // be filtered out of table casts.
515                Err(_) => {
516                    table_cast.push((
517                        CastType::Natural,
518                        HirScalarExpr::call_variadic(
519                            mz_expr::VariadicFunc::ErrorIfNull,
520                            vec![
521                                HirScalarExpr::literal_null(ScalarType::String),
522                                HirScalarExpr::literal(
523                                    mz_repr::Datum::from(
524                                        format!("Unsupported type with OID {}", column.type_oid)
525                                            .as_str(),
526                                    ),
527                                    ScalarType::String,
528                                ),
529                            ],
530                        )
531                        .lower_uncorrelated()
532                        .expect("no correlation"),
533                    ));
534                    continue;
535                }
536            }
537        };
538
539        let data_type = scx.resolve_type(ty)?;
540        let scalar_type = crate::plan::query::scalar_type_from_sql(scx, &data_type)?;
541
542        let col_expr = HirScalarExpr::named_column(
543            ColumnRef {
544                level: 0,
545                column: i,
546            },
547            Arc::from(column.name.as_str()),
548        );
549
550        let cast_expr = plan_cast(&cast_ecx, CastContext::Explicit, col_expr, &scalar_type)?;
551
552        let cast = if column.nullable {
553            cast_expr
554        } else {
555            // We must enforce nullability constraint on cast
556            // because PG replication stream does not propagate
557            // constraint changes and we want to error subsource if
558            // e.g. the constraint is dropped and we don't notice
559            // it.
560            HirScalarExpr::call_variadic(mz_expr::VariadicFunc::ErrorIfNull, vec![
561                                cast_expr,
562                                HirScalarExpr::literal(
563                                    mz_repr::Datum::from(
564                                        format!(
565                                            "PG column {}.{}.{} contained NULL data, despite having NOT NULL constraint",
566                                            table.namespace.clone(),
567                                            table.name.clone(),
568                                            column.name.clone())
569                                            .as_str(),
570                                    ),
571                                    ScalarType::String,
572                                ),
573                            ],
574                            )
575        };
576
577        // We expect only reg* types to encounter this issue. Users
578        // can ingest the data as text if they need to ingest it.
579        // This is acceptable because we don't expect the OIDs from
580        // an external PG source to be unilaterally usable in
581        // resolving item names in MZ.
582        let mir_cast = cast.lower_uncorrelated().map_err(|_e| {
583            tracing::info!(
584                "cannot ingest {:?} data from PG source because cast is correlated",
585                scalar_type
586            );
587
588            PlanError::TableContainsUningestableTypes {
589                name: table.name.to_string(),
590                type_: scx.humanize_scalar_type(&scalar_type, false),
591                column: column.name.to_string(),
592            }
593        })?;
594
595        table_cast.push((cast_type, mir_cast));
596    }
597    Ok(table_cast)
598}
599
600mod privileges {
601    use mz_postgres_util::{Config, PostgresError};
602
603    use super::*;
604    use crate::plan::PlanError;
605    use crate::pure::PgSourcePurificationError;
606
607    async fn check_schema_privileges(
608        config: &Config,
609        client: &Client,
610        table_oids: &[Oid],
611    ) -> Result<(), PlanError> {
612        let invalid_schema_privileges_rows = client
613            .query(
614                "
615                WITH distinct_namespace AS (
616                    SELECT
617                        DISTINCT n.oid, n.nspname AS schema_name
618                    FROM unnest($1::OID[]) AS oids (oid)
619                    JOIN pg_class AS c ON c.oid = oids.oid
620                    JOIN pg_namespace AS n ON c.relnamespace = n.oid
621                )
622                SELECT d.schema_name
623                FROM distinct_namespace AS d
624                WHERE
625                    NOT has_schema_privilege($2::TEXT, d.oid, 'usage')",
626                &[
627                    &table_oids,
628                    &config.get_user().expect("connection specifies user"),
629                ],
630            )
631            .await
632            .map_err(PostgresError::from)?;
633
634        let mut invalid_schema_privileges = invalid_schema_privileges_rows
635            .into_iter()
636            .map(|row| row.get("schema_name"))
637            .collect::<Vec<String>>();
638
639        if invalid_schema_privileges.is_empty() {
640            Ok(())
641        } else {
642            invalid_schema_privileges.sort();
643            Err(PgSourcePurificationError::UserLacksUsageOnSchemas {
644                user: config
645                    .get_user()
646                    .expect("connection specifies user")
647                    .to_string(),
648                schemas: invalid_schema_privileges,
649            })?
650        }
651    }
652
653    /// Ensure that the user specified in `config` has:
654    ///
655    /// -`SELECT` privileges for the identified `tables`.
656    ///
657    ///  `tables`'s elements should be of the structure `[<schema name>, <table name>]`.
658    ///
659    /// - `USAGE` privileges on the schemas references in `tables`.
660    ///
661    /// # Panics
662    /// If `config` does not specify a user.
663    pub async fn check_table_privileges(
664        config: &Config,
665        client: &Client,
666        table_oids: &[Oid],
667    ) -> Result<(), PlanError> {
668        check_schema_privileges(config, client, table_oids).await?;
669
670        let invalid_table_privileges_rows = client
671            .query(
672                "
673            SELECT
674                format('%I.%I', n.nspname, c.relname) AS schema_qualified_table_name
675             FROM unnest($1::oid[]) AS oids (oid)
676             JOIN
677                 pg_class c ON c.oid = oids.oid
678             JOIN
679                 pg_namespace n ON c.relnamespace = n.oid
680             WHERE NOT has_table_privilege($2::text, c.oid, 'select')",
681                &[
682                    &table_oids,
683                    &config.get_user().expect("connection specifies user"),
684                ],
685            )
686            .await
687            .map_err(PostgresError::from)?;
688
689        let mut invalid_table_privileges = invalid_table_privileges_rows
690            .into_iter()
691            .map(|row| row.get("schema_qualified_table_name"))
692            .collect::<Vec<String>>();
693
694        if invalid_table_privileges.is_empty() {
695            Ok(())
696        } else {
697            invalid_table_privileges.sort();
698            Err(PgSourcePurificationError::UserLacksSelectOnTables {
699                user: config
700                    .get_user()
701                    .expect("connection must specify user")
702                    .to_string(),
703                tables: invalid_table_privileges,
704            })?
705        }
706    }
707}
708
709mod replica_identity {
710    use mz_postgres_util::PostgresError;
711
712    use super::*;
713    use crate::plan::PlanError;
714    use crate::pure::PgSourcePurificationError;
715
716    /// Ensures that all provided OIDs are tables with `REPLICA IDENTITY FULL`.
717    pub async fn check_replica_identity_full(
718        client: &Client,
719        table_oids: &[Oid],
720    ) -> Result<(), PlanError> {
721        let invalid_replica_identity_rows = client
722            .query(
723                "
724            SELECT
725                format('%I.%I', n.nspname, c.relname) AS schema_qualified_table_name
726             FROM unnest($1::oid[]) AS oids (oid)
727             JOIN
728                 pg_class c ON c.oid = oids.oid
729             JOIN
730                 pg_namespace n ON c.relnamespace = n.oid
731             WHERE relreplident != 'f' OR relreplident IS NULL;",
732                &[&table_oids],
733            )
734            .await
735            .map_err(PostgresError::from)?;
736
737        let mut invalid_replica_identity = invalid_replica_identity_rows
738            .into_iter()
739            .map(|row| row.get("schema_qualified_table_name"))
740            .collect::<Vec<String>>();
741
742        if invalid_replica_identity.is_empty() {
743            Ok(())
744        } else {
745            invalid_replica_identity.sort();
746            Err(PgSourcePurificationError::NotTablesWReplicaIdentityFull {
747                items: invalid_replica_identity,
748            })?
749        }
750    }
751}