mz_sql/pure/
postgres.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Postgres utilities for SQL purification.
11
12use std::collections::{BTreeMap, BTreeSet};
13use std::sync::Arc;
14
15use mz_expr::MirScalarExpr;
16use mz_postgres_util::desc::PostgresTableDesc;
17use mz_proto::RustType;
18use mz_repr::{SqlColumnType, SqlRelationType, SqlScalarType};
19use mz_sql_parser::ast::display::AstDisplay;
20use mz_sql_parser::ast::{
21    ColumnDef, CreateSubsourceOption, CreateSubsourceOptionName, CreateSubsourceStatement,
22    ExternalReferences, Ident, TableConstraint, UnresolvedItemName, Value, WithOptionValue,
23};
24use mz_storage_types::sources::SourceExportStatementDetails;
25use mz_storage_types::sources::postgres::CastType;
26use prost::Message;
27use tokio_postgres::Client;
28use tokio_postgres::types::Oid;
29
30use crate::names::{Aug, ResolvedItemName};
31use crate::normalize;
32use crate::plan::hir::ColumnRef;
33use crate::plan::typeconv::{CastContext, plan_cast};
34use crate::plan::{
35    ExprContext, HirScalarExpr, PlanError, QueryContext, QueryLifetime, Scope, StatementContext,
36};
37
38use super::error::PgSourcePurificationError;
39use super::references::RetrievedSourceReferences;
40use super::{PartialItemName, PurifiedExportDetails, PurifiedSourceExport, SourceReferencePolicy};
41
42/// Ensure that we have select permissions on all tables; we have to do this before we
43/// start snapshotting because if we discover we cannot `COPY` from a table while
44/// snapshotting, we break the entire source.
45pub(super) async fn validate_requested_references_privileges(
46    client: &Client,
47    table_oids: &[Oid],
48) -> Result<(), PlanError> {
49    privileges::check_table_privileges(client, table_oids).await?;
50    replica_identity::check_replica_identity_full(client, table_oids).await?;
51
52    Ok(())
53}
54
55/// Generate a mapping of `Oid`s to column names that should be ingested as text
56/// (rather than their type in the upstream database).
57///
58/// Additionally, modify `text_columns` so that they contain database-qualified
59/// references to the columns.
60pub(super) fn generate_text_columns(
61    retrieved_references: &RetrievedSourceReferences,
62    text_columns: &mut [UnresolvedItemName],
63) -> Result<BTreeMap<u32, BTreeSet<String>>, PlanError> {
64    let mut text_cols_dict: BTreeMap<u32, BTreeSet<String>> = BTreeMap::new();
65
66    for name in text_columns {
67        let (qual, col) = match name.0.split_last().expect("must have at least one element") {
68            (col, qual) if qual.is_empty() => {
69                return Err(PlanError::InvalidOptionValue {
70                    option_name: "TEXT COLUMNS".to_string(),
71                    err: Box::new(PlanError::UnderqualifiedColumnName(
72                        col.as_str().to_string(),
73                    )),
74                });
75            }
76            (col, qual) => (qual.to_vec(), col.as_str().to_string()),
77        };
78
79        let resolved_reference = retrieved_references.resolve_name(&qual)?;
80        let mut fully_qualified_name =
81            resolved_reference
82                .external_reference()
83                .map_err(|e| PlanError::InvalidOptionValue {
84                    option_name: "TEXT COLUMNS".to_string(),
85                    err: Box::new(e.into()),
86                })?;
87
88        let desc = resolved_reference
89            .postgres_desc()
90            .expect("known to be postgres");
91
92        if !desc.columns.iter().any(|column| column.name == col) {
93            let column = mz_repr::ColumnName::from(col);
94            let similar = desc
95                .columns
96                .iter()
97                .filter_map(|c| {
98                    let c_name = mz_repr::ColumnName::from(c.name.clone());
99                    c_name.is_similar(&column).then_some(c_name)
100                })
101                .collect();
102            return Err(PlanError::InvalidOptionValue {
103                option_name: "TEXT COLUMNS".to_string(),
104                err: Box::new(PlanError::UnknownColumn {
105                    table: Some(
106                        normalize::unresolved_item_name(fully_qualified_name)
107                            .expect("known to be of valid len"),
108                    ),
109                    column,
110                    similar,
111                }),
112            });
113        }
114
115        // Rewrite fully qualified name.
116        let col_ident = Ident::new(col.as_str().to_string())?;
117        fully_qualified_name.0.push(col_ident);
118        *name = fully_qualified_name;
119
120        let new = text_cols_dict
121            .entry(desc.oid)
122            .or_default()
123            .insert(col.as_str().to_string());
124
125        if !new {
126            return Err(PlanError::InvalidOptionValue {
127                option_name: "TEXT COLUMNS".to_string(),
128                err: Box::new(PlanError::UnexpectedDuplicateReference { name: name.clone() }),
129            });
130        }
131    }
132
133    Ok(text_cols_dict)
134}
135
136pub fn generate_create_subsource_statements(
137    scx: &StatementContext,
138    source_name: ResolvedItemName,
139    requested_subsources: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
140) -> Result<Vec<CreateSubsourceStatement<Aug>>, PlanError> {
141    // Aggregate all unrecognized types.
142    let mut unsupported_cols = vec![];
143
144    // Now that we have an explicit list of validated requested subsources we can create them
145    let mut subsources = Vec::with_capacity(requested_subsources.len());
146
147    for (subsource_name, purified_export) in requested_subsources {
148        let PostgresExportStatementValues {
149            columns,
150            constraints,
151            text_columns,
152            details,
153            external_reference,
154        } = generate_source_export_statement_values(scx, purified_export, &mut unsupported_cols)?;
155
156        let mut with_options = vec![
157            CreateSubsourceOption {
158                name: CreateSubsourceOptionName::ExternalReference,
159                value: Some(WithOptionValue::UnresolvedItemName(external_reference)),
160            },
161            CreateSubsourceOption {
162                name: CreateSubsourceOptionName::Details,
163                value: Some(WithOptionValue::Value(Value::String(hex::encode(
164                    details.into_proto().encode_to_vec(),
165                )))),
166            },
167        ];
168
169        if let Some(text_columns) = text_columns {
170            with_options.push(CreateSubsourceOption {
171                name: CreateSubsourceOptionName::TextColumns,
172                value: Some(WithOptionValue::Sequence(text_columns)),
173            });
174        }
175
176        // Create the subsource statement
177        let subsource = CreateSubsourceStatement {
178            name: subsource_name,
179            columns,
180            // We might not know the primary source's `GlobalId` yet; if not,
181            // we'll fill it in once we generate it.
182            of_source: Some(source_name.clone()),
183            // TODO(petrosagg): nothing stops us from getting the constraints of the
184            // upstream tables and mirroring them here which will lead to more optimization
185            // opportunities if for example there is a primary key or an index.
186            //
187            // If we ever do that we must triple check that we will get notified *in the
188            // replication stream*, if our assumptions change. Failure to do that could
189            // mean that an upstream table that started with an index was then altered to
190            // one without and now we're producing garbage data.
191            constraints,
192            if_not_exists: false,
193            with_options,
194        };
195        subsources.push(subsource);
196    }
197
198    if !unsupported_cols.is_empty() {
199        unsupported_cols.sort();
200        Err(PgSourcePurificationError::UnrecognizedTypes {
201            cols: unsupported_cols,
202        })?;
203    }
204
205    Ok(subsources)
206}
207
208pub(super) struct PostgresExportStatementValues {
209    pub(super) columns: Vec<ColumnDef<Aug>>,
210    pub(super) constraints: Vec<TableConstraint<Aug>>,
211    pub(super) text_columns: Option<Vec<WithOptionValue<Aug>>>,
212    pub(super) details: SourceExportStatementDetails,
213    pub(super) external_reference: UnresolvedItemName,
214}
215
216pub(super) fn generate_source_export_statement_values(
217    scx: &StatementContext,
218    purified_export: PurifiedSourceExport,
219    unsupported_cols: &mut Vec<(String, mz_repr::adt::system::Oid)>,
220) -> Result<PostgresExportStatementValues, PlanError> {
221    let (text_columns, table) = match purified_export.details {
222        PurifiedExportDetails::Postgres {
223            text_columns,
224            table,
225        } => (text_columns, table),
226        _ => unreachable!("purified export details must be postgres"),
227    };
228
229    let text_column_set = text_columns
230        .as_ref()
231        .map(|v| BTreeSet::from_iter(v.iter().map(Ident::as_str)));
232
233    // Figure out the schema of the subsource
234    let mut columns = vec![];
235    for c in table.columns.iter() {
236        let name = Ident::new(c.name.clone())?;
237
238        let ty = match text_column_set {
239            Some(ref names) if names.contains(c.name.as_str()) => mz_pgrepr::Type::Text,
240            _ => match mz_pgrepr::Type::from_oid_and_typmod(c.type_oid, c.type_mod) {
241                Ok(t) => t,
242                Err(_) => {
243                    let mut full_name = purified_export.external_reference.0.clone();
244                    full_name.push(name);
245                    unsupported_cols.push((
246                        UnresolvedItemName(full_name).to_ast_string_simple(),
247                        mz_repr::adt::system::Oid(c.type_oid),
248                    ));
249                    continue;
250                }
251            },
252        };
253
254        let data_type = scx.resolve_type(ty)?;
255        let mut options = vec![];
256
257        if !c.nullable {
258            options.push(mz_sql_parser::ast::ColumnOptionDef {
259                name: None,
260                option: mz_sql_parser::ast::ColumnOption::NotNull,
261            });
262        }
263
264        columns.push(ColumnDef {
265            name,
266            data_type,
267            collation: None,
268            options,
269        });
270    }
271
272    let mut constraints = vec![];
273    for key in table.keys.clone() {
274        let mut key_columns = vec![];
275
276        for col_num in key.cols {
277            let ident = Ident::new(
278                table
279                    .columns
280                    .iter()
281                    .find(|col| col.col_num == col_num)
282                    .expect("key exists as column")
283                    .name
284                    .clone(),
285            )?;
286            key_columns.push(ident);
287        }
288
289        let constraint = mz_sql_parser::ast::TableConstraint::Unique {
290            name: Some(Ident::new(key.name)?),
291            columns: key_columns,
292            is_primary: key.is_primary,
293            nulls_not_distinct: key.nulls_not_distinct,
294        };
295
296        // We take the first constraint available to be the primary key.
297        if key.is_primary {
298            constraints.insert(0, constraint);
299        } else {
300            constraints.push(constraint);
301        }
302    }
303    let details = SourceExportStatementDetails::Postgres { table };
304
305    let text_columns = text_columns.map(|mut columns| {
306        columns.sort();
307        columns
308            .into_iter()
309            .map(WithOptionValue::Ident::<Aug>)
310            .collect()
311    });
312
313    Ok(PostgresExportStatementValues {
314        columns,
315        constraints,
316        text_columns,
317        details,
318        external_reference: purified_export.external_reference,
319    })
320}
321
322pub(super) struct PurifiedSourceExports {
323    pub(super) source_exports: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
324    // NOTE(roshan): The text columns are already part of their
325    // appropriate `source_exports` above, but these are returned to allow
326    // round-tripping a `CREATE SOURCE` statement while we still allow creating
327    // implicit subsources from `CREATE SOURCE`. Remove once
328    // fully deprecating that feature and forcing users to use explicit
329    // `CREATE TABLE .. FROM SOURCE` statements.
330    pub(super) normalized_text_columns: Vec<WithOptionValue<Aug>>,
331}
332
333// Purify the requested external references, returning a set of purified
334// source exports corresponding to external tables, and and additional
335// fields necessary to generate relevant statements and update statement options
336pub(super) async fn purify_source_exports(
337    client: &Client,
338    retrieved_references: &RetrievedSourceReferences,
339    requested_references: &Option<ExternalReferences>,
340    mut text_columns: Vec<UnresolvedItemName>,
341    unresolved_source_name: &UnresolvedItemName,
342    reference_policy: &SourceReferencePolicy,
343) -> Result<PurifiedSourceExports, PlanError> {
344    let requested_exports = match requested_references.as_ref() {
345        Some(requested) if matches!(reference_policy, SourceReferencePolicy::NotAllowed) => {
346            Err(PlanError::UseTablesForSources(requested.to_string()))?
347        }
348        Some(requested) => retrieved_references
349            .requested_source_exports(Some(requested), unresolved_source_name)?,
350        None => {
351            if matches!(reference_policy, SourceReferencePolicy::Required) {
352                Err(PgSourcePurificationError::RequiresExternalReferences)?
353            }
354
355            // If no external reference is specified, it does not make sense to include
356            // text columns.
357            if !text_columns.is_empty() {
358                Err(
359                    PgSourcePurificationError::UnnecessaryOptionsWithoutReferences(
360                        "TEXT COLUMNS".to_string(),
361                    ),
362                )?
363            }
364
365            return Ok(PurifiedSourceExports {
366                source_exports: BTreeMap::new(),
367                normalized_text_columns: vec![],
368            });
369        }
370    };
371
372    if requested_exports.is_empty() {
373        sql_bail!(
374            "[internal error]: Postgres reference {} did not match any tables",
375            requested_references
376                .as_ref()
377                .unwrap()
378                .to_ast_string_simple()
379        );
380    }
381
382    super::validate_source_export_names(&requested_exports)?;
383
384    let table_oids: Vec<_> = requested_exports
385        .iter()
386        .map(|r| r.meta.postgres_desc().expect("is postgres").oid)
387        .collect();
388
389    validate_requested_references_privileges(client, &table_oids).await?;
390
391    let mut text_column_map = generate_text_columns(retrieved_references, &mut text_columns)?;
392
393    // Normalize options to contain full qualified values.
394    text_columns.sort();
395    text_columns.dedup();
396    let normalized_text_columns: Vec<_> = text_columns
397        .into_iter()
398        .map(WithOptionValue::UnresolvedItemName)
399        .collect();
400
401    let source_exports = requested_exports
402        .into_iter()
403        .map(|r| {
404            let desc = r.meta.postgres_desc().expect("known postgres");
405            (
406                r.name,
407                PurifiedSourceExport {
408                    external_reference: r.external_reference,
409                    details: PurifiedExportDetails::Postgres {
410                        text_columns: text_column_map.remove(&desc.oid).map(|v| {
411                            v.into_iter()
412                                .map(|s| Ident::new(s).expect("validated above"))
413                                .collect()
414                        }),
415                        table: desc.clone(),
416                    },
417                },
418            )
419        })
420        .collect();
421
422    if !text_column_map.is_empty() {
423        // If any any item was not removed from the text_column_map, it wasn't being
424        // added.
425        let mut dangling_text_column_refs = vec![];
426        let all_references = retrieved_references.all_references();
427
428        for id in text_column_map.keys() {
429            let desc = all_references
430                .iter()
431                .find_map(|reference| {
432                    let desc = reference.postgres_desc().expect("is postgres");
433                    if desc.oid == *id { Some(desc) } else { None }
434                })
435                .expect("validated when generating text columns");
436
437            dangling_text_column_refs.push(PartialItemName {
438                database: None,
439                schema: Some(desc.namespace.clone()),
440                item: desc.name.clone(),
441            });
442        }
443
444        dangling_text_column_refs.sort();
445        Err(PgSourcePurificationError::DanglingTextColumns {
446            items: dangling_text_column_refs,
447        })?;
448    }
449
450    Ok(PurifiedSourceExports {
451        source_exports,
452        normalized_text_columns,
453    })
454}
455
456pub(crate) fn generate_column_casts(
457    scx: &StatementContext,
458    table: &PostgresTableDesc,
459    text_columns: &Vec<Ident>,
460) -> Result<Vec<(CastType, MirScalarExpr)>, PlanError> {
461    // Generate the cast expressions required to convert the text encoded columns into
462    // the appropriate target types, creating a Vec<MirScalarExpr>
463    // The postgres source reader will then eval each of those on the incoming rows
464    // First, construct an expression context where the expression is evaluated on an
465    // imaginary row which has the same number of columns as the upstream table but all
466    // of the types are text
467    let mut cast_scx = scx.clone();
468    cast_scx.param_types = Default::default();
469    let cast_qcx = QueryContext::root(&cast_scx, QueryLifetime::Source);
470    let mut column_types = vec![];
471    for column in table.columns.iter() {
472        column_types.push(SqlColumnType {
473            nullable: column.nullable,
474            scalar_type: SqlScalarType::String,
475        });
476    }
477
478    let cast_ecx = ExprContext {
479        qcx: &cast_qcx,
480        name: "plan_postgres_source_cast",
481        scope: &Scope::empty(),
482        relation_type: &SqlRelationType {
483            column_types,
484            keys: vec![],
485        },
486        allow_aggregates: false,
487        allow_subqueries: false,
488        allow_parameters: false,
489        allow_windows: false,
490    };
491
492    let text_columns = BTreeSet::from_iter(text_columns.iter().map(Ident::as_str));
493
494    // Then, for each column we will generate a MirRelationExpr that extracts the nth
495    // column and casts it to the appropriate target type
496    let mut table_cast = vec![];
497    for (i, column) in table.columns.iter().enumerate() {
498        let (cast_type, ty) = if text_columns.contains(column.name.as_str()) {
499            // Treat the column as text if it was referenced in
500            // `TEXT COLUMNS`. This is the only place we need to
501            // perform this logic; even if the type is unsupported,
502            // we'll be able to ingest its values as text in
503            // storage.
504            (CastType::Text, mz_pgrepr::Type::Text)
505        } else {
506            match mz_pgrepr::Type::from_oid_and_typmod(column.type_oid, column.type_mod) {
507                Ok(t) => (CastType::Natural, t),
508                // If this reference survived purification, we
509                // do not expect it to be from a table that the
510                // user will consume., i.e. expect this table to
511                // be filtered out of table casts.
512                Err(_) => {
513                    table_cast.push((
514                        CastType::Natural,
515                        HirScalarExpr::call_variadic(
516                            mz_expr::VariadicFunc::ErrorIfNull,
517                            vec![
518                                HirScalarExpr::literal_null(SqlScalarType::String),
519                                HirScalarExpr::literal(
520                                    mz_repr::Datum::from(
521                                        format!("Unsupported type with OID {}", column.type_oid)
522                                            .as_str(),
523                                    ),
524                                    SqlScalarType::String,
525                                ),
526                            ],
527                        )
528                        .lower_uncorrelated()
529                        .expect("no correlation"),
530                    ));
531                    continue;
532                }
533            }
534        };
535
536        let data_type = scx.resolve_type(ty)?;
537        let scalar_type = crate::plan::query::scalar_type_from_sql(scx, &data_type)?;
538
539        let col_expr = HirScalarExpr::named_column(
540            ColumnRef {
541                level: 0,
542                column: i,
543            },
544            Arc::from(column.name.as_str()),
545        );
546
547        let cast_expr = plan_cast(&cast_ecx, CastContext::Explicit, col_expr, &scalar_type)?;
548
549        let cast = if column.nullable {
550            cast_expr
551        } else {
552            // We must enforce nullability constraint on cast
553            // because PG replication stream does not propagate
554            // constraint changes and we want to error subsource if
555            // e.g. the constraint is dropped and we don't notice
556            // it.
557            HirScalarExpr::call_variadic(mz_expr::VariadicFunc::ErrorIfNull, vec![
558                                cast_expr,
559                                HirScalarExpr::literal(
560                                    mz_repr::Datum::from(
561                                        format!(
562                                            "PG column {}.{}.{} contained NULL data, despite having NOT NULL constraint",
563                                            table.namespace.clone(),
564                                            table.name.clone(),
565                                            column.name.clone())
566                                            .as_str(),
567                                    ),
568                                    SqlScalarType::String,
569                                ),
570                            ],
571                            )
572        };
573
574        // We expect only reg* types to encounter this issue. Users
575        // can ingest the data as text if they need to ingest it.
576        // This is acceptable because we don't expect the OIDs from
577        // an external PG source to be unilaterally usable in
578        // resolving item names in MZ.
579        let mir_cast = cast.lower_uncorrelated().map_err(|_e| {
580            tracing::info!(
581                "cannot ingest {:?} data from PG source because cast is correlated",
582                scalar_type
583            );
584
585            PlanError::TableContainsUningestableTypes {
586                name: table.name.to_string(),
587                type_: scx.humanize_scalar_type(&scalar_type, false),
588                column: column.name.to_string(),
589            }
590        })?;
591
592        table_cast.push((cast_type, mir_cast));
593    }
594    Ok(table_cast)
595}
596
597mod privileges {
598    use mz_postgres_util::PostgresError;
599
600    use super::*;
601    use crate::plan::PlanError;
602    use crate::pure::PgSourcePurificationError;
603
604    async fn check_schema_privileges(client: &Client, table_oids: &[Oid]) -> Result<(), PlanError> {
605        let invalid_schema_privileges_rows = client
606            .query(
607                "
608                WITH distinct_namespace AS (
609                    SELECT
610                        DISTINCT n.oid, n.nspname AS schema_name
611                    FROM unnest($1::OID[]) AS oids (oid)
612                    JOIN pg_class AS c ON c.oid = oids.oid
613                    JOIN pg_namespace AS n ON c.relnamespace = n.oid
614                )
615                SELECT d.schema_name
616                FROM distinct_namespace AS d
617                WHERE
618                    NOT has_schema_privilege(CURRENT_USER::TEXT, d.oid, 'usage')",
619                &[&table_oids],
620            )
621            .await
622            .map_err(PostgresError::from)?;
623
624        let mut invalid_schema_privileges = invalid_schema_privileges_rows
625            .into_iter()
626            .map(|row| row.get("schema_name"))
627            .collect::<Vec<String>>();
628
629        if invalid_schema_privileges.is_empty() {
630            Ok(())
631        } else {
632            invalid_schema_privileges.sort();
633            Err(PgSourcePurificationError::UserLacksUsageOnSchemas {
634                schemas: invalid_schema_privileges,
635            })?
636        }
637    }
638
639    /// Ensure that the user specified in `config` has:
640    ///
641    /// -`SELECT` privileges for the identified `tables`.
642    ///
643    ///  `tables`'s elements should be of the structure `[<schema name>, <table name>]`.
644    ///
645    /// - `USAGE` privileges on the schemas references in `tables`.
646    ///
647    /// # Panics
648    /// If `config` does not specify a user.
649    pub async fn check_table_privileges(
650        client: &Client,
651        table_oids: &[Oid],
652    ) -> Result<(), PlanError> {
653        check_schema_privileges(client, table_oids).await?;
654
655        let invalid_table_privileges_rows = client
656            .query(
657                "
658            SELECT
659                format('%I.%I', n.nspname, c.relname) AS schema_qualified_table_name
660             FROM unnest($1::oid[]) AS oids (oid)
661             JOIN
662                 pg_class c ON c.oid = oids.oid
663             JOIN
664                 pg_namespace n ON c.relnamespace = n.oid
665             WHERE NOT has_table_privilege(CURRENT_USER::text, c.oid, 'select')",
666                &[&table_oids],
667            )
668            .await
669            .map_err(PostgresError::from)?;
670
671        let mut invalid_table_privileges = invalid_table_privileges_rows
672            .into_iter()
673            .map(|row| row.get("schema_qualified_table_name"))
674            .collect::<Vec<String>>();
675
676        if invalid_table_privileges.is_empty() {
677            Ok(())
678        } else {
679            invalid_table_privileges.sort();
680            Err(PgSourcePurificationError::UserLacksSelectOnTables {
681                tables: invalid_table_privileges,
682            })?
683        }
684    }
685}
686
687mod replica_identity {
688    use mz_postgres_util::PostgresError;
689
690    use super::*;
691    use crate::plan::PlanError;
692    use crate::pure::PgSourcePurificationError;
693
694    /// Ensures that all provided OIDs are tables with `REPLICA IDENTITY FULL`.
695    pub async fn check_replica_identity_full(
696        client: &Client,
697        table_oids: &[Oid],
698    ) -> Result<(), PlanError> {
699        let invalid_replica_identity_rows = client
700            .query(
701                "
702            SELECT
703                format('%I.%I', n.nspname, c.relname) AS schema_qualified_table_name
704             FROM unnest($1::oid[]) AS oids (oid)
705             JOIN
706                 pg_class c ON c.oid = oids.oid
707             JOIN
708                 pg_namespace n ON c.relnamespace = n.oid
709             WHERE relreplident != 'f' OR relreplident IS NULL;",
710                &[&table_oids],
711            )
712            .await
713            .map_err(PostgresError::from)?;
714
715        let mut invalid_replica_identity = invalid_replica_identity_rows
716            .into_iter()
717            .map(|row| row.get("schema_qualified_table_name"))
718            .collect::<Vec<String>>();
719
720        if invalid_replica_identity.is_empty() {
721            Ok(())
722        } else {
723            invalid_replica_identity.sort();
724            Err(PgSourcePurificationError::NotTablesWReplicaIdentityFull {
725                items: invalid_replica_identity,
726            })?
727        }
728    }
729}