1use std::collections::{BTreeMap, BTreeSet};
13use std::sync::Arc;
14
15use mz_expr::MirScalarExpr;
16use mz_postgres_util::desc::PostgresTableDesc;
17use mz_proto::RustType;
18use mz_repr::{SqlColumnType, SqlRelationType, SqlScalarType};
19use mz_sql_parser::ast::display::AstDisplay;
20use mz_sql_parser::ast::{
21 ColumnDef, CreateSubsourceOption, CreateSubsourceOptionName, CreateSubsourceStatement,
22 ExternalReferences, Ident, TableConstraint, UnresolvedItemName, Value, WithOptionValue,
23};
24use mz_storage_types::sources::SourceExportStatementDetails;
25use mz_storage_types::sources::postgres::CastType;
26use prost::Message;
27use tokio_postgres::Client;
28use tokio_postgres::types::Oid;
29
30use crate::names::{Aug, ResolvedItemName};
31use crate::normalize;
32use crate::plan::hir::ColumnRef;
33use crate::plan::typeconv::{CastContext, plan_cast};
34use crate::plan::{
35 ExprContext, HirScalarExpr, PlanError, QueryContext, QueryLifetime, Scope, StatementContext,
36};
37
38use super::error::PgSourcePurificationError;
39use super::references::RetrievedSourceReferences;
40use super::{PartialItemName, PurifiedExportDetails, PurifiedSourceExport, SourceReferencePolicy};
41
42pub(super) async fn validate_requested_references_privileges(
46 client: &Client,
47 table_oids: &[Oid],
48) -> Result<(), PlanError> {
49 privileges::check_table_privileges(client, table_oids).await?;
50 replica_identity::check_replica_identity_full(client, table_oids).await?;
51
52 Ok(())
53}
54
55pub(super) fn generate_text_columns(
61 retrieved_references: &RetrievedSourceReferences,
62 text_columns: &mut [UnresolvedItemName],
63) -> Result<BTreeMap<u32, BTreeSet<String>>, PlanError> {
64 let mut text_cols_dict: BTreeMap<u32, BTreeSet<String>> = BTreeMap::new();
65
66 for name in text_columns {
67 let (qual, col) = match name.0.split_last().expect("must have at least one element") {
68 (col, qual) if qual.is_empty() => {
69 return Err(PlanError::InvalidOptionValue {
70 option_name: "TEXT COLUMNS".to_string(),
71 err: Box::new(PlanError::UnderqualifiedColumnName(
72 col.as_str().to_string(),
73 )),
74 });
75 }
76 (col, qual) => (qual.to_vec(), col.as_str().to_string()),
77 };
78
79 let resolved_reference = retrieved_references.resolve_name(&qual)?;
80 let mut fully_qualified_name =
81 resolved_reference
82 .external_reference()
83 .map_err(|e| PlanError::InvalidOptionValue {
84 option_name: "TEXT COLUMNS".to_string(),
85 err: Box::new(e.into()),
86 })?;
87
88 let desc = resolved_reference
89 .postgres_desc()
90 .expect("known to be postgres");
91
92 if !desc.columns.iter().any(|column| column.name == col) {
93 let column = mz_repr::ColumnName::from(col);
94 let similar = desc
95 .columns
96 .iter()
97 .filter_map(|c| {
98 let c_name = mz_repr::ColumnName::from(c.name.clone());
99 c_name.is_similar(&column).then_some(c_name)
100 })
101 .collect();
102 return Err(PlanError::InvalidOptionValue {
103 option_name: "TEXT COLUMNS".to_string(),
104 err: Box::new(PlanError::UnknownColumn {
105 table: Some(
106 normalize::unresolved_item_name(fully_qualified_name)
107 .expect("known to be of valid len"),
108 ),
109 column,
110 similar,
111 }),
112 });
113 }
114
115 let col_ident = Ident::new(col.as_str().to_string())?;
117 fully_qualified_name.0.push(col_ident);
118 *name = fully_qualified_name;
119
120 let new = text_cols_dict
121 .entry(desc.oid)
122 .or_default()
123 .insert(col.as_str().to_string());
124
125 if !new {
126 return Err(PlanError::InvalidOptionValue {
127 option_name: "TEXT COLUMNS".to_string(),
128 err: Box::new(PlanError::UnexpectedDuplicateReference { name: name.clone() }),
129 });
130 }
131 }
132
133 Ok(text_cols_dict)
134}
135
136pub fn generate_create_subsource_statements(
137 scx: &StatementContext,
138 source_name: ResolvedItemName,
139 requested_subsources: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
140) -> Result<Vec<CreateSubsourceStatement<Aug>>, PlanError> {
141 let mut unsupported_cols = vec![];
143
144 let mut subsources = Vec::with_capacity(requested_subsources.len());
146
147 for (subsource_name, purified_export) in requested_subsources {
148 let PostgresExportStatementValues {
149 columns,
150 constraints,
151 text_columns,
152 details,
153 external_reference,
154 } = generate_source_export_statement_values(scx, purified_export, &mut unsupported_cols)?;
155
156 let mut with_options = vec![
157 CreateSubsourceOption {
158 name: CreateSubsourceOptionName::ExternalReference,
159 value: Some(WithOptionValue::UnresolvedItemName(external_reference)),
160 },
161 CreateSubsourceOption {
162 name: CreateSubsourceOptionName::Details,
163 value: Some(WithOptionValue::Value(Value::String(hex::encode(
164 details.into_proto().encode_to_vec(),
165 )))),
166 },
167 ];
168
169 if let Some(text_columns) = text_columns {
170 with_options.push(CreateSubsourceOption {
171 name: CreateSubsourceOptionName::TextColumns,
172 value: Some(WithOptionValue::Sequence(text_columns)),
173 });
174 }
175
176 let subsource = CreateSubsourceStatement {
178 name: subsource_name,
179 columns,
180 of_source: Some(source_name.clone()),
183 constraints,
192 if_not_exists: false,
193 with_options,
194 };
195 subsources.push(subsource);
196 }
197
198 if !unsupported_cols.is_empty() {
199 unsupported_cols.sort();
200 Err(PgSourcePurificationError::UnrecognizedTypes {
201 cols: unsupported_cols,
202 })?;
203 }
204
205 Ok(subsources)
206}
207
208pub(super) struct PostgresExportStatementValues {
209 pub(super) columns: Vec<ColumnDef<Aug>>,
210 pub(super) constraints: Vec<TableConstraint<Aug>>,
211 pub(super) text_columns: Option<Vec<WithOptionValue<Aug>>>,
212 pub(super) details: SourceExportStatementDetails,
213 pub(super) external_reference: UnresolvedItemName,
214}
215
216pub(super) fn generate_source_export_statement_values(
217 scx: &StatementContext,
218 purified_export: PurifiedSourceExport,
219 unsupported_cols: &mut Vec<(String, mz_repr::adt::system::Oid)>,
220) -> Result<PostgresExportStatementValues, PlanError> {
221 let (text_columns, table) = match purified_export.details {
222 PurifiedExportDetails::Postgres {
223 text_columns,
224 table,
225 } => (text_columns, table),
226 _ => unreachable!("purified export details must be postgres"),
227 };
228
229 let text_column_set = text_columns
230 .as_ref()
231 .map(|v| BTreeSet::from_iter(v.iter().map(Ident::as_str)));
232
233 let mut columns = vec![];
235 for c in table.columns.iter() {
236 let name = Ident::new(c.name.clone())?;
237
238 let ty = match text_column_set {
239 Some(ref names) if names.contains(c.name.as_str()) => mz_pgrepr::Type::Text,
240 _ => match mz_pgrepr::Type::from_oid_and_typmod(c.type_oid, c.type_mod) {
241 Ok(t) => t,
242 Err(_) => {
243 let mut full_name = purified_export.external_reference.0.clone();
244 full_name.push(name);
245 unsupported_cols.push((
246 UnresolvedItemName(full_name).to_ast_string_simple(),
247 mz_repr::adt::system::Oid(c.type_oid),
248 ));
249 continue;
250 }
251 },
252 };
253
254 let data_type = scx.resolve_type(ty)?;
255 let mut options = vec![];
256
257 if !c.nullable {
258 options.push(mz_sql_parser::ast::ColumnOptionDef {
259 name: None,
260 option: mz_sql_parser::ast::ColumnOption::NotNull,
261 });
262 }
263
264 columns.push(ColumnDef {
265 name,
266 data_type,
267 collation: None,
268 options,
269 });
270 }
271
272 let mut constraints = vec![];
273 for key in table.keys.clone() {
274 let mut key_columns = vec![];
275
276 for col_num in key.cols {
277 let ident = Ident::new(
278 table
279 .columns
280 .iter()
281 .find(|col| col.col_num == col_num)
282 .expect("key exists as column")
283 .name
284 .clone(),
285 )?;
286 key_columns.push(ident);
287 }
288
289 let constraint = mz_sql_parser::ast::TableConstraint::Unique {
290 name: Some(Ident::new(key.name)?),
291 columns: key_columns,
292 is_primary: key.is_primary,
293 nulls_not_distinct: key.nulls_not_distinct,
294 };
295
296 if key.is_primary {
298 constraints.insert(0, constraint);
299 } else {
300 constraints.push(constraint);
301 }
302 }
303 let details = SourceExportStatementDetails::Postgres { table };
304
305 let text_columns = text_columns.map(|mut columns| {
306 columns.sort();
307 columns
308 .into_iter()
309 .map(WithOptionValue::Ident::<Aug>)
310 .collect()
311 });
312
313 Ok(PostgresExportStatementValues {
314 columns,
315 constraints,
316 text_columns,
317 details,
318 external_reference: purified_export.external_reference,
319 })
320}
321
322pub(super) struct PurifiedSourceExports {
323 pub(super) source_exports: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
324 pub(super) normalized_text_columns: Vec<WithOptionValue<Aug>>,
331}
332
333pub(super) async fn purify_source_exports(
337 client: &Client,
338 retrieved_references: &RetrievedSourceReferences,
339 requested_references: &Option<ExternalReferences>,
340 mut text_columns: Vec<UnresolvedItemName>,
341 unresolved_source_name: &UnresolvedItemName,
342 reference_policy: &SourceReferencePolicy,
343) -> Result<PurifiedSourceExports, PlanError> {
344 let requested_exports = match requested_references.as_ref() {
345 Some(requested) if matches!(reference_policy, SourceReferencePolicy::NotAllowed) => {
346 Err(PlanError::UseTablesForSources(requested.to_string()))?
347 }
348 Some(requested) => retrieved_references
349 .requested_source_exports(Some(requested), unresolved_source_name)?,
350 None => {
351 if matches!(reference_policy, SourceReferencePolicy::Required) {
352 Err(PgSourcePurificationError::RequiresExternalReferences)?
353 }
354
355 if !text_columns.is_empty() {
358 Err(
359 PgSourcePurificationError::UnnecessaryOptionsWithoutReferences(
360 "TEXT COLUMNS".to_string(),
361 ),
362 )?
363 }
364
365 return Ok(PurifiedSourceExports {
366 source_exports: BTreeMap::new(),
367 normalized_text_columns: vec![],
368 });
369 }
370 };
371
372 if requested_exports.is_empty() {
373 sql_bail!(
374 "[internal error]: Postgres reference {} did not match any tables",
375 requested_references
376 .as_ref()
377 .unwrap()
378 .to_ast_string_simple()
379 );
380 }
381
382 super::validate_source_export_names(&requested_exports)?;
383
384 let table_oids: Vec<_> = requested_exports
385 .iter()
386 .map(|r| r.meta.postgres_desc().expect("is postgres").oid)
387 .collect();
388
389 validate_requested_references_privileges(client, &table_oids).await?;
390
391 let mut text_column_map = generate_text_columns(retrieved_references, &mut text_columns)?;
392
393 text_columns.sort();
395 text_columns.dedup();
396 let normalized_text_columns: Vec<_> = text_columns
397 .into_iter()
398 .map(WithOptionValue::UnresolvedItemName)
399 .collect();
400
401 let source_exports = requested_exports
402 .into_iter()
403 .map(|r| {
404 let desc = r.meta.postgres_desc().expect("known postgres");
405 (
406 r.name,
407 PurifiedSourceExport {
408 external_reference: r.external_reference,
409 details: PurifiedExportDetails::Postgres {
410 text_columns: text_column_map.remove(&desc.oid).map(|v| {
411 v.into_iter()
412 .map(|s| Ident::new(s).expect("validated above"))
413 .collect()
414 }),
415 table: desc.clone(),
416 },
417 },
418 )
419 })
420 .collect();
421
422 if !text_column_map.is_empty() {
423 let mut dangling_text_column_refs = vec![];
426 let all_references = retrieved_references.all_references();
427
428 for id in text_column_map.keys() {
429 let desc = all_references
430 .iter()
431 .find_map(|reference| {
432 let desc = reference.postgres_desc().expect("is postgres");
433 if desc.oid == *id { Some(desc) } else { None }
434 })
435 .expect("validated when generating text columns");
436
437 dangling_text_column_refs.push(PartialItemName {
438 database: None,
439 schema: Some(desc.namespace.clone()),
440 item: desc.name.clone(),
441 });
442 }
443
444 dangling_text_column_refs.sort();
445 Err(PgSourcePurificationError::DanglingTextColumns {
446 items: dangling_text_column_refs,
447 })?;
448 }
449
450 Ok(PurifiedSourceExports {
451 source_exports,
452 normalized_text_columns,
453 })
454}
455
456pub(crate) fn generate_column_casts(
457 scx: &StatementContext,
458 table: &PostgresTableDesc,
459 text_columns: &Vec<Ident>,
460) -> Result<Vec<(CastType, MirScalarExpr)>, PlanError> {
461 let mut cast_scx = scx.clone();
468 cast_scx.param_types = Default::default();
469 let cast_qcx = QueryContext::root(&cast_scx, QueryLifetime::Source);
470 let mut column_types = vec![];
471 for column in table.columns.iter() {
472 column_types.push(SqlColumnType {
473 nullable: column.nullable,
474 scalar_type: SqlScalarType::String,
475 });
476 }
477
478 let cast_ecx = ExprContext {
479 qcx: &cast_qcx,
480 name: "plan_postgres_source_cast",
481 scope: &Scope::empty(),
482 relation_type: &SqlRelationType {
483 column_types,
484 keys: vec![],
485 },
486 allow_aggregates: false,
487 allow_subqueries: false,
488 allow_parameters: false,
489 allow_windows: false,
490 };
491
492 let text_columns = BTreeSet::from_iter(text_columns.iter().map(Ident::as_str));
493
494 let mut table_cast = vec![];
497 for (i, column) in table.columns.iter().enumerate() {
498 let (cast_type, ty) = if text_columns.contains(column.name.as_str()) {
499 (CastType::Text, mz_pgrepr::Type::Text)
505 } else {
506 match mz_pgrepr::Type::from_oid_and_typmod(column.type_oid, column.type_mod) {
507 Ok(t) => (CastType::Natural, t),
508 Err(_) => {
513 table_cast.push((
514 CastType::Natural,
515 HirScalarExpr::call_variadic(
516 mz_expr::VariadicFunc::ErrorIfNull,
517 vec![
518 HirScalarExpr::literal_null(SqlScalarType::String),
519 HirScalarExpr::literal(
520 mz_repr::Datum::from(
521 format!("Unsupported type with OID {}", column.type_oid)
522 .as_str(),
523 ),
524 SqlScalarType::String,
525 ),
526 ],
527 )
528 .lower_uncorrelated()
529 .expect("no correlation"),
530 ));
531 continue;
532 }
533 }
534 };
535
536 let data_type = scx.resolve_type(ty)?;
537 let scalar_type = crate::plan::query::scalar_type_from_sql(scx, &data_type)?;
538
539 let col_expr = HirScalarExpr::named_column(
540 ColumnRef {
541 level: 0,
542 column: i,
543 },
544 Arc::from(column.name.as_str()),
545 );
546
547 let cast_expr = plan_cast(&cast_ecx, CastContext::Explicit, col_expr, &scalar_type)?;
548
549 let cast = if column.nullable {
550 cast_expr
551 } else {
552 HirScalarExpr::call_variadic(mz_expr::VariadicFunc::ErrorIfNull, vec![
558 cast_expr,
559 HirScalarExpr::literal(
560 mz_repr::Datum::from(
561 format!(
562 "PG column {}.{}.{} contained NULL data, despite having NOT NULL constraint",
563 table.namespace.clone(),
564 table.name.clone(),
565 column.name.clone())
566 .as_str(),
567 ),
568 SqlScalarType::String,
569 ),
570 ],
571 )
572 };
573
574 let mir_cast = cast.lower_uncorrelated().map_err(|_e| {
580 tracing::info!(
581 "cannot ingest {:?} data from PG source because cast is correlated",
582 scalar_type
583 );
584
585 PlanError::TableContainsUningestableTypes {
586 name: table.name.to_string(),
587 type_: scx.humanize_scalar_type(&scalar_type, false),
588 column: column.name.to_string(),
589 }
590 })?;
591
592 table_cast.push((cast_type, mir_cast));
593 }
594 Ok(table_cast)
595}
596
597mod privileges {
598 use mz_postgres_util::PostgresError;
599
600 use super::*;
601 use crate::plan::PlanError;
602 use crate::pure::PgSourcePurificationError;
603
604 async fn check_schema_privileges(client: &Client, table_oids: &[Oid]) -> Result<(), PlanError> {
605 let invalid_schema_privileges_rows = client
606 .query(
607 "
608 WITH distinct_namespace AS (
609 SELECT
610 DISTINCT n.oid, n.nspname AS schema_name
611 FROM unnest($1::OID[]) AS oids (oid)
612 JOIN pg_class AS c ON c.oid = oids.oid
613 JOIN pg_namespace AS n ON c.relnamespace = n.oid
614 )
615 SELECT d.schema_name
616 FROM distinct_namespace AS d
617 WHERE
618 NOT has_schema_privilege(CURRENT_USER::TEXT, d.oid, 'usage')",
619 &[&table_oids],
620 )
621 .await
622 .map_err(PostgresError::from)?;
623
624 let mut invalid_schema_privileges = invalid_schema_privileges_rows
625 .into_iter()
626 .map(|row| row.get("schema_name"))
627 .collect::<Vec<String>>();
628
629 if invalid_schema_privileges.is_empty() {
630 Ok(())
631 } else {
632 invalid_schema_privileges.sort();
633 Err(PgSourcePurificationError::UserLacksUsageOnSchemas {
634 schemas: invalid_schema_privileges,
635 })?
636 }
637 }
638
639 pub async fn check_table_privileges(
650 client: &Client,
651 table_oids: &[Oid],
652 ) -> Result<(), PlanError> {
653 check_schema_privileges(client, table_oids).await?;
654
655 let invalid_table_privileges_rows = client
656 .query(
657 "
658 SELECT
659 format('%I.%I', n.nspname, c.relname) AS schema_qualified_table_name
660 FROM unnest($1::oid[]) AS oids (oid)
661 JOIN
662 pg_class c ON c.oid = oids.oid
663 JOIN
664 pg_namespace n ON c.relnamespace = n.oid
665 WHERE NOT has_table_privilege(CURRENT_USER::text, c.oid, 'select')",
666 &[&table_oids],
667 )
668 .await
669 .map_err(PostgresError::from)?;
670
671 let mut invalid_table_privileges = invalid_table_privileges_rows
672 .into_iter()
673 .map(|row| row.get("schema_qualified_table_name"))
674 .collect::<Vec<String>>();
675
676 if invalid_table_privileges.is_empty() {
677 Ok(())
678 } else {
679 invalid_table_privileges.sort();
680 Err(PgSourcePurificationError::UserLacksSelectOnTables {
681 tables: invalid_table_privileges,
682 })?
683 }
684 }
685}
686
687mod replica_identity {
688 use mz_postgres_util::PostgresError;
689
690 use super::*;
691 use crate::plan::PlanError;
692 use crate::pure::PgSourcePurificationError;
693
694 pub async fn check_replica_identity_full(
696 client: &Client,
697 table_oids: &[Oid],
698 ) -> Result<(), PlanError> {
699 let invalid_replica_identity_rows = client
700 .query(
701 "
702 SELECT
703 format('%I.%I', n.nspname, c.relname) AS schema_qualified_table_name
704 FROM unnest($1::oid[]) AS oids (oid)
705 JOIN
706 pg_class c ON c.oid = oids.oid
707 JOIN
708 pg_namespace n ON c.relnamespace = n.oid
709 WHERE relreplident != 'f' OR relreplident IS NULL;",
710 &[&table_oids],
711 )
712 .await
713 .map_err(PostgresError::from)?;
714
715 let mut invalid_replica_identity = invalid_replica_identity_rows
716 .into_iter()
717 .map(|row| row.get("schema_qualified_table_name"))
718 .collect::<Vec<String>>();
719
720 if invalid_replica_identity.is_empty() {
721 Ok(())
722 } else {
723 invalid_replica_identity.sort();
724 Err(PgSourcePurificationError::NotTablesWReplicaIdentityFull {
725 items: invalid_replica_identity,
726 })?
727 }
728 }
729}