1use std::collections::{BTreeMap, BTreeSet};
13
14use mz_postgres_util::desc::PostgresTableDesc;
15use mz_proto::RustType;
16use mz_repr::{Datum, ReprColumnType, ReprScalarType, Row, SqlScalarType};
17use mz_sql_parser::ast::display::AstDisplay;
18use mz_sql_parser::ast::{
19 ColumnDef, CreateSubsourceOption, CreateSubsourceOptionName, CreateSubsourceStatement,
20 ExternalReferences, Ident, PgConfigOptionName, TableConstraint, UnresolvedItemName, Value,
21 WithOptionValue,
22};
23use mz_storage_types::sources::SourceExportStatementDetails;
24use mz_storage_types::sources::casts::{CastFunc, StorageScalarExpr};
25use mz_storage_types::sources::postgres::CastType;
26use prost::Message;
27use tokio_postgres::Client;
28use tokio_postgres::types::Oid;
29
30use crate::names::{Aug, ResolvedItemName};
31use crate::normalize;
32use crate::plan::{PlanError, StatementContext};
33
34use super::error::PgSourcePurificationError;
35use super::references::RetrievedSourceReferences;
36use super::{PartialItemName, PurifiedExportDetails, PurifiedSourceExport, SourceReferencePolicy};
37
38pub(super) async fn validate_requested_references_privileges(
42 client: &Client,
43 table_oids: &[Oid],
44) -> Result<(), PlanError> {
45 privileges::check_table_privileges(client, table_oids).await?;
46 privileges::check_rls_privileges(client, table_oids).await?;
47 replica_identity::check_replica_identity_full(client, table_oids).await?;
48
49 Ok(())
50}
51
52pub(super) fn map_column_refs(
57 retrieved_references: &RetrievedSourceReferences,
58 columns: &mut [UnresolvedItemName],
59 option_type: PgConfigOptionName,
60) -> Result<BTreeMap<u32, BTreeSet<String>>, PlanError> {
61 let mut cols_map: BTreeMap<u32, BTreeSet<String>> = BTreeMap::new();
62
63 for name in columns {
64 let (qual, col) = match name.0.split_last().expect("must have at least one element") {
65 (col, []) => {
66 return Err(PlanError::InvalidOptionValue {
67 option_name: option_type.to_ast_string_simple(),
68 err: Box::new(PlanError::UnderqualifiedColumnName(
69 col.as_str().to_string(),
70 )),
71 });
72 }
73 (col, qual) => (qual.to_vec(), col.as_str().to_string()),
74 };
75
76 let resolved_reference = retrieved_references.resolve_name(&qual)?;
77 let mut fully_qualified_name =
78 resolved_reference
79 .external_reference()
80 .map_err(|e| PlanError::InvalidOptionValue {
81 option_name: option_type.to_ast_string_simple(),
82 err: Box::new(e.into()),
83 })?;
84
85 let desc = resolved_reference
86 .postgres_desc()
87 .expect("known to be postgres");
88
89 if !desc.columns.iter().any(|column| column.name == col) {
90 let column = mz_repr::ColumnName::from(col);
91 let similar = desc
92 .columns
93 .iter()
94 .filter_map(|c| {
95 let c_name = mz_repr::ColumnName::from(c.name.clone());
96 c_name.is_similar(&column).then_some(c_name)
97 })
98 .collect();
99 return Err(PlanError::InvalidOptionValue {
100 option_name: option_type.to_ast_string_simple(),
101 err: Box::new(PlanError::UnknownColumn {
102 table: Some(
103 normalize::unresolved_item_name(fully_qualified_name)
104 .expect("known to be of valid len"),
105 ),
106 column,
107 similar,
108 }),
109 });
110 }
111
112 let col_ident = Ident::new(col.as_str().to_string())?;
114 fully_qualified_name.0.push(col_ident);
115 *name = fully_qualified_name;
116
117 let new = cols_map
118 .entry(desc.oid)
119 .or_default()
120 .insert(col.as_str().to_string());
121
122 if !new {
123 return Err(PlanError::InvalidOptionValue {
124 option_name: option_type.to_ast_string_simple(),
125 err: Box::new(PlanError::UnexpectedDuplicateReference { name: name.clone() }),
126 });
127 }
128 }
129
130 Ok(cols_map)
131}
132
133pub fn generate_create_subsource_statements(
134 scx: &StatementContext,
135 source_name: ResolvedItemName,
136 requested_subsources: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
137) -> Result<Vec<CreateSubsourceStatement<Aug>>, PlanError> {
138 let mut unsupported_cols = vec![];
140
141 let mut subsources = Vec::with_capacity(requested_subsources.len());
143
144 for (subsource_name, purified_export) in requested_subsources {
145 let PostgresExportStatementValues {
146 columns,
147 constraints,
148 text_columns,
149 exclude_columns,
150 details,
151 external_reference,
152 } = generate_source_export_statement_values(scx, purified_export, &mut unsupported_cols)?;
153
154 let mut with_options = vec![
155 CreateSubsourceOption {
156 name: CreateSubsourceOptionName::ExternalReference,
157 value: Some(WithOptionValue::UnresolvedItemName(external_reference)),
158 },
159 CreateSubsourceOption {
160 name: CreateSubsourceOptionName::Details,
161 value: Some(WithOptionValue::Value(Value::String(hex::encode(
162 details.into_proto().encode_to_vec(),
163 )))),
164 },
165 ];
166
167 if let Some(text_columns) = text_columns {
168 with_options.push(CreateSubsourceOption {
169 name: CreateSubsourceOptionName::TextColumns,
170 value: Some(WithOptionValue::Sequence(text_columns)),
171 });
172 }
173
174 if let Some(exclude_columns) = exclude_columns {
175 with_options.push(CreateSubsourceOption {
176 name: CreateSubsourceOptionName::ExcludeColumns,
177 value: Some(WithOptionValue::Sequence(exclude_columns)),
178 });
179 }
180
181 let subsource = CreateSubsourceStatement {
183 name: subsource_name,
184 columns,
185 of_source: Some(source_name.clone()),
188 constraints,
197 if_not_exists: false,
198 with_options,
199 };
200 subsources.push(subsource);
201 }
202
203 if !unsupported_cols.is_empty() {
204 unsupported_cols.sort();
205 Err(PgSourcePurificationError::UnrecognizedTypes {
206 cols: unsupported_cols,
207 })?;
208 }
209
210 Ok(subsources)
211}
212
213pub(super) struct PostgresExportStatementValues {
214 pub(super) columns: Vec<ColumnDef<Aug>>,
215 pub(super) constraints: Vec<TableConstraint<Aug>>,
216 pub(super) text_columns: Option<Vec<WithOptionValue<Aug>>>,
217 pub(super) exclude_columns: Option<Vec<WithOptionValue<Aug>>>,
218 pub(super) details: SourceExportStatementDetails,
219 pub(super) external_reference: UnresolvedItemName,
220}
221
222pub(super) fn generate_source_export_statement_values(
223 scx: &StatementContext,
224 purified_export: PurifiedSourceExport,
225 unsupported_cols: &mut Vec<(String, mz_repr::adt::system::Oid)>,
226) -> Result<PostgresExportStatementValues, PlanError> {
227 let PurifiedExportDetails::Postgres {
228 table,
229 text_columns,
230 exclude_columns,
231 } = purified_export.details
232 else {
233 bail_internal!("purified export details must be postgres");
234 };
235
236 let text_column_set = BTreeSet::from_iter(text_columns.iter().flatten().map(Ident::as_str));
237 let exclude_column_set =
238 BTreeSet::from_iter(exclude_columns.iter().flatten().map(Ident::as_str));
239
240 let mut columns = vec![];
242 for c in table.columns.iter() {
243 let name = Ident::new(c.name.clone())?;
244
245 if exclude_column_set.contains(c.name.as_str()) {
246 continue;
247 }
248
249 let ty = if text_column_set.contains(c.name.as_str()) {
250 mz_pgrepr::Type::Text
251 } else {
252 match mz_pgrepr::Type::from_oid_and_typmod(c.type_oid, c.type_mod) {
253 Ok(t) => t,
254 Err(_) => {
255 let mut full_name = purified_export.external_reference.0.clone();
256 full_name.push(name);
257 unsupported_cols.push((
258 UnresolvedItemName(full_name).to_ast_string_simple(),
259 mz_repr::adt::system::Oid(c.type_oid),
260 ));
261 continue;
262 }
263 }
264 };
265
266 let data_type = scx.resolve_type(ty)?;
267 let mut options = vec![];
268
269 if !c.nullable {
270 options.push(mz_sql_parser::ast::ColumnOptionDef {
271 name: None,
272 option: mz_sql_parser::ast::ColumnOption::NotNull,
273 });
274 }
275
276 columns.push(ColumnDef {
277 name,
278 data_type,
279 collation: None,
280 options,
281 });
282 }
283
284 let mut constraints = vec![];
285 for key in table.keys.clone() {
286 let mut key_columns = vec![];
287 let mut all_key_cols_included = true;
288
289 for col_num in key.cols {
290 match table.columns.iter().find(|col| col.col_num == col_num) {
291 Some(col) => {
292 let ident = Ident::new(col.name.clone())?;
293 key_columns.push(ident);
294 }
295 None => {
296 all_key_cols_included = false;
297 break;
298 }
299 }
300 }
301 if !all_key_cols_included {
302 continue;
303 }
304
305 let constraint = mz_sql_parser::ast::TableConstraint::Unique {
306 name: Some(Ident::new(key.name)?),
307 columns: key_columns,
308 is_primary: key.is_primary,
309 nulls_not_distinct: key.nulls_not_distinct,
310 };
311
312 if key.is_primary {
314 constraints.insert(0, constraint);
315 } else {
316 constraints.push(constraint);
317 }
318 }
319 let details = SourceExportStatementDetails::Postgres { table };
320
321 let text_columns = text_columns.map(|mut columns| {
322 columns.sort();
323 columns
324 .into_iter()
325 .map(WithOptionValue::Ident::<Aug>)
326 .collect()
327 });
328
329 let exclude_columns = exclude_columns.map(|mut columns| {
330 columns.sort();
331 columns
332 .into_iter()
333 .map(WithOptionValue::Ident::<Aug>)
334 .collect()
335 });
336
337 Ok(PostgresExportStatementValues {
338 columns,
339 constraints,
340 text_columns,
341 exclude_columns,
342 details,
343 external_reference: purified_export.external_reference,
344 })
345}
346
347pub(super) struct PurifiedSourceExports {
348 pub(super) source_exports: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
349 pub(super) normalized_text_columns: Vec<WithOptionValue<Aug>>,
356}
357
358pub(super) async fn purify_source_exports(
362 client: &Client,
363 retrieved_references: &RetrievedSourceReferences,
364 requested_references: &Option<ExternalReferences>,
365 mut text_columns: Vec<UnresolvedItemName>,
366 mut exclude_columns: Vec<UnresolvedItemName>,
367 unresolved_source_name: &UnresolvedItemName,
368 reference_policy: &SourceReferencePolicy,
369) -> Result<PurifiedSourceExports, PlanError> {
370 let requested_exports = match requested_references.as_ref() {
371 Some(requested) if matches!(reference_policy, SourceReferencePolicy::NotAllowed) => {
372 Err(PlanError::UseTablesForSources(requested.to_string()))?
373 }
374 Some(requested) => retrieved_references
375 .requested_source_exports(Some(requested), unresolved_source_name)?,
376 None => {
377 if matches!(reference_policy, SourceReferencePolicy::Required) {
378 Err(PgSourcePurificationError::RequiresExternalReferences)?
379 }
380
381 if !text_columns.is_empty() {
384 Err(
385 PgSourcePurificationError::UnnecessaryOptionsWithoutReferences(
386 "TEXT COLUMNS".to_string(),
387 ),
388 )?
389 }
390
391 if !exclude_columns.is_empty() {
394 Err(
395 PgSourcePurificationError::UnnecessaryOptionsWithoutReferences(
396 "EXCLUDE COLUMNS".to_string(),
397 ),
398 )?
399 }
400
401 return Ok(PurifiedSourceExports {
402 source_exports: BTreeMap::new(),
403 normalized_text_columns: vec![],
404 });
405 }
406 };
407
408 if requested_exports.is_empty() {
409 sql_bail!(
410 "[internal error]: Postgres reference {} did not match any tables",
411 requested_references
412 .as_ref()
413 .unwrap()
414 .to_ast_string_simple()
415 );
416 }
417
418 super::validate_source_export_names(&requested_exports)?;
419
420 let table_oids: Vec<_> = requested_exports
421 .iter()
422 .map(|r| r.meta.postgres_desc().expect("is postgres").oid)
423 .collect();
424
425 validate_requested_references_privileges(client, &table_oids).await?;
426
427 let mut text_column_map = map_column_refs(
428 retrieved_references,
429 &mut text_columns,
430 PgConfigOptionName::TextColumns,
431 )?;
432 let mut exclude_column_map = map_column_refs(
433 retrieved_references,
434 &mut exclude_columns,
435 PgConfigOptionName::ExcludeColumns,
436 )?;
437
438 text_columns.sort();
440 text_columns.dedup();
441 let normalized_text_columns: Vec<_> = text_columns
442 .into_iter()
443 .map(WithOptionValue::UnresolvedItemName)
444 .collect();
445
446 let source_exports = requested_exports
447 .into_iter()
448 .map(|r| {
449 let mut desc = r.meta.postgres_desc().expect("known postgres").clone();
450 let text_columns = text_column_map.remove(&desc.oid);
451 let exclude_columns = exclude_column_map.remove(&desc.oid);
452
453 if let Some(exclude_cols) = &exclude_columns {
454 desc.columns.retain(|c| !exclude_cols.contains(&c.name));
455 }
456
457 if let (Some(text_cols), Some(exclude_cols)) = (&text_columns, &exclude_columns) {
458 let intersection: Vec<_> = text_cols.intersection(exclude_cols).collect();
459 if !intersection.is_empty() {
460 return Err(PgSourcePurificationError::DuplicatedColumnNames(
461 intersection.iter().map(|s| (*s).to_string()).collect(),
462 ));
463 }
464 }
465 Ok((
466 r.name,
467 PurifiedSourceExport {
468 external_reference: r.external_reference,
469 details: PurifiedExportDetails::Postgres {
470 text_columns: text_columns.map(|v| {
471 v.into_iter()
472 .map(|s| Ident::new(s).expect("validated above"))
473 .collect()
474 }),
475 exclude_columns: exclude_columns.map(|v| {
476 v.into_iter()
477 .map(|s| Ident::new(s).expect("validated above"))
478 .collect()
479 }),
480 table: desc,
481 },
482 },
483 ))
484 })
485 .collect::<Result<BTreeMap<_, _>, _>>()?;
486
487 if !text_column_map.is_empty() {
488 let mut dangling_text_column_refs = vec![];
491 let all_references = retrieved_references.all_references();
492
493 for id in text_column_map.keys() {
494 let desc = all_references
495 .iter()
496 .find_map(|reference| {
497 let desc = reference.postgres_desc().expect("is postgres");
498 if desc.oid == *id { Some(desc) } else { None }
499 })
500 .expect("validated when generating text columns");
501
502 dangling_text_column_refs.push(PartialItemName {
503 database: None,
504 schema: Some(desc.namespace.clone()),
505 item: desc.name.clone(),
506 });
507 }
508
509 dangling_text_column_refs.sort();
510 return Err(PlanError::from(
511 PgSourcePurificationError::DanglingTextColumns {
512 items: dangling_text_column_refs,
513 },
514 ));
515 }
516
517 if !exclude_column_map.is_empty() {
518 let mut dangling_exclude_column_refs = vec![];
521 let all_references = retrieved_references.all_references();
522
523 for id in exclude_column_map.keys() {
524 let desc = all_references
525 .iter()
526 .find_map(|reference| {
527 let desc = reference.postgres_desc().expect("is postgres");
528 if desc.oid == *id { Some(desc) } else { None }
529 })
530 .expect("validated when generating exclude columns");
531
532 dangling_exclude_column_refs.push(PartialItemName {
533 database: None,
534 schema: Some(desc.namespace.clone()),
535 item: desc.name.clone(),
536 });
537 }
538
539 dangling_exclude_column_refs.sort();
540 return Err(PlanError::from(
541 PgSourcePurificationError::DanglingExcludeColumns {
542 items: dangling_exclude_column_refs,
543 },
544 ));
545 }
546
547 Ok(PurifiedSourceExports {
548 source_exports,
549 normalized_text_columns,
550 })
551}
552
553pub(crate) fn generate_column_casts(
554 scx: &StatementContext,
555 table: &PostgresTableDesc,
556 text_columns: &Vec<Ident>,
557) -> Result<Vec<(CastType, StorageScalarExpr)>, PlanError> {
558 let text_columns = BTreeSet::from_iter(text_columns.iter().map(Ident::as_str));
563
564 let mut table_cast = vec![];
565 for (i, column) in table.columns.iter().enumerate() {
566 let (cast_type, ty) = if text_columns.contains(column.name.as_str()) {
567 (CastType::Text, mz_pgrepr::Type::Text)
573 } else {
574 match mz_pgrepr::Type::from_oid_and_typmod(column.type_oid, column.type_mod) {
575 Ok(t) => (CastType::Natural, t),
576 Err(_) => {
581 table_cast.push((
582 CastType::Natural,
583 StorageScalarExpr::ErrorIfNull(
584 Box::new(StorageScalarExpr::Literal(
585 Row::pack_slice(&[Datum::Null]),
586 ReprColumnType {
587 nullable: true,
588 scalar_type: ReprScalarType::String,
589 },
590 )),
591 format!("Unsupported type with OID {}", column.type_oid),
592 ),
593 ));
594 continue;
595 }
596 }
597 };
598
599 let cast_expr = match pg_type_to_cast_func(scx, &ty) {
600 Ok(None) => {
601 StorageScalarExpr::Column(i)
603 }
604 Ok(Some(cast_func)) => {
605 StorageScalarExpr::CallUnary(cast_func, Box::new(StorageScalarExpr::Column(i)))
606 }
607 Err(PlanError::TableContainsUningestableTypes { type_, .. }) => {
608 return Err(PlanError::TableContainsUningestableTypes {
614 name: table.name.to_string(),
615 type_,
616 column: column.name.to_string(),
617 });
618 }
619 Err(e) => return Err(e),
620 };
621
622 let cast = if column.nullable {
623 cast_expr
624 } else {
625 let message = format!(
631 "PG column {}.{}.{} contained NULL data, despite having NOT NULL constraint",
632 table.namespace, table.name, column.name
633 );
634 StorageScalarExpr::ErrorIfNull(Box::new(cast_expr), message)
635 };
636
637 table_cast.push((cast_type, cast));
638 }
639 Ok(table_cast)
640}
641
642fn resolve_pg_type_to_scalar_type(
644 scx: &StatementContext,
645 ty: &mz_pgrepr::Type,
646) -> Result<SqlScalarType, PlanError> {
647 let data_type = scx.resolve_type(ty.clone())?;
648 crate::plan::query::scalar_type_from_sql(scx, &data_type)
649}
650
651fn pg_type_to_cast_func(
659 scx: &StatementContext,
660 ty: &mz_pgrepr::Type,
661) -> Result<Option<CastFunc>, PlanError> {
662 use mz_pgrepr::Type;
663
664 let cast_func = match ty {
665 Type::Bool => CastFunc::CastStringToBool,
666 Type::Bytea => CastFunc::CastStringToBytes,
667 Type::Char => CastFunc::CastStringToPgLegacyChar,
668 Type::Date => CastFunc::CastStringToDate,
669 Type::Float4 => CastFunc::CastStringToFloat32,
670 Type::Float8 => CastFunc::CastStringToFloat64,
671 Type::Int2 => CastFunc::CastStringToInt16,
672 Type::Int4 => CastFunc::CastStringToInt32,
673 Type::Int8 => CastFunc::CastStringToInt64,
674 Type::UInt2 => CastFunc::CastStringToUint16,
675 Type::UInt4 => CastFunc::CastStringToUint32,
676 Type::UInt8 => CastFunc::CastStringToUint64,
677 Type::Interval { .. } => CastFunc::CastStringToInterval,
678 Type::Jsonb => CastFunc::CastStringToJsonb,
679 Type::Name => CastFunc::CastStringToPgLegacyName,
680 Type::Numeric { .. } => {
681 let scalar_type = resolve_pg_type_to_scalar_type(scx, ty)?;
683 match scalar_type {
684 SqlScalarType::Numeric { max_scale } => CastFunc::CastStringToNumeric(max_scale),
685 _ => unreachable!("Numeric must resolve to Numeric"),
686 }
687 }
688 Type::Oid => CastFunc::CastStringToOid,
689 Type::Text => return Ok(None),
690 Type::BpChar { .. } => {
691 let scalar_type = resolve_pg_type_to_scalar_type(scx, ty)?;
693 match scalar_type {
694 SqlScalarType::Char { length } => CastFunc::CastStringToChar {
695 length,
696 fail_on_len: true,
697 },
698 _ => unreachable!("BpChar must resolve to Char"),
699 }
700 }
701 Type::VarChar { .. } => {
702 let scalar_type = resolve_pg_type_to_scalar_type(scx, ty)?;
704 match scalar_type {
705 SqlScalarType::VarChar { max_length } => CastFunc::CastStringToVarChar {
706 length: max_length,
707 fail_on_len: true,
708 },
709 _ => unreachable!("VarChar must resolve to VarChar"),
710 }
711 }
712 Type::Time { .. } => {
713 CastFunc::CastStringToTime
715 }
716 Type::Timestamp { .. } => {
717 let scalar_type = resolve_pg_type_to_scalar_type(scx, ty)?;
719 match scalar_type {
720 SqlScalarType::Timestamp { precision } => {
721 CastFunc::CastStringToTimestamp(precision)
722 }
723 _ => unreachable!("Timestamp must resolve to Timestamp"),
724 }
725 }
726 Type::TimestampTz { .. } => {
727 let scalar_type = resolve_pg_type_to_scalar_type(scx, ty)?;
729 match scalar_type {
730 SqlScalarType::TimestampTz { precision } => {
731 CastFunc::CastStringToTimestampTz(precision)
732 }
733 _ => unreachable!("TimestampTz must resolve to TimestampTz"),
734 }
735 }
736 Type::Uuid => CastFunc::CastStringToUuid,
737 Type::Int2Vector => CastFunc::CastStringToInt2Vector,
738 Type::MzTimestamp => CastFunc::CastStringToMzTimestamp,
739 Type::Json => CastFunc::CastStringToJsonb,
741 Type::Array(elem) => {
742 let return_ty = resolve_pg_type_to_scalar_type(scx, ty)?;
743 let elem_cast = build_element_cast_expr(scx, elem)?;
744 CastFunc::CastStringToArray {
745 return_ty,
746 cast_expr: Box::new(elem_cast),
747 }
748 }
749 Type::List(elem) => {
750 let return_ty = resolve_pg_type_to_scalar_type(scx, ty)?;
751 let elem_cast = build_element_cast_expr(scx, elem)?;
752 CastFunc::CastStringToList {
753 return_ty,
754 cast_expr: Box::new(elem_cast),
755 }
756 }
757 Type::Map { value_type } => {
758 let return_ty = resolve_pg_type_to_scalar_type(scx, ty)?;
759 let value_cast = build_element_cast_expr(scx, value_type)?;
760 CastFunc::CastStringToMap {
761 return_ty,
762 cast_expr: Box::new(value_cast),
763 }
764 }
765 Type::Range { element_type } => {
766 let return_ty = resolve_pg_type_to_scalar_type(scx, ty)?;
767 let elem_cast = build_element_cast_expr(scx, element_type)?;
768 CastFunc::CastStringToRange {
769 return_ty,
770 cast_expr: Box::new(elem_cast),
771 }
772 }
773 Type::RegType | Type::RegClass | Type::RegProc => {
776 return Err(PlanError::TableContainsUningestableTypes {
777 name: String::new(),
778 type_: ty.name().to_string(),
779 column: String::new(),
780 });
781 }
782 other => {
783 return Err(PlanError::TableContainsUningestableTypes {
784 name: String::new(),
785 type_: other.name().to_string(),
786 column: String::new(),
787 });
788 }
789 };
790 Ok(Some(cast_func))
791}
792
793fn build_element_cast_expr(
797 scx: &StatementContext,
798 elem_ty: &mz_pgrepr::Type,
799) -> Result<StorageScalarExpr, PlanError> {
800 match pg_type_to_cast_func(scx, elem_ty)? {
801 None => Ok(StorageScalarExpr::Column(0)),
802 Some(cast_func) => Ok(StorageScalarExpr::CallUnary(
803 cast_func,
804 Box::new(StorageScalarExpr::Column(0)),
805 )),
806 }
807}
808
809mod privileges {
810 use mz_postgres_util::{PostgresError, query, sql};
811
812 use super::*;
813 use crate::plan::PlanError;
814 use crate::pure::PgSourcePurificationError;
815
816 async fn check_schema_privileges(client: &Client, table_oids: &[Oid]) -> Result<(), PlanError> {
817 let invalid_schema_privileges_rows = query(
818 client,
819 sql!(
820 "
821 WITH distinct_namespace AS (
822 SELECT
823 DISTINCT n.oid, n.nspname AS schema_name
824 FROM unnest($1::OID[]) AS oids (oid)
825 JOIN pg_class AS c ON c.oid = oids.oid
826 JOIN pg_namespace AS n ON c.relnamespace = n.oid
827 )
828 SELECT d.schema_name
829 FROM distinct_namespace AS d
830 WHERE
831 NOT has_schema_privilege(CURRENT_USER::TEXT, d.oid, 'usage')"
832 ),
833 &[&table_oids],
834 )
835 .await?;
836
837 let mut invalid_schema_privileges = invalid_schema_privileges_rows
838 .into_iter()
839 .map(|row| row.get("schema_name"))
840 .collect::<Vec<String>>();
841
842 if invalid_schema_privileges.is_empty() {
843 Ok(())
844 } else {
845 invalid_schema_privileges.sort();
846 Err(PgSourcePurificationError::UserLacksUsageOnSchemas {
847 schemas: invalid_schema_privileges,
848 })?
849 }
850 }
851
852 pub async fn check_table_privileges(
863 client: &Client,
864 table_oids: &[Oid],
865 ) -> Result<(), PlanError> {
866 check_schema_privileges(client, table_oids).await?;
867
868 let invalid_table_privileges_rows = query(
869 client,
870 sql!(
871 "
872 SELECT
873 format('%I.%I', n.nspname, c.relname) AS schema_qualified_table_name
874 FROM unnest($1::oid[]) AS oids (oid)
875 JOIN
876 pg_class c ON c.oid = oids.oid
877 JOIN
878 pg_namespace n ON c.relnamespace = n.oid
879 WHERE NOT has_table_privilege(CURRENT_USER::text, c.oid, 'select')"
880 ),
881 &[&table_oids],
882 )
883 .await?;
884
885 let mut invalid_table_privileges = invalid_table_privileges_rows
886 .into_iter()
887 .map(|row| row.get("schema_qualified_table_name"))
888 .collect::<Vec<String>>();
889
890 if invalid_table_privileges.is_empty() {
891 Ok(())
892 } else {
893 invalid_table_privileges.sort();
894 Err(PgSourcePurificationError::UserLacksSelectOnTables {
895 tables: invalid_table_privileges,
896 })?
897 }
898 }
899
900 pub async fn check_rls_privileges(
905 client: &Client,
906 table_oids: &[Oid],
907 ) -> Result<(), PlanError> {
908 match mz_postgres_util::validate_no_rls_policies(client, table_oids).await {
909 Ok(_) => Ok(()),
910 Err(err) => match err {
911 PostgresError::BypassRLSRequired(tables) => {
915 Err(PgSourcePurificationError::BypassRLSRequired { tables })?
916 }
917 _ => Err(err)?,
918 },
919 }
920 }
921}
922
923mod replica_identity {
924 use mz_postgres_util::{query, sql};
925
926 use super::*;
927 use crate::plan::PlanError;
928 use crate::pure::PgSourcePurificationError;
929
930 pub async fn check_replica_identity_full(
932 client: &Client,
933 table_oids: &[Oid],
934 ) -> Result<(), PlanError> {
935 let invalid_replica_identity_rows = query(
936 client,
937 sql!(
938 "
939 SELECT
940 format('%I.%I', n.nspname, c.relname) AS schema_qualified_table_name
941 FROM unnest($1::oid[]) AS oids (oid)
942 JOIN
943 pg_class c ON c.oid = oids.oid
944 JOIN
945 pg_namespace n ON c.relnamespace = n.oid
946 WHERE relreplident != 'f' OR relreplident IS NULL;"
947 ),
948 &[&table_oids],
949 )
950 .await?;
951
952 let mut invalid_replica_identity = invalid_replica_identity_rows
953 .into_iter()
954 .map(|row| row.get("schema_qualified_table_name"))
955 .collect::<Vec<String>>();
956
957 if invalid_replica_identity.is_empty() {
958 Ok(())
959 } else {
960 invalid_replica_identity.sort();
961 Err(PgSourcePurificationError::NotTablesWReplicaIdentityFull {
962 items: invalid_replica_identity,
963 })?
964 }
965 }
966}