1use std::collections::{BTreeMap, BTreeSet};
13
14use mz_expr::MirScalarExpr;
15use mz_postgres_util::desc::PostgresTableDesc;
16use mz_proto::RustType;
17use mz_repr::{SqlColumnType, SqlRelationType, SqlScalarType};
18use mz_sql_parser::ast::display::AstDisplay;
19use mz_sql_parser::ast::{
20 ColumnDef, CreateSubsourceOption, CreateSubsourceOptionName, CreateSubsourceStatement,
21 ExternalReferences, Ident, PgConfigOptionName, TableConstraint, UnresolvedItemName, Value,
22 WithOptionValue,
23};
24use mz_storage_types::sources::SourceExportStatementDetails;
25use mz_storage_types::sources::postgres::CastType;
26use prost::Message;
27use tokio_postgres::Client;
28use tokio_postgres::types::Oid;
29
30use crate::names::{Aug, ResolvedItemName};
31use crate::normalize;
32use crate::plan::hir::ColumnRef;
33use crate::plan::typeconv::{CastContext, plan_cast};
34use crate::plan::{
35 ExprContext, HirScalarExpr, PlanError, QueryContext, QueryLifetime, Scope, StatementContext,
36};
37
38use super::error::PgSourcePurificationError;
39use super::references::RetrievedSourceReferences;
40use super::{PartialItemName, PurifiedExportDetails, PurifiedSourceExport, SourceReferencePolicy};
41
42pub(super) async fn validate_requested_references_privileges(
46 client: &Client,
47 table_oids: &[Oid],
48) -> Result<(), PlanError> {
49 privileges::check_table_privileges(client, table_oids).await?;
50 privileges::check_rls_privileges(client, table_oids).await?;
51 replica_identity::check_replica_identity_full(client, table_oids).await?;
52
53 Ok(())
54}
55
56pub(super) fn map_column_refs(
61 retrieved_references: &RetrievedSourceReferences,
62 columns: &mut [UnresolvedItemName],
63 option_type: PgConfigOptionName,
64) -> Result<BTreeMap<u32, BTreeSet<String>>, PlanError> {
65 let mut cols_map: BTreeMap<u32, BTreeSet<String>> = BTreeMap::new();
66
67 for name in columns {
68 let (qual, col) = match name.0.split_last().expect("must have at least one element") {
69 (col, qual) if qual.is_empty() => {
70 return Err(PlanError::InvalidOptionValue {
71 option_name: option_type.to_ast_string_simple(),
72 err: Box::new(PlanError::UnderqualifiedColumnName(
73 col.as_str().to_string(),
74 )),
75 });
76 }
77 (col, qual) => (qual.to_vec(), col.as_str().to_string()),
78 };
79
80 let resolved_reference = retrieved_references.resolve_name(&qual)?;
81 let mut fully_qualified_name =
82 resolved_reference
83 .external_reference()
84 .map_err(|e| PlanError::InvalidOptionValue {
85 option_name: option_type.to_ast_string_simple(),
86 err: Box::new(e.into()),
87 })?;
88
89 let desc = resolved_reference
90 .postgres_desc()
91 .expect("known to be postgres");
92
93 if !desc.columns.iter().any(|column| column.name == col) {
94 let column = mz_repr::ColumnName::from(col);
95 let similar = desc
96 .columns
97 .iter()
98 .filter_map(|c| {
99 let c_name = mz_repr::ColumnName::from(c.name.clone());
100 c_name.is_similar(&column).then_some(c_name)
101 })
102 .collect();
103 return Err(PlanError::InvalidOptionValue {
104 option_name: option_type.to_ast_string_simple(),
105 err: Box::new(PlanError::UnknownColumn {
106 table: Some(
107 normalize::unresolved_item_name(fully_qualified_name)
108 .expect("known to be of valid len"),
109 ),
110 column,
111 similar,
112 }),
113 });
114 }
115
116 let col_ident = Ident::new(col.as_str().to_string())?;
118 fully_qualified_name.0.push(col_ident);
119 *name = fully_qualified_name;
120
121 let new = cols_map
122 .entry(desc.oid)
123 .or_default()
124 .insert(col.as_str().to_string());
125
126 if !new {
127 return Err(PlanError::InvalidOptionValue {
128 option_name: option_type.to_ast_string_simple(),
129 err: Box::new(PlanError::UnexpectedDuplicateReference { name: name.clone() }),
130 });
131 }
132 }
133
134 Ok(cols_map)
135}
136
137pub fn generate_create_subsource_statements(
138 scx: &StatementContext,
139 source_name: ResolvedItemName,
140 requested_subsources: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
141) -> Result<Vec<CreateSubsourceStatement<Aug>>, PlanError> {
142 let mut unsupported_cols = vec![];
144
145 let mut subsources = Vec::with_capacity(requested_subsources.len());
147
148 for (subsource_name, purified_export) in requested_subsources {
149 let PostgresExportStatementValues {
150 columns,
151 constraints,
152 text_columns,
153 exclude_columns,
154 details,
155 external_reference,
156 } = generate_source_export_statement_values(scx, purified_export, &mut unsupported_cols)?;
157
158 let mut with_options = vec![
159 CreateSubsourceOption {
160 name: CreateSubsourceOptionName::ExternalReference,
161 value: Some(WithOptionValue::UnresolvedItemName(external_reference)),
162 },
163 CreateSubsourceOption {
164 name: CreateSubsourceOptionName::Details,
165 value: Some(WithOptionValue::Value(Value::String(hex::encode(
166 details.into_proto().encode_to_vec(),
167 )))),
168 },
169 ];
170
171 if let Some(text_columns) = text_columns {
172 with_options.push(CreateSubsourceOption {
173 name: CreateSubsourceOptionName::TextColumns,
174 value: Some(WithOptionValue::Sequence(text_columns)),
175 });
176 }
177
178 if let Some(exclude_columns) = exclude_columns {
179 with_options.push(CreateSubsourceOption {
180 name: CreateSubsourceOptionName::ExcludeColumns,
181 value: Some(WithOptionValue::Sequence(exclude_columns)),
182 });
183 }
184
185 let subsource = CreateSubsourceStatement {
187 name: subsource_name,
188 columns,
189 of_source: Some(source_name.clone()),
192 constraints,
201 if_not_exists: false,
202 with_options,
203 };
204 subsources.push(subsource);
205 }
206
207 if !unsupported_cols.is_empty() {
208 unsupported_cols.sort();
209 Err(PgSourcePurificationError::UnrecognizedTypes {
210 cols: unsupported_cols,
211 })?;
212 }
213
214 Ok(subsources)
215}
216
217pub(super) struct PostgresExportStatementValues {
218 pub(super) columns: Vec<ColumnDef<Aug>>,
219 pub(super) constraints: Vec<TableConstraint<Aug>>,
220 pub(super) text_columns: Option<Vec<WithOptionValue<Aug>>>,
221 pub(super) exclude_columns: Option<Vec<WithOptionValue<Aug>>>,
222 pub(super) details: SourceExportStatementDetails,
223 pub(super) external_reference: UnresolvedItemName,
224}
225
226pub(super) fn generate_source_export_statement_values(
227 scx: &StatementContext,
228 purified_export: PurifiedSourceExport,
229 unsupported_cols: &mut Vec<(String, mz_repr::adt::system::Oid)>,
230) -> Result<PostgresExportStatementValues, PlanError> {
231 let PurifiedExportDetails::Postgres {
232 table,
233 text_columns,
234 exclude_columns,
235 } = purified_export.details
236 else {
237 unreachable!("purified export details must be postgres")
238 };
239
240 let text_column_set = BTreeSet::from_iter(text_columns.iter().flatten().map(Ident::as_str));
241 let exclude_column_set =
242 BTreeSet::from_iter(exclude_columns.iter().flatten().map(Ident::as_str));
243
244 let mut columns = vec![];
246 for c in table.columns.iter() {
247 let name = Ident::new(c.name.clone())?;
248
249 if exclude_column_set.contains(c.name.as_str()) {
250 continue;
251 }
252
253 let ty = if text_column_set.contains(c.name.as_str()) {
254 mz_pgrepr::Type::Text
255 } else {
256 match mz_pgrepr::Type::from_oid_and_typmod(c.type_oid, c.type_mod) {
257 Ok(t) => t,
258 Err(_) => {
259 let mut full_name = purified_export.external_reference.0.clone();
260 full_name.push(name);
261 unsupported_cols.push((
262 UnresolvedItemName(full_name).to_ast_string_simple(),
263 mz_repr::adt::system::Oid(c.type_oid),
264 ));
265 continue;
266 }
267 }
268 };
269
270 let data_type = scx.resolve_type(ty)?;
271 let mut options = vec![];
272
273 if !c.nullable {
274 options.push(mz_sql_parser::ast::ColumnOptionDef {
275 name: None,
276 option: mz_sql_parser::ast::ColumnOption::NotNull,
277 });
278 }
279
280 columns.push(ColumnDef {
281 name,
282 data_type,
283 collation: None,
284 options,
285 });
286 }
287
288 let mut constraints = vec![];
289 for key in table.keys.clone() {
290 let mut key_columns = vec![];
291
292 for col_num in key.cols {
293 let ident = Ident::new(
294 table
295 .columns
296 .iter()
297 .find(|col| col.col_num == col_num)
298 .expect("key exists as column")
299 .name
300 .clone(),
301 )?;
302 key_columns.push(ident);
303 }
304
305 let constraint = mz_sql_parser::ast::TableConstraint::Unique {
306 name: Some(Ident::new(key.name)?),
307 columns: key_columns,
308 is_primary: key.is_primary,
309 nulls_not_distinct: key.nulls_not_distinct,
310 };
311
312 if key.is_primary {
314 constraints.insert(0, constraint);
315 } else {
316 constraints.push(constraint);
317 }
318 }
319 let details = SourceExportStatementDetails::Postgres { table };
320
321 let text_columns = text_columns.map(|mut columns| {
322 columns.sort();
323 columns
324 .into_iter()
325 .map(WithOptionValue::Ident::<Aug>)
326 .collect()
327 });
328
329 let exclude_columns = exclude_columns.map(|mut columns| {
330 columns.sort();
331 columns
332 .into_iter()
333 .map(WithOptionValue::Ident::<Aug>)
334 .collect()
335 });
336
337 Ok(PostgresExportStatementValues {
338 columns,
339 constraints,
340 text_columns,
341 exclude_columns,
342 details,
343 external_reference: purified_export.external_reference,
344 })
345}
346
347pub(super) struct PurifiedSourceExports {
348 pub(super) source_exports: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
349 pub(super) normalized_text_columns: Vec<WithOptionValue<Aug>>,
356}
357
358pub(super) async fn purify_source_exports(
362 client: &Client,
363 retrieved_references: &RetrievedSourceReferences,
364 requested_references: &Option<ExternalReferences>,
365 mut text_columns: Vec<UnresolvedItemName>,
366 mut exclude_columns: Vec<UnresolvedItemName>,
367 unresolved_source_name: &UnresolvedItemName,
368 reference_policy: &SourceReferencePolicy,
369) -> Result<PurifiedSourceExports, PlanError> {
370 let requested_exports = match requested_references.as_ref() {
371 Some(requested) if matches!(reference_policy, SourceReferencePolicy::NotAllowed) => {
372 Err(PlanError::UseTablesForSources(requested.to_string()))?
373 }
374 Some(requested) => retrieved_references
375 .requested_source_exports(Some(requested), unresolved_source_name)?,
376 None => {
377 if matches!(reference_policy, SourceReferencePolicy::Required) {
378 Err(PgSourcePurificationError::RequiresExternalReferences)?
379 }
380
381 if !text_columns.is_empty() {
384 Err(
385 PgSourcePurificationError::UnnecessaryOptionsWithoutReferences(
386 "TEXT COLUMNS".to_string(),
387 ),
388 )?
389 }
390
391 if !exclude_columns.is_empty() {
394 Err(
395 PgSourcePurificationError::UnnecessaryOptionsWithoutReferences(
396 "EXCLUDE COLUMNS".to_string(),
397 ),
398 )?
399 }
400
401 return Ok(PurifiedSourceExports {
402 source_exports: BTreeMap::new(),
403 normalized_text_columns: vec![],
404 });
405 }
406 };
407
408 if requested_exports.is_empty() {
409 sql_bail!(
410 "[internal error]: Postgres reference {} did not match any tables",
411 requested_references
412 .as_ref()
413 .unwrap()
414 .to_ast_string_simple()
415 );
416 }
417
418 super::validate_source_export_names(&requested_exports)?;
419
420 let table_oids: Vec<_> = requested_exports
421 .iter()
422 .map(|r| r.meta.postgres_desc().expect("is postgres").oid)
423 .collect();
424
425 validate_requested_references_privileges(client, &table_oids).await?;
426
427 let mut text_column_map = map_column_refs(
428 retrieved_references,
429 &mut text_columns,
430 PgConfigOptionName::TextColumns,
431 )?;
432 let mut exclude_column_map = map_column_refs(
433 retrieved_references,
434 &mut exclude_columns,
435 PgConfigOptionName::ExcludeColumns,
436 )?;
437
438 text_columns.sort();
440 text_columns.dedup();
441 let normalized_text_columns: Vec<_> = text_columns
442 .into_iter()
443 .map(WithOptionValue::UnresolvedItemName)
444 .collect();
445
446 let source_exports = requested_exports
447 .into_iter()
448 .map(|r| {
449 let mut desc = r.meta.postgres_desc().expect("known postgres").clone();
450 let text_columns = text_column_map.remove(&desc.oid);
451 let exclude_columns = exclude_column_map.remove(&desc.oid);
452
453 if let Some(exclude_cols) = &exclude_columns {
454 desc.columns.retain(|c| !exclude_cols.contains(&c.name));
455 }
456
457 if let (Some(text_cols), Some(exclude_cols)) = (&text_columns, &exclude_columns) {
458 let intersection: Vec<_> = text_cols.intersection(exclude_cols).collect();
459 if !intersection.is_empty() {
460 return Err(PgSourcePurificationError::DuplicatedColumnNames(
461 intersection.iter().map(|s| (*s).to_string()).collect(),
462 ));
463 }
464 }
465 Ok((
466 r.name,
467 PurifiedSourceExport {
468 external_reference: r.external_reference,
469 details: PurifiedExportDetails::Postgres {
470 text_columns: text_columns.map(|v| {
471 v.into_iter()
472 .map(|s| Ident::new(s).expect("validated above"))
473 .collect()
474 }),
475 exclude_columns: exclude_columns.map(|v| {
476 v.into_iter()
477 .map(|s| Ident::new(s).expect("validated above"))
478 .collect()
479 }),
480 table: desc,
481 },
482 },
483 ))
484 })
485 .collect::<Result<BTreeMap<_, _>, _>>()?;
486
487 if !text_column_map.is_empty() {
488 let mut dangling_text_column_refs = vec![];
491 let all_references = retrieved_references.all_references();
492
493 for id in text_column_map.keys() {
494 let desc = all_references
495 .iter()
496 .find_map(|reference| {
497 let desc = reference.postgres_desc().expect("is postgres");
498 if desc.oid == *id { Some(desc) } else { None }
499 })
500 .expect("validated when generating text columns");
501
502 dangling_text_column_refs.push(PartialItemName {
503 database: None,
504 schema: Some(desc.namespace.clone()),
505 item: desc.name.clone(),
506 });
507 }
508
509 dangling_text_column_refs.sort();
510 return Err(PlanError::from(
511 PgSourcePurificationError::DanglingTextColumns {
512 items: dangling_text_column_refs,
513 },
514 ));
515 }
516
517 if !exclude_column_map.is_empty() {
518 let mut dangling_exclude_column_refs = vec![];
521 let all_references = retrieved_references.all_references();
522
523 for id in exclude_column_map.keys() {
524 let desc = all_references
525 .iter()
526 .find_map(|reference| {
527 let desc = reference.postgres_desc().expect("is postgres");
528 if desc.oid == *id { Some(desc) } else { None }
529 })
530 .expect("validated when generating exclude columns");
531
532 dangling_exclude_column_refs.push(PartialItemName {
533 database: None,
534 schema: Some(desc.namespace.clone()),
535 item: desc.name.clone(),
536 });
537 }
538
539 dangling_exclude_column_refs.sort();
540 return Err(PlanError::from(
541 PgSourcePurificationError::DanglingExcludeColumns {
542 items: dangling_exclude_column_refs,
543 },
544 ));
545 }
546
547 Ok(PurifiedSourceExports {
548 source_exports,
549 normalized_text_columns,
550 })
551}
552
553pub(crate) fn generate_column_casts(
554 scx: &StatementContext,
555 table: &PostgresTableDesc,
556 text_columns: &Vec<Ident>,
557) -> Result<Vec<(CastType, MirScalarExpr)>, PlanError> {
558 let mut cast_scx = scx.clone();
565 cast_scx.param_types = Default::default();
566 let cast_qcx = QueryContext::root(&cast_scx, QueryLifetime::Source);
567 let mut column_types = vec![];
568 for column in table.columns.iter() {
569 column_types.push(SqlColumnType {
570 nullable: column.nullable,
571 scalar_type: SqlScalarType::String,
572 });
573 }
574
575 let cast_ecx = ExprContext {
576 qcx: &cast_qcx,
577 name: "plan_postgres_source_cast",
578 scope: &Scope::empty(),
579 relation_type: &SqlRelationType {
580 column_types,
581 keys: vec![],
582 },
583 allow_aggregates: false,
584 allow_subqueries: false,
585 allow_parameters: false,
586 allow_windows: false,
587 };
588
589 let text_columns = BTreeSet::from_iter(text_columns.iter().map(Ident::as_str));
590
591 let mut table_cast = vec![];
594 for (i, column) in table.columns.iter().enumerate() {
595 let (cast_type, ty) = if text_columns.contains(column.name.as_str()) {
596 (CastType::Text, mz_pgrepr::Type::Text)
602 } else {
603 match mz_pgrepr::Type::from_oid_and_typmod(column.type_oid, column.type_mod) {
604 Ok(t) => (CastType::Natural, t),
605 Err(_) => {
610 table_cast.push((
611 CastType::Natural,
612 HirScalarExpr::call_variadic(
613 mz_expr::VariadicFunc::ErrorIfNull,
614 vec![
615 HirScalarExpr::literal_null(SqlScalarType::String),
616 HirScalarExpr::literal(
617 mz_repr::Datum::from(
618 format!("Unsupported type with OID {}", column.type_oid)
619 .as_str(),
620 ),
621 SqlScalarType::String,
622 ),
623 ],
624 )
625 .lower_uncorrelated()
626 .expect("no correlation"),
627 ));
628 continue;
629 }
630 }
631 };
632
633 let data_type = scx.resolve_type(ty)?;
634 let scalar_type = crate::plan::query::scalar_type_from_sql(scx, &data_type)?;
635
636 let col_expr = HirScalarExpr::unnamed_column(ColumnRef {
637 level: 0,
638 column: i,
639 });
640
641 let cast_expr = plan_cast(&cast_ecx, CastContext::Explicit, col_expr, &scalar_type)?;
642
643 let cast = if column.nullable {
644 cast_expr
645 } else {
646 HirScalarExpr::call_variadic(mz_expr::VariadicFunc::ErrorIfNull, vec![
652 cast_expr,
653 HirScalarExpr::literal(
654 mz_repr::Datum::from(
655 format!(
656 "PG column {}.{}.{} contained NULL data, despite having NOT NULL constraint",
657 table.namespace.clone(),
658 table.name.clone(),
659 column.name.clone())
660 .as_str(),
661 ),
662 SqlScalarType::String,
663 ),
664 ],
665 )
666 };
667
668 let mir_cast = cast.lower_uncorrelated().map_err(|_e| {
674 tracing::info!(
675 "cannot ingest {:?} data from PG source because cast is correlated",
676 scalar_type
677 );
678
679 PlanError::TableContainsUningestableTypes {
680 name: table.name.to_string(),
681 type_: scx.humanize_scalar_type(&scalar_type, false),
682 column: column.name.to_string(),
683 }
684 })?;
685
686 table_cast.push((cast_type, mir_cast));
687 }
688 Ok(table_cast)
689}
690
691mod privileges {
692 use mz_postgres_util::PostgresError;
693
694 use super::*;
695 use crate::plan::PlanError;
696 use crate::pure::PgSourcePurificationError;
697
698 async fn check_schema_privileges(client: &Client, table_oids: &[Oid]) -> Result<(), PlanError> {
699 let invalid_schema_privileges_rows = client
700 .query(
701 "
702 WITH distinct_namespace AS (
703 SELECT
704 DISTINCT n.oid, n.nspname AS schema_name
705 FROM unnest($1::OID[]) AS oids (oid)
706 JOIN pg_class AS c ON c.oid = oids.oid
707 JOIN pg_namespace AS n ON c.relnamespace = n.oid
708 )
709 SELECT d.schema_name
710 FROM distinct_namespace AS d
711 WHERE
712 NOT has_schema_privilege(CURRENT_USER::TEXT, d.oid, 'usage')",
713 &[&table_oids],
714 )
715 .await
716 .map_err(PostgresError::from)?;
717
718 let mut invalid_schema_privileges = invalid_schema_privileges_rows
719 .into_iter()
720 .map(|row| row.get("schema_name"))
721 .collect::<Vec<String>>();
722
723 if invalid_schema_privileges.is_empty() {
724 Ok(())
725 } else {
726 invalid_schema_privileges.sort();
727 Err(PgSourcePurificationError::UserLacksUsageOnSchemas {
728 schemas: invalid_schema_privileges,
729 })?
730 }
731 }
732
733 pub async fn check_table_privileges(
744 client: &Client,
745 table_oids: &[Oid],
746 ) -> Result<(), PlanError> {
747 check_schema_privileges(client, table_oids).await?;
748
749 let invalid_table_privileges_rows = client
750 .query(
751 "
752 SELECT
753 format('%I.%I', n.nspname, c.relname) AS schema_qualified_table_name
754 FROM unnest($1::oid[]) AS oids (oid)
755 JOIN
756 pg_class c ON c.oid = oids.oid
757 JOIN
758 pg_namespace n ON c.relnamespace = n.oid
759 WHERE NOT has_table_privilege(CURRENT_USER::text, c.oid, 'select')",
760 &[&table_oids],
761 )
762 .await
763 .map_err(PostgresError::from)?;
764
765 let mut invalid_table_privileges = invalid_table_privileges_rows
766 .into_iter()
767 .map(|row| row.get("schema_qualified_table_name"))
768 .collect::<Vec<String>>();
769
770 if invalid_table_privileges.is_empty() {
771 Ok(())
772 } else {
773 invalid_table_privileges.sort();
774 Err(PgSourcePurificationError::UserLacksSelectOnTables {
775 tables: invalid_table_privileges,
776 })?
777 }
778 }
779
780 pub async fn check_rls_privileges(
785 client: &Client,
786 table_oids: &[Oid],
787 ) -> Result<(), PlanError> {
788 match mz_postgres_util::validate_no_rls_policies(client, table_oids).await {
789 Ok(_) => Ok(()),
790 Err(err) => match err {
791 PostgresError::BypassRLSRequired(tables) => {
795 Err(PgSourcePurificationError::BypassRLSRequired { tables })?
796 }
797 _ => Err(err)?,
798 },
799 }
800 }
801}
802
803mod replica_identity {
804 use mz_postgres_util::PostgresError;
805
806 use super::*;
807 use crate::plan::PlanError;
808 use crate::pure::PgSourcePurificationError;
809
810 pub async fn check_replica_identity_full(
812 client: &Client,
813 table_oids: &[Oid],
814 ) -> Result<(), PlanError> {
815 let invalid_replica_identity_rows = client
816 .query(
817 "
818 SELECT
819 format('%I.%I', n.nspname, c.relname) AS schema_qualified_table_name
820 FROM unnest($1::oid[]) AS oids (oid)
821 JOIN
822 pg_class c ON c.oid = oids.oid
823 JOIN
824 pg_namespace n ON c.relnamespace = n.oid
825 WHERE relreplident != 'f' OR relreplident IS NULL;",
826 &[&table_oids],
827 )
828 .await
829 .map_err(PostgresError::from)?;
830
831 let mut invalid_replica_identity = invalid_replica_identity_rows
832 .into_iter()
833 .map(|row| row.get("schema_qualified_table_name"))
834 .collect::<Vec<String>>();
835
836 if invalid_replica_identity.is_empty() {
837 Ok(())
838 } else {
839 invalid_replica_identity.sort();
840 Err(PgSourcePurificationError::NotTablesWReplicaIdentityFull {
841 items: invalid_replica_identity,
842 })?
843 }
844 }
845}