1use std::collections::{BTreeMap, BTreeSet};
13
14use mz_expr::MirScalarExpr;
15use mz_postgres_util::Config;
16use mz_postgres_util::desc::PostgresTableDesc;
17use mz_proto::RustType;
18use mz_repr::{ColumnType, RelationType, ScalarType};
19use mz_sql_parser::ast::display::AstDisplay;
20use mz_sql_parser::ast::{
21 ColumnDef, CreateSubsourceOption, CreateSubsourceOptionName, CreateSubsourceStatement,
22 ExternalReferences, Ident, TableConstraint, UnresolvedItemName, Value, WithOptionValue,
23};
24use mz_storage_types::sources::SourceExportStatementDetails;
25use mz_storage_types::sources::postgres::CastType;
26use prost::Message;
27use tokio_postgres::Client;
28use tokio_postgres::types::Oid;
29
30use crate::names::{Aug, ResolvedItemName};
31use crate::normalize;
32use crate::plan::hir::ColumnRef;
33use crate::plan::typeconv::{CastContext, plan_cast};
34use crate::plan::{
35 ExprContext, HirScalarExpr, PlanError, QueryContext, QueryLifetime, Scope, StatementContext,
36};
37
38use super::error::PgSourcePurificationError;
39use super::references::RetrievedSourceReferences;
40use super::{PartialItemName, PurifiedExportDetails, PurifiedSourceExport, SourceReferencePolicy};
41
42pub(super) async fn validate_requested_references_privileges(
46 config: &Config,
47 client: &Client,
48 table_oids: &[Oid],
49) -> Result<(), PlanError> {
50 privileges::check_table_privileges(config, client, table_oids).await?;
51 replica_identity::check_replica_identity_full(client, table_oids).await?;
52
53 Ok(())
54}
55
56pub(super) fn generate_text_columns(
62 retrieved_references: &RetrievedSourceReferences,
63 text_columns: &mut [UnresolvedItemName],
64) -> Result<BTreeMap<u32, BTreeSet<String>>, PlanError> {
65 let mut text_cols_dict: BTreeMap<u32, BTreeSet<String>> = BTreeMap::new();
66
67 for name in text_columns {
68 let (qual, col) = match name.0.split_last().expect("must have at least one element") {
69 (col, qual) if qual.is_empty() => {
70 return Err(PlanError::InvalidOptionValue {
71 option_name: "TEXT COLUMNS".to_string(),
72 err: Box::new(PlanError::UnderqualifiedColumnName(
73 col.as_str().to_string(),
74 )),
75 });
76 }
77 (col, qual) => (qual.to_vec(), col.as_str().to_string()),
78 };
79
80 let resolved_reference = retrieved_references.resolve_name(&qual)?;
81 let mut fully_qualified_name =
82 resolved_reference
83 .external_reference()
84 .map_err(|e| PlanError::InvalidOptionValue {
85 option_name: "TEXT COLUMNS".to_string(),
86 err: Box::new(e.into()),
87 })?;
88
89 let desc = resolved_reference
90 .postgres_desc()
91 .expect("known to be postgres");
92
93 if !desc.columns.iter().any(|column| column.name == col) {
94 let column = mz_repr::ColumnName::from(col);
95 let similar = desc
96 .columns
97 .iter()
98 .filter_map(|c| {
99 let c_name = mz_repr::ColumnName::from(c.name.clone());
100 c_name.is_similar(&column).then_some(c_name)
101 })
102 .collect();
103 return Err(PlanError::InvalidOptionValue {
104 option_name: "TEXT COLUMNS".to_string(),
105 err: Box::new(PlanError::UnknownColumn {
106 table: Some(
107 normalize::unresolved_item_name(fully_qualified_name)
108 .expect("known to be of valid len"),
109 ),
110 column,
111 similar,
112 }),
113 });
114 }
115
116 let col_ident = Ident::new(col.as_str().to_string())?;
118 fully_qualified_name.0.push(col_ident);
119 *name = fully_qualified_name;
120
121 let new = text_cols_dict
122 .entry(desc.oid)
123 .or_default()
124 .insert(col.as_str().to_string());
125
126 if !new {
127 return Err(PlanError::InvalidOptionValue {
128 option_name: "TEXT COLUMNS".to_string(),
129 err: Box::new(PlanError::UnexpectedDuplicateReference { name: name.clone() }),
130 });
131 }
132 }
133
134 Ok(text_cols_dict)
135}
136
137pub fn generate_create_subsource_statements(
138 scx: &StatementContext,
139 source_name: ResolvedItemName,
140 requested_subsources: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
141) -> Result<Vec<CreateSubsourceStatement<Aug>>, PlanError> {
142 let mut unsupported_cols = vec![];
144
145 let mut subsources = Vec::with_capacity(requested_subsources.len());
147
148 for (subsource_name, purified_export) in requested_subsources {
149 let PostgresExportStatementValues {
150 columns,
151 constraints,
152 text_columns,
153 details,
154 external_reference,
155 } = generate_source_export_statement_values(scx, purified_export, &mut unsupported_cols)?;
156
157 let mut with_options = vec![
158 CreateSubsourceOption {
159 name: CreateSubsourceOptionName::ExternalReference,
160 value: Some(WithOptionValue::UnresolvedItemName(external_reference)),
161 },
162 CreateSubsourceOption {
163 name: CreateSubsourceOptionName::Details,
164 value: Some(WithOptionValue::Value(Value::String(hex::encode(
165 details.into_proto().encode_to_vec(),
166 )))),
167 },
168 ];
169
170 if let Some(text_columns) = text_columns {
171 with_options.push(CreateSubsourceOption {
172 name: CreateSubsourceOptionName::TextColumns,
173 value: Some(WithOptionValue::Sequence(text_columns)),
174 });
175 }
176
177 let subsource = CreateSubsourceStatement {
179 name: subsource_name,
180 columns,
181 of_source: Some(source_name.clone()),
184 constraints,
193 if_not_exists: false,
194 with_options,
195 };
196 subsources.push(subsource);
197 }
198
199 if !unsupported_cols.is_empty() {
200 unsupported_cols.sort();
201 Err(PgSourcePurificationError::UnrecognizedTypes {
202 cols: unsupported_cols,
203 })?;
204 }
205
206 Ok(subsources)
207}
208
209pub(super) struct PostgresExportStatementValues {
210 pub(super) columns: Vec<ColumnDef<Aug>>,
211 pub(super) constraints: Vec<TableConstraint<Aug>>,
212 pub(super) text_columns: Option<Vec<WithOptionValue<Aug>>>,
213 pub(super) details: SourceExportStatementDetails,
214 pub(super) external_reference: UnresolvedItemName,
215}
216
217pub(super) fn generate_source_export_statement_values(
218 scx: &StatementContext,
219 purified_export: PurifiedSourceExport,
220 unsupported_cols: &mut Vec<(String, mz_repr::adt::system::Oid)>,
221) -> Result<PostgresExportStatementValues, PlanError> {
222 let (text_columns, table) = match purified_export.details {
223 PurifiedExportDetails::Postgres {
224 text_columns,
225 table,
226 } => (text_columns, table),
227 _ => unreachable!("purified export details must be postgres"),
228 };
229
230 let text_column_set = text_columns
231 .as_ref()
232 .map(|v| BTreeSet::from_iter(v.iter().map(Ident::as_str)));
233
234 let mut columns = vec![];
236 for c in table.columns.iter() {
237 let name = Ident::new(c.name.clone())?;
238
239 let ty = match text_column_set {
240 Some(ref names) if names.contains(c.name.as_str()) => mz_pgrepr::Type::Text,
241 _ => match mz_pgrepr::Type::from_oid_and_typmod(c.type_oid, c.type_mod) {
242 Ok(t) => t,
243 Err(_) => {
244 let mut full_name = purified_export.external_reference.0.clone();
245 full_name.push(name);
246 unsupported_cols.push((
247 UnresolvedItemName(full_name).to_ast_string_simple(),
248 mz_repr::adt::system::Oid(c.type_oid),
249 ));
250 continue;
251 }
252 },
253 };
254
255 let data_type = scx.resolve_type(ty)?;
256 let mut options = vec![];
257
258 if !c.nullable {
259 options.push(mz_sql_parser::ast::ColumnOptionDef {
260 name: None,
261 option: mz_sql_parser::ast::ColumnOption::NotNull,
262 });
263 }
264
265 columns.push(ColumnDef {
266 name,
267 data_type,
268 collation: None,
269 options,
270 });
271 }
272
273 let mut constraints = vec![];
274 for key in table.keys.clone() {
275 let mut key_columns = vec![];
276
277 for col_num in key.cols {
278 let ident = Ident::new(
279 table
280 .columns
281 .iter()
282 .find(|col| col.col_num == col_num)
283 .expect("key exists as column")
284 .name
285 .clone(),
286 )?;
287 key_columns.push(ident);
288 }
289
290 let constraint = mz_sql_parser::ast::TableConstraint::Unique {
291 name: Some(Ident::new(key.name)?),
292 columns: key_columns,
293 is_primary: key.is_primary,
294 nulls_not_distinct: key.nulls_not_distinct,
295 };
296
297 if key.is_primary {
299 constraints.insert(0, constraint);
300 } else {
301 constraints.push(constraint);
302 }
303 }
304 let details = SourceExportStatementDetails::Postgres { table };
305
306 let text_columns = text_columns.map(|mut columns| {
307 columns.sort();
308 columns
309 .into_iter()
310 .map(WithOptionValue::Ident::<Aug>)
311 .collect()
312 });
313
314 Ok(PostgresExportStatementValues {
315 columns,
316 constraints,
317 text_columns,
318 details,
319 external_reference: purified_export.external_reference,
320 })
321}
322
323pub(super) struct PurifiedSourceExports {
324 pub(super) source_exports: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
325 pub(super) normalized_text_columns: Vec<WithOptionValue<Aug>>,
332}
333
334pub(super) async fn purify_source_exports(
338 client: &Client,
339 config: &mz_postgres_util::Config,
340 retrieved_references: &RetrievedSourceReferences,
341 requested_references: &Option<ExternalReferences>,
342 mut text_columns: Vec<UnresolvedItemName>,
343 unresolved_source_name: &UnresolvedItemName,
344 reference_policy: &SourceReferencePolicy,
345) -> Result<PurifiedSourceExports, PlanError> {
346 let requested_exports = match requested_references.as_ref() {
347 Some(requested) if matches!(reference_policy, SourceReferencePolicy::NotAllowed) => {
348 Err(PlanError::UseTablesForSources(requested.to_string()))?
349 }
350 Some(requested) => retrieved_references
351 .requested_source_exports(Some(requested), unresolved_source_name)?,
352 None => {
353 if matches!(reference_policy, SourceReferencePolicy::Required) {
354 Err(PgSourcePurificationError::RequiresExternalReferences)?
355 }
356
357 if !text_columns.is_empty() {
360 Err(
361 PgSourcePurificationError::UnnecessaryOptionsWithoutReferences(
362 "TEXT COLUMNS".to_string(),
363 ),
364 )?
365 }
366
367 return Ok(PurifiedSourceExports {
368 source_exports: BTreeMap::new(),
369 normalized_text_columns: vec![],
370 });
371 }
372 };
373
374 if requested_exports.is_empty() {
375 sql_bail!(
376 "[internal error]: Postgres reference {} did not match any tables",
377 requested_references
378 .as_ref()
379 .unwrap()
380 .to_ast_string_simple()
381 );
382 }
383
384 super::validate_source_export_names(&requested_exports)?;
385
386 let table_oids: Vec<_> = requested_exports
387 .iter()
388 .map(|r| r.meta.postgres_desc().expect("is postgres").oid)
389 .collect();
390
391 validate_requested_references_privileges(config, client, &table_oids).await?;
392
393 let mut text_column_map = generate_text_columns(retrieved_references, &mut text_columns)?;
394
395 text_columns.sort();
397 text_columns.dedup();
398 let normalized_text_columns: Vec<_> = text_columns
399 .into_iter()
400 .map(WithOptionValue::UnresolvedItemName)
401 .collect();
402
403 let source_exports = requested_exports
404 .into_iter()
405 .map(|r| {
406 let desc = r.meta.postgres_desc().expect("known postgres");
407 (
408 r.name,
409 PurifiedSourceExport {
410 external_reference: r.external_reference,
411 details: PurifiedExportDetails::Postgres {
412 text_columns: text_column_map.remove(&desc.oid).map(|v| {
413 v.into_iter()
414 .map(|s| Ident::new(s).expect("validated above"))
415 .collect()
416 }),
417 table: desc.clone(),
418 },
419 },
420 )
421 })
422 .collect();
423
424 if !text_column_map.is_empty() {
425 let mut dangling_text_column_refs = vec![];
428 let all_references = retrieved_references.all_references();
429
430 for id in text_column_map.keys() {
431 let desc = all_references
432 .iter()
433 .find_map(|reference| {
434 let desc = reference.postgres_desc().expect("is postgres");
435 if desc.oid == *id { Some(desc) } else { None }
436 })
437 .expect("validated when generating text columns");
438
439 dangling_text_column_refs.push(PartialItemName {
440 database: None,
441 schema: Some(desc.namespace.clone()),
442 item: desc.name.clone(),
443 });
444 }
445
446 dangling_text_column_refs.sort();
447 Err(PgSourcePurificationError::DanglingTextColumns {
448 items: dangling_text_column_refs,
449 })?;
450 }
451
452 Ok(PurifiedSourceExports {
453 source_exports,
454 normalized_text_columns,
455 })
456}
457
458pub(crate) fn generate_column_casts(
459 scx: &StatementContext,
460 table: &PostgresTableDesc,
461 text_columns: &Vec<Ident>,
462) -> Result<Vec<(CastType, MirScalarExpr)>, PlanError> {
463 let mut cast_scx = scx.clone();
470 cast_scx.param_types = Default::default();
471 let cast_qcx = QueryContext::root(&cast_scx, QueryLifetime::Source);
472 let mut column_types = vec![];
473 for column in table.columns.iter() {
474 column_types.push(ColumnType {
475 nullable: column.nullable,
476 scalar_type: ScalarType::String,
477 });
478 }
479
480 let cast_ecx = ExprContext {
481 qcx: &cast_qcx,
482 name: "plan_postgres_source_cast",
483 scope: &Scope::empty(),
484 relation_type: &RelationType {
485 column_types,
486 keys: vec![],
487 },
488 allow_aggregates: false,
489 allow_subqueries: false,
490 allow_parameters: false,
491 allow_windows: false,
492 };
493
494 let text_columns = BTreeSet::from_iter(text_columns.iter().map(Ident::as_str));
495
496 let mut table_cast = vec![];
499 for (i, column) in table.columns.iter().enumerate() {
500 let (cast_type, ty) = if text_columns.contains(column.name.as_str()) {
501 (CastType::Text, mz_pgrepr::Type::Text)
507 } else {
508 match mz_pgrepr::Type::from_oid_and_typmod(column.type_oid, column.type_mod) {
509 Ok(t) => (CastType::Natural, t),
510 Err(_) => {
515 table_cast.push((
516 CastType::Natural,
517 HirScalarExpr::CallVariadic {
518 func: mz_expr::VariadicFunc::ErrorIfNull,
519 exprs: vec![
520 HirScalarExpr::literal_null(ScalarType::String),
521 HirScalarExpr::literal(
522 mz_repr::Datum::from(
523 format!("Unsupported type with OID {}", column.type_oid)
524 .as_str(),
525 ),
526 ScalarType::String,
527 ),
528 ],
529 }
530 .lower_uncorrelated()
531 .expect("no correlation"),
532 ));
533 continue;
534 }
535 }
536 };
537
538 let data_type = scx.resolve_type(ty)?;
539 let scalar_type = crate::plan::query::scalar_type_from_sql(scx, &data_type)?;
540
541 let col_expr = HirScalarExpr::Column(ColumnRef {
542 level: 0,
543 column: i,
544 });
545
546 let cast_expr = plan_cast(&cast_ecx, CastContext::Explicit, col_expr, &scalar_type)?;
547
548 let cast = if column.nullable {
549 cast_expr
550 } else {
551 HirScalarExpr::CallVariadic {
557 func: mz_expr::VariadicFunc::ErrorIfNull,
558 exprs: vec![
559 cast_expr,
560 HirScalarExpr::literal(
561 mz_repr::Datum::from(
562 format!(
563 "PG column {}.{}.{} contained NULL data, despite having NOT NULL constraint",
564 table.namespace.clone(),
565 table.name.clone(),
566 column.name.clone())
567 .as_str(),
568 ),
569 ScalarType::String,
570 ),
571 ],
572 }
573 };
574
575 let mir_cast = cast.lower_uncorrelated().map_err(|_e| {
581 tracing::info!(
582 "cannot ingest {:?} data from PG source because cast is correlated",
583 scalar_type
584 );
585
586 PlanError::TableContainsUningestableTypes {
587 name: table.name.to_string(),
588 type_: scx.humanize_scalar_type(&scalar_type, false),
589 column: column.name.to_string(),
590 }
591 })?;
592
593 table_cast.push((cast_type, mir_cast));
594 }
595 Ok(table_cast)
596}
597
598mod privileges {
599 use mz_postgres_util::{Config, PostgresError};
600
601 use super::*;
602 use crate::plan::PlanError;
603 use crate::pure::PgSourcePurificationError;
604
605 async fn check_schema_privileges(
606 config: &Config,
607 client: &Client,
608 table_oids: &[Oid],
609 ) -> Result<(), PlanError> {
610 let invalid_schema_privileges_rows = client
611 .query(
612 "
613 WITH distinct_namespace AS (
614 SELECT
615 DISTINCT n.oid, n.nspname AS schema_name
616 FROM unnest($1::OID[]) AS oids (oid)
617 JOIN pg_class AS c ON c.oid = oids.oid
618 JOIN pg_namespace AS n ON c.relnamespace = n.oid
619 )
620 SELECT d.schema_name
621 FROM distinct_namespace AS d
622 WHERE
623 NOT has_schema_privilege($2::TEXT, d.oid, 'usage')",
624 &[
625 &table_oids,
626 &config.get_user().expect("connection specifies user"),
627 ],
628 )
629 .await
630 .map_err(PostgresError::from)?;
631
632 let mut invalid_schema_privileges = invalid_schema_privileges_rows
633 .into_iter()
634 .map(|row| row.get("schema_name"))
635 .collect::<Vec<String>>();
636
637 if invalid_schema_privileges.is_empty() {
638 Ok(())
639 } else {
640 invalid_schema_privileges.sort();
641 Err(PgSourcePurificationError::UserLacksUsageOnSchemas {
642 user: config
643 .get_user()
644 .expect("connection specifies user")
645 .to_string(),
646 schemas: invalid_schema_privileges,
647 })?
648 }
649 }
650
651 pub async fn check_table_privileges(
662 config: &Config,
663 client: &Client,
664 table_oids: &[Oid],
665 ) -> Result<(), PlanError> {
666 check_schema_privileges(config, client, table_oids).await?;
667
668 let invalid_table_privileges_rows = client
669 .query(
670 "
671 SELECT
672 format('%I.%I', n.nspname, c.relname) AS schema_qualified_table_name
673 FROM unnest($1::oid[]) AS oids (oid)
674 JOIN
675 pg_class c ON c.oid = oids.oid
676 JOIN
677 pg_namespace n ON c.relnamespace = n.oid
678 WHERE NOT has_table_privilege($2::text, c.oid, 'select')",
679 &[
680 &table_oids,
681 &config.get_user().expect("connection specifies user"),
682 ],
683 )
684 .await
685 .map_err(PostgresError::from)?;
686
687 let mut invalid_table_privileges = invalid_table_privileges_rows
688 .into_iter()
689 .map(|row| row.get("schema_qualified_table_name"))
690 .collect::<Vec<String>>();
691
692 if invalid_table_privileges.is_empty() {
693 Ok(())
694 } else {
695 invalid_table_privileges.sort();
696 Err(PgSourcePurificationError::UserLacksSelectOnTables {
697 user: config
698 .get_user()
699 .expect("connection must specify user")
700 .to_string(),
701 tables: invalid_table_privileges,
702 })?
703 }
704 }
705}
706
707mod replica_identity {
708 use mz_postgres_util::PostgresError;
709
710 use super::*;
711 use crate::plan::PlanError;
712 use crate::pure::PgSourcePurificationError;
713
714 pub async fn check_replica_identity_full(
716 client: &Client,
717 table_oids: &[Oid],
718 ) -> Result<(), PlanError> {
719 let invalid_replica_identity_rows = client
720 .query(
721 "
722 SELECT
723 format('%I.%I', n.nspname, c.relname) AS schema_qualified_table_name
724 FROM unnest($1::oid[]) AS oids (oid)
725 JOIN
726 pg_class c ON c.oid = oids.oid
727 JOIN
728 pg_namespace n ON c.relnamespace = n.oid
729 WHERE relreplident != 'f' OR relreplident IS NULL;",
730 &[&table_oids],
731 )
732 .await
733 .map_err(PostgresError::from)?;
734
735 let mut invalid_replica_identity = invalid_replica_identity_rows
736 .into_iter()
737 .map(|row| row.get("schema_qualified_table_name"))
738 .collect::<Vec<String>>();
739
740 if invalid_replica_identity.is_empty() {
741 Ok(())
742 } else {
743 invalid_replica_identity.sort();
744 Err(PgSourcePurificationError::NotTablesWReplicaIdentityFull {
745 items: invalid_replica_identity,
746 })?
747 }
748 }
749}