1use std::collections::{BTreeMap, BTreeSet};
13use std::sync::Arc;
14
15use mz_expr::MirScalarExpr;
16use mz_postgres_util::Config;
17use mz_postgres_util::desc::PostgresTableDesc;
18use mz_proto::RustType;
19use mz_repr::{ColumnType, RelationType, ScalarType};
20use mz_sql_parser::ast::display::AstDisplay;
21use mz_sql_parser::ast::{
22 ColumnDef, CreateSubsourceOption, CreateSubsourceOptionName, CreateSubsourceStatement,
23 ExternalReferences, Ident, TableConstraint, UnresolvedItemName, Value, WithOptionValue,
24};
25use mz_storage_types::sources::SourceExportStatementDetails;
26use mz_storage_types::sources::postgres::CastType;
27use prost::Message;
28use tokio_postgres::Client;
29use tokio_postgres::types::Oid;
30
31use crate::names::{Aug, ResolvedItemName};
32use crate::normalize;
33use crate::plan::hir::ColumnRef;
34use crate::plan::typeconv::{CastContext, plan_cast};
35use crate::plan::{
36 ExprContext, HirScalarExpr, PlanError, QueryContext, QueryLifetime, Scope, StatementContext,
37};
38
39use super::error::PgSourcePurificationError;
40use super::references::RetrievedSourceReferences;
41use super::{PartialItemName, PurifiedExportDetails, PurifiedSourceExport, SourceReferencePolicy};
42
43pub(super) async fn validate_requested_references_privileges(
47 config: &Config,
48 client: &Client,
49 table_oids: &[Oid],
50) -> Result<(), PlanError> {
51 privileges::check_table_privileges(config, client, table_oids).await?;
52 replica_identity::check_replica_identity_full(client, table_oids).await?;
53
54 Ok(())
55}
56
57pub(super) fn generate_text_columns(
63 retrieved_references: &RetrievedSourceReferences,
64 text_columns: &mut [UnresolvedItemName],
65) -> Result<BTreeMap<u32, BTreeSet<String>>, PlanError> {
66 let mut text_cols_dict: BTreeMap<u32, BTreeSet<String>> = BTreeMap::new();
67
68 for name in text_columns {
69 let (qual, col) = match name.0.split_last().expect("must have at least one element") {
70 (col, qual) if qual.is_empty() => {
71 return Err(PlanError::InvalidOptionValue {
72 option_name: "TEXT COLUMNS".to_string(),
73 err: Box::new(PlanError::UnderqualifiedColumnName(
74 col.as_str().to_string(),
75 )),
76 });
77 }
78 (col, qual) => (qual.to_vec(), col.as_str().to_string()),
79 };
80
81 let resolved_reference = retrieved_references.resolve_name(&qual)?;
82 let mut fully_qualified_name =
83 resolved_reference
84 .external_reference()
85 .map_err(|e| PlanError::InvalidOptionValue {
86 option_name: "TEXT COLUMNS".to_string(),
87 err: Box::new(e.into()),
88 })?;
89
90 let desc = resolved_reference
91 .postgres_desc()
92 .expect("known to be postgres");
93
94 if !desc.columns.iter().any(|column| column.name == col) {
95 let column = mz_repr::ColumnName::from(col);
96 let similar = desc
97 .columns
98 .iter()
99 .filter_map(|c| {
100 let c_name = mz_repr::ColumnName::from(c.name.clone());
101 c_name.is_similar(&column).then_some(c_name)
102 })
103 .collect();
104 return Err(PlanError::InvalidOptionValue {
105 option_name: "TEXT COLUMNS".to_string(),
106 err: Box::new(PlanError::UnknownColumn {
107 table: Some(
108 normalize::unresolved_item_name(fully_qualified_name)
109 .expect("known to be of valid len"),
110 ),
111 column,
112 similar,
113 }),
114 });
115 }
116
117 let col_ident = Ident::new(col.as_str().to_string())?;
119 fully_qualified_name.0.push(col_ident);
120 *name = fully_qualified_name;
121
122 let new = text_cols_dict
123 .entry(desc.oid)
124 .or_default()
125 .insert(col.as_str().to_string());
126
127 if !new {
128 return Err(PlanError::InvalidOptionValue {
129 option_name: "TEXT COLUMNS".to_string(),
130 err: Box::new(PlanError::UnexpectedDuplicateReference { name: name.clone() }),
131 });
132 }
133 }
134
135 Ok(text_cols_dict)
136}
137
138pub fn generate_create_subsource_statements(
139 scx: &StatementContext,
140 source_name: ResolvedItemName,
141 requested_subsources: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
142) -> Result<Vec<CreateSubsourceStatement<Aug>>, PlanError> {
143 let mut unsupported_cols = vec![];
145
146 let mut subsources = Vec::with_capacity(requested_subsources.len());
148
149 for (subsource_name, purified_export) in requested_subsources {
150 let PostgresExportStatementValues {
151 columns,
152 constraints,
153 text_columns,
154 details,
155 external_reference,
156 } = generate_source_export_statement_values(scx, purified_export, &mut unsupported_cols)?;
157
158 let mut with_options = vec![
159 CreateSubsourceOption {
160 name: CreateSubsourceOptionName::ExternalReference,
161 value: Some(WithOptionValue::UnresolvedItemName(external_reference)),
162 },
163 CreateSubsourceOption {
164 name: CreateSubsourceOptionName::Details,
165 value: Some(WithOptionValue::Value(Value::String(hex::encode(
166 details.into_proto().encode_to_vec(),
167 )))),
168 },
169 ];
170
171 if let Some(text_columns) = text_columns {
172 with_options.push(CreateSubsourceOption {
173 name: CreateSubsourceOptionName::TextColumns,
174 value: Some(WithOptionValue::Sequence(text_columns)),
175 });
176 }
177
178 let subsource = CreateSubsourceStatement {
180 name: subsource_name,
181 columns,
182 of_source: Some(source_name.clone()),
185 constraints,
194 if_not_exists: false,
195 with_options,
196 };
197 subsources.push(subsource);
198 }
199
200 if !unsupported_cols.is_empty() {
201 unsupported_cols.sort();
202 Err(PgSourcePurificationError::UnrecognizedTypes {
203 cols: unsupported_cols,
204 })?;
205 }
206
207 Ok(subsources)
208}
209
210pub(super) struct PostgresExportStatementValues {
211 pub(super) columns: Vec<ColumnDef<Aug>>,
212 pub(super) constraints: Vec<TableConstraint<Aug>>,
213 pub(super) text_columns: Option<Vec<WithOptionValue<Aug>>>,
214 pub(super) details: SourceExportStatementDetails,
215 pub(super) external_reference: UnresolvedItemName,
216}
217
218pub(super) fn generate_source_export_statement_values(
219 scx: &StatementContext,
220 purified_export: PurifiedSourceExport,
221 unsupported_cols: &mut Vec<(String, mz_repr::adt::system::Oid)>,
222) -> Result<PostgresExportStatementValues, PlanError> {
223 let (text_columns, table) = match purified_export.details {
224 PurifiedExportDetails::Postgres {
225 text_columns,
226 table,
227 } => (text_columns, table),
228 _ => unreachable!("purified export details must be postgres"),
229 };
230
231 let text_column_set = text_columns
232 .as_ref()
233 .map(|v| BTreeSet::from_iter(v.iter().map(Ident::as_str)));
234
235 let mut columns = vec![];
237 for c in table.columns.iter() {
238 let name = Ident::new(c.name.clone())?;
239
240 let ty = match text_column_set {
241 Some(ref names) if names.contains(c.name.as_str()) => mz_pgrepr::Type::Text,
242 _ => match mz_pgrepr::Type::from_oid_and_typmod(c.type_oid, c.type_mod) {
243 Ok(t) => t,
244 Err(_) => {
245 let mut full_name = purified_export.external_reference.0.clone();
246 full_name.push(name);
247 unsupported_cols.push((
248 UnresolvedItemName(full_name).to_ast_string_simple(),
249 mz_repr::adt::system::Oid(c.type_oid),
250 ));
251 continue;
252 }
253 },
254 };
255
256 let data_type = scx.resolve_type(ty)?;
257 let mut options = vec![];
258
259 if !c.nullable {
260 options.push(mz_sql_parser::ast::ColumnOptionDef {
261 name: None,
262 option: mz_sql_parser::ast::ColumnOption::NotNull,
263 });
264 }
265
266 columns.push(ColumnDef {
267 name,
268 data_type,
269 collation: None,
270 options,
271 });
272 }
273
274 let mut constraints = vec![];
275 for key in table.keys.clone() {
276 let mut key_columns = vec![];
277
278 for col_num in key.cols {
279 let ident = Ident::new(
280 table
281 .columns
282 .iter()
283 .find(|col| col.col_num == col_num)
284 .expect("key exists as column")
285 .name
286 .clone(),
287 )?;
288 key_columns.push(ident);
289 }
290
291 let constraint = mz_sql_parser::ast::TableConstraint::Unique {
292 name: Some(Ident::new(key.name)?),
293 columns: key_columns,
294 is_primary: key.is_primary,
295 nulls_not_distinct: key.nulls_not_distinct,
296 };
297
298 if key.is_primary {
300 constraints.insert(0, constraint);
301 } else {
302 constraints.push(constraint);
303 }
304 }
305 let details = SourceExportStatementDetails::Postgres { table };
306
307 let text_columns = text_columns.map(|mut columns| {
308 columns.sort();
309 columns
310 .into_iter()
311 .map(WithOptionValue::Ident::<Aug>)
312 .collect()
313 });
314
315 Ok(PostgresExportStatementValues {
316 columns,
317 constraints,
318 text_columns,
319 details,
320 external_reference: purified_export.external_reference,
321 })
322}
323
324pub(super) struct PurifiedSourceExports {
325 pub(super) source_exports: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
326 pub(super) normalized_text_columns: Vec<WithOptionValue<Aug>>,
333}
334
335pub(super) async fn purify_source_exports(
339 client: &Client,
340 config: &mz_postgres_util::Config,
341 retrieved_references: &RetrievedSourceReferences,
342 requested_references: &Option<ExternalReferences>,
343 mut text_columns: Vec<UnresolvedItemName>,
344 unresolved_source_name: &UnresolvedItemName,
345 reference_policy: &SourceReferencePolicy,
346) -> Result<PurifiedSourceExports, PlanError> {
347 let requested_exports = match requested_references.as_ref() {
348 Some(requested) if matches!(reference_policy, SourceReferencePolicy::NotAllowed) => {
349 Err(PlanError::UseTablesForSources(requested.to_string()))?
350 }
351 Some(requested) => retrieved_references
352 .requested_source_exports(Some(requested), unresolved_source_name)?,
353 None => {
354 if matches!(reference_policy, SourceReferencePolicy::Required) {
355 Err(PgSourcePurificationError::RequiresExternalReferences)?
356 }
357
358 if !text_columns.is_empty() {
361 Err(
362 PgSourcePurificationError::UnnecessaryOptionsWithoutReferences(
363 "TEXT COLUMNS".to_string(),
364 ),
365 )?
366 }
367
368 return Ok(PurifiedSourceExports {
369 source_exports: BTreeMap::new(),
370 normalized_text_columns: vec![],
371 });
372 }
373 };
374
375 if requested_exports.is_empty() {
376 sql_bail!(
377 "[internal error]: Postgres reference {} did not match any tables",
378 requested_references
379 .as_ref()
380 .unwrap()
381 .to_ast_string_simple()
382 );
383 }
384
385 super::validate_source_export_names(&requested_exports)?;
386
387 let table_oids: Vec<_> = requested_exports
388 .iter()
389 .map(|r| r.meta.postgres_desc().expect("is postgres").oid)
390 .collect();
391
392 validate_requested_references_privileges(config, client, &table_oids).await?;
393
394 let mut text_column_map = generate_text_columns(retrieved_references, &mut text_columns)?;
395
396 text_columns.sort();
398 text_columns.dedup();
399 let normalized_text_columns: Vec<_> = text_columns
400 .into_iter()
401 .map(WithOptionValue::UnresolvedItemName)
402 .collect();
403
404 let source_exports = requested_exports
405 .into_iter()
406 .map(|r| {
407 let desc = r.meta.postgres_desc().expect("known postgres");
408 (
409 r.name,
410 PurifiedSourceExport {
411 external_reference: r.external_reference,
412 details: PurifiedExportDetails::Postgres {
413 text_columns: text_column_map.remove(&desc.oid).map(|v| {
414 v.into_iter()
415 .map(|s| Ident::new(s).expect("validated above"))
416 .collect()
417 }),
418 table: desc.clone(),
419 },
420 },
421 )
422 })
423 .collect();
424
425 if !text_column_map.is_empty() {
426 let mut dangling_text_column_refs = vec![];
429 let all_references = retrieved_references.all_references();
430
431 for id in text_column_map.keys() {
432 let desc = all_references
433 .iter()
434 .find_map(|reference| {
435 let desc = reference.postgres_desc().expect("is postgres");
436 if desc.oid == *id { Some(desc) } else { None }
437 })
438 .expect("validated when generating text columns");
439
440 dangling_text_column_refs.push(PartialItemName {
441 database: None,
442 schema: Some(desc.namespace.clone()),
443 item: desc.name.clone(),
444 });
445 }
446
447 dangling_text_column_refs.sort();
448 Err(PgSourcePurificationError::DanglingTextColumns {
449 items: dangling_text_column_refs,
450 })?;
451 }
452
453 Ok(PurifiedSourceExports {
454 source_exports,
455 normalized_text_columns,
456 })
457}
458
459pub(crate) fn generate_column_casts(
460 scx: &StatementContext,
461 table: &PostgresTableDesc,
462 text_columns: &Vec<Ident>,
463) -> Result<Vec<(CastType, MirScalarExpr)>, PlanError> {
464 let mut cast_scx = scx.clone();
471 cast_scx.param_types = Default::default();
472 let cast_qcx = QueryContext::root(&cast_scx, QueryLifetime::Source);
473 let mut column_types = vec![];
474 for column in table.columns.iter() {
475 column_types.push(ColumnType {
476 nullable: column.nullable,
477 scalar_type: ScalarType::String,
478 });
479 }
480
481 let cast_ecx = ExprContext {
482 qcx: &cast_qcx,
483 name: "plan_postgres_source_cast",
484 scope: &Scope::empty(),
485 relation_type: &RelationType {
486 column_types,
487 keys: vec![],
488 },
489 allow_aggregates: false,
490 allow_subqueries: false,
491 allow_parameters: false,
492 allow_windows: false,
493 };
494
495 let text_columns = BTreeSet::from_iter(text_columns.iter().map(Ident::as_str));
496
497 let mut table_cast = vec![];
500 for (i, column) in table.columns.iter().enumerate() {
501 let (cast_type, ty) = if text_columns.contains(column.name.as_str()) {
502 (CastType::Text, mz_pgrepr::Type::Text)
508 } else {
509 match mz_pgrepr::Type::from_oid_and_typmod(column.type_oid, column.type_mod) {
510 Ok(t) => (CastType::Natural, t),
511 Err(_) => {
516 table_cast.push((
517 CastType::Natural,
518 HirScalarExpr::call_variadic(
519 mz_expr::VariadicFunc::ErrorIfNull,
520 vec![
521 HirScalarExpr::literal_null(ScalarType::String),
522 HirScalarExpr::literal(
523 mz_repr::Datum::from(
524 format!("Unsupported type with OID {}", column.type_oid)
525 .as_str(),
526 ),
527 ScalarType::String,
528 ),
529 ],
530 )
531 .lower_uncorrelated()
532 .expect("no correlation"),
533 ));
534 continue;
535 }
536 }
537 };
538
539 let data_type = scx.resolve_type(ty)?;
540 let scalar_type = crate::plan::query::scalar_type_from_sql(scx, &data_type)?;
541
542 let col_expr = HirScalarExpr::named_column(
543 ColumnRef {
544 level: 0,
545 column: i,
546 },
547 Arc::from(column.name.as_str()),
548 );
549
550 let cast_expr = plan_cast(&cast_ecx, CastContext::Explicit, col_expr, &scalar_type)?;
551
552 let cast = if column.nullable {
553 cast_expr
554 } else {
555 HirScalarExpr::call_variadic(mz_expr::VariadicFunc::ErrorIfNull, vec![
561 cast_expr,
562 HirScalarExpr::literal(
563 mz_repr::Datum::from(
564 format!(
565 "PG column {}.{}.{} contained NULL data, despite having NOT NULL constraint",
566 table.namespace.clone(),
567 table.name.clone(),
568 column.name.clone())
569 .as_str(),
570 ),
571 ScalarType::String,
572 ),
573 ],
574 )
575 };
576
577 let mir_cast = cast.lower_uncorrelated().map_err(|_e| {
583 tracing::info!(
584 "cannot ingest {:?} data from PG source because cast is correlated",
585 scalar_type
586 );
587
588 PlanError::TableContainsUningestableTypes {
589 name: table.name.to_string(),
590 type_: scx.humanize_scalar_type(&scalar_type, false),
591 column: column.name.to_string(),
592 }
593 })?;
594
595 table_cast.push((cast_type, mir_cast));
596 }
597 Ok(table_cast)
598}
599
600mod privileges {
601 use mz_postgres_util::{Config, PostgresError};
602
603 use super::*;
604 use crate::plan::PlanError;
605 use crate::pure::PgSourcePurificationError;
606
607 async fn check_schema_privileges(
608 config: &Config,
609 client: &Client,
610 table_oids: &[Oid],
611 ) -> Result<(), PlanError> {
612 let invalid_schema_privileges_rows = client
613 .query(
614 "
615 WITH distinct_namespace AS (
616 SELECT
617 DISTINCT n.oid, n.nspname AS schema_name
618 FROM unnest($1::OID[]) AS oids (oid)
619 JOIN pg_class AS c ON c.oid = oids.oid
620 JOIN pg_namespace AS n ON c.relnamespace = n.oid
621 )
622 SELECT d.schema_name
623 FROM distinct_namespace AS d
624 WHERE
625 NOT has_schema_privilege($2::TEXT, d.oid, 'usage')",
626 &[
627 &table_oids,
628 &config.get_user().expect("connection specifies user"),
629 ],
630 )
631 .await
632 .map_err(PostgresError::from)?;
633
634 let mut invalid_schema_privileges = invalid_schema_privileges_rows
635 .into_iter()
636 .map(|row| row.get("schema_name"))
637 .collect::<Vec<String>>();
638
639 if invalid_schema_privileges.is_empty() {
640 Ok(())
641 } else {
642 invalid_schema_privileges.sort();
643 Err(PgSourcePurificationError::UserLacksUsageOnSchemas {
644 user: config
645 .get_user()
646 .expect("connection specifies user")
647 .to_string(),
648 schemas: invalid_schema_privileges,
649 })?
650 }
651 }
652
653 pub async fn check_table_privileges(
664 config: &Config,
665 client: &Client,
666 table_oids: &[Oid],
667 ) -> Result<(), PlanError> {
668 check_schema_privileges(config, client, table_oids).await?;
669
670 let invalid_table_privileges_rows = client
671 .query(
672 "
673 SELECT
674 format('%I.%I', n.nspname, c.relname) AS schema_qualified_table_name
675 FROM unnest($1::oid[]) AS oids (oid)
676 JOIN
677 pg_class c ON c.oid = oids.oid
678 JOIN
679 pg_namespace n ON c.relnamespace = n.oid
680 WHERE NOT has_table_privilege($2::text, c.oid, 'select')",
681 &[
682 &table_oids,
683 &config.get_user().expect("connection specifies user"),
684 ],
685 )
686 .await
687 .map_err(PostgresError::from)?;
688
689 let mut invalid_table_privileges = invalid_table_privileges_rows
690 .into_iter()
691 .map(|row| row.get("schema_qualified_table_name"))
692 .collect::<Vec<String>>();
693
694 if invalid_table_privileges.is_empty() {
695 Ok(())
696 } else {
697 invalid_table_privileges.sort();
698 Err(PgSourcePurificationError::UserLacksSelectOnTables {
699 user: config
700 .get_user()
701 .expect("connection must specify user")
702 .to_string(),
703 tables: invalid_table_privileges,
704 })?
705 }
706 }
707}
708
709mod replica_identity {
710 use mz_postgres_util::PostgresError;
711
712 use super::*;
713 use crate::plan::PlanError;
714 use crate::pure::PgSourcePurificationError;
715
716 pub async fn check_replica_identity_full(
718 client: &Client,
719 table_oids: &[Oid],
720 ) -> Result<(), PlanError> {
721 let invalid_replica_identity_rows = client
722 .query(
723 "
724 SELECT
725 format('%I.%I', n.nspname, c.relname) AS schema_qualified_table_name
726 FROM unnest($1::oid[]) AS oids (oid)
727 JOIN
728 pg_class c ON c.oid = oids.oid
729 JOIN
730 pg_namespace n ON c.relnamespace = n.oid
731 WHERE relreplident != 'f' OR relreplident IS NULL;",
732 &[&table_oids],
733 )
734 .await
735 .map_err(PostgresError::from)?;
736
737 let mut invalid_replica_identity = invalid_replica_identity_rows
738 .into_iter()
739 .map(|row| row.get("schema_qualified_table_name"))
740 .collect::<Vec<String>>();
741
742 if invalid_replica_identity.is_empty() {
743 Ok(())
744 } else {
745 invalid_replica_identity.sort();
746 Err(PgSourcePurificationError::NotTablesWReplicaIdentityFull {
747 items: invalid_replica_identity,
748 })?
749 }
750 }
751}