1use std::collections::{BTreeMap, BTreeSet};
13
14use mz_expr::MirScalarExpr;
15use mz_expr::func::variadic::ErrorIfNull;
16use mz_postgres_util::desc::PostgresTableDesc;
17use mz_proto::RustType;
18use mz_repr::{SqlColumnType, SqlRelationType, SqlScalarType};
19use mz_sql_parser::ast::display::AstDisplay;
20use mz_sql_parser::ast::{
21 ColumnDef, CreateSubsourceOption, CreateSubsourceOptionName, CreateSubsourceStatement,
22 ExternalReferences, Ident, PgConfigOptionName, TableConstraint, UnresolvedItemName, Value,
23 WithOptionValue,
24};
25use mz_storage_types::sources::SourceExportStatementDetails;
26use mz_storage_types::sources::postgres::CastType;
27use prost::Message;
28use tokio_postgres::Client;
29use tokio_postgres::types::Oid;
30
31use crate::names::{Aug, ResolvedItemName};
32use crate::normalize;
33use crate::plan::hir::ColumnRef;
34use crate::plan::typeconv::{CastContext, plan_cast};
35use crate::plan::{
36 ExprContext, HirScalarExpr, PlanError, QueryContext, QueryLifetime, Scope, StatementContext,
37};
38
39use super::error::PgSourcePurificationError;
40use super::references::RetrievedSourceReferences;
41use super::{PartialItemName, PurifiedExportDetails, PurifiedSourceExport, SourceReferencePolicy};
42
43pub(super) async fn validate_requested_references_privileges(
47 client: &Client,
48 table_oids: &[Oid],
49) -> Result<(), PlanError> {
50 privileges::check_table_privileges(client, table_oids).await?;
51 privileges::check_rls_privileges(client, table_oids).await?;
52 replica_identity::check_replica_identity_full(client, table_oids).await?;
53
54 Ok(())
55}
56
57pub(super) fn map_column_refs(
62 retrieved_references: &RetrievedSourceReferences,
63 columns: &mut [UnresolvedItemName],
64 option_type: PgConfigOptionName,
65) -> Result<BTreeMap<u32, BTreeSet<String>>, PlanError> {
66 let mut cols_map: BTreeMap<u32, BTreeSet<String>> = BTreeMap::new();
67
68 for name in columns {
69 let (qual, col) = match name.0.split_last().expect("must have at least one element") {
70 (col, qual) if qual.is_empty() => {
71 return Err(PlanError::InvalidOptionValue {
72 option_name: option_type.to_ast_string_simple(),
73 err: Box::new(PlanError::UnderqualifiedColumnName(
74 col.as_str().to_string(),
75 )),
76 });
77 }
78 (col, qual) => (qual.to_vec(), col.as_str().to_string()),
79 };
80
81 let resolved_reference = retrieved_references.resolve_name(&qual)?;
82 let mut fully_qualified_name =
83 resolved_reference
84 .external_reference()
85 .map_err(|e| PlanError::InvalidOptionValue {
86 option_name: option_type.to_ast_string_simple(),
87 err: Box::new(e.into()),
88 })?;
89
90 let desc = resolved_reference
91 .postgres_desc()
92 .expect("known to be postgres");
93
94 if !desc.columns.iter().any(|column| column.name == col) {
95 let column = mz_repr::ColumnName::from(col);
96 let similar = desc
97 .columns
98 .iter()
99 .filter_map(|c| {
100 let c_name = mz_repr::ColumnName::from(c.name.clone());
101 c_name.is_similar(&column).then_some(c_name)
102 })
103 .collect();
104 return Err(PlanError::InvalidOptionValue {
105 option_name: option_type.to_ast_string_simple(),
106 err: Box::new(PlanError::UnknownColumn {
107 table: Some(
108 normalize::unresolved_item_name(fully_qualified_name)
109 .expect("known to be of valid len"),
110 ),
111 column,
112 similar,
113 }),
114 });
115 }
116
117 let col_ident = Ident::new(col.as_str().to_string())?;
119 fully_qualified_name.0.push(col_ident);
120 *name = fully_qualified_name;
121
122 let new = cols_map
123 .entry(desc.oid)
124 .or_default()
125 .insert(col.as_str().to_string());
126
127 if !new {
128 return Err(PlanError::InvalidOptionValue {
129 option_name: option_type.to_ast_string_simple(),
130 err: Box::new(PlanError::UnexpectedDuplicateReference { name: name.clone() }),
131 });
132 }
133 }
134
135 Ok(cols_map)
136}
137
138pub fn generate_create_subsource_statements(
139 scx: &StatementContext,
140 source_name: ResolvedItemName,
141 requested_subsources: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
142) -> Result<Vec<CreateSubsourceStatement<Aug>>, PlanError> {
143 let mut unsupported_cols = vec![];
145
146 let mut subsources = Vec::with_capacity(requested_subsources.len());
148
149 for (subsource_name, purified_export) in requested_subsources {
150 let PostgresExportStatementValues {
151 columns,
152 constraints,
153 text_columns,
154 exclude_columns,
155 details,
156 external_reference,
157 } = generate_source_export_statement_values(scx, purified_export, &mut unsupported_cols)?;
158
159 let mut with_options = vec![
160 CreateSubsourceOption {
161 name: CreateSubsourceOptionName::ExternalReference,
162 value: Some(WithOptionValue::UnresolvedItemName(external_reference)),
163 },
164 CreateSubsourceOption {
165 name: CreateSubsourceOptionName::Details,
166 value: Some(WithOptionValue::Value(Value::String(hex::encode(
167 details.into_proto().encode_to_vec(),
168 )))),
169 },
170 ];
171
172 if let Some(text_columns) = text_columns {
173 with_options.push(CreateSubsourceOption {
174 name: CreateSubsourceOptionName::TextColumns,
175 value: Some(WithOptionValue::Sequence(text_columns)),
176 });
177 }
178
179 if let Some(exclude_columns) = exclude_columns {
180 with_options.push(CreateSubsourceOption {
181 name: CreateSubsourceOptionName::ExcludeColumns,
182 value: Some(WithOptionValue::Sequence(exclude_columns)),
183 });
184 }
185
186 let subsource = CreateSubsourceStatement {
188 name: subsource_name,
189 columns,
190 of_source: Some(source_name.clone()),
193 constraints,
202 if_not_exists: false,
203 with_options,
204 };
205 subsources.push(subsource);
206 }
207
208 if !unsupported_cols.is_empty() {
209 unsupported_cols.sort();
210 Err(PgSourcePurificationError::UnrecognizedTypes {
211 cols: unsupported_cols,
212 })?;
213 }
214
215 Ok(subsources)
216}
217
218pub(super) struct PostgresExportStatementValues {
219 pub(super) columns: Vec<ColumnDef<Aug>>,
220 pub(super) constraints: Vec<TableConstraint<Aug>>,
221 pub(super) text_columns: Option<Vec<WithOptionValue<Aug>>>,
222 pub(super) exclude_columns: Option<Vec<WithOptionValue<Aug>>>,
223 pub(super) details: SourceExportStatementDetails,
224 pub(super) external_reference: UnresolvedItemName,
225}
226
227pub(super) fn generate_source_export_statement_values(
228 scx: &StatementContext,
229 purified_export: PurifiedSourceExport,
230 unsupported_cols: &mut Vec<(String, mz_repr::adt::system::Oid)>,
231) -> Result<PostgresExportStatementValues, PlanError> {
232 let PurifiedExportDetails::Postgres {
233 table,
234 text_columns,
235 exclude_columns,
236 } = purified_export.details
237 else {
238 unreachable!("purified export details must be postgres")
239 };
240
241 let text_column_set = BTreeSet::from_iter(text_columns.iter().flatten().map(Ident::as_str));
242 let exclude_column_set =
243 BTreeSet::from_iter(exclude_columns.iter().flatten().map(Ident::as_str));
244
245 let mut columns = vec![];
247 for c in table.columns.iter() {
248 let name = Ident::new(c.name.clone())?;
249
250 if exclude_column_set.contains(c.name.as_str()) {
251 continue;
252 }
253
254 let ty = if text_column_set.contains(c.name.as_str()) {
255 mz_pgrepr::Type::Text
256 } else {
257 match mz_pgrepr::Type::from_oid_and_typmod(c.type_oid, c.type_mod) {
258 Ok(t) => t,
259 Err(_) => {
260 let mut full_name = purified_export.external_reference.0.clone();
261 full_name.push(name);
262 unsupported_cols.push((
263 UnresolvedItemName(full_name).to_ast_string_simple(),
264 mz_repr::adt::system::Oid(c.type_oid),
265 ));
266 continue;
267 }
268 }
269 };
270
271 let data_type = scx.resolve_type(ty)?;
272 let mut options = vec![];
273
274 if !c.nullable {
275 options.push(mz_sql_parser::ast::ColumnOptionDef {
276 name: None,
277 option: mz_sql_parser::ast::ColumnOption::NotNull,
278 });
279 }
280
281 columns.push(ColumnDef {
282 name,
283 data_type,
284 collation: None,
285 options,
286 });
287 }
288
289 let mut constraints = vec![];
290 for key in table.keys.clone() {
291 let mut key_columns = vec![];
292
293 for col_num in key.cols {
294 let ident = Ident::new(
295 table
296 .columns
297 .iter()
298 .find(|col| col.col_num == col_num)
299 .expect("key exists as column")
300 .name
301 .clone(),
302 )?;
303 key_columns.push(ident);
304 }
305
306 let constraint = mz_sql_parser::ast::TableConstraint::Unique {
307 name: Some(Ident::new(key.name)?),
308 columns: key_columns,
309 is_primary: key.is_primary,
310 nulls_not_distinct: key.nulls_not_distinct,
311 };
312
313 if key.is_primary {
315 constraints.insert(0, constraint);
316 } else {
317 constraints.push(constraint);
318 }
319 }
320 let details = SourceExportStatementDetails::Postgres { table };
321
322 let text_columns = text_columns.map(|mut columns| {
323 columns.sort();
324 columns
325 .into_iter()
326 .map(WithOptionValue::Ident::<Aug>)
327 .collect()
328 });
329
330 let exclude_columns = exclude_columns.map(|mut columns| {
331 columns.sort();
332 columns
333 .into_iter()
334 .map(WithOptionValue::Ident::<Aug>)
335 .collect()
336 });
337
338 Ok(PostgresExportStatementValues {
339 columns,
340 constraints,
341 text_columns,
342 exclude_columns,
343 details,
344 external_reference: purified_export.external_reference,
345 })
346}
347
348pub(super) struct PurifiedSourceExports {
349 pub(super) source_exports: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
350 pub(super) normalized_text_columns: Vec<WithOptionValue<Aug>>,
357}
358
359pub(super) async fn purify_source_exports(
363 client: &Client,
364 retrieved_references: &RetrievedSourceReferences,
365 requested_references: &Option<ExternalReferences>,
366 mut text_columns: Vec<UnresolvedItemName>,
367 mut exclude_columns: Vec<UnresolvedItemName>,
368 unresolved_source_name: &UnresolvedItemName,
369 reference_policy: &SourceReferencePolicy,
370) -> Result<PurifiedSourceExports, PlanError> {
371 let requested_exports = match requested_references.as_ref() {
372 Some(requested) if matches!(reference_policy, SourceReferencePolicy::NotAllowed) => {
373 Err(PlanError::UseTablesForSources(requested.to_string()))?
374 }
375 Some(requested) => retrieved_references
376 .requested_source_exports(Some(requested), unresolved_source_name)?,
377 None => {
378 if matches!(reference_policy, SourceReferencePolicy::Required) {
379 Err(PgSourcePurificationError::RequiresExternalReferences)?
380 }
381
382 if !text_columns.is_empty() {
385 Err(
386 PgSourcePurificationError::UnnecessaryOptionsWithoutReferences(
387 "TEXT COLUMNS".to_string(),
388 ),
389 )?
390 }
391
392 if !exclude_columns.is_empty() {
395 Err(
396 PgSourcePurificationError::UnnecessaryOptionsWithoutReferences(
397 "EXCLUDE COLUMNS".to_string(),
398 ),
399 )?
400 }
401
402 return Ok(PurifiedSourceExports {
403 source_exports: BTreeMap::new(),
404 normalized_text_columns: vec![],
405 });
406 }
407 };
408
409 if requested_exports.is_empty() {
410 sql_bail!(
411 "[internal error]: Postgres reference {} did not match any tables",
412 requested_references
413 .as_ref()
414 .unwrap()
415 .to_ast_string_simple()
416 );
417 }
418
419 super::validate_source_export_names(&requested_exports)?;
420
421 let table_oids: Vec<_> = requested_exports
422 .iter()
423 .map(|r| r.meta.postgres_desc().expect("is postgres").oid)
424 .collect();
425
426 validate_requested_references_privileges(client, &table_oids).await?;
427
428 let mut text_column_map = map_column_refs(
429 retrieved_references,
430 &mut text_columns,
431 PgConfigOptionName::TextColumns,
432 )?;
433 let mut exclude_column_map = map_column_refs(
434 retrieved_references,
435 &mut exclude_columns,
436 PgConfigOptionName::ExcludeColumns,
437 )?;
438
439 text_columns.sort();
441 text_columns.dedup();
442 let normalized_text_columns: Vec<_> = text_columns
443 .into_iter()
444 .map(WithOptionValue::UnresolvedItemName)
445 .collect();
446
447 let source_exports = requested_exports
448 .into_iter()
449 .map(|r| {
450 let mut desc = r.meta.postgres_desc().expect("known postgres").clone();
451 let text_columns = text_column_map.remove(&desc.oid);
452 let exclude_columns = exclude_column_map.remove(&desc.oid);
453
454 if let Some(exclude_cols) = &exclude_columns {
455 desc.columns.retain(|c| !exclude_cols.contains(&c.name));
456 }
457
458 if let (Some(text_cols), Some(exclude_cols)) = (&text_columns, &exclude_columns) {
459 let intersection: Vec<_> = text_cols.intersection(exclude_cols).collect();
460 if !intersection.is_empty() {
461 return Err(PgSourcePurificationError::DuplicatedColumnNames(
462 intersection.iter().map(|s| (*s).to_string()).collect(),
463 ));
464 }
465 }
466 Ok((
467 r.name,
468 PurifiedSourceExport {
469 external_reference: r.external_reference,
470 details: PurifiedExportDetails::Postgres {
471 text_columns: text_columns.map(|v| {
472 v.into_iter()
473 .map(|s| Ident::new(s).expect("validated above"))
474 .collect()
475 }),
476 exclude_columns: exclude_columns.map(|v| {
477 v.into_iter()
478 .map(|s| Ident::new(s).expect("validated above"))
479 .collect()
480 }),
481 table: desc,
482 },
483 },
484 ))
485 })
486 .collect::<Result<BTreeMap<_, _>, _>>()?;
487
488 if !text_column_map.is_empty() {
489 let mut dangling_text_column_refs = vec![];
492 let all_references = retrieved_references.all_references();
493
494 for id in text_column_map.keys() {
495 let desc = all_references
496 .iter()
497 .find_map(|reference| {
498 let desc = reference.postgres_desc().expect("is postgres");
499 if desc.oid == *id { Some(desc) } else { None }
500 })
501 .expect("validated when generating text columns");
502
503 dangling_text_column_refs.push(PartialItemName {
504 database: None,
505 schema: Some(desc.namespace.clone()),
506 item: desc.name.clone(),
507 });
508 }
509
510 dangling_text_column_refs.sort();
511 return Err(PlanError::from(
512 PgSourcePurificationError::DanglingTextColumns {
513 items: dangling_text_column_refs,
514 },
515 ));
516 }
517
518 if !exclude_column_map.is_empty() {
519 let mut dangling_exclude_column_refs = vec![];
522 let all_references = retrieved_references.all_references();
523
524 for id in exclude_column_map.keys() {
525 let desc = all_references
526 .iter()
527 .find_map(|reference| {
528 let desc = reference.postgres_desc().expect("is postgres");
529 if desc.oid == *id { Some(desc) } else { None }
530 })
531 .expect("validated when generating exclude columns");
532
533 dangling_exclude_column_refs.push(PartialItemName {
534 database: None,
535 schema: Some(desc.namespace.clone()),
536 item: desc.name.clone(),
537 });
538 }
539
540 dangling_exclude_column_refs.sort();
541 return Err(PlanError::from(
542 PgSourcePurificationError::DanglingExcludeColumns {
543 items: dangling_exclude_column_refs,
544 },
545 ));
546 }
547
548 Ok(PurifiedSourceExports {
549 source_exports,
550 normalized_text_columns,
551 })
552}
553
554pub(crate) fn generate_column_casts(
555 scx: &StatementContext,
556 table: &PostgresTableDesc,
557 text_columns: &Vec<Ident>,
558) -> Result<Vec<(CastType, MirScalarExpr)>, PlanError> {
559 let mut cast_scx = scx.clone();
566 cast_scx.param_types = Default::default();
567 let cast_qcx = QueryContext::root(&cast_scx, QueryLifetime::Source);
568 let mut column_types = vec![];
569 for column in table.columns.iter() {
570 column_types.push(SqlColumnType {
571 nullable: column.nullable,
572 scalar_type: SqlScalarType::String,
573 });
574 }
575
576 let cast_ecx = ExprContext {
577 qcx: &cast_qcx,
578 name: "plan_postgres_source_cast",
579 scope: &Scope::empty(),
580 relation_type: &SqlRelationType {
581 column_types,
582 keys: vec![],
583 },
584 allow_aggregates: false,
585 allow_subqueries: true,
592 allow_parameters: false,
593 allow_windows: false,
594 };
595
596 let text_columns = BTreeSet::from_iter(text_columns.iter().map(Ident::as_str));
597
598 let mut table_cast = vec![];
601 for (i, column) in table.columns.iter().enumerate() {
602 let (cast_type, ty) = if text_columns.contains(column.name.as_str()) {
603 (CastType::Text, mz_pgrepr::Type::Text)
609 } else {
610 match mz_pgrepr::Type::from_oid_and_typmod(column.type_oid, column.type_mod) {
611 Ok(t) => (CastType::Natural, t),
612 Err(_) => {
617 table_cast.push((
618 CastType::Natural,
619 HirScalarExpr::call_variadic(
620 ErrorIfNull,
621 vec![
622 HirScalarExpr::literal_null(SqlScalarType::String),
623 HirScalarExpr::literal(
624 mz_repr::Datum::from(
625 format!("Unsupported type with OID {}", column.type_oid)
626 .as_str(),
627 ),
628 SqlScalarType::String,
629 ),
630 ],
631 )
632 .lower_uncorrelated(scx.catalog.system_vars())
633 .expect("no correlation"),
634 ));
635 continue;
636 }
637 }
638 };
639
640 let data_type = scx.resolve_type(ty)?;
641 let scalar_type = crate::plan::query::scalar_type_from_sql(scx, &data_type)?;
642
643 let col_expr = HirScalarExpr::unnamed_column(ColumnRef {
644 level: 0,
645 column: i,
646 });
647
648 let cast_expr = plan_cast(&cast_ecx, CastContext::Explicit, col_expr, &scalar_type)?;
649
650 let cast = if column.nullable {
651 cast_expr
652 } else {
653 let message = format!(
659 "PG column {}.{}.{} contained NULL data, despite having NOT NULL constraint",
660 table.namespace, table.name, column.name
661 );
662 let exprs = vec![
663 cast_expr,
664 HirScalarExpr::literal(
665 mz_repr::Datum::from(message.as_str()),
666 SqlScalarType::String,
667 ),
668 ];
669 HirScalarExpr::call_variadic(ErrorIfNull, exprs)
670 };
671
672 let mir_cast = cast
678 .lower_uncorrelated(scx.catalog.system_vars())
679 .map_err(|_e| {
680 tracing::info!(
683 "cannot ingest {:?} data from PG source because cast is correlated",
684 scalar_type
685 );
686
687 PlanError::TableContainsUningestableTypes {
688 name: table.name.to_string(),
689 type_: scx.humanize_scalar_type(&scalar_type, false),
690 column: column.name.to_string(),
691 }
692 })?;
693
694 table_cast.push((cast_type, mir_cast));
695 }
696 Ok(table_cast)
697}
698
699mod privileges {
700 use mz_postgres_util::PostgresError;
701
702 use super::*;
703 use crate::plan::PlanError;
704 use crate::pure::PgSourcePurificationError;
705
706 async fn check_schema_privileges(client: &Client, table_oids: &[Oid]) -> Result<(), PlanError> {
707 let invalid_schema_privileges_rows = client
708 .query(
709 "
710 WITH distinct_namespace AS (
711 SELECT
712 DISTINCT n.oid, n.nspname AS schema_name
713 FROM unnest($1::OID[]) AS oids (oid)
714 JOIN pg_class AS c ON c.oid = oids.oid
715 JOIN pg_namespace AS n ON c.relnamespace = n.oid
716 )
717 SELECT d.schema_name
718 FROM distinct_namespace AS d
719 WHERE
720 NOT has_schema_privilege(CURRENT_USER::TEXT, d.oid, 'usage')",
721 &[&table_oids],
722 )
723 .await
724 .map_err(PostgresError::from)?;
725
726 let mut invalid_schema_privileges = invalid_schema_privileges_rows
727 .into_iter()
728 .map(|row| row.get("schema_name"))
729 .collect::<Vec<String>>();
730
731 if invalid_schema_privileges.is_empty() {
732 Ok(())
733 } else {
734 invalid_schema_privileges.sort();
735 Err(PgSourcePurificationError::UserLacksUsageOnSchemas {
736 schemas: invalid_schema_privileges,
737 })?
738 }
739 }
740
741 pub async fn check_table_privileges(
752 client: &Client,
753 table_oids: &[Oid],
754 ) -> Result<(), PlanError> {
755 check_schema_privileges(client, table_oids).await?;
756
757 let invalid_table_privileges_rows = client
758 .query(
759 "
760 SELECT
761 format('%I.%I', n.nspname, c.relname) AS schema_qualified_table_name
762 FROM unnest($1::oid[]) AS oids (oid)
763 JOIN
764 pg_class c ON c.oid = oids.oid
765 JOIN
766 pg_namespace n ON c.relnamespace = n.oid
767 WHERE NOT has_table_privilege(CURRENT_USER::text, c.oid, 'select')",
768 &[&table_oids],
769 )
770 .await
771 .map_err(PostgresError::from)?;
772
773 let mut invalid_table_privileges = invalid_table_privileges_rows
774 .into_iter()
775 .map(|row| row.get("schema_qualified_table_name"))
776 .collect::<Vec<String>>();
777
778 if invalid_table_privileges.is_empty() {
779 Ok(())
780 } else {
781 invalid_table_privileges.sort();
782 Err(PgSourcePurificationError::UserLacksSelectOnTables {
783 tables: invalid_table_privileges,
784 })?
785 }
786 }
787
788 pub async fn check_rls_privileges(
793 client: &Client,
794 table_oids: &[Oid],
795 ) -> Result<(), PlanError> {
796 match mz_postgres_util::validate_no_rls_policies(client, table_oids).await {
797 Ok(_) => Ok(()),
798 Err(err) => match err {
799 PostgresError::BypassRLSRequired(tables) => {
803 Err(PgSourcePurificationError::BypassRLSRequired { tables })?
804 }
805 _ => Err(err)?,
806 },
807 }
808 }
809}
810
811mod replica_identity {
812 use mz_postgres_util::PostgresError;
813
814 use super::*;
815 use crate::plan::PlanError;
816 use crate::pure::PgSourcePurificationError;
817
818 pub async fn check_replica_identity_full(
820 client: &Client,
821 table_oids: &[Oid],
822 ) -> Result<(), PlanError> {
823 let invalid_replica_identity_rows = client
824 .query(
825 "
826 SELECT
827 format('%I.%I', n.nspname, c.relname) AS schema_qualified_table_name
828 FROM unnest($1::oid[]) AS oids (oid)
829 JOIN
830 pg_class c ON c.oid = oids.oid
831 JOIN
832 pg_namespace n ON c.relnamespace = n.oid
833 WHERE relreplident != 'f' OR relreplident IS NULL;",
834 &[&table_oids],
835 )
836 .await
837 .map_err(PostgresError::from)?;
838
839 let mut invalid_replica_identity = invalid_replica_identity_rows
840 .into_iter()
841 .map(|row| row.get("schema_qualified_table_name"))
842 .collect::<Vec<String>>();
843
844 if invalid_replica_identity.is_empty() {
845 Ok(())
846 } else {
847 invalid_replica_identity.sort();
848 Err(PgSourcePurificationError::NotTablesWReplicaIdentityFull {
849 items: invalid_replica_identity,
850 })?
851 }
852 }
853}