use std::collections::{BTreeMap, BTreeSet};
use std::iter;
use std::path::Path;
use std::sync::Arc;
use anyhow::anyhow;
use itertools::Itertools;
use mz_adapter_types::dyncfgs;
use mz_ccsr::{Client, GetByIdError, GetBySubjectError, Schema as CcsrSchema};
use mz_controller_types::ClusterId;
use mz_kafka_util::client::MzClientContext;
use mz_mysql_util::MySqlTableDesc;
use mz_ore::error::ErrorExt;
use mz_ore::future::InTask;
use mz_ore::iter::IteratorExt;
use mz_ore::str::StrExt;
use mz_ore::{assert_none, soft_panic_or_log};
use mz_postgres_util::desc::PostgresTableDesc;
use mz_postgres_util::replication::WalLevel;
use mz_postgres_util::tunnel::PostgresFlavor;
use mz_proto::RustType;
use mz_repr::{strconv, CatalogItemId, RelationDesc, RelationVersionSelector, Timestamp};
use mz_sql_parser::ast::display::AstDisplay;
use mz_sql_parser::ast::visit::{visit_function, Visit};
use mz_sql_parser::ast::visit_mut::{visit_expr_mut, VisitMut};
use mz_sql_parser::ast::{
AlterSourceAction, AlterSourceAddSubsourceOptionName, AlterSourceStatement, AvroDocOn,
ColumnName, CreateMaterializedViewStatement, CreateSinkConnection, CreateSinkOption,
CreateSinkOptionName, CreateSinkStatement, CreateSubsourceOption, CreateSubsourceOptionName,
CreateTableFromSourceStatement, CsrConfigOption, CsrConfigOptionName, CsrConnection,
CsrSeedAvro, CsrSeedProtobuf, CsrSeedProtobufSchema, DeferredItemName, DocOnIdentifier,
DocOnSchema, Expr, Function, FunctionArgs, Ident, KafkaSourceConfigOption,
KafkaSourceConfigOptionName, LoadGenerator, LoadGeneratorOption, LoadGeneratorOptionName,
MaterializedViewOption, MaterializedViewOptionName, MySqlConfigOption, MySqlConfigOptionName,
PgConfigOption, PgConfigOptionName, RawItemName, ReaderSchemaSelectionStrategy,
RefreshAtOptionValue, RefreshEveryOptionValue, RefreshOptionValue, SourceEnvelope, Statement,
TableFromSourceColumns, TableFromSourceOption, TableFromSourceOptionName, UnresolvedItemName,
};
use mz_storage_types::configuration::StorageConfiguration;
use mz_storage_types::connections::inline::IntoInlineConnection;
use mz_storage_types::connections::{Connection, PostgresConnection};
use mz_storage_types::errors::ContextCreationError;
use mz_storage_types::sources::load_generator::LoadGeneratorOutput;
use mz_storage_types::sources::mysql::MySqlSourceDetails;
use mz_storage_types::sources::postgres::PostgresSourcePublicationDetails;
use mz_storage_types::sources::{
GenericSourceConnection, PostgresSourceConnection, SourceConnection, SourceDesc,
SourceExportStatementDetails,
};
use prost::Message;
use protobuf_native::compiler::{SourceTreeDescriptorDatabase, VirtualSourceTree};
use protobuf_native::MessageLite;
use rdkafka::admin::AdminClient;
use references::{RetrievedSourceReferences, SourceReferenceClient};
use uuid::Uuid;
use crate::ast::{
AlterSourceAddSubsourceOption, AvroSchema, CreateSourceConnection, CreateSourceStatement,
CreateSubsourceStatement, CsrConnectionAvro, CsrConnectionProtobuf, ExternalReferenceExport,
ExternalReferences, Format, FormatSpecifier, ProtobufSchema, Value, WithOptionValue,
};
use crate::catalog::{CatalogItemType, SessionCatalog};
use crate::kafka_util::{KafkaSinkConfigOptionExtracted, KafkaSourceConfigOptionExtracted};
use crate::names::{
Aug, FullItemName, PartialItemName, ResolvedColumnReference, ResolvedDataType, ResolvedIds,
ResolvedItemName,
};
use crate::plan::error::PlanError;
use crate::plan::statement::ddl::load_generator_ast_to_generator;
use crate::plan::{SourceReferences, StatementContext};
use crate::{kafka_util, normalize};
use self::error::{
CsrPurificationError, KafkaSinkPurificationError, KafkaSourcePurificationError,
LoadGeneratorSourcePurificationError, MySqlSourcePurificationError, PgSourcePurificationError,
};
pub(crate) mod error;
pub mod mysql;
pub mod postgres;
mod references;
pub(crate) struct RequestedSourceExport<T> {
external_reference: UnresolvedItemName,
name: UnresolvedItemName,
meta: T,
}
impl<T> RequestedSourceExport<T> {
fn change_meta<F>(self, new_meta: F) -> RequestedSourceExport<F> {
RequestedSourceExport {
external_reference: self.external_reference,
name: self.name,
meta: new_meta,
}
}
}
fn source_export_name_gen(
source_name: &UnresolvedItemName,
subsource_name: &str,
) -> Result<UnresolvedItemName, PlanError> {
let mut partial = normalize::unresolved_item_name(source_name.clone())?;
partial.item = subsource_name.to_string();
Ok(UnresolvedItemName::from(partial))
}
fn validate_source_export_names<T>(
requested_source_exports: &[RequestedSourceExport<T>],
) -> Result<(), PlanError> {
if let Some(name) = requested_source_exports
.iter()
.map(|subsource| &subsource.name)
.duplicates()
.next()
.cloned()
{
let mut upstream_references: Vec<_> = requested_source_exports
.into_iter()
.filter_map(|subsource| {
if &subsource.name == &name {
Some(subsource.external_reference.clone())
} else {
None
}
})
.collect();
upstream_references.sort();
Err(PlanError::SubsourceNameConflict {
name,
upstream_references,
})?;
}
if let Some(name) = requested_source_exports
.iter()
.map(|export| &export.external_reference)
.duplicates()
.next()
.cloned()
{
let mut target_names: Vec<_> = requested_source_exports
.into_iter()
.filter_map(|export| {
if &export.external_reference == &name {
Some(export.name.clone())
} else {
None
}
})
.collect();
target_names.sort();
Err(PlanError::SubsourceDuplicateReference { name, target_names })?;
}
Ok(())
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum PurifiedStatement {
PurifiedCreateSource {
create_progress_subsource_stmt: CreateSubsourceStatement<Aug>,
create_source_stmt: CreateSourceStatement<Aug>,
subsources: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
available_source_references: SourceReferences,
},
PurifiedAlterSource {
alter_source_stmt: AlterSourceStatement<Aug>,
},
PurifiedAlterSourceAddSubsources {
source_name: ResolvedItemName,
options: Vec<AlterSourceAddSubsourceOption<Aug>>,
subsources: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
},
PurifiedAlterSourceRefreshReferences {
source_name: ResolvedItemName,
available_source_references: SourceReferences,
},
PurifiedCreateSink(CreateSinkStatement<Aug>),
PurifiedCreateTableFromSource {
stmt: CreateTableFromSourceStatement<Aug>,
},
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct PurifiedSourceExport {
pub external_reference: UnresolvedItemName,
pub details: PurifiedExportDetails,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum PurifiedExportDetails {
MySql {
table: MySqlTableDesc,
text_columns: Option<Vec<Ident>>,
exclude_columns: Option<Vec<Ident>>,
initial_gtid_set: String,
},
Postgres {
table: PostgresTableDesc,
text_columns: Option<Vec<Ident>>,
},
Kafka {},
LoadGenerator {
table: Option<RelationDesc>,
output: LoadGeneratorOutput,
},
}
pub async fn purify_statement(
catalog: impl SessionCatalog,
now: u64,
stmt: Statement<Aug>,
storage_configuration: &StorageConfiguration,
) -> (Result<PurifiedStatement, PlanError>, Option<ClusterId>) {
match stmt {
Statement::CreateSource(stmt) => {
let cluster_id = stmt.in_cluster.as_ref().map(|cluster| cluster.id.clone());
(
purify_create_source(catalog, now, stmt, storage_configuration).await,
cluster_id,
)
}
Statement::AlterSource(stmt) => (
purify_alter_source(catalog, stmt, storage_configuration).await,
None,
),
Statement::CreateSink(stmt) => {
let cluster_id = stmt.in_cluster.as_ref().map(|cluster| cluster.id.clone());
(
purify_create_sink(catalog, stmt, storage_configuration).await,
cluster_id,
)
}
Statement::CreateTableFromSource(stmt) => (
purify_create_table_from_source(catalog, stmt, storage_configuration).await,
None,
),
o => unreachable!("{:?} does not need to be purified", o),
}
}
pub(crate) fn purify_create_sink_avro_doc_on_options(
catalog: &dyn SessionCatalog,
from_id: CatalogItemId,
format: &mut Option<FormatSpecifier<Aug>>,
) -> Result<(), PlanError> {
let from = catalog.get_item(&from_id);
let object_ids = from
.references()
.items()
.copied()
.chain_one(from.id())
.collect::<Vec<_>>();
let mut avro_format_options = vec![];
for_each_format(format, |doc_on_schema, fmt| match fmt {
Format::Avro(AvroSchema::InlineSchema { .. })
| Format::Bytes
| Format::Csv { .. }
| Format::Json { .. }
| Format::Protobuf(..)
| Format::Regex(..)
| Format::Text => (),
Format::Avro(AvroSchema::Csr {
csr_connection: CsrConnectionAvro { connection, .. },
}) => {
avro_format_options.push((doc_on_schema, &mut connection.options));
}
});
for (for_schema, options) in avro_format_options {
let user_provided_comments = options
.iter()
.filter_map(|CsrConfigOption { name, .. }| match name {
CsrConfigOptionName::AvroDocOn(doc_on) => Some(doc_on.clone()),
_ => None,
})
.collect::<BTreeSet<_>>();
for object_id in &object_ids {
let item = catalog
.get_item(object_id)
.at_version(RelationVersionSelector::Latest);
let full_name = catalog.resolve_full_name(item.name());
let full_resolved_name = ResolvedItemName::Item {
id: *object_id,
qualifiers: item.name().qualifiers.clone(),
full_name: full_name.clone(),
print_id: !matches!(
item.item_type(),
CatalogItemType::Func | CatalogItemType::Type
),
version: RelationVersionSelector::Latest,
};
if let Some(comments_map) = catalog.get_item_comments(object_id) {
let doc_on_item_key = AvroDocOn {
identifier: DocOnIdentifier::Type(full_resolved_name.clone()),
for_schema,
};
if !user_provided_comments.contains(&doc_on_item_key) {
if let Some(root_comment) = comments_map.get(&None) {
options.push(CsrConfigOption {
name: CsrConfigOptionName::AvroDocOn(doc_on_item_key),
value: Some(mz_sql_parser::ast::WithOptionValue::Value(Value::String(
root_comment.clone(),
))),
});
}
}
if let Ok(desc) = item.desc(&full_name) {
for (pos, column_name) in desc.iter_names().enumerate() {
let comment = comments_map.get(&Some(pos + 1));
if let Some(comment_str) = comment {
let doc_on_column_key = AvroDocOn {
identifier: DocOnIdentifier::Column(ColumnName {
relation: full_resolved_name.clone(),
column: ResolvedColumnReference::Column {
name: column_name.to_owned(),
index: pos,
},
}),
for_schema,
};
if !user_provided_comments.contains(&doc_on_column_key) {
options.push(CsrConfigOption {
name: CsrConfigOptionName::AvroDocOn(doc_on_column_key),
value: Some(mz_sql_parser::ast::WithOptionValue::Value(
Value::String(comment_str.clone()),
)),
});
}
}
}
}
}
}
}
Ok(())
}
async fn purify_create_sink(
catalog: impl SessionCatalog,
mut create_sink_stmt: CreateSinkStatement<Aug>,
storage_configuration: &StorageConfiguration,
) -> Result<PurifiedStatement, PlanError> {
let CreateSinkStatement {
connection,
format,
with_options,
name: _,
in_cluster: _,
if_not_exists: _,
from,
envelope: _,
} = &mut create_sink_stmt;
const USER_ALLOWED_WITH_OPTIONS: &[CreateSinkOptionName] = &[CreateSinkOptionName::Snapshot];
if let Some(op) = with_options
.iter()
.find(|op| !USER_ALLOWED_WITH_OPTIONS.contains(&op.name))
{
sql_bail!(
"CREATE SINK...WITH ({}..) is not allowed",
op.name.to_ast_string(),
)
}
let default_strategy =
dyncfgs::DEFAULT_SINK_PARTITION_STRATEGY.get(catalog.system_vars().dyncfgs());
with_options.push(CreateSinkOption {
name: CreateSinkOptionName::PartitionStrategy,
value: Some(WithOptionValue::Value(Value::String(default_strategy))),
});
match &connection {
CreateSinkConnection::Kafka {
connection,
options,
key: _,
headers: _,
} => {
let scx = StatementContext::new(None, &catalog);
let connection = {
let item = scx.get_item_by_resolved_name(connection)?;
match item.connection()? {
Connection::Kafka(connection) => {
connection.clone().into_inline_connection(scx.catalog)
}
_ => sql_bail!(
"{} is not a kafka connection",
scx.catalog.resolve_full_name(item.name())
),
}
};
let extracted_options: KafkaSinkConfigOptionExtracted = options.clone().try_into()?;
if extracted_options.legacy_ids == Some(true) {
sql_bail!("LEGACY IDs option is not supported");
}
let client: AdminClient<_> = connection
.create_with_context(
storage_configuration,
MzClientContext::default(),
&BTreeMap::new(),
InTask::No,
)
.await
.map_err(|e| {
KafkaSinkPurificationError::AdminClientError(Arc::new(e))
})?;
let metadata = client
.inner()
.fetch_metadata(
None,
storage_configuration
.parameters
.kafka_timeout_config
.fetch_metadata_timeout,
)
.map_err(|e| {
KafkaSinkPurificationError::AdminClientError(Arc::new(
ContextCreationError::KafkaError(e),
))
})?;
if metadata.brokers().len() == 0 {
Err(KafkaSinkPurificationError::ZeroBrokers)?;
}
}
}
let mut csr_connection_ids = BTreeSet::new();
for_each_format(format, |_, fmt| match fmt {
Format::Avro(AvroSchema::InlineSchema { .. })
| Format::Bytes
| Format::Csv { .. }
| Format::Json { .. }
| Format::Protobuf(ProtobufSchema::InlineSchema { .. })
| Format::Regex(..)
| Format::Text => (),
Format::Avro(AvroSchema::Csr {
csr_connection: CsrConnectionAvro { connection, .. },
})
| Format::Protobuf(ProtobufSchema::Csr {
csr_connection: CsrConnectionProtobuf { connection, .. },
}) => {
csr_connection_ids.insert(*connection.connection.item_id());
}
});
let scx = StatementContext::new(None, &catalog);
for csr_connection_id in csr_connection_ids {
let connection = {
let item = scx.get_item(&csr_connection_id);
match item.connection()? {
Connection::Csr(connection) => connection.clone().into_inline_connection(&catalog),
_ => Err(CsrPurificationError::NotCsrConnection(
scx.catalog.resolve_full_name(item.name()),
))?,
}
};
let client = connection
.connect(storage_configuration, InTask::No)
.await
.map_err(|e| CsrPurificationError::ClientError(Arc::new(e)))?;
client
.list_subjects()
.await
.map_err(|e| CsrPurificationError::ListSubjectsError(Arc::new(e)))?;
}
purify_create_sink_avro_doc_on_options(&catalog, *from.item_id(), format)?;
Ok(PurifiedStatement::PurifiedCreateSink(create_sink_stmt))
}
fn for_each_format<'a, F>(format: &'a mut Option<FormatSpecifier<Aug>>, mut f: F)
where
F: FnMut(DocOnSchema, &'a mut Format<Aug>),
{
match format {
None => (),
Some(FormatSpecifier::Bare(fmt)) => f(DocOnSchema::All, fmt),
Some(FormatSpecifier::KeyValue { key, value }) => {
f(DocOnSchema::KeyOnly, key);
f(DocOnSchema::ValueOnly, value);
}
}
}
pub(crate) enum SourceReferencePolicy {
NotAllowed,
Optional,
Required,
}
async fn purify_create_source(
catalog: impl SessionCatalog,
now: u64,
mut create_source_stmt: CreateSourceStatement<Aug>,
storage_configuration: &StorageConfiguration,
) -> Result<PurifiedStatement, PlanError> {
let CreateSourceStatement {
name: source_name,
connection: source_connection,
format,
envelope,
include_metadata,
external_references,
progress_subsource,
..
} = &mut create_source_stmt;
if let Some(DeferredItemName::Named(_)) = progress_subsource {
sql_bail!("Cannot manually ID qualify progress subsource")
}
let mut requested_subsource_map = BTreeMap::new();
let progress_desc = match &source_connection {
CreateSourceConnection::Kafka { .. } => {
&mz_storage_types::sources::kafka::KAFKA_PROGRESS_DESC
}
CreateSourceConnection::Postgres { .. } | CreateSourceConnection::Yugabyte { .. } => {
&mz_storage_types::sources::postgres::PG_PROGRESS_DESC
}
CreateSourceConnection::MySql { .. } => {
&mz_storage_types::sources::mysql::MYSQL_PROGRESS_DESC
}
CreateSourceConnection::LoadGenerator { .. } => {
&mz_storage_types::sources::load_generator::LOAD_GEN_PROGRESS_DESC
}
};
let scx = StatementContext::new(None, &catalog);
let reference_policy = if scx.catalog.system_vars().enable_create_table_from_source()
&& scx.catalog.system_vars().force_source_table_syntax()
{
SourceReferencePolicy::NotAllowed
} else if scx.catalog.system_vars().enable_create_table_from_source() {
SourceReferencePolicy::Optional
} else {
SourceReferencePolicy::Required
};
let mut format_options = SourceFormatOptions::Default;
let retrieved_source_references: RetrievedSourceReferences;
match source_connection {
CreateSourceConnection::Kafka {
connection,
options: base_with_options,
..
} => {
if let Some(external_references) = external_references {
Err(KafkaSourcePurificationError::ReferencedSubsources(
external_references.clone(),
))?;
}
let connection = {
let item = scx.get_item_by_resolved_name(connection)?;
match item.connection()? {
Connection::Kafka(connection) => {
connection.clone().into_inline_connection(&catalog)
}
_ => Err(KafkaSourcePurificationError::NotKafkaConnection(
scx.catalog.resolve_full_name(item.name()),
))?,
}
};
let extracted_options: KafkaSourceConfigOptionExtracted =
base_with_options.clone().try_into()?;
let topic = extracted_options
.topic
.ok_or(KafkaSourcePurificationError::ConnectionMissingTopic)?;
let consumer = connection
.create_with_context(
storage_configuration,
MzClientContext::default(),
&BTreeMap::new(),
InTask::No,
)
.await
.map_err(|e| {
KafkaSourcePurificationError::KafkaConsumerError(
e.display_with_causes().to_string(),
)
})?;
let consumer = Arc::new(consumer);
match (
extracted_options.start_offset,
extracted_options.start_timestamp,
) {
(None, None) => {
kafka_util::ensure_topic_exists(
Arc::clone(&consumer),
&topic,
storage_configuration
.parameters
.kafka_timeout_config
.fetch_metadata_timeout,
)
.await?;
}
(Some(_), Some(_)) => {
sql_bail!("cannot specify START TIMESTAMP and START OFFSET at same time")
}
(Some(start_offsets), None) => {
kafka_util::validate_start_offsets(
Arc::clone(&consumer),
&topic,
start_offsets,
storage_configuration
.parameters
.kafka_timeout_config
.fetch_metadata_timeout,
)
.await?;
}
(None, Some(time_offset)) => {
let start_offsets = kafka_util::lookup_start_offsets(
Arc::clone(&consumer),
&topic,
time_offset,
now,
storage_configuration
.parameters
.kafka_timeout_config
.fetch_metadata_timeout,
)
.await?;
base_with_options.retain(|val| {
!matches!(val.name, KafkaSourceConfigOptionName::StartTimestamp)
});
base_with_options.push(KafkaSourceConfigOption {
name: KafkaSourceConfigOptionName::StartOffset,
value: Some(WithOptionValue::Sequence(
start_offsets
.iter()
.map(|offset| {
WithOptionValue::Value(Value::Number(offset.to_string()))
})
.collect(),
)),
});
}
}
let reference_client = SourceReferenceClient::Kafka { topic: &topic };
retrieved_source_references = reference_client.get_source_references().await?;
format_options = SourceFormatOptions::Kafka { topic };
}
source_connection @ CreateSourceConnection::Postgres { .. }
| source_connection @ CreateSourceConnection::Yugabyte { .. } => {
let (source_flavor, connection, options) = match source_connection {
CreateSourceConnection::Postgres {
connection,
options,
} => (PostgresFlavor::Vanilla, connection, options),
CreateSourceConnection::Yugabyte {
connection,
options,
} => (PostgresFlavor::Yugabyte, connection, options),
_ => unreachable!(),
};
let connection = {
let item = scx.get_item_by_resolved_name(connection)?;
match item.connection().map_err(PlanError::from)? {
Connection::Postgres(connection) => {
let connection = connection.clone().into_inline_connection(&catalog);
if connection.flavor != source_flavor {
match source_flavor {
PostgresFlavor::Vanilla => {
Err(PgSourcePurificationError::NotPgConnection(
scx.catalog.resolve_full_name(item.name()),
))
}
PostgresFlavor::Yugabyte => {
Err(PgSourcePurificationError::NotYugabyteConnection(
scx.catalog.resolve_full_name(item.name()),
))
}
}
} else {
Ok(connection)
}
}
_ => match source_flavor {
PostgresFlavor::Vanilla => Err(PgSourcePurificationError::NotPgConnection(
scx.catalog.resolve_full_name(item.name()),
)),
PostgresFlavor::Yugabyte => {
Err(PgSourcePurificationError::NotYugabyteConnection(
scx.catalog.resolve_full_name(item.name()),
))
}
},
}
}?;
let crate::plan::statement::PgConfigOptionExtracted {
publication,
text_columns,
details,
..
} = options.clone().try_into()?;
let publication =
publication.ok_or(PgSourcePurificationError::ConnectionMissingPublication)?;
if details.is_some() {
Err(PgSourcePurificationError::UserSpecifiedDetails)?;
}
let config = connection
.config(
&storage_configuration.connection_context.secrets_reader,
storage_configuration,
InTask::No,
)
.await?;
let client = config
.connect(
"postgres_purification",
&storage_configuration.connection_context.ssh_tunnel_manager,
)
.await?;
let wal_level = mz_postgres_util::get_wal_level(&client).await?;
if wal_level < WalLevel::Logical {
Err(PgSourcePurificationError::InsufficientWalLevel { wal_level })?;
}
let max_wal_senders = mz_postgres_util::get_max_wal_senders(&client).await?;
if max_wal_senders < 1 {
Err(PgSourcePurificationError::ReplicationDisabled)?;
}
let available_replication_slots =
mz_postgres_util::available_replication_slots(&client).await?;
if available_replication_slots < 2 {
Err(PgSourcePurificationError::InsufficientReplicationSlotsAvailable { count: 2 })?;
}
let reference_client = SourceReferenceClient::Postgres {
client: &client,
publication: &publication,
database: &connection.database,
};
retrieved_source_references = reference_client.get_source_references().await?;
let postgres::PurifiedSourceExports {
source_exports: subsources,
normalized_text_columns,
} = postgres::purify_source_exports(
&client,
&config,
&retrieved_source_references,
external_references,
text_columns,
source_name,
&reference_policy,
)
.await?;
if let Some(text_cols_option) = options
.iter_mut()
.find(|option| option.name == PgConfigOptionName::TextColumns)
{
text_cols_option.value = Some(WithOptionValue::Sequence(normalized_text_columns));
}
requested_subsource_map.extend(subsources);
let replication_client = config
.connect_replication(&storage_configuration.connection_context.ssh_tunnel_manager)
.await?;
let timeline_id = mz_postgres_util::get_timeline_id(&replication_client).await?;
options.retain(|PgConfigOption { name, .. }| name != &PgConfigOptionName::Details);
let details = PostgresSourcePublicationDetails {
slot: format!(
"materialize_{}",
Uuid::new_v4().to_string().replace('-', "")
),
timeline_id: Some(timeline_id),
database: connection.database,
};
options.push(PgConfigOption {
name: PgConfigOptionName::Details,
value: Some(WithOptionValue::Value(Value::String(hex::encode(
details.into_proto().encode_to_vec(),
)))),
})
}
CreateSourceConnection::MySql {
connection,
options,
} => {
let connection_item = scx.get_item_by_resolved_name(connection)?;
let connection = match connection_item.connection()? {
Connection::MySql(connection) => {
connection.clone().into_inline_connection(&catalog)
}
_ => Err(MySqlSourcePurificationError::NotMySqlConnection(
scx.catalog.resolve_full_name(connection_item.name()),
))?,
};
let crate::plan::statement::ddl::MySqlConfigOptionExtracted {
details,
text_columns,
exclude_columns,
seen: _,
} = options.clone().try_into()?;
if details.is_some() {
Err(MySqlSourcePurificationError::UserSpecifiedDetails)?;
}
let config = connection
.config(
&storage_configuration.connection_context.secrets_reader,
storage_configuration,
InTask::No,
)
.await?;
let mut conn = config
.connect(
"mysql purification",
&storage_configuration.connection_context.ssh_tunnel_manager,
)
.await?;
let mut replication_errors = vec![];
for error in [
mz_mysql_util::ensure_gtid_consistency(&mut conn)
.await
.err(),
mz_mysql_util::ensure_full_row_binlog_format(&mut conn)
.await
.err(),
mz_mysql_util::ensure_replication_commit_order(&mut conn)
.await
.err(),
] {
match error {
Some(mz_mysql_util::MySqlError::InvalidSystemSetting {
setting,
expected,
actual,
}) => {
replication_errors.push((setting, expected, actual));
}
Some(err) => Err(err)?,
None => (),
}
}
if !replication_errors.is_empty() {
Err(MySqlSourcePurificationError::ReplicationSettingsError(
replication_errors,
))?;
}
let initial_gtid_set =
mz_mysql_util::query_sys_var(&mut conn, "global.gtid_executed").await?;
let reference_client = SourceReferenceClient::MySql {
conn: &mut conn,
include_system_schemas: mysql::references_system_schemas(external_references),
};
retrieved_source_references = reference_client.get_source_references().await?;
let mysql::PurifiedSourceExports {
source_exports: subsources,
normalized_text_columns,
normalized_exclude_columns,
} = mysql::purify_source_exports(
&mut conn,
&retrieved_source_references,
external_references,
text_columns,
exclude_columns,
source_name,
initial_gtid_set.clone(),
&reference_policy,
)
.await?;
requested_subsource_map.extend(subsources);
let details = MySqlSourceDetails {};
options
.retain(|MySqlConfigOption { name, .. }| name != &MySqlConfigOptionName::Details);
options.push(MySqlConfigOption {
name: MySqlConfigOptionName::Details,
value: Some(WithOptionValue::Value(Value::String(hex::encode(
details.into_proto().encode_to_vec(),
)))),
});
if let Some(text_cols_option) = options
.iter_mut()
.find(|option| option.name == MySqlConfigOptionName::TextColumns)
{
text_cols_option.value = Some(WithOptionValue::Sequence(normalized_text_columns));
}
if let Some(ignore_cols_option) = options
.iter_mut()
.find(|option| option.name == MySqlConfigOptionName::ExcludeColumns)
{
ignore_cols_option.value =
Some(WithOptionValue::Sequence(normalized_exclude_columns));
}
}
CreateSourceConnection::LoadGenerator { generator, options } => {
let load_generator =
load_generator_ast_to_generator(&scx, generator, options, include_metadata)?;
let reference_client = SourceReferenceClient::LoadGenerator {
generator: &load_generator,
};
retrieved_source_references = reference_client.get_source_references().await?;
let subsource_references = retrieved_source_references
.all_references()
.iter()
.filter(|r| {
r.load_generator_output().expect("is loadgen") != &LoadGeneratorOutput::Default
})
.collect::<Vec<_>>();
match external_references {
Some(requested)
if matches!(reference_policy, SourceReferencePolicy::NotAllowed) =>
{
Err(PlanError::UseTablesForSources(requested.to_string()))?
}
Some(requested) => {
let requested_exports = retrieved_source_references
.requested_source_exports(Some(requested), source_name)?;
for export in requested_exports {
requested_subsource_map.insert(
export.name,
PurifiedSourceExport {
external_reference: export.external_reference,
details: PurifiedExportDetails::LoadGenerator {
table: export
.meta
.load_generator_desc()
.expect("is loadgen")
.clone(),
output: export
.meta
.load_generator_output()
.expect("is loadgen")
.clone(),
},
},
);
}
}
None => {
if matches!(reference_policy, SourceReferencePolicy::Required)
&& !subsource_references.is_empty()
{
Err(LoadGeneratorSourcePurificationError::MultiOutputRequiresForAllTables)?
}
}
}
if let LoadGenerator::Clock = generator {
if !options
.iter()
.any(|p| p.name == LoadGeneratorOptionName::AsOf)
{
let now = catalog.now();
options.push(LoadGeneratorOption {
name: LoadGeneratorOptionName::AsOf,
value: Some(WithOptionValue::Value(Value::Number(now.to_string()))),
});
}
}
}
}
*external_references = None;
let name = match progress_subsource {
Some(name) => match name {
DeferredItemName::Deferred(name) => name.clone(),
DeferredItemName::Named(_) => unreachable!("already checked for this value"),
},
None => {
let (item, prefix) = source_name.0.split_last().unwrap();
let item_name = Ident::try_generate_name(item.to_string(), "_progress", |candidate| {
let mut suggested_name = prefix.to_vec();
suggested_name.push(candidate.clone());
let partial = normalize::unresolved_item_name(UnresolvedItemName(suggested_name))?;
let qualified = scx.allocate_qualified_name(partial)?;
let item_exists = scx.catalog.get_item_by_name(&qualified).is_some();
let type_exists = scx.catalog.get_type_by_name(&qualified).is_some();
Ok::<_, PlanError>(!item_exists && !type_exists)
})?;
let mut full_name = prefix.to_vec();
full_name.push(item_name);
let full_name = normalize::unresolved_item_name(UnresolvedItemName(full_name))?;
let qualified_name = scx.allocate_qualified_name(full_name)?;
let full_name = scx.catalog.resolve_full_name(&qualified_name);
UnresolvedItemName::from(full_name.clone())
}
};
let (columns, constraints) = scx.relation_desc_into_table_defs(progress_desc)?;
let create_progress_subsource_stmt = CreateSubsourceStatement {
name,
columns,
of_source: None,
constraints,
if_not_exists: false,
with_options: vec![CreateSubsourceOption {
name: CreateSubsourceOptionName::Progress,
value: Some(WithOptionValue::Value(Value::Boolean(true))),
}],
};
purify_source_format(
&catalog,
format,
&format_options,
envelope,
storage_configuration,
)
.await?;
Ok(PurifiedStatement::PurifiedCreateSource {
create_progress_subsource_stmt,
create_source_stmt,
subsources: requested_subsource_map,
available_source_references: retrieved_source_references.available_source_references(),
})
}
async fn purify_alter_source(
catalog: impl SessionCatalog,
stmt: AlterSourceStatement<Aug>,
storage_configuration: &StorageConfiguration,
) -> Result<PurifiedStatement, PlanError> {
let scx = StatementContext::new(None, &catalog);
let AlterSourceStatement {
source_name: unresolved_source_name,
action,
if_exists,
} = stmt;
let item = match scx.resolve_item(RawItemName::Name(unresolved_source_name.clone())) {
Ok(item) => item,
Err(_) if if_exists => {
return Ok(PurifiedStatement::PurifiedAlterSource {
alter_source_stmt: AlterSourceStatement {
source_name: unresolved_source_name,
action,
if_exists,
},
});
}
Err(e) => return Err(e),
};
let desc = match item.source_desc()? {
Some(desc) => desc.clone().into_inline_connection(scx.catalog),
None => {
sql_bail!("cannot ALTER this type of source")
}
};
let source_name = item.name();
let resolved_source_name = ResolvedItemName::Item {
id: item.id(),
qualifiers: item.name().qualifiers.clone(),
full_name: scx.catalog.resolve_full_name(source_name),
print_id: true,
version: RelationVersionSelector::Latest,
};
let partial_name = scx.catalog.minimal_qualification(source_name);
match action {
AlterSourceAction::AddSubsources {
external_references,
options,
} => {
if scx.catalog.system_vars().enable_create_table_from_source()
&& scx.catalog.system_vars().force_source_table_syntax()
{
Err(PlanError::UseTablesForSources(
"ALTER SOURCE .. ADD SUBSOURCES ..".to_string(),
))?;
}
purify_alter_source_add_subsources(
external_references,
options,
desc,
partial_name,
unresolved_source_name,
resolved_source_name,
storage_configuration,
)
.await
}
AlterSourceAction::RefreshReferences => {
purify_alter_source_refresh_references(
desc,
resolved_source_name,
storage_configuration,
)
.await
}
_ => Ok(PurifiedStatement::PurifiedAlterSource {
alter_source_stmt: AlterSourceStatement {
source_name: unresolved_source_name,
action,
if_exists,
},
}),
}
}
async fn purify_alter_source_add_subsources(
external_references: Vec<ExternalReferenceExport>,
mut options: Vec<AlterSourceAddSubsourceOption<Aug>>,
desc: SourceDesc,
partial_source_name: PartialItemName,
unresolved_source_name: UnresolvedItemName,
resolved_source_name: ResolvedItemName,
storage_configuration: &StorageConfiguration,
) -> Result<PurifiedStatement, PlanError> {
match desc.connection {
GenericSourceConnection::Postgres(PostgresSourceConnection {
connection:
PostgresConnection {
flavor: PostgresFlavor::Vanilla,
..
},
..
}) => {}
GenericSourceConnection::MySql(_) => {}
_ => sql_bail!(
"source {} does not support ALTER SOURCE.",
partial_source_name
),
};
let connection_name = desc.connection.name();
let crate::plan::statement::ddl::AlterSourceAddSubsourceOptionExtracted {
text_columns,
exclude_columns,
details,
seen: _,
} = options.clone().try_into()?;
assert_none!(details, "details cannot be explicitly set");
let mut requested_subsource_map = BTreeMap::new();
match desc.connection {
GenericSourceConnection::Postgres(pg_source_connection) => {
let pg_connection = &pg_source_connection.connection;
let config = pg_connection
.config(
&storage_configuration.connection_context.secrets_reader,
storage_configuration,
InTask::No,
)
.await?;
let client = config
.connect(
"postgres_purification",
&storage_configuration.connection_context.ssh_tunnel_manager,
)
.await?;
let available_replication_slots =
mz_postgres_util::available_replication_slots(&client).await?;
if available_replication_slots < 1 {
Err(PgSourcePurificationError::InsufficientReplicationSlotsAvailable { count: 1 })?;
}
if !exclude_columns.is_empty() {
sql_bail!(
"{} is a {} source, which does not support EXCLUDE COLUMNS.",
partial_source_name,
connection_name
)
}
let reference_client = SourceReferenceClient::Postgres {
client: &client,
publication: &pg_source_connection.publication,
database: &pg_connection.database,
};
let retrieved_source_references = reference_client.get_source_references().await?;
let postgres::PurifiedSourceExports {
source_exports: subsources,
normalized_text_columns,
} = postgres::purify_source_exports(
&client,
&config,
&retrieved_source_references,
&Some(ExternalReferences::SubsetTables(external_references)),
text_columns,
&unresolved_source_name,
&SourceReferencePolicy::Required,
)
.await?;
if let Some(text_cols_option) = options
.iter_mut()
.find(|option| option.name == AlterSourceAddSubsourceOptionName::TextColumns)
{
text_cols_option.value = Some(WithOptionValue::Sequence(normalized_text_columns));
}
requested_subsource_map.extend(subsources);
}
GenericSourceConnection::MySql(mysql_source_connection) => {
let mysql_connection = &mysql_source_connection.connection;
let config = mysql_connection
.config(
&storage_configuration.connection_context.secrets_reader,
storage_configuration,
InTask::No,
)
.await?;
let mut conn = config
.connect(
"mysql purification",
&storage_configuration.connection_context.ssh_tunnel_manager,
)
.await?;
let initial_gtid_set =
mz_mysql_util::query_sys_var(&mut conn, "global.gtid_executed").await?;
let requested_references = Some(ExternalReferences::SubsetTables(external_references));
let reference_client = SourceReferenceClient::MySql {
conn: &mut conn,
include_system_schemas: mysql::references_system_schemas(&requested_references),
};
let retrieved_source_references = reference_client.get_source_references().await?;
let mysql::PurifiedSourceExports {
source_exports: subsources,
normalized_text_columns,
normalized_exclude_columns,
} = mysql::purify_source_exports(
&mut conn,
&retrieved_source_references,
&requested_references,
text_columns,
exclude_columns,
&unresolved_source_name,
initial_gtid_set,
&SourceReferencePolicy::Required,
)
.await?;
requested_subsource_map.extend(subsources);
if let Some(text_cols_option) = options
.iter_mut()
.find(|option| option.name == AlterSourceAddSubsourceOptionName::TextColumns)
{
text_cols_option.value = Some(WithOptionValue::Sequence(normalized_text_columns));
}
if let Some(ignore_cols_option) = options
.iter_mut()
.find(|option| option.name == AlterSourceAddSubsourceOptionName::ExcludeColumns)
{
ignore_cols_option.value =
Some(WithOptionValue::Sequence(normalized_exclude_columns));
}
}
_ => unreachable!(),
};
Ok(PurifiedStatement::PurifiedAlterSourceAddSubsources {
source_name: resolved_source_name,
options,
subsources: requested_subsource_map,
})
}
async fn purify_alter_source_refresh_references(
desc: SourceDesc,
resolved_source_name: ResolvedItemName,
storage_configuration: &StorageConfiguration,
) -> Result<PurifiedStatement, PlanError> {
let retrieved_source_references = match desc.connection {
GenericSourceConnection::Postgres(pg_source_connection) => {
let pg_connection = &pg_source_connection.connection;
let config = pg_connection
.config(
&storage_configuration.connection_context.secrets_reader,
storage_configuration,
InTask::No,
)
.await?;
let client = config
.connect(
"postgres_purification",
&storage_configuration.connection_context.ssh_tunnel_manager,
)
.await?;
let reference_client = SourceReferenceClient::Postgres {
client: &client,
publication: &pg_source_connection.publication,
database: &pg_connection.database,
};
reference_client.get_source_references().await?
}
GenericSourceConnection::MySql(mysql_source_connection) => {
let mysql_connection = &mysql_source_connection.connection;
let config = mysql_connection
.config(
&storage_configuration.connection_context.secrets_reader,
storage_configuration,
InTask::No,
)
.await?;
let mut conn = config
.connect(
"mysql purification",
&storage_configuration.connection_context.ssh_tunnel_manager,
)
.await?;
let reference_client = SourceReferenceClient::MySql {
conn: &mut conn,
include_system_schemas: false,
};
reference_client.get_source_references().await?
}
GenericSourceConnection::LoadGenerator(load_gen_connection) => {
let reference_client = SourceReferenceClient::LoadGenerator {
generator: &load_gen_connection.load_generator,
};
reference_client.get_source_references().await?
}
GenericSourceConnection::Kafka(kafka_conn) => {
let reference_client = SourceReferenceClient::Kafka {
topic: &kafka_conn.topic,
};
reference_client.get_source_references().await?
}
};
Ok(PurifiedStatement::PurifiedAlterSourceRefreshReferences {
source_name: resolved_source_name,
available_source_references: retrieved_source_references.available_source_references(),
})
}
async fn purify_create_table_from_source(
catalog: impl SessionCatalog,
mut stmt: CreateTableFromSourceStatement<Aug>,
storage_configuration: &StorageConfiguration,
) -> Result<PurifiedStatement, PlanError> {
let scx = StatementContext::new(None, &catalog);
let CreateTableFromSourceStatement {
name: _,
columns,
constraints,
source: source_name,
if_not_exists: _,
external_reference,
format,
envelope,
include_metadata: _,
with_options,
} = &mut stmt;
if matches!(columns, TableFromSourceColumns::Defined(_)) {
sql_bail!("CREATE TABLE .. FROM SOURCE column definitions cannot be specified directly");
}
if !constraints.is_empty() {
sql_bail!(
"CREATE TABLE .. FROM SOURCE constraint definitions cannot be specified directly"
);
}
let item = match scx.get_item_by_resolved_name(source_name) {
Ok(item) => item,
Err(e) => return Err(e),
};
let desc = match item.source_desc()? {
Some(desc) => desc.clone().into_inline_connection(scx.catalog),
None => {
sql_bail!("cannot ALTER this type of source")
}
};
let unresolved_source_name: UnresolvedItemName = source_name.full_item_name().clone().into();
let qualified_source_name = item.name();
let connection_name = desc.connection.name();
let crate::plan::statement::ddl::TableFromSourceOptionExtracted {
text_columns,
exclude_columns,
details,
ignore_keys: _,
timeline: _,
seen: _,
} = with_options.clone().try_into()?;
assert_none!(details, "details cannot be explicitly set");
let qualified_text_columns = text_columns
.iter()
.map(|col| {
UnresolvedItemName(
external_reference
.as_ref()
.map(|er| er.0.iter().chain_one(col).map(|i| i.clone()).collect())
.unwrap_or_else(|| vec![col.clone()]),
)
})
.collect_vec();
let qualified_exclude_columns = exclude_columns
.iter()
.map(|col| {
UnresolvedItemName(
external_reference
.as_ref()
.map(|er| er.0.iter().chain_one(col).map(|i| i.clone()).collect())
.unwrap_or_else(|| vec![col.clone()]),
)
})
.collect_vec();
let mut format_options = SourceFormatOptions::Default;
let retrieved_source_references: RetrievedSourceReferences;
let requested_references = external_reference.as_ref().map(|ref_name| {
ExternalReferences::SubsetTables(vec![ExternalReferenceExport {
reference: ref_name.clone(),
alias: None,
}])
});
let purified_export = match desc.connection {
GenericSourceConnection::Postgres(pg_source_connection) => {
let pg_connection = &pg_source_connection.connection;
let config = pg_connection
.config(
&storage_configuration.connection_context.secrets_reader,
storage_configuration,
InTask::No,
)
.await?;
let client = config
.connect(
"postgres_purification",
&storage_configuration.connection_context.ssh_tunnel_manager,
)
.await?;
let available_replication_slots =
mz_postgres_util::available_replication_slots(&client).await?;
if available_replication_slots < 1 {
Err(PgSourcePurificationError::InsufficientReplicationSlotsAvailable { count: 1 })?;
}
if !exclude_columns.is_empty() {
sql_bail!(
"{} is a {} source, which does not support EXCLUDE COLUMNS.",
scx.catalog.minimal_qualification(qualified_source_name),
connection_name
)
}
let reference_client = SourceReferenceClient::Postgres {
client: &client,
publication: &pg_source_connection.publication,
database: &pg_connection.database,
};
retrieved_source_references = reference_client.get_source_references().await?;
let postgres::PurifiedSourceExports {
source_exports,
normalized_text_columns: _,
} = postgres::purify_source_exports(
&client,
&config,
&retrieved_source_references,
&requested_references,
qualified_text_columns,
&unresolved_source_name,
&SourceReferencePolicy::Required,
)
.await?;
let (_, purified_export) = source_exports.into_iter().next().unwrap();
purified_export
}
GenericSourceConnection::MySql(mysql_source_connection) => {
let mysql_connection = &mysql_source_connection.connection;
let config = mysql_connection
.config(
&storage_configuration.connection_context.secrets_reader,
storage_configuration,
InTask::No,
)
.await?;
let mut conn = config
.connect(
"mysql purification",
&storage_configuration.connection_context.ssh_tunnel_manager,
)
.await?;
let initial_gtid_set =
mz_mysql_util::query_sys_var(&mut conn, "global.gtid_executed").await?;
let reference_client = SourceReferenceClient::MySql {
conn: &mut conn,
include_system_schemas: mysql::references_system_schemas(&requested_references),
};
retrieved_source_references = reference_client.get_source_references().await?;
let mysql::PurifiedSourceExports {
source_exports,
normalized_text_columns: _,
normalized_exclude_columns: _,
} = mysql::purify_source_exports(
&mut conn,
&retrieved_source_references,
&requested_references,
qualified_text_columns,
qualified_exclude_columns,
&unresolved_source_name,
initial_gtid_set,
&SourceReferencePolicy::Required,
)
.await?;
let (_, purified_export) = source_exports.into_iter().next().unwrap();
purified_export
}
GenericSourceConnection::LoadGenerator(load_gen_connection) => {
let reference_client = SourceReferenceClient::LoadGenerator {
generator: &load_gen_connection.load_generator,
};
retrieved_source_references = reference_client.get_source_references().await?;
let requested_exports = retrieved_source_references
.requested_source_exports(requested_references.as_ref(), &unresolved_source_name)?;
let export = requested_exports.into_iter().next().unwrap();
PurifiedSourceExport {
external_reference: export.external_reference,
details: PurifiedExportDetails::LoadGenerator {
table: export
.meta
.load_generator_desc()
.expect("is loadgen")
.clone(),
output: export
.meta
.load_generator_output()
.expect("is loadgen")
.clone(),
},
}
}
GenericSourceConnection::Kafka(kafka_conn) => {
let reference_client = SourceReferenceClient::Kafka {
topic: &kafka_conn.topic,
};
retrieved_source_references = reference_client.get_source_references().await?;
let requested_exports = retrieved_source_references
.requested_source_exports(requested_references.as_ref(), &unresolved_source_name)?;
let export = requested_exports.into_iter().next().unwrap();
format_options = SourceFormatOptions::Kafka {
topic: kafka_conn.topic.clone(),
};
PurifiedSourceExport {
external_reference: export.external_reference,
details: PurifiedExportDetails::Kafka {},
}
}
};
purify_source_format(
&catalog,
format,
&format_options,
envelope,
storage_configuration,
)
.await?;
*external_reference = Some(purified_export.external_reference.clone());
match &purified_export.details {
PurifiedExportDetails::Postgres { .. } => {
let mut unsupported_cols = vec![];
let postgres::PostgresExportStatementValues {
columns: gen_columns,
constraints: gen_constraints,
text_columns: gen_text_columns,
details: gen_details,
external_reference: _,
} = postgres::generate_source_export_statement_values(
&scx,
purified_export,
&mut unsupported_cols,
)?;
if !unsupported_cols.is_empty() {
unsupported_cols.sort();
Err(PgSourcePurificationError::UnrecognizedTypes {
cols: unsupported_cols,
})?;
}
if let Some(text_cols_option) = with_options
.iter_mut()
.find(|option| option.name == TableFromSourceOptionName::TextColumns)
{
if let Some(gen_text_columns) = gen_text_columns {
text_cols_option.value = Some(WithOptionValue::Sequence(gen_text_columns));
} else {
soft_panic_or_log!(
"text_columns should be Some if text_cols_option is present"
);
}
}
match columns {
TableFromSourceColumns::Defined(_) => unreachable!(),
TableFromSourceColumns::NotSpecified => {
*columns = TableFromSourceColumns::Defined(gen_columns);
*constraints = gen_constraints;
}
TableFromSourceColumns::Named(_) => {
sql_bail!("columns cannot be named for Postgres sources")
}
}
with_options.push(TableFromSourceOption {
name: TableFromSourceOptionName::Details,
value: Some(WithOptionValue::Value(Value::String(hex::encode(
gen_details.into_proto().encode_to_vec(),
)))),
})
}
PurifiedExportDetails::MySql { .. } => {
let mysql::MySqlExportStatementValues {
columns: gen_columns,
constraints: gen_constraints,
text_columns: gen_text_columns,
exclude_columns: gen_exclude_columns,
details: gen_details,
external_reference: _,
} = mysql::generate_source_export_statement_values(&scx, purified_export)?;
if let Some(text_cols_option) = with_options
.iter_mut()
.find(|option| option.name == TableFromSourceOptionName::TextColumns)
{
if let Some(gen_text_columns) = gen_text_columns {
text_cols_option.value = Some(WithOptionValue::Sequence(gen_text_columns));
} else {
soft_panic_or_log!(
"text_columns should be Some if text_cols_option is present"
);
}
}
if let Some(ignore_cols_option) = with_options
.iter_mut()
.find(|option| option.name == TableFromSourceOptionName::ExcludeColumns)
{
if let Some(gen_exclude_columns) = gen_exclude_columns {
ignore_cols_option.value = Some(WithOptionValue::Sequence(gen_exclude_columns));
} else {
soft_panic_or_log!(
"text_columns should be Some if ignore_cols_option is present"
);
}
}
match columns {
TableFromSourceColumns::Defined(_) => unreachable!(),
TableFromSourceColumns::NotSpecified => {
*columns = TableFromSourceColumns::Defined(gen_columns);
*constraints = gen_constraints;
}
TableFromSourceColumns::Named(_) => {
sql_bail!("columns cannot be named for MySQL sources")
}
}
with_options.push(TableFromSourceOption {
name: TableFromSourceOptionName::Details,
value: Some(WithOptionValue::Value(Value::String(hex::encode(
gen_details.into_proto().encode_to_vec(),
)))),
})
}
PurifiedExportDetails::LoadGenerator { .. } => {
let (desc, output) = match purified_export.details {
PurifiedExportDetails::LoadGenerator { table, output } => (table, output),
_ => unreachable!("purified export details must be load generator"),
};
if let Some(desc) = desc {
let (gen_columns, gen_constraints) = scx.relation_desc_into_table_defs(&desc)?;
match columns {
TableFromSourceColumns::Defined(_) => unreachable!(),
TableFromSourceColumns::NotSpecified => {
*columns = TableFromSourceColumns::Defined(gen_columns);
*constraints = gen_constraints;
}
TableFromSourceColumns::Named(_) => {
sql_bail!("columns cannot be named for multi-output load generator sources")
}
}
}
let details = SourceExportStatementDetails::LoadGenerator { output };
with_options.push(TableFromSourceOption {
name: TableFromSourceOptionName::Details,
value: Some(WithOptionValue::Value(Value::String(hex::encode(
details.into_proto().encode_to_vec(),
)))),
})
}
PurifiedExportDetails::Kafka {} => {
let details = SourceExportStatementDetails::Kafka {};
with_options.push(TableFromSourceOption {
name: TableFromSourceOptionName::Details,
value: Some(WithOptionValue::Value(Value::String(hex::encode(
details.into_proto().encode_to_vec(),
)))),
})
}
};
Ok(PurifiedStatement::PurifiedCreateTableFromSource { stmt })
}
enum SourceFormatOptions {
Default,
Kafka { topic: String },
}
async fn purify_source_format(
catalog: &dyn SessionCatalog,
format: &mut Option<FormatSpecifier<Aug>>,
options: &SourceFormatOptions,
envelope: &Option<SourceEnvelope>,
storage_configuration: &StorageConfiguration,
) -> Result<(), PlanError> {
if matches!(format, Some(FormatSpecifier::KeyValue { .. }))
&& !matches!(options, SourceFormatOptions::Kafka { .. })
{
sql_bail!("Kafka sources are the only source type that can provide KEY/VALUE formats")
}
match format.as_mut() {
None => {}
Some(FormatSpecifier::Bare(format)) => {
purify_source_format_single(catalog, format, options, envelope, storage_configuration)
.await?;
}
Some(FormatSpecifier::KeyValue { key, value: val }) => {
purify_source_format_single(catalog, key, options, envelope, storage_configuration)
.await?;
purify_source_format_single(catalog, val, options, envelope, storage_configuration)
.await?;
}
}
Ok(())
}
async fn purify_source_format_single(
catalog: &dyn SessionCatalog,
format: &mut Format<Aug>,
options: &SourceFormatOptions,
envelope: &Option<SourceEnvelope>,
storage_configuration: &StorageConfiguration,
) -> Result<(), PlanError> {
match format {
Format::Avro(schema) => match schema {
AvroSchema::Csr { csr_connection } => {
purify_csr_connection_avro(
catalog,
options,
csr_connection,
envelope,
storage_configuration,
)
.await?
}
AvroSchema::InlineSchema { .. } => {}
},
Format::Protobuf(schema) => match schema {
ProtobufSchema::Csr { csr_connection } => {
purify_csr_connection_proto(
catalog,
options,
csr_connection,
envelope,
storage_configuration,
)
.await?;
}
ProtobufSchema::InlineSchema { .. } => {}
},
Format::Bytes
| Format::Regex(_)
| Format::Json { .. }
| Format::Text
| Format::Csv { .. } => (),
}
Ok(())
}
pub fn generate_subsource_statements(
scx: &StatementContext,
source_name: ResolvedItemName,
subsources: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
) -> Result<Vec<CreateSubsourceStatement<Aug>>, PlanError> {
if subsources.is_empty() {
return Ok(vec![]);
}
let (_, purified_export) = subsources.iter().next().unwrap();
let statements = match &purified_export.details {
PurifiedExportDetails::Postgres { .. } => {
crate::pure::postgres::generate_create_subsource_statements(
scx,
source_name,
subsources,
)?
}
PurifiedExportDetails::MySql { .. } => {
crate::pure::mysql::generate_create_subsource_statements(scx, source_name, subsources)?
}
PurifiedExportDetails::LoadGenerator { .. } => {
let mut subsource_stmts = Vec::with_capacity(subsources.len());
for (subsource_name, purified_export) in subsources {
let (desc, output) = match purified_export.details {
PurifiedExportDetails::LoadGenerator { table, output } => (table, output),
_ => unreachable!("purified export details must be load generator"),
};
let desc =
desc.expect("subsources cannot be generated for single-output load generators");
let (columns, table_constraints) = scx.relation_desc_into_table_defs(&desc)?;
let details = SourceExportStatementDetails::LoadGenerator { output };
let subsource = CreateSubsourceStatement {
name: subsource_name,
columns,
of_source: Some(source_name.clone()),
constraints: table_constraints,
if_not_exists: false,
with_options: vec![
CreateSubsourceOption {
name: CreateSubsourceOptionName::ExternalReference,
value: Some(WithOptionValue::UnresolvedItemName(
purified_export.external_reference,
)),
},
CreateSubsourceOption {
name: CreateSubsourceOptionName::Details,
value: Some(WithOptionValue::Value(Value::String(hex::encode(
details.into_proto().encode_to_vec(),
)))),
},
],
};
subsource_stmts.push(subsource);
}
subsource_stmts
}
PurifiedExportDetails::Kafka { .. } => {
assert!(
subsources.is_empty(),
"Kafka sources do not produce data-bearing subsources"
);
vec![]
}
};
Ok(statements)
}
async fn purify_csr_connection_proto(
catalog: &dyn SessionCatalog,
options: &SourceFormatOptions,
csr_connection: &mut CsrConnectionProtobuf<Aug>,
envelope: &Option<SourceEnvelope>,
storage_configuration: &StorageConfiguration,
) -> Result<(), PlanError> {
let SourceFormatOptions::Kafka { topic } = options else {
sql_bail!("Confluent Schema Registry is only supported with Kafka sources")
};
let CsrConnectionProtobuf {
seed,
connection: CsrConnection {
connection,
options: _,
},
} = csr_connection;
match seed {
None => {
let scx = StatementContext::new(None, &*catalog);
let ccsr_connection = match scx.get_item_by_resolved_name(connection)?.connection()? {
Connection::Csr(connection) => connection.clone().into_inline_connection(catalog),
_ => sql_bail!("{} is not a schema registry connection", connection),
};
let ccsr_client = ccsr_connection
.connect(storage_configuration, InTask::No)
.await
.map_err(|e| CsrPurificationError::ClientError(Arc::new(e)))?;
let value = compile_proto(&format!("{}-value", topic), &ccsr_client).await?;
let key = compile_proto(&format!("{}-key", topic), &ccsr_client)
.await
.ok();
if matches!(envelope, Some(SourceEnvelope::Debezium)) && key.is_none() {
sql_bail!("Key schema is required for ENVELOPE DEBEZIUM");
}
*seed = Some(CsrSeedProtobuf { value, key });
}
Some(_) => (),
}
Ok(())
}
async fn purify_csr_connection_avro(
catalog: &dyn SessionCatalog,
options: &SourceFormatOptions,
csr_connection: &mut CsrConnectionAvro<Aug>,
envelope: &Option<SourceEnvelope>,
storage_configuration: &StorageConfiguration,
) -> Result<(), PlanError> {
let SourceFormatOptions::Kafka { topic } = options else {
sql_bail!("Confluent Schema Registry is only supported with Kafka sources")
};
let CsrConnectionAvro {
connection: CsrConnection { connection, .. },
seed,
key_strategy,
value_strategy,
} = csr_connection;
if seed.is_none() {
let scx = StatementContext::new(None, &*catalog);
let csr_connection = match scx.get_item_by_resolved_name(connection)?.connection()? {
Connection::Csr(connection) => connection.clone().into_inline_connection(catalog),
_ => sql_bail!("{} is not a schema registry connection", connection),
};
let ccsr_client = csr_connection
.connect(storage_configuration, InTask::No)
.await
.map_err(|e| CsrPurificationError::ClientError(Arc::new(e)))?;
let Schema {
key_schema,
value_schema,
} = get_remote_csr_schema(
&ccsr_client,
key_strategy.clone().unwrap_or_default(),
value_strategy.clone().unwrap_or_default(),
topic,
)
.await?;
if matches!(envelope, Some(SourceEnvelope::Debezium)) && key_schema.is_none() {
sql_bail!("Key schema is required for ENVELOPE DEBEZIUM");
}
*seed = Some(CsrSeedAvro {
key_schema,
value_schema,
})
}
Ok(())
}
#[derive(Debug)]
pub struct Schema {
pub key_schema: Option<String>,
pub value_schema: String,
}
async fn get_schema_with_strategy(
client: &Client,
strategy: ReaderSchemaSelectionStrategy,
subject: &str,
) -> Result<Option<String>, PlanError> {
match strategy {
ReaderSchemaSelectionStrategy::Latest => {
match client.get_schema_by_subject(subject).await {
Ok(CcsrSchema { raw, .. }) => Ok(Some(raw)),
Err(GetBySubjectError::SubjectNotFound)
| Err(GetBySubjectError::VersionNotFound(_)) => Ok(None),
Err(e) => Err(PlanError::FetchingCsrSchemaFailed {
schema_lookup: format!("subject {}", subject.quoted()),
cause: Arc::new(e),
}),
}
}
ReaderSchemaSelectionStrategy::Inline(raw) => Ok(Some(raw)),
ReaderSchemaSelectionStrategy::ById(id) => match client.get_schema_by_id(id).await {
Ok(CcsrSchema { raw, .. }) => Ok(Some(raw)),
Err(GetByIdError::SchemaNotFound) => Ok(None),
Err(e) => Err(PlanError::FetchingCsrSchemaFailed {
schema_lookup: format!("ID {}", id),
cause: Arc::new(e),
}),
},
}
}
async fn get_remote_csr_schema(
ccsr_client: &mz_ccsr::Client,
key_strategy: ReaderSchemaSelectionStrategy,
value_strategy: ReaderSchemaSelectionStrategy,
topic: &str,
) -> Result<Schema, PlanError> {
let value_schema_name = format!("{}-value", topic);
let value_schema =
get_schema_with_strategy(ccsr_client, value_strategy, &value_schema_name).await?;
let value_schema = value_schema.ok_or_else(|| anyhow!("No value schema found"))?;
let subject = format!("{}-key", topic);
let key_schema = get_schema_with_strategy(ccsr_client, key_strategy, &subject).await?;
Ok(Schema {
key_schema,
value_schema,
})
}
async fn compile_proto(
subject_name: &String,
ccsr_client: &Client,
) -> Result<CsrSeedProtobufSchema, PlanError> {
let (primary_subject, dependency_subjects) = ccsr_client
.get_subject_and_references(subject_name)
.await
.map_err(|e| PlanError::FetchingCsrSchemaFailed {
schema_lookup: format!("subject {}", subject_name.quoted()),
cause: Arc::new(e),
})?;
let mut source_tree = VirtualSourceTree::new();
for subject in iter::once(&primary_subject).chain(dependency_subjects.iter()) {
source_tree.as_mut().add_file(
Path::new(&subject.name),
subject.schema.raw.as_bytes().to_vec(),
);
}
let mut db = SourceTreeDescriptorDatabase::new(source_tree.as_mut());
let fds = db
.as_mut()
.build_file_descriptor_set(&[Path::new(&primary_subject.name)])
.map_err(|cause| PlanError::InvalidProtobufSchema { cause })?;
let primary_fd = fds.file(0);
let message_name = match primary_fd.message_type_size() {
1 => String::from_utf8_lossy(primary_fd.message_type(0).name()).into_owned(),
0 => bail_unsupported!(29603, "Protobuf schemas with no messages"),
_ => bail_unsupported!(29603, "Protobuf schemas with multiple messages"),
};
let bytes = &fds
.serialize()
.map_err(|cause| PlanError::InvalidProtobufSchema { cause })?;
let mut schema = String::new();
strconv::format_bytes(&mut schema, bytes);
Ok(CsrSeedProtobufSchema {
schema,
message_name,
})
}
const MZ_NOW_NAME: &str = "mz_now";
const MZ_NOW_SCHEMA: &str = "mz_catalog";
pub fn purify_create_materialized_view_options(
catalog: impl SessionCatalog,
mz_now: Option<Timestamp>,
cmvs: &mut CreateMaterializedViewStatement<Aug>,
resolved_ids: &mut ResolvedIds,
) {
let (mz_now_id, mz_now_expr) = {
let item = catalog
.resolve_function(&PartialItemName {
database: None,
schema: Some(MZ_NOW_SCHEMA.to_string()),
item: MZ_NOW_NAME.to_string(),
})
.expect("we should be able to resolve mz_now");
(
item.id(),
Expr::Function(Function {
name: ResolvedItemName::Item {
id: item.id(),
qualifiers: item.name().qualifiers.clone(),
full_name: catalog.resolve_full_name(item.name()),
print_id: false,
version: RelationVersionSelector::Latest,
},
args: FunctionArgs::Args {
args: Vec::new(),
order_by: Vec::new(),
},
filter: None,
over: None,
distinct: false,
}),
)
};
let (mz_timestamp_id, mz_timestamp_type) = {
let item = catalog.get_system_type("mz_timestamp");
let full_name = catalog.resolve_full_name(item.name());
(
item.id(),
ResolvedDataType::Named {
id: item.id(),
qualifiers: item.name().qualifiers.clone(),
full_name,
modifiers: vec![],
print_id: true,
},
)
};
let mut introduced_mz_timestamp = false;
for option in cmvs.with_options.iter_mut() {
if matches!(
option.value,
Some(WithOptionValue::Refresh(RefreshOptionValue::AtCreation))
) {
option.value = Some(WithOptionValue::Refresh(RefreshOptionValue::At(
RefreshAtOptionValue {
time: mz_now_expr.clone(),
},
)));
}
if let Some(WithOptionValue::Refresh(RefreshOptionValue::Every(
RefreshEveryOptionValue { aligned_to, .. },
))) = &mut option.value
{
if aligned_to.is_none() {
*aligned_to = Some(mz_now_expr.clone());
}
}
match &mut option.value {
Some(WithOptionValue::Refresh(RefreshOptionValue::At(RefreshAtOptionValue {
time,
}))) => {
let mut visitor = MzNowPurifierVisitor::new(mz_now, mz_timestamp_type.clone());
visitor.visit_expr_mut(time);
introduced_mz_timestamp |= visitor.introduced_mz_timestamp;
}
Some(WithOptionValue::Refresh(RefreshOptionValue::Every(
RefreshEveryOptionValue {
interval: _,
aligned_to: Some(aligned_to),
},
))) => {
let mut visitor = MzNowPurifierVisitor::new(mz_now, mz_timestamp_type.clone());
visitor.visit_expr_mut(aligned_to);
introduced_mz_timestamp |= visitor.introduced_mz_timestamp;
}
_ => {}
}
}
if !cmvs.with_options.iter().any(|o| {
matches!(
o,
MaterializedViewOption {
value: Some(WithOptionValue::Refresh(..)),
..
}
)
}) {
cmvs.with_options.push(MaterializedViewOption {
name: MaterializedViewOptionName::Refresh,
value: Some(WithOptionValue::Refresh(RefreshOptionValue::OnCommit)),
})
}
if introduced_mz_timestamp {
resolved_ids.add_item(mz_timestamp_id);
}
let mut visitor = ExprContainsTemporalVisitor::new();
visitor.visit_create_materialized_view_statement(cmvs);
if !visitor.contains_temporal {
resolved_ids.remove_item(&mz_now_id);
}
}
pub fn materialized_view_option_contains_temporal(mvo: &MaterializedViewOption<Aug>) -> bool {
match &mvo.value {
Some(WithOptionValue::Refresh(RefreshOptionValue::At(RefreshAtOptionValue { time }))) => {
let mut visitor = ExprContainsTemporalVisitor::new();
visitor.visit_expr(time);
visitor.contains_temporal
}
Some(WithOptionValue::Refresh(RefreshOptionValue::Every(RefreshEveryOptionValue {
interval: _,
aligned_to: Some(aligned_to),
}))) => {
let mut visitor = ExprContainsTemporalVisitor::new();
visitor.visit_expr(aligned_to);
visitor.contains_temporal
}
Some(WithOptionValue::Refresh(RefreshOptionValue::Every(RefreshEveryOptionValue {
interval: _,
aligned_to: None,
}))) => {
true
}
Some(WithOptionValue::Refresh(RefreshOptionValue::AtCreation)) => {
true
}
_ => false,
}
}
struct ExprContainsTemporalVisitor {
pub contains_temporal: bool,
}
impl ExprContainsTemporalVisitor {
pub fn new() -> ExprContainsTemporalVisitor {
ExprContainsTemporalVisitor {
contains_temporal: false,
}
}
}
impl Visit<'_, Aug> for ExprContainsTemporalVisitor {
fn visit_function(&mut self, func: &Function<Aug>) {
self.contains_temporal |= func.name.full_item_name().item == MZ_NOW_NAME;
visit_function(self, func);
}
}
struct MzNowPurifierVisitor {
pub mz_now: Option<Timestamp>,
pub mz_timestamp_type: ResolvedDataType,
pub introduced_mz_timestamp: bool,
}
impl MzNowPurifierVisitor {
pub fn new(
mz_now: Option<Timestamp>,
mz_timestamp_type: ResolvedDataType,
) -> MzNowPurifierVisitor {
MzNowPurifierVisitor {
mz_now,
mz_timestamp_type,
introduced_mz_timestamp: false,
}
}
}
impl VisitMut<'_, Aug> for MzNowPurifierVisitor {
fn visit_expr_mut(&mut self, expr: &'_ mut Expr<Aug>) {
match expr {
Expr::Function(Function {
name:
ResolvedItemName::Item {
full_name: FullItemName { item, .. },
..
},
..
}) if item == &MZ_NOW_NAME.to_string() => {
let mz_now = self.mz_now.expect(
"we should have chosen a timestamp if the expression contains mz_now()",
);
*expr = Expr::Cast {
expr: Box::new(Expr::Value(Value::Number(mz_now.to_string()))),
data_type: self.mz_timestamp_type.clone(),
};
self.introduced_mz_timestamp = true;
}
_ => visit_expr_mut(self, expr),
}
}
}