use std::fmt;
use crate::ast::display::{self, AstDisplay, AstFormatter, WithOptionName};
use crate::ast::{
AstInfo, ColumnName, Expr, Ident, OrderByExpr, UnresolvedItemName, WithOptionValue,
};
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub enum MaterializedViewOptionName {
AssertNotNull,
RetainHistory,
Refresh,
}
impl AstDisplay for MaterializedViewOptionName {
fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
match self {
MaterializedViewOptionName::AssertNotNull => f.write_str("ASSERT NOT NULL"),
MaterializedViewOptionName::RetainHistory => f.write_str("RETAIN HISTORY"),
MaterializedViewOptionName::Refresh => f.write_str("REFRESH"),
}
}
}
impl WithOptionName for MaterializedViewOptionName {
fn redact_value(&self) -> bool {
match self {
MaterializedViewOptionName::AssertNotNull
| MaterializedViewOptionName::RetainHistory
| MaterializedViewOptionName::Refresh => false,
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct MaterializedViewOption<T: AstInfo> {
pub name: MaterializedViewOptionName,
pub value: Option<WithOptionValue<T>>,
}
impl_display_for_with_option!(MaterializedViewOption);
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct Schema {
pub schema: String,
}
impl AstDisplay for Schema {
fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
f.write_str("SCHEMA '");
f.write_node(&display::escape_single_quote_string(&self.schema));
f.write_str("'");
}
}
impl_display!(Schema);
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub enum AvroSchemaOptionName {
ConfluentWireFormat,
}
impl AstDisplay for AvroSchemaOptionName {
fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
match self {
AvroSchemaOptionName::ConfluentWireFormat => f.write_str("CONFLUENT WIRE FORMAT"),
}
}
}
impl WithOptionName for AvroSchemaOptionName {
fn redact_value(&self) -> bool {
match self {
Self::ConfluentWireFormat => false,
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct AvroSchemaOption<T: AstInfo> {
pub name: AvroSchemaOptionName,
pub value: Option<WithOptionValue<T>>,
}
impl_display_for_with_option!(AvroSchemaOption);
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub enum AvroSchema<T: AstInfo> {
Csr {
csr_connection: CsrConnectionAvro<T>,
},
InlineSchema {
schema: Schema,
with_options: Vec<AvroSchemaOption<T>>,
},
}
impl<T: AstInfo> AstDisplay for AvroSchema<T> {
fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
match self {
Self::Csr { csr_connection } => {
f.write_node(csr_connection);
}
Self::InlineSchema {
schema,
with_options,
} => {
f.write_str("USING ");
schema.fmt(f);
if !with_options.is_empty() {
f.write_str(" (");
f.write_node(&display::comma_separated(with_options));
f.write_str(")");
}
}
}
}
}
impl_display_t!(AvroSchema);
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub enum ProtobufSchema<T: AstInfo> {
Csr {
csr_connection: CsrConnectionProtobuf<T>,
},
InlineSchema {
message_name: String,
schema: Schema,
},
}
impl<T: AstInfo> AstDisplay for ProtobufSchema<T> {
fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
match self {
Self::Csr { csr_connection } => {
f.write_node(csr_connection);
}
Self::InlineSchema {
message_name,
schema,
} => {
f.write_str("MESSAGE '");
f.write_node(&display::escape_single_quote_string(message_name));
f.write_str("' USING ");
f.write_str(schema);
}
}
}
}
impl_display_t!(ProtobufSchema);
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub enum CsrConfigOptionName<T: AstInfo> {
AvroKeyFullname,
AvroValueFullname,
NullDefaults,
AvroDocOn(AvroDocOn<T>),
KeyCompatibilityLevel,
ValueCompatibilityLevel,
}
impl<T: AstInfo> WithOptionName for CsrConfigOptionName<T> {
fn redact_value(&self) -> bool {
match self {
Self::AvroKeyFullname
| Self::AvroValueFullname
| Self::NullDefaults
| Self::AvroDocOn(_)
| Self::KeyCompatibilityLevel
| Self::ValueCompatibilityLevel => false,
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct AvroDocOn<T: AstInfo> {
pub identifier: DocOnIdentifier<T>,
pub for_schema: DocOnSchema,
}
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub enum DocOnSchema {
KeyOnly,
ValueOnly,
All,
}
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub enum DocOnIdentifier<T: AstInfo> {
Column(ColumnName<T>),
Type(T::ItemName),
}
impl<T: AstInfo> AstDisplay for AvroDocOn<T> {
fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
match &self.for_schema {
DocOnSchema::KeyOnly => f.write_str("KEY "),
DocOnSchema::ValueOnly => f.write_str("VALUE "),
DocOnSchema::All => {}
}
match &self.identifier {
DocOnIdentifier::Column(name) => {
f.write_str("DOC ON COLUMN ");
f.write_node(name);
}
DocOnIdentifier::Type(name) => {
f.write_str("DOC ON TYPE ");
f.write_node(name);
}
}
}
}
impl_display_t!(AvroDocOn);
impl<T: AstInfo> AstDisplay for CsrConfigOptionName<T> {
fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
match self {
CsrConfigOptionName::AvroKeyFullname => f.write_str("AVRO KEY FULLNAME"),
CsrConfigOptionName::AvroValueFullname => f.write_str("AVRO VALUE FULLNAME"),
CsrConfigOptionName::NullDefaults => f.write_str("NULL DEFAULTS"),
CsrConfigOptionName::AvroDocOn(doc_on) => f.write_node(doc_on),
CsrConfigOptionName::KeyCompatibilityLevel => f.write_str("KEY COMPATIBILITY LEVEL"),
CsrConfigOptionName::ValueCompatibilityLevel => {
f.write_str("VALUE COMPATIBILITY LEVEL")
}
}
}
}
impl_display_t!(CsrConfigOptionName);
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct CsrConfigOption<T: AstInfo> {
pub name: CsrConfigOptionName<T>,
pub value: Option<WithOptionValue<T>>,
}
impl_display_for_with_option!(CsrConfigOption);
impl_display_t!(CsrConfigOption);
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct CsrConnection<T: AstInfo> {
pub connection: T::ItemName,
pub options: Vec<CsrConfigOption<T>>,
}
impl<T: AstInfo> AstDisplay for CsrConnection<T> {
fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
f.write_str("CONNECTION ");
f.write_node(&self.connection);
if !self.options.is_empty() {
f.write_str(" (");
f.write_node(&display::comma_separated(&self.options));
f.write_str(")");
}
}
}
impl_display_t!(CsrConnection);
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub enum ReaderSchemaSelectionStrategy {
Latest,
Inline(String),
ById(i32),
}
impl Default for ReaderSchemaSelectionStrategy {
fn default() -> Self {
Self::Latest
}
}
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct CsrConnectionAvro<T: AstInfo> {
pub connection: CsrConnection<T>,
pub key_strategy: Option<ReaderSchemaSelectionStrategy>,
pub value_strategy: Option<ReaderSchemaSelectionStrategy>,
pub seed: Option<CsrSeedAvro>,
}
impl<T: AstInfo> AstDisplay for CsrConnectionAvro<T> {
fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
f.write_str("USING CONFLUENT SCHEMA REGISTRY ");
f.write_node(&self.connection);
if let Some(seed) = &self.seed {
f.write_str(" ");
f.write_node(seed);
}
}
}
impl_display_t!(CsrConnectionAvro);
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct CsrConnectionProtobuf<T: AstInfo> {
pub connection: CsrConnection<T>,
pub seed: Option<CsrSeedProtobuf>,
}
impl<T: AstInfo> AstDisplay for CsrConnectionProtobuf<T> {
fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
f.write_str("USING CONFLUENT SCHEMA REGISTRY ");
f.write_node(&self.connection);
if let Some(seed) = &self.seed {
f.write_str(" ");
f.write_node(seed);
}
}
}
impl_display_t!(CsrConnectionProtobuf);
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct CsrSeedAvro {
pub key_schema: Option<String>,
pub value_schema: String,
}
impl AstDisplay for CsrSeedAvro {
fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
f.write_str("SEED");
if let Some(key_schema) = &self.key_schema {
f.write_str(" KEY SCHEMA '");
f.write_node(&display::escape_single_quote_string(key_schema));
f.write_str("'");
}
f.write_str(" VALUE SCHEMA '");
f.write_node(&display::escape_single_quote_string(&self.value_schema));
f.write_str("'");
}
}
impl_display!(CsrSeedAvro);
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct CsrSeedProtobuf {
pub key: Option<CsrSeedProtobufSchema>,
pub value: CsrSeedProtobufSchema,
}
impl AstDisplay for CsrSeedProtobuf {
fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
f.write_str("SEED");
if let Some(key) = &self.key {
f.write_str(" KEY ");
f.write_node(key);
}
f.write_str(" VALUE ");
f.write_node(&self.value);
}
}
impl_display!(CsrSeedProtobuf);
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct CsrSeedProtobufSchema {
pub schema: String,
pub message_name: String,
}
impl AstDisplay for CsrSeedProtobufSchema {
fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
f.write_str("SCHEMA '");
f.write_str(&display::escape_single_quote_string(&self.schema));
f.write_str("' MESSAGE '");
f.write_str(&self.message_name);
f.write_str("'");
}
}
impl_display!(CsrSeedProtobufSchema);
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub enum FormatSpecifier<T: AstInfo> {
Bare(Format<T>),
KeyValue { key: Format<T>, value: Format<T> },
}
impl<T: AstInfo> AstDisplay for FormatSpecifier<T> {
fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
match self {
FormatSpecifier::Bare(format) => {
f.write_str(" FORMAT ");
f.write_node(format)
}
FormatSpecifier::KeyValue { key, value } => {
f.write_str(" KEY FORMAT ");
f.write_node(key);
f.write_str(" VALUE FORMAT ");
f.write_node(value);
}
}
}
}
impl_display_t!(FormatSpecifier);
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub enum Format<T: AstInfo> {
Bytes,
Avro(AvroSchema<T>),
Protobuf(ProtobufSchema<T>),
Regex(String),
Csv {
columns: CsvColumns,
delimiter: char,
},
Json {
array: bool,
},
Text,
}
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub enum CsvColumns {
Count(u64),
Header { names: Vec<Ident> },
}
impl AstDisplay for CsvColumns {
fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
match self {
CsvColumns::Count(n) => {
f.write_str(n);
f.write_str(" COLUMNS")
}
CsvColumns::Header { names } => {
f.write_str("HEADER");
if !names.is_empty() {
f.write_str(" (");
f.write_node(&display::comma_separated(names));
f.write_str(")");
}
}
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub enum SourceIncludeMetadata {
Key {
alias: Option<Ident>,
},
Timestamp {
alias: Option<Ident>,
},
Partition {
alias: Option<Ident>,
},
Offset {
alias: Option<Ident>,
},
Headers {
alias: Option<Ident>,
},
Header {
key: String,
alias: Ident,
use_bytes: bool,
},
}
impl AstDisplay for SourceIncludeMetadata {
fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
let print_alias = |f: &mut AstFormatter<W>, alias: &Option<Ident>| {
if let Some(alias) = alias {
f.write_str(" AS ");
f.write_node(alias);
}
};
match self {
SourceIncludeMetadata::Key { alias } => {
f.write_str("KEY");
print_alias(f, alias);
}
SourceIncludeMetadata::Timestamp { alias } => {
f.write_str("TIMESTAMP");
print_alias(f, alias);
}
SourceIncludeMetadata::Partition { alias } => {
f.write_str("PARTITION");
print_alias(f, alias);
}
SourceIncludeMetadata::Offset { alias } => {
f.write_str("OFFSET");
print_alias(f, alias);
}
SourceIncludeMetadata::Headers { alias } => {
f.write_str("HEADERS");
print_alias(f, alias);
}
SourceIncludeMetadata::Header {
alias,
key,
use_bytes,
} => {
f.write_str("HEADER '");
f.write_str(&display::escape_single_quote_string(key));
f.write_str("'");
print_alias(f, &Some(alias.clone()));
if *use_bytes {
f.write_str(" BYTES");
}
}
}
}
}
impl_display!(SourceIncludeMetadata);
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub enum SourceErrorPolicy {
Inline {
alias: Option<Ident>,
},
}
impl AstDisplay for SourceErrorPolicy {
fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
match self {
Self::Inline { alias } => {
f.write_str("INLINE");
if let Some(alias) = alias {
f.write_str(" AS ");
f.write_node(alias);
}
}
}
}
}
impl_display!(SourceErrorPolicy);
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub enum SourceEnvelope {
None,
Debezium,
Upsert {
value_decode_err_policy: Vec<SourceErrorPolicy>,
},
CdcV2,
}
impl SourceEnvelope {
pub fn requires_all_input(&self) -> bool {
match self {
SourceEnvelope::None => false,
SourceEnvelope::Debezium => false,
SourceEnvelope::Upsert { .. } => false,
SourceEnvelope::CdcV2 => true,
}
}
}
impl AstDisplay for SourceEnvelope {
fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
match self {
Self::None => {
f.write_str("NONE");
}
Self::Debezium => {
f.write_str("DEBEZIUM");
}
Self::Upsert {
value_decode_err_policy,
} => {
if value_decode_err_policy.is_empty() {
f.write_str("UPSERT");
} else {
f.write_str("UPSERT (VALUE DECODING ERRORS = (");
f.write_node(&display::comma_separated(value_decode_err_policy));
f.write_str("))")
}
}
Self::CdcV2 => {
f.write_str("MATERIALIZE");
}
}
}
}
impl_display!(SourceEnvelope);
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub enum SinkEnvelope {
Debezium,
Upsert,
}
impl AstDisplay for SinkEnvelope {
fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
match self {
Self::Upsert => {
f.write_str("UPSERT");
}
Self::Debezium => {
f.write_str("DEBEZIUM");
}
}
}
}
impl_display!(SinkEnvelope);
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub enum SubscribeOutput<T: AstInfo> {
Diffs,
WithinTimestampOrderBy { order_by: Vec<OrderByExpr<T>> },
EnvelopeUpsert { key_columns: Vec<Ident> },
EnvelopeDebezium { key_columns: Vec<Ident> },
}
impl<T: AstInfo> AstDisplay for SubscribeOutput<T> {
fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
match self {
Self::Diffs => {}
Self::WithinTimestampOrderBy { order_by } => {
f.write_str(" WITHIN TIMESTAMP ORDER BY ");
f.write_node(&display::comma_separated(order_by));
}
Self::EnvelopeUpsert { key_columns } => {
f.write_str(" ENVELOPE UPSERT (KEY (");
f.write_node(&display::comma_separated(key_columns));
f.write_str("))");
}
Self::EnvelopeDebezium { key_columns } => {
f.write_str(" ENVELOPE DEBEZIUM (KEY (");
f.write_node(&display::comma_separated(key_columns));
f.write_str("))");
}
}
}
}
impl_display_t!(SubscribeOutput);
impl<T: AstInfo> AstDisplay for Format<T> {
fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
match self {
Self::Bytes => f.write_str("BYTES"),
Self::Avro(inner) => {
f.write_str("AVRO ");
f.write_node(inner);
}
Self::Protobuf(inner) => {
f.write_str("PROTOBUF ");
f.write_node(inner);
}
Self::Regex(regex) => {
f.write_str("REGEX '");
f.write_node(&display::escape_single_quote_string(regex));
f.write_str("'");
}
Self::Csv { columns, delimiter } => {
f.write_str("CSV WITH ");
f.write_node(columns);
if *delimiter != ',' {
f.write_str(" DELIMITED BY '");
f.write_node(&display::escape_single_quote_string(&delimiter.to_string()));
f.write_str("'");
}
}
Self::Json { array } => {
f.write_str("JSON");
if *array {
f.write_str(" ARRAY");
}
}
Self::Text => f.write_str("TEXT"),
}
}
}
impl_display_t!(Format);
#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub enum ConnectionOptionName {
AccessKeyId,
AssumeRoleArn,
AssumeRoleSessionName,
AvailabilityZones,
AwsConnection,
AwsPrivatelink,
Broker,
Brokers,
Database,
Endpoint,
Host,
Password,
Port,
ProgressTopic,
ProgressTopicReplicationFactor,
Region,
SaslMechanisms,
SaslPassword,
SaslUsername,
SecretAccessKey,
SecurityProtocol,
ServiceName,
SshTunnel,
SslCertificate,
SslCertificateAuthority,
SslKey,
SslMode,
SessionToken,
Url,
User,
}
impl AstDisplay for ConnectionOptionName {
fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
f.write_str(match self {
ConnectionOptionName::AccessKeyId => "ACCESS KEY ID",
ConnectionOptionName::AvailabilityZones => "AVAILABILITY ZONES",
ConnectionOptionName::AwsConnection => "AWS CONNECTION",
ConnectionOptionName::AwsPrivatelink => "AWS PRIVATELINK",
ConnectionOptionName::Broker => "BROKER",
ConnectionOptionName::Brokers => "BROKERS",
ConnectionOptionName::Database => "DATABASE",
ConnectionOptionName::Endpoint => "ENDPOINT",
ConnectionOptionName::Host => "HOST",
ConnectionOptionName::Password => "PASSWORD",
ConnectionOptionName::Port => "PORT",
ConnectionOptionName::ProgressTopic => "PROGRESS TOPIC",
ConnectionOptionName::ProgressTopicReplicationFactor => {
"PROGRESS TOPIC REPLICATION FACTOR"
}
ConnectionOptionName::Region => "REGION",
ConnectionOptionName::AssumeRoleArn => "ASSUME ROLE ARN",
ConnectionOptionName::AssumeRoleSessionName => "ASSUME ROLE SESSION NAME",
ConnectionOptionName::SaslMechanisms => "SASL MECHANISMS",
ConnectionOptionName::SaslPassword => "SASL PASSWORD",
ConnectionOptionName::SaslUsername => "SASL USERNAME",
ConnectionOptionName::SecurityProtocol => "SECURITY PROTOCOL",
ConnectionOptionName::SecretAccessKey => "SECRET ACCESS KEY",
ConnectionOptionName::ServiceName => "SERVICE NAME",
ConnectionOptionName::SshTunnel => "SSH TUNNEL",
ConnectionOptionName::SslCertificate => "SSL CERTIFICATE",
ConnectionOptionName::SslCertificateAuthority => "SSL CERTIFICATE AUTHORITY",
ConnectionOptionName::SslKey => "SSL KEY",
ConnectionOptionName::SslMode => "SSL MODE",
ConnectionOptionName::SessionToken => "SESSION TOKEN",
ConnectionOptionName::Url => "URL",
ConnectionOptionName::User => "USER",
})
}
}
impl_display!(ConnectionOptionName);
impl WithOptionName for ConnectionOptionName {
fn redact_value(&self) -> bool {
match self {
ConnectionOptionName::AccessKeyId
| ConnectionOptionName::AvailabilityZones
| ConnectionOptionName::AwsConnection
| ConnectionOptionName::AwsPrivatelink
| ConnectionOptionName::Broker
| ConnectionOptionName::Brokers
| ConnectionOptionName::Database
| ConnectionOptionName::Endpoint
| ConnectionOptionName::Host
| ConnectionOptionName::Password
| ConnectionOptionName::Port
| ConnectionOptionName::ProgressTopic
| ConnectionOptionName::ProgressTopicReplicationFactor
| ConnectionOptionName::Region
| ConnectionOptionName::AssumeRoleArn
| ConnectionOptionName::AssumeRoleSessionName
| ConnectionOptionName::SaslMechanisms
| ConnectionOptionName::SaslPassword
| ConnectionOptionName::SaslUsername
| ConnectionOptionName::SecurityProtocol
| ConnectionOptionName::SecretAccessKey
| ConnectionOptionName::ServiceName
| ConnectionOptionName::SshTunnel
| ConnectionOptionName::SslCertificate
| ConnectionOptionName::SslCertificateAuthority
| ConnectionOptionName::SslKey
| ConnectionOptionName::SslMode
| ConnectionOptionName::SessionToken
| ConnectionOptionName::Url
| ConnectionOptionName::User => false,
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct ConnectionOption<T: AstInfo> {
pub name: ConnectionOptionName,
pub value: Option<WithOptionValue<T>>,
}
impl_display_for_with_option!(ConnectionOption);
impl_display_t!(ConnectionOption);
#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
pub enum CreateConnectionType {
Aws,
AwsPrivatelink,
Kafka,
Csr,
Postgres,
Ssh,
MySql,
Yugabyte,
}
impl AstDisplay for CreateConnectionType {
fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
match self {
Self::Kafka => {
f.write_str("KAFKA");
}
Self::Csr => {
f.write_str("CONFLUENT SCHEMA REGISTRY");
}
Self::Postgres => {
f.write_str("POSTGRES");
}
Self::Aws => {
f.write_str("AWS");
}
Self::AwsPrivatelink => {
f.write_str("AWS PRIVATELINK");
}
Self::Ssh => {
f.write_str("SSH TUNNEL");
}
Self::MySql => {
f.write_str("MYSQL");
}
Self::Yugabyte => {
f.write_str("YUGABYTE");
}
}
}
}
impl_display!(CreateConnectionType);
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub enum CreateConnectionOptionName {
Validate,
}
impl AstDisplay for CreateConnectionOptionName {
fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
f.write_str(match self {
CreateConnectionOptionName::Validate => "VALIDATE",
})
}
}
impl_display!(CreateConnectionOptionName);
impl WithOptionName for CreateConnectionOptionName {
fn redact_value(&self) -> bool {
match self {
CreateConnectionOptionName::Validate => false,
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct CreateConnectionOption<T: AstInfo> {
pub name: CreateConnectionOptionName,
pub value: Option<WithOptionValue<T>>,
}
impl_display_for_with_option!(CreateConnectionOption);
impl_display_t!(CreateConnectionOption);
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub enum KafkaSourceConfigOptionName {
GroupIdPrefix,
Topic,
TopicMetadataRefreshInterval,
StartTimestamp,
StartOffset,
}
impl AstDisplay for KafkaSourceConfigOptionName {
fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
f.write_str(match self {
KafkaSourceConfigOptionName::GroupIdPrefix => "GROUP ID PREFIX",
KafkaSourceConfigOptionName::Topic => "TOPIC",
KafkaSourceConfigOptionName::TopicMetadataRefreshInterval => {
"TOPIC METADATA REFRESH INTERVAL"
}
KafkaSourceConfigOptionName::StartOffset => "START OFFSET",
KafkaSourceConfigOptionName::StartTimestamp => "START TIMESTAMP",
})
}
}
impl_display!(KafkaSourceConfigOptionName);
impl WithOptionName for KafkaSourceConfigOptionName {
fn redact_value(&self) -> bool {
match self {
KafkaSourceConfigOptionName::GroupIdPrefix
| KafkaSourceConfigOptionName::Topic
| KafkaSourceConfigOptionName::TopicMetadataRefreshInterval
| KafkaSourceConfigOptionName::StartOffset
| KafkaSourceConfigOptionName::StartTimestamp => false,
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct KafkaSourceConfigOption<T: AstInfo> {
pub name: KafkaSourceConfigOptionName,
pub value: Option<WithOptionValue<T>>,
}
impl_display_for_with_option!(KafkaSourceConfigOption);
impl_display_t!(KafkaSourceConfigOption);
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub enum KafkaSinkConfigOptionName {
CompressionType,
PartitionBy,
ProgressGroupIdPrefix,
Topic,
TransactionalIdPrefix,
LegacyIds,
TopicConfig,
TopicMetadataRefreshInterval,
TopicPartitionCount,
TopicReplicationFactor,
}
impl AstDisplay for KafkaSinkConfigOptionName {
fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
f.write_str(match self {
KafkaSinkConfigOptionName::CompressionType => "COMPRESSION TYPE",
KafkaSinkConfigOptionName::PartitionBy => "PARTITION BY",
KafkaSinkConfigOptionName::ProgressGroupIdPrefix => "PROGRESS GROUP ID PREFIX",
KafkaSinkConfigOptionName::Topic => "TOPIC",
KafkaSinkConfigOptionName::TransactionalIdPrefix => "TRANSACTIONAL ID PREFIX",
KafkaSinkConfigOptionName::LegacyIds => "LEGACY IDS",
KafkaSinkConfigOptionName::TopicConfig => "TOPIC CONFIG",
KafkaSinkConfigOptionName::TopicMetadataRefreshInterval => {
"TOPIC METADATA REFRESH INTERVAL"
}
KafkaSinkConfigOptionName::TopicPartitionCount => "TOPIC PARTITION COUNT",
KafkaSinkConfigOptionName::TopicReplicationFactor => "TOPIC REPLICATION FACTOR",
})
}
}
impl_display!(KafkaSinkConfigOptionName);
impl WithOptionName for KafkaSinkConfigOptionName {
fn redact_value(&self) -> bool {
match self {
KafkaSinkConfigOptionName::CompressionType
| KafkaSinkConfigOptionName::ProgressGroupIdPrefix
| KafkaSinkConfigOptionName::Topic
| KafkaSinkConfigOptionName::TopicMetadataRefreshInterval
| KafkaSinkConfigOptionName::TransactionalIdPrefix
| KafkaSinkConfigOptionName::LegacyIds
| KafkaSinkConfigOptionName::TopicConfig
| KafkaSinkConfigOptionName::TopicPartitionCount
| KafkaSinkConfigOptionName::TopicReplicationFactor => false,
KafkaSinkConfigOptionName::PartitionBy => true,
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct KafkaSinkConfigOption<T: AstInfo> {
pub name: KafkaSinkConfigOptionName,
pub value: Option<WithOptionValue<T>>,
}
impl_display_for_with_option!(KafkaSinkConfigOption);
impl_display_t!(KafkaSinkConfigOption);
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub enum PgConfigOptionName {
Details,
Publication,
TextColumns,
}
impl AstDisplay for PgConfigOptionName {
fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
f.write_str(match self {
PgConfigOptionName::Details => "DETAILS",
PgConfigOptionName::Publication => "PUBLICATION",
PgConfigOptionName::TextColumns => "TEXT COLUMNS",
})
}
}
impl_display!(PgConfigOptionName);
impl WithOptionName for PgConfigOptionName {
fn redact_value(&self) -> bool {
match self {
PgConfigOptionName::Details
| PgConfigOptionName::Publication
| PgConfigOptionName::TextColumns => false,
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct PgConfigOption<T: AstInfo> {
pub name: PgConfigOptionName,
pub value: Option<WithOptionValue<T>>,
}
impl_display_for_with_option!(PgConfigOption);
impl_display_t!(PgConfigOption);
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub enum MySqlConfigOptionName {
Details,
TextColumns,
ExcludeColumns,
}
impl AstDisplay for MySqlConfigOptionName {
fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
f.write_str(match self {
MySqlConfigOptionName::Details => "DETAILS",
MySqlConfigOptionName::TextColumns => "TEXT COLUMNS",
MySqlConfigOptionName::ExcludeColumns => "EXCLUDE COLUMNS",
})
}
}
impl_display!(MySqlConfigOptionName);
impl WithOptionName for MySqlConfigOptionName {
fn redact_value(&self) -> bool {
match self {
MySqlConfigOptionName::Details
| MySqlConfigOptionName::TextColumns
| MySqlConfigOptionName::ExcludeColumns => false,
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct MySqlConfigOption<T: AstInfo> {
pub name: MySqlConfigOptionName,
pub value: Option<WithOptionValue<T>>,
}
impl_display_for_with_option!(MySqlConfigOption);
impl_display_t!(MySqlConfigOption);
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub enum CreateSourceConnection<T: AstInfo> {
Kafka {
connection: T::ItemName,
options: Vec<KafkaSourceConfigOption<T>>,
},
Postgres {
connection: T::ItemName,
options: Vec<PgConfigOption<T>>,
},
Yugabyte {
connection: T::ItemName,
options: Vec<PgConfigOption<T>>,
},
MySql {
connection: T::ItemName,
options: Vec<MySqlConfigOption<T>>,
},
LoadGenerator {
generator: LoadGenerator,
options: Vec<LoadGeneratorOption<T>>,
},
}
impl<T: AstInfo> AstDisplay for CreateSourceConnection<T> {
fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
match self {
CreateSourceConnection::Kafka {
connection,
options,
} => {
f.write_str("KAFKA CONNECTION ");
f.write_node(connection);
if !options.is_empty() {
f.write_str(" (");
f.write_node(&display::comma_separated(options));
f.write_str(")");
}
}
CreateSourceConnection::Postgres {
connection,
options,
} => {
f.write_str("POSTGRES CONNECTION ");
f.write_node(connection);
if !options.is_empty() {
f.write_str(" (");
f.write_node(&display::comma_separated(options));
f.write_str(")");
}
}
CreateSourceConnection::Yugabyte {
connection,
options,
} => {
f.write_str("YUGABYTE CONNECTION ");
f.write_node(connection);
if !options.is_empty() {
f.write_str(" (");
f.write_node(&display::comma_separated(options));
f.write_str(")");
}
}
CreateSourceConnection::MySql {
connection,
options,
} => {
f.write_str("MYSQL CONNECTION ");
f.write_node(connection);
if !options.is_empty() {
f.write_str(" (");
f.write_node(&display::comma_separated(options));
f.write_str(")");
}
}
CreateSourceConnection::LoadGenerator { generator, options } => {
f.write_str("LOAD GENERATOR ");
f.write_node(generator);
if !options.is_empty() {
f.write_str(" (");
f.write_node(&display::comma_separated(options));
f.write_str(")");
}
}
}
}
}
impl_display_t!(CreateSourceConnection);
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub enum LoadGenerator {
Clock,
Counter,
Marketing,
Auction,
Datums,
Tpch,
KeyValue,
}
impl AstDisplay for LoadGenerator {
fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
match self {
Self::Counter => f.write_str("COUNTER"),
Self::Clock => f.write_str("CLOCK"),
Self::Marketing => f.write_str("MARKETING"),
Self::Auction => f.write_str("AUCTION"),
Self::Datums => f.write_str("DATUMS"),
Self::Tpch => f.write_str("TPCH"),
Self::KeyValue => f.write_str("KEY VALUE"),
}
}
}
impl_display!(LoadGenerator);
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub enum LoadGeneratorOptionName {
ScaleFactor,
TickInterval,
AsOf,
UpTo,
MaxCardinality,
Keys,
SnapshotRounds,
TransactionalSnapshot,
ValueSize,
Seed,
Partitions,
BatchSize,
}
impl AstDisplay for LoadGeneratorOptionName {
fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
f.write_str(match self {
LoadGeneratorOptionName::ScaleFactor => "SCALE FACTOR",
LoadGeneratorOptionName::TickInterval => "TICK INTERVAL",
LoadGeneratorOptionName::AsOf => "AS OF",
LoadGeneratorOptionName::UpTo => "UP TO",
LoadGeneratorOptionName::MaxCardinality => "MAX CARDINALITY",
LoadGeneratorOptionName::Keys => "KEYS",
LoadGeneratorOptionName::SnapshotRounds => "SNAPSHOT ROUNDS",
LoadGeneratorOptionName::TransactionalSnapshot => "TRANSACTIONAL SNAPSHOT",
LoadGeneratorOptionName::ValueSize => "VALUE SIZE",
LoadGeneratorOptionName::Seed => "SEED",
LoadGeneratorOptionName::Partitions => "PARTITIONS",
LoadGeneratorOptionName::BatchSize => "BATCH SIZE",
})
}
}
impl_display!(LoadGeneratorOptionName);
impl WithOptionName for LoadGeneratorOptionName {
fn redact_value(&self) -> bool {
match self {
LoadGeneratorOptionName::ScaleFactor
| LoadGeneratorOptionName::TickInterval
| LoadGeneratorOptionName::AsOf
| LoadGeneratorOptionName::UpTo
| LoadGeneratorOptionName::MaxCardinality
| LoadGeneratorOptionName::Keys
| LoadGeneratorOptionName::SnapshotRounds
| LoadGeneratorOptionName::TransactionalSnapshot
| LoadGeneratorOptionName::ValueSize
| LoadGeneratorOptionName::Partitions
| LoadGeneratorOptionName::BatchSize
| LoadGeneratorOptionName::Seed => false,
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct LoadGeneratorOption<T: AstInfo> {
pub name: LoadGeneratorOptionName,
pub value: Option<WithOptionValue<T>>,
}
impl_display_for_with_option!(LoadGeneratorOption);
impl_display_t!(LoadGeneratorOption);
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub enum CreateSinkConnection<T: AstInfo> {
Kafka {
connection: T::ItemName,
options: Vec<KafkaSinkConfigOption<T>>,
key: Option<KafkaSinkKey>,
headers: Option<Ident>,
},
}
impl<T: AstInfo> AstDisplay for CreateSinkConnection<T> {
fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
match self {
CreateSinkConnection::Kafka {
connection,
options,
key,
headers,
} => {
f.write_str("KAFKA CONNECTION ");
f.write_node(connection);
if !options.is_empty() {
f.write_str(" (");
f.write_node(&display::comma_separated(options));
f.write_str(")");
}
if let Some(key) = key.as_ref() {
f.write_node(key);
}
if let Some(headers) = headers {
f.write_str(" HEADERS ");
f.write_node(headers);
}
}
}
}
}
impl_display_t!(CreateSinkConnection);
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct KafkaSinkKey {
pub key_columns: Vec<Ident>,
pub not_enforced: bool,
}
impl AstDisplay for KafkaSinkKey {
fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
f.write_str(" KEY (");
f.write_node(&display::comma_separated(&self.key_columns));
f.write_str(")");
if self.not_enforced {
f.write_str(" NOT ENFORCED");
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub enum TableConstraint<T: AstInfo> {
Unique {
name: Option<Ident>,
columns: Vec<Ident>,
is_primary: bool,
nulls_not_distinct: bool,
},
ForeignKey {
name: Option<Ident>,
columns: Vec<Ident>,
foreign_table: T::ItemName,
referred_columns: Vec<Ident>,
},
Check {
name: Option<Ident>,
expr: Box<Expr<T>>,
},
}
impl<T: AstInfo> AstDisplay for TableConstraint<T> {
fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
match self {
TableConstraint::Unique {
name,
columns,
is_primary,
nulls_not_distinct,
} => {
f.write_node(&display_constraint_name(name));
if *is_primary {
f.write_str("PRIMARY KEY ");
} else {
f.write_str("UNIQUE ");
if *nulls_not_distinct {
f.write_str("NULLS NOT DISTINCT ");
}
}
f.write_str("(");
f.write_node(&display::comma_separated(columns));
f.write_str(")");
}
TableConstraint::ForeignKey {
name,
columns,
foreign_table,
referred_columns,
} => {
f.write_node(&display_constraint_name(name));
f.write_str("FOREIGN KEY (");
f.write_node(&display::comma_separated(columns));
f.write_str(") REFERENCES ");
f.write_node(foreign_table);
f.write_str("(");
f.write_node(&display::comma_separated(referred_columns));
f.write_str(")");
}
TableConstraint::Check { name, expr } => {
f.write_node(&display_constraint_name(name));
f.write_str("CHECK (");
f.write_node(&expr);
f.write_str(")");
}
}
}
}
impl_display_t!(TableConstraint);
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub enum KeyConstraint {
PrimaryKeyNotEnforced { columns: Vec<Ident> },
}
impl AstDisplay for KeyConstraint {
fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
match self {
KeyConstraint::PrimaryKeyNotEnforced { columns } => {
f.write_str("PRIMARY KEY ");
f.write_str("(");
f.write_node(&display::comma_separated(columns));
f.write_str(") ");
f.write_str("NOT ENFORCED");
}
}
}
}
impl_display!(KeyConstraint);
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub enum CreateSourceOptionName {
IgnoreKeys,
Timeline,
TimestampInterval,
RetainHistory,
}
impl AstDisplay for CreateSourceOptionName {
fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
f.write_str(match self {
CreateSourceOptionName::IgnoreKeys => "IGNORE KEYS",
CreateSourceOptionName::Timeline => "TIMELINE",
CreateSourceOptionName::TimestampInterval => "TIMESTAMP INTERVAL",
CreateSourceOptionName::RetainHistory => "RETAIN HISTORY",
})
}
}
impl_display!(CreateSourceOptionName);
impl WithOptionName for CreateSourceOptionName {
fn redact_value(&self) -> bool {
match self {
CreateSourceOptionName::IgnoreKeys
| CreateSourceOptionName::Timeline
| CreateSourceOptionName::TimestampInterval
| CreateSourceOptionName::RetainHistory => false,
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct CreateSourceOption<T: AstInfo> {
pub name: CreateSourceOptionName,
pub value: Option<WithOptionValue<T>>,
}
impl_display_for_with_option!(CreateSourceOption);
impl_display_t!(CreateSourceOption);
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct ColumnDef<T: AstInfo> {
pub name: Ident,
pub data_type: T::DataType,
pub collation: Option<UnresolvedItemName>,
pub options: Vec<ColumnOptionDef<T>>,
}
impl<T: AstInfo> AstDisplay for ColumnDef<T> {
fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
f.write_node(&self.name);
f.write_str(" ");
f.write_node(&self.data_type);
if let Some(collation) = &self.collation {
f.write_str(" COLLATE ");
f.write_node(collation);
}
for option in &self.options {
f.write_str(" ");
f.write_node(option);
}
}
}
impl_display_t!(ColumnDef);
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct ColumnOptionDef<T: AstInfo> {
pub name: Option<Ident>,
pub option: ColumnOption<T>,
}
impl<T: AstInfo> AstDisplay for ColumnOptionDef<T> {
fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
f.write_node(&display_constraint_name(&self.name));
f.write_node(&self.option);
}
}
impl_display_t!(ColumnOptionDef);
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub enum ColumnOption<T: AstInfo> {
Null,
NotNull,
Default(Expr<T>),
Unique {
is_primary: bool,
},
ForeignKey {
foreign_table: UnresolvedItemName,
referred_columns: Vec<Ident>,
},
Check(Expr<T>),
}
impl<T: AstInfo> AstDisplay for ColumnOption<T> {
fn fmt<W: fmt::Write>(&self, f: &mut AstFormatter<W>) {
use ColumnOption::*;
match self {
Null => f.write_str("NULL"),
NotNull => f.write_str("NOT NULL"),
Default(expr) => {
f.write_str("DEFAULT ");
f.write_node(expr);
}
Unique { is_primary } => {
if *is_primary {
f.write_str("PRIMARY KEY");
} else {
f.write_str("UNIQUE");
}
}
ForeignKey {
foreign_table,
referred_columns,
} => {
f.write_str("REFERENCES ");
f.write_node(foreign_table);
f.write_str(" (");
f.write_node(&display::comma_separated(referred_columns));
f.write_str(")");
}
Check(expr) => {
f.write_str("CHECK (");
f.write_node(expr);
f.write_str(")");
}
}
}
}
impl_display_t!(ColumnOption);
fn display_constraint_name<'a>(name: &'a Option<Ident>) -> impl AstDisplay + 'a {
struct ConstraintName<'a>(&'a Option<Ident>);
impl<'a> AstDisplay for ConstraintName<'a> {
fn fmt<W>(&self, f: &mut AstFormatter<W>)
where
W: fmt::Write,
{
if let Some(name) = self.0 {
f.write_str("CONSTRAINT ");
f.write_node(name);
f.write_str(" ");
}
}
}
ConstraintName(name)
}