Skip to main content

mz_sql/pure/
postgres.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Postgres utilities for SQL purification.
11
12use std::collections::{BTreeMap, BTreeSet};
13
14use mz_postgres_util::desc::PostgresTableDesc;
15use mz_proto::RustType;
16use mz_repr::{Datum, ReprColumnType, ReprScalarType, Row, SqlScalarType};
17use mz_sql_parser::ast::display::AstDisplay;
18use mz_sql_parser::ast::{
19    ColumnDef, CreateSubsourceOption, CreateSubsourceOptionName, CreateSubsourceStatement,
20    ExternalReferences, Ident, PgConfigOptionName, TableConstraint, UnresolvedItemName, Value,
21    WithOptionValue,
22};
23use mz_storage_types::sources::SourceExportStatementDetails;
24use mz_storage_types::sources::casts::{CastFunc, StorageScalarExpr};
25use mz_storage_types::sources::postgres::CastType;
26use prost::Message;
27use tokio_postgres::Client;
28use tokio_postgres::types::Oid;
29
30use crate::names::{Aug, ResolvedItemName};
31use crate::normalize;
32use crate::plan::{PlanError, StatementContext};
33
34use super::error::PgSourcePurificationError;
35use super::references::RetrievedSourceReferences;
36use super::{PartialItemName, PurifiedExportDetails, PurifiedSourceExport, SourceReferencePolicy};
37
38/// Ensure that we have select permissions on all tables; we have to do this before we
39/// start snapshotting because if we discover we cannot `COPY` from a table while
40/// snapshotting, we break the entire source.
41pub(super) async fn validate_requested_references_privileges(
42    client: &Client,
43    table_oids: &[Oid],
44) -> Result<(), PlanError> {
45    privileges::check_table_privileges(client, table_oids).await?;
46    privileges::check_rls_privileges(client, table_oids).await?;
47    replica_identity::check_replica_identity_full(client, table_oids).await?;
48
49    Ok(())
50}
51
52/// Map a list of column references to a map of table oids to column names.
53///
54/// Additionally, modify `columns` so that they contain database-qualified
55/// references to the columns.
56pub(super) fn map_column_refs(
57    retrieved_references: &RetrievedSourceReferences,
58    columns: &mut [UnresolvedItemName],
59    option_type: PgConfigOptionName,
60) -> Result<BTreeMap<u32, BTreeSet<String>>, PlanError> {
61    let mut cols_map: BTreeMap<u32, BTreeSet<String>> = BTreeMap::new();
62
63    for name in columns {
64        let (qual, col) = match name.0.split_last().expect("must have at least one element") {
65            (col, []) => {
66                return Err(PlanError::InvalidOptionValue {
67                    option_name: option_type.to_ast_string_simple(),
68                    err: Box::new(PlanError::UnderqualifiedColumnName(
69                        col.as_str().to_string(),
70                    )),
71                });
72            }
73            (col, qual) => (qual.to_vec(), col.as_str().to_string()),
74        };
75
76        let resolved_reference = retrieved_references.resolve_name(&qual)?;
77        let mut fully_qualified_name =
78            resolved_reference
79                .external_reference()
80                .map_err(|e| PlanError::InvalidOptionValue {
81                    option_name: option_type.to_ast_string_simple(),
82                    err: Box::new(e.into()),
83                })?;
84
85        let desc = resolved_reference
86            .postgres_desc()
87            .expect("known to be postgres");
88
89        if !desc.columns.iter().any(|column| column.name == col) {
90            let column = mz_repr::ColumnName::from(col);
91            let similar = desc
92                .columns
93                .iter()
94                .filter_map(|c| {
95                    let c_name = mz_repr::ColumnName::from(c.name.clone());
96                    c_name.is_similar(&column).then_some(c_name)
97                })
98                .collect();
99            return Err(PlanError::InvalidOptionValue {
100                option_name: option_type.to_ast_string_simple(),
101                err: Box::new(PlanError::UnknownColumn {
102                    table: Some(
103                        normalize::unresolved_item_name(fully_qualified_name)
104                            .expect("known to be of valid len"),
105                    ),
106                    column,
107                    similar,
108                }),
109            });
110        }
111
112        // Rewrite fully qualified name.
113        let col_ident = Ident::new(col.as_str().to_string())?;
114        fully_qualified_name.0.push(col_ident);
115        *name = fully_qualified_name;
116
117        let new = cols_map
118            .entry(desc.oid)
119            .or_default()
120            .insert(col.as_str().to_string());
121
122        if !new {
123            return Err(PlanError::InvalidOptionValue {
124                option_name: option_type.to_ast_string_simple(),
125                err: Box::new(PlanError::UnexpectedDuplicateReference { name: name.clone() }),
126            });
127        }
128    }
129
130    Ok(cols_map)
131}
132
133pub fn generate_create_subsource_statements(
134    scx: &StatementContext,
135    source_name: ResolvedItemName,
136    requested_subsources: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
137) -> Result<Vec<CreateSubsourceStatement<Aug>>, PlanError> {
138    // Aggregate all unrecognized types.
139    let mut unsupported_cols = vec![];
140
141    // Now that we have an explicit list of validated requested subsources we can create them
142    let mut subsources = Vec::with_capacity(requested_subsources.len());
143
144    for (subsource_name, purified_export) in requested_subsources {
145        let PostgresExportStatementValues {
146            columns,
147            constraints,
148            text_columns,
149            exclude_columns,
150            details,
151            external_reference,
152        } = generate_source_export_statement_values(scx, purified_export, &mut unsupported_cols)?;
153
154        let mut with_options = vec![
155            CreateSubsourceOption {
156                name: CreateSubsourceOptionName::ExternalReference,
157                value: Some(WithOptionValue::UnresolvedItemName(external_reference)),
158            },
159            CreateSubsourceOption {
160                name: CreateSubsourceOptionName::Details,
161                value: Some(WithOptionValue::Value(Value::String(hex::encode(
162                    details.into_proto().encode_to_vec(),
163                )))),
164            },
165        ];
166
167        if let Some(text_columns) = text_columns {
168            with_options.push(CreateSubsourceOption {
169                name: CreateSubsourceOptionName::TextColumns,
170                value: Some(WithOptionValue::Sequence(text_columns)),
171            });
172        }
173
174        if let Some(exclude_columns) = exclude_columns {
175            with_options.push(CreateSubsourceOption {
176                name: CreateSubsourceOptionName::ExcludeColumns,
177                value: Some(WithOptionValue::Sequence(exclude_columns)),
178            });
179        }
180
181        // Create the subsource statement
182        let subsource = CreateSubsourceStatement {
183            name: subsource_name,
184            columns,
185            // We might not know the primary source's `GlobalId` yet; if not,
186            // we'll fill it in once we generate it.
187            of_source: Some(source_name.clone()),
188            // TODO(petrosagg): nothing stops us from getting the constraints of the
189            // upstream tables and mirroring them here which will lead to more optimization
190            // opportunities if for example there is a primary key or an index.
191            //
192            // If we ever do that we must triple check that we will get notified *in the
193            // replication stream*, if our assumptions change. Failure to do that could
194            // mean that an upstream table that started with an index was then altered to
195            // one without and now we're producing garbage data.
196            constraints,
197            if_not_exists: false,
198            with_options,
199        };
200        subsources.push(subsource);
201    }
202
203    if !unsupported_cols.is_empty() {
204        unsupported_cols.sort();
205        Err(PgSourcePurificationError::UnrecognizedTypes {
206            cols: unsupported_cols,
207        })?;
208    }
209
210    Ok(subsources)
211}
212
213pub(super) struct PostgresExportStatementValues {
214    pub(super) columns: Vec<ColumnDef<Aug>>,
215    pub(super) constraints: Vec<TableConstraint<Aug>>,
216    pub(super) text_columns: Option<Vec<WithOptionValue<Aug>>>,
217    pub(super) exclude_columns: Option<Vec<WithOptionValue<Aug>>>,
218    pub(super) details: SourceExportStatementDetails,
219    pub(super) external_reference: UnresolvedItemName,
220}
221
222pub(super) fn generate_source_export_statement_values(
223    scx: &StatementContext,
224    purified_export: PurifiedSourceExport,
225    unsupported_cols: &mut Vec<(String, mz_repr::adt::system::Oid)>,
226) -> Result<PostgresExportStatementValues, PlanError> {
227    let PurifiedExportDetails::Postgres {
228        table,
229        text_columns,
230        exclude_columns,
231    } = purified_export.details
232    else {
233        bail_internal!("purified export details must be postgres");
234    };
235
236    let text_column_set = BTreeSet::from_iter(text_columns.iter().flatten().map(Ident::as_str));
237    let exclude_column_set =
238        BTreeSet::from_iter(exclude_columns.iter().flatten().map(Ident::as_str));
239
240    // Figure out the schema of the subsource
241    let mut columns = vec![];
242    for c in table.columns.iter() {
243        let name = Ident::new(c.name.clone())?;
244
245        if exclude_column_set.contains(c.name.as_str()) {
246            continue;
247        }
248
249        let ty = if text_column_set.contains(c.name.as_str()) {
250            mz_pgrepr::Type::Text
251        } else {
252            match mz_pgrepr::Type::from_oid_and_typmod(c.type_oid, c.type_mod) {
253                Ok(t) => t,
254                Err(_) => {
255                    let mut full_name = purified_export.external_reference.0.clone();
256                    full_name.push(name);
257                    unsupported_cols.push((
258                        UnresolvedItemName(full_name).to_ast_string_simple(),
259                        mz_repr::adt::system::Oid(c.type_oid),
260                    ));
261                    continue;
262                }
263            }
264        };
265
266        let data_type = scx.resolve_type(ty)?;
267        let mut options = vec![];
268
269        if !c.nullable {
270            options.push(mz_sql_parser::ast::ColumnOptionDef {
271                name: None,
272                option: mz_sql_parser::ast::ColumnOption::NotNull,
273            });
274        }
275
276        columns.push(ColumnDef {
277            name,
278            data_type,
279            collation: None,
280            options,
281        });
282    }
283
284    let mut constraints = vec![];
285    for key in table.keys.clone() {
286        let mut key_columns = vec![];
287        let mut all_key_cols_included = true;
288
289        for col_num in key.cols {
290            match table.columns.iter().find(|col| col.col_num == col_num) {
291                Some(col) => {
292                    let ident = Ident::new(col.name.clone())?;
293                    key_columns.push(ident);
294                }
295                None => {
296                    all_key_cols_included = false;
297                    break;
298                }
299            }
300        }
301        if !all_key_cols_included {
302            continue;
303        }
304
305        let constraint = mz_sql_parser::ast::TableConstraint::Unique {
306            name: Some(Ident::new(key.name)?),
307            columns: key_columns,
308            is_primary: key.is_primary,
309            nulls_not_distinct: key.nulls_not_distinct,
310        };
311
312        // We take the first constraint available to be the primary key.
313        if key.is_primary {
314            constraints.insert(0, constraint);
315        } else {
316            constraints.push(constraint);
317        }
318    }
319    let details = SourceExportStatementDetails::Postgres { table };
320
321    let text_columns = text_columns.map(|mut columns| {
322        columns.sort();
323        columns
324            .into_iter()
325            .map(WithOptionValue::Ident::<Aug>)
326            .collect()
327    });
328
329    let exclude_columns = exclude_columns.map(|mut columns| {
330        columns.sort();
331        columns
332            .into_iter()
333            .map(WithOptionValue::Ident::<Aug>)
334            .collect()
335    });
336
337    Ok(PostgresExportStatementValues {
338        columns,
339        constraints,
340        text_columns,
341        exclude_columns,
342        details,
343        external_reference: purified_export.external_reference,
344    })
345}
346
347pub(super) struct PurifiedSourceExports {
348    pub(super) source_exports: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
349    // NOTE(roshan): The text columns are already part of their
350    // appropriate `source_exports` above, but these are returned to allow
351    // round-tripping a `CREATE SOURCE` statement while we still allow creating
352    // implicit subsources from `CREATE SOURCE`. Remove once
353    // fully deprecating that feature and forcing users to use explicit
354    // `CREATE TABLE .. FROM SOURCE` statements.
355    pub(super) normalized_text_columns: Vec<WithOptionValue<Aug>>,
356}
357
358// Purify the requested external references, returning a set of purified
359// source exports corresponding to external tables, and and additional
360// fields necessary to generate relevant statements and update statement options
361pub(super) async fn purify_source_exports(
362    client: &Client,
363    retrieved_references: &RetrievedSourceReferences,
364    requested_references: &Option<ExternalReferences>,
365    mut text_columns: Vec<UnresolvedItemName>,
366    mut exclude_columns: Vec<UnresolvedItemName>,
367    unresolved_source_name: &UnresolvedItemName,
368    reference_policy: &SourceReferencePolicy,
369) -> Result<PurifiedSourceExports, PlanError> {
370    let requested_exports = match requested_references.as_ref() {
371        Some(requested) if matches!(reference_policy, SourceReferencePolicy::NotAllowed) => {
372            Err(PlanError::UseTablesForSources(requested.to_string()))?
373        }
374        Some(requested) => retrieved_references
375            .requested_source_exports(Some(requested), unresolved_source_name)?,
376        None => {
377            if matches!(reference_policy, SourceReferencePolicy::Required) {
378                Err(PgSourcePurificationError::RequiresExternalReferences)?
379            }
380
381            // If no external reference is specified, it does not make sense to include
382            // text columns.
383            if !text_columns.is_empty() {
384                Err(
385                    PgSourcePurificationError::UnnecessaryOptionsWithoutReferences(
386                        "TEXT COLUMNS".to_string(),
387                    ),
388                )?
389            }
390
391            // If no external reference is specified, it does not make sense to include
392            // exclude columns.
393            if !exclude_columns.is_empty() {
394                Err(
395                    PgSourcePurificationError::UnnecessaryOptionsWithoutReferences(
396                        "EXCLUDE COLUMNS".to_string(),
397                    ),
398                )?
399            }
400
401            return Ok(PurifiedSourceExports {
402                source_exports: BTreeMap::new(),
403                normalized_text_columns: vec![],
404            });
405        }
406    };
407
408    if requested_exports.is_empty() {
409        sql_bail!(
410            "[internal error]: Postgres reference {} did not match any tables",
411            requested_references
412                .as_ref()
413                .unwrap()
414                .to_ast_string_simple()
415        );
416    }
417
418    super::validate_source_export_names(&requested_exports)?;
419
420    let table_oids: Vec<_> = requested_exports
421        .iter()
422        .map(|r| r.meta.postgres_desc().expect("is postgres").oid)
423        .collect();
424
425    validate_requested_references_privileges(client, &table_oids).await?;
426
427    let mut text_column_map = map_column_refs(
428        retrieved_references,
429        &mut text_columns,
430        PgConfigOptionName::TextColumns,
431    )?;
432    let mut exclude_column_map = map_column_refs(
433        retrieved_references,
434        &mut exclude_columns,
435        PgConfigOptionName::ExcludeColumns,
436    )?;
437
438    // Normalize options to contain full qualified values.
439    text_columns.sort();
440    text_columns.dedup();
441    let normalized_text_columns: Vec<_> = text_columns
442        .into_iter()
443        .map(WithOptionValue::UnresolvedItemName)
444        .collect();
445
446    let source_exports = requested_exports
447        .into_iter()
448        .map(|r| {
449            let mut desc = r.meta.postgres_desc().expect("known postgres").clone();
450            let text_columns = text_column_map.remove(&desc.oid);
451            let exclude_columns = exclude_column_map.remove(&desc.oid);
452
453            if let Some(exclude_cols) = &exclude_columns {
454                desc.columns.retain(|c| !exclude_cols.contains(&c.name));
455            }
456
457            if let (Some(text_cols), Some(exclude_cols)) = (&text_columns, &exclude_columns) {
458                let intersection: Vec<_> = text_cols.intersection(exclude_cols).collect();
459                if !intersection.is_empty() {
460                    return Err(PgSourcePurificationError::DuplicatedColumnNames(
461                        intersection.iter().map(|s| (*s).to_string()).collect(),
462                    ));
463                }
464            }
465            Ok((
466                r.name,
467                PurifiedSourceExport {
468                    external_reference: r.external_reference,
469                    details: PurifiedExportDetails::Postgres {
470                        text_columns: text_columns.map(|v| {
471                            v.into_iter()
472                                .map(|s| Ident::new(s).expect("validated above"))
473                                .collect()
474                        }),
475                        exclude_columns: exclude_columns.map(|v| {
476                            v.into_iter()
477                                .map(|s| Ident::new(s).expect("validated above"))
478                                .collect()
479                        }),
480                        table: desc,
481                    },
482                },
483            ))
484        })
485        .collect::<Result<BTreeMap<_, _>, _>>()?;
486
487    if !text_column_map.is_empty() {
488        // If any any item was not removed from the text_column_map, it wasn't being
489        // added.
490        let mut dangling_text_column_refs = vec![];
491        let all_references = retrieved_references.all_references();
492
493        for id in text_column_map.keys() {
494            let desc = all_references
495                .iter()
496                .find_map(|reference| {
497                    let desc = reference.postgres_desc().expect("is postgres");
498                    if desc.oid == *id { Some(desc) } else { None }
499                })
500                .expect("validated when generating text columns");
501
502            dangling_text_column_refs.push(PartialItemName {
503                database: None,
504                schema: Some(desc.namespace.clone()),
505                item: desc.name.clone(),
506            });
507        }
508
509        dangling_text_column_refs.sort();
510        return Err(PlanError::from(
511            PgSourcePurificationError::DanglingTextColumns {
512                items: dangling_text_column_refs,
513            },
514        ));
515    }
516
517    if !exclude_column_map.is_empty() {
518        // If any any item was not removed from the exclude_column_map, it wasn't being
519        // added.
520        let mut dangling_exclude_column_refs = vec![];
521        let all_references = retrieved_references.all_references();
522
523        for id in exclude_column_map.keys() {
524            let desc = all_references
525                .iter()
526                .find_map(|reference| {
527                    let desc = reference.postgres_desc().expect("is postgres");
528                    if desc.oid == *id { Some(desc) } else { None }
529                })
530                .expect("validated when generating exclude columns");
531
532            dangling_exclude_column_refs.push(PartialItemName {
533                database: None,
534                schema: Some(desc.namespace.clone()),
535                item: desc.name.clone(),
536            });
537        }
538
539        dangling_exclude_column_refs.sort();
540        return Err(PlanError::from(
541            PgSourcePurificationError::DanglingExcludeColumns {
542                items: dangling_exclude_column_refs,
543            },
544        ));
545    }
546
547    Ok(PurifiedSourceExports {
548        source_exports,
549        normalized_text_columns,
550    })
551}
552
553pub(crate) fn generate_column_casts(
554    scx: &StatementContext,
555    table: &PostgresTableDesc,
556    text_columns: &Vec<Ident>,
557) -> Result<Vec<(CastType, StorageScalarExpr)>, PlanError> {
558    // Generate the cast expressions required to convert the text encoded columns into
559    // the appropriate target types, creating a Vec<StorageScalarExpr>.
560    // The postgres source reader will then eval each of those on the incoming rows.
561
562    let text_columns = BTreeSet::from_iter(text_columns.iter().map(Ident::as_str));
563
564    let mut table_cast = vec![];
565    for (i, column) in table.columns.iter().enumerate() {
566        let (cast_type, ty) = if text_columns.contains(column.name.as_str()) {
567            // Treat the column as text if it was referenced in
568            // `TEXT COLUMNS`. This is the only place we need to
569            // perform this logic; even if the type is unsupported,
570            // we'll be able to ingest its values as text in
571            // storage.
572            (CastType::Text, mz_pgrepr::Type::Text)
573        } else {
574            match mz_pgrepr::Type::from_oid_and_typmod(column.type_oid, column.type_mod) {
575                Ok(t) => (CastType::Natural, t),
576                // If this reference survived purification, we
577                // do not expect it to be from a table that the
578                // user will consume., i.e. expect this table to
579                // be filtered out of table casts.
580                Err(_) => {
581                    table_cast.push((
582                        CastType::Natural,
583                        StorageScalarExpr::ErrorIfNull(
584                            Box::new(StorageScalarExpr::Literal(
585                                Row::pack_slice(&[Datum::Null]),
586                                ReprColumnType {
587                                    nullable: true,
588                                    scalar_type: ReprScalarType::String,
589                                },
590                            )),
591                            format!("Unsupported type with OID {}", column.type_oid),
592                        ),
593                    ));
594                    continue;
595                }
596            }
597        };
598
599        let cast_expr = match pg_type_to_cast_func(scx, &ty) {
600            Ok(None) => {
601                // No cast needed (e.g. Text → String identity).
602                StorageScalarExpr::Column(i)
603            }
604            Ok(Some(cast_func)) => {
605                StorageScalarExpr::CallUnary(cast_func, Box::new(StorageScalarExpr::Column(i)))
606            }
607            Err(PlanError::TableContainsUningestableTypes { type_, .. }) => {
608                // We expect only reg* types and similar to encounter
609                // this. Users can ingest the data as text if they need
610                // to. This is acceptable because we don't expect the
611                // OIDs from an external PG source to be unilaterally
612                // usable in resolving item names in MZ.
613                return Err(PlanError::TableContainsUningestableTypes {
614                    name: table.name.to_string(),
615                    type_,
616                    column: column.name.to_string(),
617                });
618            }
619            Err(e) => return Err(e),
620        };
621
622        let cast = if column.nullable {
623            cast_expr
624        } else {
625            // We must enforce nullability constraint on cast
626            // because PG replication stream does not propagate
627            // constraint changes and we want to error subsource if
628            // e.g. the constraint is dropped and we don't notice
629            // it.
630            let message = format!(
631                "PG column {}.{}.{} contained NULL data, despite having NOT NULL constraint",
632                table.namespace, table.name, column.name
633            );
634            StorageScalarExpr::ErrorIfNull(Box::new(cast_expr), message)
635        };
636
637        table_cast.push((cast_type, cast));
638    }
639    Ok(table_cast)
640}
641
642/// Resolve a PG type to its corresponding `SqlScalarType` via the catalog.
643fn resolve_pg_type_to_scalar_type(
644    scx: &StatementContext,
645    ty: &mz_pgrepr::Type,
646) -> Result<SqlScalarType, PlanError> {
647    let data_type = scx.resolve_type(ty.clone())?;
648    crate::plan::query::scalar_type_from_sql(scx, &data_type)
649}
650
651/// Map a PG type to the corresponding `CastFunc` variant. Returns:
652/// - `Ok(Some(func))` for types that need a cast
653/// - `Ok(None)` for types that need no cast (Text → String identity)
654/// - `Err(PlanError::TableContainsUningestableTypes { .. })` for types
655///   that cannot be ingested. The error uses placeholder strings for
656///   table/column name; callers with context should use
657///   `pg_type_to_cast_func_or_uningestable` instead.
658fn pg_type_to_cast_func(
659    scx: &StatementContext,
660    ty: &mz_pgrepr::Type,
661) -> Result<Option<CastFunc>, PlanError> {
662    use mz_pgrepr::Type;
663
664    let cast_func = match ty {
665        Type::Bool => CastFunc::CastStringToBool,
666        Type::Bytea => CastFunc::CastStringToBytes,
667        Type::Char => CastFunc::CastStringToPgLegacyChar,
668        Type::Date => CastFunc::CastStringToDate,
669        Type::Float4 => CastFunc::CastStringToFloat32,
670        Type::Float8 => CastFunc::CastStringToFloat64,
671        Type::Int2 => CastFunc::CastStringToInt16,
672        Type::Int4 => CastFunc::CastStringToInt32,
673        Type::Int8 => CastFunc::CastStringToInt64,
674        Type::UInt2 => CastFunc::CastStringToUint16,
675        Type::UInt4 => CastFunc::CastStringToUint32,
676        Type::UInt8 => CastFunc::CastStringToUint64,
677        Type::Interval { .. } => CastFunc::CastStringToInterval,
678        Type::Jsonb => CastFunc::CastStringToJsonb,
679        Type::Name => CastFunc::CastStringToPgLegacyName,
680        Type::Numeric { .. } => {
681            // Resolve through the catalog to get the repr NumericMaxScale type.
682            let scalar_type = resolve_pg_type_to_scalar_type(scx, ty)?;
683            match scalar_type {
684                SqlScalarType::Numeric { max_scale } => CastFunc::CastStringToNumeric(max_scale),
685                _ => unreachable!("Numeric must resolve to Numeric"),
686            }
687        }
688        Type::Oid => CastFunc::CastStringToOid,
689        Type::Text => return Ok(None),
690        Type::BpChar { .. } => {
691            // Resolve through the catalog to get the repr CharLength type.
692            let scalar_type = resolve_pg_type_to_scalar_type(scx, ty)?;
693            match scalar_type {
694                SqlScalarType::Char { length } => CastFunc::CastStringToChar {
695                    length,
696                    fail_on_len: true,
697                },
698                _ => unreachable!("BpChar must resolve to Char"),
699            }
700        }
701        Type::VarChar { .. } => {
702            // Resolve through the catalog to get the repr VarCharMaxLength type.
703            let scalar_type = resolve_pg_type_to_scalar_type(scx, ty)?;
704            match scalar_type {
705                SqlScalarType::VarChar { max_length } => CastFunc::CastStringToVarChar {
706                    length: max_length,
707                    fail_on_len: true,
708                },
709                _ => unreachable!("VarChar must resolve to VarChar"),
710            }
711        }
712        Type::Time { .. } => {
713            // Time precision is not yet fully supported; resolve_type strips precision.
714            CastFunc::CastStringToTime
715        }
716        Type::Timestamp { .. } => {
717            // Resolve through the catalog to get the repr TimestampPrecision type.
718            let scalar_type = resolve_pg_type_to_scalar_type(scx, ty)?;
719            match scalar_type {
720                SqlScalarType::Timestamp { precision } => {
721                    CastFunc::CastStringToTimestamp(precision)
722                }
723                _ => unreachable!("Timestamp must resolve to Timestamp"),
724            }
725        }
726        Type::TimestampTz { .. } => {
727            // Resolve through the catalog to get the repr TimestampPrecision type.
728            let scalar_type = resolve_pg_type_to_scalar_type(scx, ty)?;
729            match scalar_type {
730                SqlScalarType::TimestampTz { precision } => {
731                    CastFunc::CastStringToTimestampTz(precision)
732                }
733                _ => unreachable!("TimestampTz must resolve to TimestampTz"),
734            }
735        }
736        Type::Uuid => CastFunc::CastStringToUuid,
737        Type::Int2Vector => CastFunc::CastStringToInt2Vector,
738        Type::MzTimestamp => CastFunc::CastStringToMzTimestamp,
739        // JSON is ingested as JSONB (same as the old plan_cast path).
740        Type::Json => CastFunc::CastStringToJsonb,
741        Type::Array(elem) => {
742            let return_ty = resolve_pg_type_to_scalar_type(scx, ty)?;
743            let elem_cast = build_element_cast_expr(scx, elem)?;
744            CastFunc::CastStringToArray {
745                return_ty,
746                cast_expr: Box::new(elem_cast),
747            }
748        }
749        Type::List(elem) => {
750            let return_ty = resolve_pg_type_to_scalar_type(scx, ty)?;
751            let elem_cast = build_element_cast_expr(scx, elem)?;
752            CastFunc::CastStringToList {
753                return_ty,
754                cast_expr: Box::new(elem_cast),
755            }
756        }
757        Type::Map { value_type } => {
758            let return_ty = resolve_pg_type_to_scalar_type(scx, ty)?;
759            let value_cast = build_element_cast_expr(scx, value_type)?;
760            CastFunc::CastStringToMap {
761                return_ty,
762                cast_expr: Box::new(value_cast),
763            }
764        }
765        Type::Range { element_type } => {
766            let return_ty = resolve_pg_type_to_scalar_type(scx, ty)?;
767            let elem_cast = build_element_cast_expr(scx, element_type)?;
768            CastFunc::CastStringToRange {
769                return_ty,
770                cast_expr: Box::new(elem_cast),
771            }
772        }
773        // reg* types require subquery-based casts that storage cannot
774        // evaluate. Users can ingest them as text via TEXT COLUMNS.
775        Type::RegType | Type::RegClass | Type::RegProc => {
776            return Err(PlanError::TableContainsUningestableTypes {
777                name: String::new(),
778                type_: ty.name().to_string(),
779                column: String::new(),
780            });
781        }
782        other => {
783            return Err(PlanError::TableContainsUningestableTypes {
784                name: String::new(),
785                type_: other.name().to_string(),
786                column: String::new(),
787            });
788        }
789    };
790    Ok(Some(cast_func))
791}
792
793/// Build the element cast expression for container types (Array, List, Map,
794/// Range). The element expression operates on a single-column input row
795/// containing the text-encoded element at column 0.
796fn build_element_cast_expr(
797    scx: &StatementContext,
798    elem_ty: &mz_pgrepr::Type,
799) -> Result<StorageScalarExpr, PlanError> {
800    match pg_type_to_cast_func(scx, elem_ty)? {
801        None => Ok(StorageScalarExpr::Column(0)),
802        Some(cast_func) => Ok(StorageScalarExpr::CallUnary(
803            cast_func,
804            Box::new(StorageScalarExpr::Column(0)),
805        )),
806    }
807}
808
809mod privileges {
810    use mz_postgres_util::{PostgresError, query, sql};
811
812    use super::*;
813    use crate::plan::PlanError;
814    use crate::pure::PgSourcePurificationError;
815
816    async fn check_schema_privileges(client: &Client, table_oids: &[Oid]) -> Result<(), PlanError> {
817        let invalid_schema_privileges_rows = query(
818            client,
819            sql!(
820                "
821                WITH distinct_namespace AS (
822                    SELECT
823                        DISTINCT n.oid, n.nspname AS schema_name
824                    FROM unnest($1::OID[]) AS oids (oid)
825                    JOIN pg_class AS c ON c.oid = oids.oid
826                    JOIN pg_namespace AS n ON c.relnamespace = n.oid
827                )
828                SELECT d.schema_name
829                FROM distinct_namespace AS d
830                WHERE
831                    NOT has_schema_privilege(CURRENT_USER::TEXT, d.oid, 'usage')"
832            ),
833            &[&table_oids],
834        )
835        .await?;
836
837        let mut invalid_schema_privileges = invalid_schema_privileges_rows
838            .into_iter()
839            .map(|row| row.get("schema_name"))
840            .collect::<Vec<String>>();
841
842        if invalid_schema_privileges.is_empty() {
843            Ok(())
844        } else {
845            invalid_schema_privileges.sort();
846            Err(PgSourcePurificationError::UserLacksUsageOnSchemas {
847                schemas: invalid_schema_privileges,
848            })?
849        }
850    }
851
852    /// Ensure that the user specified in `config` has:
853    ///
854    /// -`SELECT` privileges for the identified `tables`.
855    ///
856    ///  `tables`'s elements should be of the structure `[<schema name>, <table name>]`.
857    ///
858    /// - `USAGE` privileges on the schemas references in `tables`.
859    ///
860    /// # Panics
861    /// If `config` does not specify a user.
862    pub async fn check_table_privileges(
863        client: &Client,
864        table_oids: &[Oid],
865    ) -> Result<(), PlanError> {
866        check_schema_privileges(client, table_oids).await?;
867
868        let invalid_table_privileges_rows = query(
869            client,
870            sql!(
871                "
872            SELECT
873                format('%I.%I', n.nspname, c.relname) AS schema_qualified_table_name
874             FROM unnest($1::oid[]) AS oids (oid)
875             JOIN
876                 pg_class c ON c.oid = oids.oid
877             JOIN
878                 pg_namespace n ON c.relnamespace = n.oid
879             WHERE NOT has_table_privilege(CURRENT_USER::text, c.oid, 'select')"
880            ),
881            &[&table_oids],
882        )
883        .await?;
884
885        let mut invalid_table_privileges = invalid_table_privileges_rows
886            .into_iter()
887            .map(|row| row.get("schema_qualified_table_name"))
888            .collect::<Vec<String>>();
889
890        if invalid_table_privileges.is_empty() {
891            Ok(())
892        } else {
893            invalid_table_privileges.sort();
894            Err(PgSourcePurificationError::UserLacksSelectOnTables {
895                tables: invalid_table_privileges,
896            })?
897        }
898    }
899
900    /// Ensure that the user specified in `config` can read data from tables if row level security
901    /// (RLS) is enabled. If the user/role does not have the BYPASSRLS attribute set, there is
902    /// the possibility that MZ may not be able to read all data during the snapshot, which would
903    /// result in missing data.
904    pub async fn check_rls_privileges(
905        client: &Client,
906        table_oids: &[Oid],
907    ) -> Result<(), PlanError> {
908        match mz_postgres_util::validate_no_rls_policies(client, table_oids).await {
909            Ok(_) => Ok(()),
910            Err(err) => match err {
911                // This is a little gross to do, but PlanError::PostgresConnectionErr implements
912                // From<PostgresError>, and the error in that case would be
913                // "failed to connect to PostgreSQL database", which doesn't make any sense.
914                PostgresError::BypassRLSRequired(tables) => {
915                    Err(PgSourcePurificationError::BypassRLSRequired { tables })?
916                }
917                _ => Err(err)?,
918            },
919        }
920    }
921}
922
923mod replica_identity {
924    use mz_postgres_util::{query, sql};
925
926    use super::*;
927    use crate::plan::PlanError;
928    use crate::pure::PgSourcePurificationError;
929
930    /// Ensures that all provided OIDs are tables with `REPLICA IDENTITY FULL`.
931    pub async fn check_replica_identity_full(
932        client: &Client,
933        table_oids: &[Oid],
934    ) -> Result<(), PlanError> {
935        let invalid_replica_identity_rows = query(
936            client,
937            sql!(
938                "
939            SELECT
940                format('%I.%I', n.nspname, c.relname) AS schema_qualified_table_name
941             FROM unnest($1::oid[]) AS oids (oid)
942             JOIN
943                 pg_class c ON c.oid = oids.oid
944             JOIN
945                 pg_namespace n ON c.relnamespace = n.oid
946             WHERE relreplident != 'f' OR relreplident IS NULL;"
947            ),
948            &[&table_oids],
949        )
950        .await?;
951
952        let mut invalid_replica_identity = invalid_replica_identity_rows
953            .into_iter()
954            .map(|row| row.get("schema_qualified_table_name"))
955            .collect::<Vec<String>>();
956
957        if invalid_replica_identity.is_empty() {
958            Ok(())
959        } else {
960            invalid_replica_identity.sort();
961            Err(PgSourcePurificationError::NotTablesWReplicaIdentityFull {
962                items: invalid_replica_identity,
963            })?
964        }
965    }
966}