use anyhow::Context;
use mz_interchange::{avro, protobuf};
use mz_proto::{IntoRustIfSome, ProtoType, RustType, TryFromProtoError};
use mz_repr::adt::regex::any_regex;
use mz_repr::{ColumnType, RelationDesc, ScalarType};
use proptest::prelude::{Arbitrary, BoxedStrategy, Strategy};
use proptest_derive::Arbitrary;
use serde::{Deserialize, Serialize};
use crate::types::connections::CsrConnection;
use crate::types::connections::inline::{
ConnectionAccess, ConnectionResolver, InlinedConnection, IntoInlineConnection,
ReferencedConnection,
};
include!(concat!(
env!("OUT_DIR"),
"/mz_storage_client.types.sources.encoding.rs"
));
pub enum SourceDataEncodingInner<C: ConnectionAccess = InlinedConnection> {
Single(DataEncodingInner<C>),
KeyValue {
key: DataEncodingInner<C>,
value: DataEncodingInner<C>,
},
}
impl<R: ConnectionResolver> IntoInlineConnection<SourceDataEncodingInner, R>
for SourceDataEncodingInner<ReferencedConnection>
{
fn into_inline_connection(self, r: R) -> SourceDataEncodingInner {
match self {
Self::Single(conn) => SourceDataEncodingInner::Single(conn.into_inline_connection(&r)),
Self::KeyValue { key, value } => SourceDataEncodingInner::KeyValue {
key: key.into_inline_connection(&r),
value: value.into_inline_connection(r),
},
}
}
}
impl<C: ConnectionAccess> SourceDataEncodingInner<C> {
pub fn into_source_data_encoding(self, force_nullable_keys: bool) -> SourceDataEncoding<C> {
match self {
SourceDataEncodingInner::Single(inner) => SourceDataEncoding::Single(DataEncoding {
inner,
force_nullable_columns: false,
}),
SourceDataEncodingInner::KeyValue { key, value } => SourceDataEncoding::KeyValue {
key: DataEncoding {
inner: key,
force_nullable_columns: force_nullable_keys,
},
value: DataEncoding {
inner: value,
force_nullable_columns: false,
},
},
}
}
}
#[derive(Arbitrary, Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
pub enum SourceDataEncoding<C: ConnectionAccess = InlinedConnection> {
Single(DataEncoding<C>),
KeyValue {
key: DataEncoding<C>,
value: DataEncoding<C>,
},
}
impl<R: ConnectionResolver> IntoInlineConnection<SourceDataEncoding, R>
for SourceDataEncoding<ReferencedConnection>
{
fn into_inline_connection(self, r: R) -> SourceDataEncoding {
match self {
Self::Single(conn) => SourceDataEncoding::Single(conn.into_inline_connection(r)),
Self::KeyValue { key, value } => SourceDataEncoding::KeyValue {
key: key.into_inline_connection(&r),
value: value.into_inline_connection(r),
},
}
}
}
impl RustType<ProtoSourceDataEncoding> for SourceDataEncoding {
fn into_proto(&self) -> ProtoSourceDataEncoding {
use proto_source_data_encoding::{Kind, ProtoKeyValue};
ProtoSourceDataEncoding {
kind: Some(match self {
SourceDataEncoding::Single(s) => Kind::Single(s.into_proto()),
SourceDataEncoding::KeyValue { key, value } => Kind::KeyValue(ProtoKeyValue {
key: Some(key.into_proto()),
value: Some(value.into_proto()),
}),
}),
}
}
fn from_proto(proto: ProtoSourceDataEncoding) -> Result<Self, TryFromProtoError> {
use proto_source_data_encoding::{Kind, ProtoKeyValue};
let kind = proto
.kind
.ok_or_else(|| TryFromProtoError::missing_field("ProtoSourceDataEncoding::kind"))?;
Ok(match kind {
Kind::Single(s) => SourceDataEncoding::Single(s.into_rust()?),
Kind::KeyValue(ProtoKeyValue { key, value }) => SourceDataEncoding::KeyValue {
key: key.into_rust_if_some("ProtoKeyValue::key")?,
value: value.into_rust_if_some("ProtoKeyValue::value")?,
},
})
}
}
impl<C: ConnectionAccess> SourceDataEncoding<C> {
pub fn key_ref(&self) -> Option<&DataEncoding<C>> {
match self {
SourceDataEncoding::Single(_) => None,
SourceDataEncoding::KeyValue { key, .. } => Some(key),
}
}
pub fn value(self) -> DataEncoding<C> {
match self {
SourceDataEncoding::Single(encoding) => encoding,
SourceDataEncoding::KeyValue { value, .. } => value,
}
}
pub fn value_ref(&self) -> &DataEncoding<C> {
match self {
SourceDataEncoding::Single(encoding) => encoding,
SourceDataEncoding::KeyValue { value, .. } => value,
}
}
pub fn desc(&self) -> Result<(Option<RelationDesc>, RelationDesc), anyhow::Error> {
Ok(match self {
SourceDataEncoding::Single(value) => (None, value.desc()?),
SourceDataEncoding::KeyValue { key, value } => (Some(key.desc()?), value.desc()?),
})
}
}
#[derive(Arbitrary, Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
pub enum DataEncodingInner<C: ConnectionAccess = InlinedConnection> {
Avro(AvroEncoding<C>),
Protobuf(ProtobufEncoding),
Csv(CsvEncoding),
Regex(RegexEncoding),
Bytes,
Json,
Text,
RowCodec(RelationDesc),
}
impl<R: ConnectionResolver> IntoInlineConnection<DataEncodingInner, R>
for DataEncodingInner<ReferencedConnection>
{
fn into_inline_connection(self, r: R) -> DataEncodingInner {
match self {
Self::Avro(conn) => DataEncodingInner::Avro(conn.into_inline_connection(r)),
Self::Protobuf(conn) => DataEncodingInner::Protobuf(conn),
Self::Csv(conn) => DataEncodingInner::Csv(conn),
Self::Regex(conn) => DataEncodingInner::Regex(conn),
Self::Bytes => DataEncodingInner::Bytes,
Self::Json => DataEncodingInner::Json,
Self::Text => DataEncodingInner::Text,
Self::RowCodec(conn) => DataEncodingInner::RowCodec(conn),
}
}
}
impl RustType<ProtoDataEncodingInner> for DataEncodingInner {
fn into_proto(&self) -> ProtoDataEncodingInner {
use proto_data_encoding_inner::Kind;
ProtoDataEncodingInner {
kind: Some(match self {
DataEncodingInner::Avro(e) => Kind::Avro(e.into_proto()),
DataEncodingInner::Protobuf(e) => Kind::Protobuf(e.into_proto()),
DataEncodingInner::Csv(e) => Kind::Csv(e.into_proto()),
DataEncodingInner::Regex(e) => Kind::Regex(e.into_proto()),
DataEncodingInner::Bytes => Kind::Bytes(()),
DataEncodingInner::Text => Kind::Text(()),
DataEncodingInner::RowCodec(e) => Kind::RowCodec(e.into_proto()),
DataEncodingInner::Json => Kind::Json(()),
}),
}
}
fn from_proto(proto: ProtoDataEncodingInner) -> Result<Self, TryFromProtoError> {
use proto_data_encoding_inner::Kind;
let kind = proto
.kind
.ok_or_else(|| TryFromProtoError::missing_field("ProtoDataEncodingInner::kind"))?;
Ok(match kind {
Kind::Avro(e) => DataEncodingInner::Avro(e.into_rust()?),
Kind::Protobuf(e) => DataEncodingInner::Protobuf(e.into_rust()?),
Kind::Csv(e) => DataEncodingInner::Csv(e.into_rust()?),
Kind::Regex(e) => DataEncodingInner::Regex(e.into_rust()?),
Kind::Bytes(()) => DataEncodingInner::Bytes,
Kind::Text(()) => DataEncodingInner::Text,
Kind::RowCodec(e) => DataEncodingInner::RowCodec(e.into_rust()?),
Kind::Json(()) => DataEncodingInner::Json,
})
}
}
#[derive(Arbitrary, Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
pub struct DataEncoding<C: ConnectionAccess = InlinedConnection> {
pub force_nullable_columns: bool,
pub inner: DataEncodingInner<C>,
}
impl<R: ConnectionResolver> IntoInlineConnection<DataEncoding, R>
for DataEncoding<ReferencedConnection>
{
fn into_inline_connection(self, r: R) -> DataEncoding {
let DataEncoding {
force_nullable_columns,
inner,
} = self;
DataEncoding {
force_nullable_columns,
inner: inner.into_inline_connection(r),
}
}
}
impl RustType<ProtoDataEncoding> for DataEncoding {
fn into_proto(&self) -> ProtoDataEncoding {
ProtoDataEncoding {
force_nullable_columns: self.force_nullable_columns,
inner: Some(self.inner.into_proto()),
}
}
fn from_proto(proto: ProtoDataEncoding) -> Result<Self, TryFromProtoError> {
Ok(DataEncoding {
force_nullable_columns: proto.force_nullable_columns.into_rust()?,
inner: proto.inner.into_rust_if_some("ProtoDataEncoding::inner")?,
})
}
}
pub fn included_column_desc(included_columns: Vec<(&str, ColumnType)>) -> RelationDesc {
let mut desc = RelationDesc::empty();
for (name, ty) in included_columns {
desc = desc.with_column(name, ty);
}
desc
}
impl<C: ConnectionAccess> DataEncoding<C> {
pub fn new(inner: DataEncodingInner<C>) -> DataEncoding<C> {
DataEncoding {
inner,
force_nullable_columns: false,
}
}
fn desc(&self) -> Result<RelationDesc, anyhow::Error> {
let desc = match &self.inner {
DataEncodingInner::Bytes => {
RelationDesc::empty().with_column("data", ScalarType::Bytes.nullable(false))
}
DataEncodingInner::Json => {
RelationDesc::empty().with_column("data", ScalarType::Jsonb.nullable(false))
}
DataEncodingInner::Avro(AvroEncoding { schema, .. }) => {
let parsed_schema = avro::parse_schema(schema).context("validating avro schema")?;
avro::schema_to_relationdesc(parsed_schema).context("validating avro schema")?
}
DataEncodingInner::Protobuf(ProtobufEncoding {
descriptors,
message_name,
confluent_wire_format: _,
}) => protobuf::DecodedDescriptors::from_bytes(descriptors, message_name.to_owned())?
.columns()
.iter()
.fold(RelationDesc::empty(), |desc, (name, ty)| {
desc.with_column(name, ty.clone())
}),
DataEncodingInner::Regex(RegexEncoding { regex }) => regex
.capture_names()
.enumerate()
.skip(1)
.fold(RelationDesc::empty(), |desc, (i, name)| {
let name = match name {
None => format!("column{}", i),
Some(name) => name.to_owned(),
};
let ty = ScalarType::String.nullable(true);
desc.with_column(name, ty)
}),
DataEncodingInner::Csv(CsvEncoding { columns, .. }) => match columns {
ColumnSpec::Count(n) => (1..=*n).fold(RelationDesc::empty(), |desc, i| {
desc.with_column(format!("column{}", i), ScalarType::String.nullable(false))
}),
ColumnSpec::Header { names } => names
.iter()
.map(|s| &**s)
.fold(RelationDesc::empty(), |desc, name| {
desc.with_column(name, ScalarType::String.nullable(false))
}),
},
DataEncodingInner::Text => {
RelationDesc::empty().with_column("text", ScalarType::String.nullable(false))
}
DataEncodingInner::RowCodec(desc) => desc.clone(),
};
if self.force_nullable_columns {
Ok(RelationDesc::from_names_and_types(
desc.into_iter()
.map(|(name, typ)| (name, typ.nullable(true))),
))
} else {
Ok(desc)
}
}
pub fn op_name(&self) -> &'static str {
match &self.inner {
DataEncodingInner::Bytes => "Bytes",
DataEncodingInner::Json => "Json",
DataEncodingInner::Avro(_) => "Avro",
DataEncodingInner::Protobuf(_) => "Protobuf",
DataEncodingInner::Regex { .. } => "Regex",
DataEncodingInner::Csv(_) => "Csv",
DataEncodingInner::Text => "Text",
DataEncodingInner::RowCodec(_) => "RowCodec",
}
}
}
#[derive(Arbitrary, Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
pub struct AvroEncoding<C: ConnectionAccess = InlinedConnection> {
pub schema: String,
pub csr_connection: Option<CsrConnection<C>>,
pub confluent_wire_format: bool,
}
impl<R: ConnectionResolver> IntoInlineConnection<AvroEncoding, R>
for AvroEncoding<ReferencedConnection>
{
fn into_inline_connection(self, r: R) -> AvroEncoding {
let AvroEncoding {
schema,
csr_connection,
confluent_wire_format,
} = self;
AvroEncoding {
schema,
csr_connection: csr_connection.map(|csr| csr.into_inline_connection(r)),
confluent_wire_format,
}
}
}
impl RustType<ProtoAvroEncoding> for AvroEncoding {
fn into_proto(&self) -> ProtoAvroEncoding {
ProtoAvroEncoding {
schema: self.schema.clone(),
csr_connection: self.csr_connection.into_proto(),
confluent_wire_format: self.confluent_wire_format,
}
}
fn from_proto(proto: ProtoAvroEncoding) -> Result<Self, TryFromProtoError> {
Ok(AvroEncoding {
schema: proto.schema,
csr_connection: proto.csr_connection.into_rust()?,
confluent_wire_format: proto.confluent_wire_format,
})
}
}
#[derive(Arbitrary, Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
pub struct ProtobufEncoding {
pub descriptors: Vec<u8>,
pub message_name: String,
pub confluent_wire_format: bool,
}
impl RustType<ProtoProtobufEncoding> for ProtobufEncoding {
fn into_proto(&self) -> ProtoProtobufEncoding {
ProtoProtobufEncoding {
descriptors: self.descriptors.clone(),
message_name: self.message_name.clone(),
confluent_wire_format: self.confluent_wire_format,
}
}
fn from_proto(proto: ProtoProtobufEncoding) -> Result<Self, TryFromProtoError> {
Ok(ProtobufEncoding {
descriptors: proto.descriptors,
message_name: proto.message_name,
confluent_wire_format: proto.confluent_wire_format,
})
}
}
#[derive(Arbitrary, Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
pub struct CsvEncoding {
pub columns: ColumnSpec,
pub delimiter: u8,
}
impl RustType<ProtoCsvEncoding> for CsvEncoding {
fn into_proto(&self) -> ProtoCsvEncoding {
ProtoCsvEncoding {
columns: Some(self.columns.into_proto()),
delimiter: self.delimiter.into_proto(),
}
}
fn from_proto(proto: ProtoCsvEncoding) -> Result<Self, TryFromProtoError> {
Ok(CsvEncoding {
columns: proto
.columns
.into_rust_if_some("ProtoCsvEncoding::columns")?,
delimiter: proto.delimiter.into_rust()?,
})
}
}
#[derive(Arbitrary, Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
pub enum ColumnSpec {
Count(usize),
Header { names: Vec<String> },
}
impl RustType<ProtoColumnSpec> for ColumnSpec {
fn into_proto(&self) -> ProtoColumnSpec {
use proto_column_spec::{Kind, ProtoHeader};
ProtoColumnSpec {
kind: Some(match self {
ColumnSpec::Count(c) => Kind::Count(c.into_proto()),
ColumnSpec::Header { names } => Kind::Header(ProtoHeader {
names: names.clone(),
}),
}),
}
}
fn from_proto(proto: ProtoColumnSpec) -> Result<Self, TryFromProtoError> {
use proto_column_spec::{Kind, ProtoHeader};
let kind = proto
.kind
.ok_or_else(|| TryFromProtoError::missing_field("ProtoColumnSpec::kind"))?;
Ok(match kind {
Kind::Count(c) => ColumnSpec::Count(c.into_rust()?),
Kind::Header(ProtoHeader { names }) => ColumnSpec::Header { names },
})
}
}
impl ColumnSpec {
pub fn arity(&self) -> usize {
match self {
ColumnSpec::Count(n) => *n,
ColumnSpec::Header { names } => names.len(),
}
}
pub fn into_header_names(self) -> Option<Vec<String>> {
match self {
ColumnSpec::Count(_) => None,
ColumnSpec::Header { names } => Some(names),
}
}
}
#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
pub struct RegexEncoding {
pub regex: mz_repr::adt::regex::Regex,
}
impl Arbitrary for RegexEncoding {
type Strategy = BoxedStrategy<Self>;
type Parameters = ();
fn arbitrary_with(_: Self::Parameters) -> Self::Strategy {
any_regex()
.prop_map(|regex| RegexEncoding { regex })
.boxed()
}
}
impl RustType<ProtoRegexEncoding> for RegexEncoding {
fn into_proto(&self) -> ProtoRegexEncoding {
ProtoRegexEncoding {
regex: Some(self.regex.into_proto()),
}
}
fn from_proto(proto: ProtoRegexEncoding) -> Result<Self, TryFromProtoError> {
Ok(RegexEncoding {
regex: proto.regex.into_rust_if_some("ProtoRegexEncoding::regex")?,
})
}
}