use std::collections::{BTreeMap, BTreeSet};
use std::fmt::Debug;
use std::hash::Hash;
use std::ops::{Add, AddAssign, Deref, DerefMut};
use std::str::FromStr;
use std::time::Duration;
use anyhow::{anyhow, bail};
use bytes::BufMut;
use dec::OrderedDecimal;
use itertools::EitherOrBoth::Both;
use itertools::Itertools;
use mz_expr::{MirScalarExpr, PartitionId};
use mz_ore::now::NowFn;
use mz_persist_types::columnar::{
ColumnFormat, ColumnGet, ColumnPush, Data, DataType, PartDecoder, PartEncoder, Schema,
};
use mz_persist_types::dyn_struct::{DynStruct, DynStructCfg, ValidityMut, ValidityRef};
use mz_persist_types::stats::StatsFn;
use mz_persist_types::Codec;
use mz_proto::{IntoRustIfSome, ProtoMapEntry, ProtoType, RustType, TryFromProtoError};
use mz_repr::adt::numeric::{Numeric, NumericMaxScale};
use mz_repr::{
ColumnType, Datum, DatumDecoderT, DatumEncoderT, GlobalId, RelationDesc, RelationType, Row,
RowDecoder, RowEncoder, ScalarType,
};
use mz_timely_util::order::{Interval, Partitioned, RangeBound};
use once_cell::sync::Lazy;
use proptest::prelude::{any, Arbitrary, BoxedStrategy, Strategy};
use proptest_derive::Arbitrary;
use prost::Message;
use serde::{Deserialize, Serialize};
use timely::dataflow::operators::to_stream::Event;
use timely::order::{PartialOrder, TotalOrder};
use timely::progress::timestamp::Refines;
use timely::progress::{PathSummary, Timestamp};
use uuid::Uuid;
use crate::controller::{CollectionMetadata, StorageError};
use crate::types::connections::inline::{
ConnectionAccess, ConnectionResolver, InlinedConnection, IntoInlineConnection,
ReferencedConnection,
};
use crate::types::errors::{DataflowError, ProtoDataflowError};
use crate::types::instances::StorageInstanceId;
use crate::types::sources::encoding::{DataEncoding, DataEncodingInner, SourceDataEncoding};
use crate::types::sources::proto_ingestion_description::{ProtoSourceExport, ProtoSourceImport};
use crate::types::sources::proto_load_generator_source_connection::Generator as ProtoGenerator;
pub mod encoding;
include!(concat!(
env!("OUT_DIR"),
"/mz_storage_client.types.sources.rs"
));
#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
pub struct IngestionDescription<S = (), C: ConnectionAccess = InlinedConnection> {
pub desc: SourceDesc<C>,
pub source_imports: BTreeMap<GlobalId, S>,
pub ingestion_metadata: S,
pub source_exports: BTreeMap<GlobalId, SourceExport<S>>,
pub instance_id: StorageInstanceId,
pub remap_collection_id: GlobalId,
}
impl<S> IngestionDescription<S> {
pub fn subsource_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
let IngestionDescription {
desc: _,
source_imports: _,
ingestion_metadata: _,
source_exports,
instance_id: _,
remap_collection_id,
} = &self;
source_exports
.keys()
.copied()
.chain(std::iter::once(*remap_collection_id))
}
}
impl<S: Debug + Eq + PartialEq> IngestionDescription<S> {
pub fn alter_compatible(
&self,
id: GlobalId,
other: &IngestionDescription<S>,
) -> Result<(), StorageError> {
if self == other {
return Ok(());
}
let IngestionDescription {
desc,
source_imports,
ingestion_metadata,
source_exports,
instance_id,
remap_collection_id,
} = self;
self.desc.alter_compatible(id, desc)?;
let compatibility_checks = [
source_imports == &other.source_imports,
ingestion_metadata == &other.ingestion_metadata,
source_exports
.iter()
.merge_join_by(&other.source_exports, |(l_key, _), (r_key, _)| {
l_key.cmp(r_key)
})
.all(|r| match r {
Both(
(
_,
SourceExport {
output_index: _,
storage_metadata: l_metadata,
},
),
(
_,
SourceExport {
output_index: _,
storage_metadata: r_metadata,
},
),
) => {
l_metadata == r_metadata
}
_ => true,
}),
instance_id == &other.instance_id,
remap_collection_id == &other.remap_collection_id,
];
for compatible in compatibility_checks {
if !compatible {
tracing::warn!(
"IngestionDescription incompatible:\nself:\n{:#?}\n\nother\n{:#?}",
self,
other
);
return Err(StorageError::InvalidAlterSource { id });
}
}
Ok(())
}
}
impl<R: ConnectionResolver> IntoInlineConnection<IngestionDescription, R>
for IngestionDescription<(), ReferencedConnection>
{
fn into_inline_connection(self, r: R) -> IngestionDescription {
let IngestionDescription {
desc,
source_imports,
ingestion_metadata,
source_exports,
instance_id,
remap_collection_id,
} = self;
IngestionDescription {
desc: desc.into_inline_connection(r),
source_imports,
ingestion_metadata,
source_exports,
instance_id,
remap_collection_id,
}
}
}
#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
pub struct SourceExport<S = ()> {
pub output_index: usize,
pub storage_metadata: S,
}
impl<S> Arbitrary for IngestionDescription<S>
where
S: Arbitrary + 'static,
{
type Strategy = BoxedStrategy<Self>;
type Parameters = ();
fn arbitrary_with(_: Self::Parameters) -> Self::Strategy {
(
any::<SourceDesc>().boxed(),
proptest::collection::btree_map(any::<GlobalId>(), any::<S>(), 1..4).boxed(),
proptest::collection::btree_map(any::<GlobalId>(), any::<SourceExport<S>>(), 1..4)
.boxed(),
any::<S>().boxed(),
any::<StorageInstanceId>().boxed(),
any::<GlobalId>(),
)
.prop_map(
|(
desc,
source_imports,
source_exports,
ingestion_metadata,
instance_id,
remap_collection_id,
)| Self {
desc,
source_imports,
source_exports,
ingestion_metadata,
instance_id,
remap_collection_id,
},
)
.boxed()
}
}
impl RustType<ProtoIngestionDescription> for IngestionDescription<CollectionMetadata> {
fn into_proto(&self) -> ProtoIngestionDescription {
ProtoIngestionDescription {
source_imports: self.source_imports.into_proto(),
source_exports: self.source_exports.into_proto(),
ingestion_metadata: Some(self.ingestion_metadata.into_proto()),
desc: Some(self.desc.into_proto()),
instance_id: Some(self.instance_id.into_proto()),
remap_collection_id: Some(self.remap_collection_id.into_proto()),
}
}
fn from_proto(proto: ProtoIngestionDescription) -> Result<Self, TryFromProtoError> {
Ok(IngestionDescription {
source_imports: proto.source_imports.into_rust()?,
source_exports: proto.source_exports.into_rust()?,
desc: proto
.desc
.into_rust_if_some("ProtoIngestionDescription::desc")?,
ingestion_metadata: proto
.ingestion_metadata
.into_rust_if_some("ProtoIngestionDescription::ingestion_metadata")?,
instance_id: proto
.instance_id
.into_rust_if_some("ProtoIngestionDescription::instance_id")?,
remap_collection_id: proto
.remap_collection_id
.into_rust_if_some("ProtoIngestionDescription::remap_collection_id")?,
})
}
}
impl ProtoMapEntry<GlobalId, CollectionMetadata> for ProtoSourceImport {
fn from_rust<'a>(entry: (&'a GlobalId, &'a CollectionMetadata)) -> Self {
ProtoSourceImport {
id: Some(entry.0.into_proto()),
storage_metadata: Some(entry.1.into_proto()),
}
}
fn into_rust(self) -> Result<(GlobalId, CollectionMetadata), TryFromProtoError> {
Ok((
self.id.into_rust_if_some("ProtoSourceImport::id")?,
self.storage_metadata
.into_rust_if_some("ProtoSourceImport::storage_metadata")?,
))
}
}
impl ProtoMapEntry<GlobalId, SourceExport<CollectionMetadata>> for ProtoSourceExport {
fn from_rust<'a>(entry: (&'a GlobalId, &'a SourceExport<CollectionMetadata>)) -> Self {
ProtoSourceExport {
id: Some(entry.0.into_proto()),
output_index: entry.1.output_index.into_proto(),
storage_metadata: Some(entry.1.storage_metadata.into_proto()),
}
}
fn into_rust(self) -> Result<(GlobalId, SourceExport<CollectionMetadata>), TryFromProtoError> {
Ok((
self.id.into_rust_if_some("ProtoSourceExport::id")?,
SourceExport {
output_index: self.output_index.into_rust()?,
storage_metadata: self
.storage_metadata
.into_rust_if_some("ProtoSourceExport::storage_metadata")?,
},
))
}
}
impl<S> Arbitrary for SourceExport<S>
where
S: Arbitrary + 'static,
{
type Strategy = BoxedStrategy<Self>;
type Parameters = ();
fn arbitrary_with(_: Self::Parameters) -> Self::Strategy {
(any::<usize>(), any::<S>())
.prop_map(|(output_index, storage_metadata)| Self {
output_index,
storage_metadata,
})
.boxed()
}
}
pub trait SourceTimestamp: timely::progress::Timestamp + Refines<()> + std::fmt::Display {
fn from_compat_ts(pid: PartitionId, offset: MzOffset) -> Self;
fn try_into_compat_ts(&self) -> Option<(PartitionId, MzOffset)>;
fn encode_row(&self) -> Row;
fn decode_row(row: &Row) -> Self;
}
impl SourceTimestamp for MzOffset {
fn from_compat_ts(pid: PartitionId, offset: MzOffset) -> Self {
assert_eq!(
pid,
PartitionId::None,
"invalid non-partitioned partition {pid}"
);
offset
}
fn try_into_compat_ts(&self) -> Option<(PartitionId, MzOffset)> {
Some((PartitionId::None, *self))
}
fn encode_row(&self) -> Row {
Row::pack([Datum::UInt64(self.offset)])
}
fn decode_row(row: &Row) -> Self {
let mut datums = row.iter();
match (datums.next(), datums.next()) {
(Some(Datum::UInt64(offset)), None) => MzOffset::from(offset),
_ => panic!("invalid row {row:?}"),
}
}
}
impl SourceTimestamp for Partitioned<i32, MzOffset> {
fn from_compat_ts(pid: PartitionId, offset: MzOffset) -> Self {
match pid {
PartitionId::Kafka(pid) => Partitioned::with_partition(pid, offset),
PartitionId::None => panic!("invalid partitioned partition {pid}"),
}
}
fn try_into_compat_ts(&self) -> Option<(PartitionId, MzOffset)> {
let pid = self.partition()?;
Some((PartitionId::Kafka(*pid), *self.timestamp()))
}
fn encode_row(&self) -> Row {
use mz_repr::adt::range;
let mut row = Row::with_capacity(2);
let mut packer = row.packer();
let to_numeric = |p: i32| Datum::from(OrderedDecimal(Numeric::from(p)));
let (lower, upper) = match self.interval() {
Interval::Range(l, u) => match (l, u) {
(RangeBound::Bottom, RangeBound::Top) => {
((Datum::Null, false), (Datum::Null, false))
}
(RangeBound::Bottom, RangeBound::Elem(pid)) => {
((Datum::Null, false), (to_numeric(*pid), false))
}
(RangeBound::Elem(pid), RangeBound::Top) => {
((to_numeric(*pid), false), (Datum::Null, false))
}
(RangeBound::Elem(l_pid), RangeBound::Elem(u_pid)) => {
((to_numeric(*l_pid), false), (to_numeric(*u_pid), false))
}
o => unreachable!("don't know how to handle this partition {o:?}"),
},
Interval::Point(pid) => ((to_numeric(*pid), true), (to_numeric(*pid), true)),
};
let offset = self.timestamp().offset;
packer
.push_range(range::Range::new(Some((
range::RangeBound::new(lower.0, lower.1),
range::RangeBound::new(upper.0, upper.1),
))))
.expect("pushing range must not generate errors");
packer.push(Datum::UInt64(offset));
row
}
fn decode_row(row: &Row) -> Self {
let mut datums = row.iter();
match (datums.next(), datums.next(), datums.next()) {
(Some(Datum::Range(range)), Some(Datum::UInt64(offset)), None) => {
let mut range = range.into_bounds(|b| b.datum());
range.canonicalize().expect("ranges must be valid");
let range = range.inner.expect("empty range");
let lower = range.lower.bound.map(|row| {
i32::try_from(row.unwrap_numeric().0)
.expect("only i32 values converted to ranges")
});
let upper = range.upper.bound.map(|row| {
i32::try_from(row.unwrap_numeric().0)
.expect("only i32 values converted to ranges")
});
match (range.lower.inclusive, range.upper.inclusive) {
(true, true) => {
assert_eq!(lower, upper);
Partitioned::with_partition(lower.unwrap(), MzOffset::from(offset))
}
(false, false) => Partitioned::with_range(lower, upper, MzOffset::from(offset)),
_ => panic!("invalid timestamp"),
}
}
invalid_binding => unreachable!("invalid binding {:?}", invalid_binding),
}
}
}
#[derive(
Copy,
Clone,
Default,
Debug,
PartialEq,
PartialOrd,
Eq,
Ord,
Hash,
Serialize,
Deserialize,
Arbitrary,
)]
pub struct MzOffset {
pub offset: u64,
}
impl differential_dataflow::difference::Semigroup for MzOffset {
fn plus_equals(&mut self, rhs: &Self) {
self.offset.plus_equals(&rhs.offset)
}
fn is_zero(&self) -> bool {
self.offset.is_zero()
}
}
impl mz_persist_types::Codec64 for MzOffset {
fn codec_name() -> String {
"MzOffset".to_string()
}
fn encode(&self) -> [u8; 8] {
mz_persist_types::Codec64::encode(&self.offset)
}
fn decode(buf: [u8; 8]) -> Self {
Self {
offset: mz_persist_types::Codec64::decode(buf),
}
}
}
impl RustType<ProtoMzOffset> for MzOffset {
fn into_proto(&self) -> ProtoMzOffset {
ProtoMzOffset {
offset: self.offset,
}
}
fn from_proto(proto: ProtoMzOffset) -> Result<Self, TryFromProtoError> {
Ok(Self {
offset: proto.offset,
})
}
}
impl MzOffset {
pub fn checked_sub(self, other: Self) -> Option<Self> {
self.offset
.checked_sub(other.offset)
.map(|offset| Self { offset })
}
}
impl From<u64> for MzOffset {
fn from(offset: u64) -> Self {
Self { offset }
}
}
impl std::fmt::Display for MzOffset {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.offset)
}
}
impl Add<u64> for MzOffset {
type Output = MzOffset;
fn add(self, x: u64) -> MzOffset {
MzOffset {
offset: self.offset + x,
}
}
}
impl Add<Self> for MzOffset {
type Output = Self;
fn add(self, x: Self) -> Self {
MzOffset {
offset: self.offset + x.offset,
}
}
}
impl AddAssign<u64> for MzOffset {
fn add_assign(&mut self, x: u64) {
self.offset += x;
}
}
impl AddAssign<Self> for MzOffset {
fn add_assign(&mut self, x: Self) {
self.offset += x.offset;
}
}
impl From<tokio_postgres::types::PgLsn> for MzOffset {
fn from(lsn: tokio_postgres::types::PgLsn) -> Self {
MzOffset { offset: lsn.into() }
}
}
impl Timestamp for MzOffset {
type Summary = MzOffset;
fn minimum() -> Self {
MzOffset {
offset: Timestamp::minimum(),
}
}
}
impl PathSummary<MzOffset> for MzOffset {
fn results_in(&self, src: &MzOffset) -> Option<MzOffset> {
Some(MzOffset {
offset: self.offset.results_in(&src.offset)?,
})
}
fn followed_by(&self, other: &Self) -> Option<Self> {
Some(MzOffset {
offset: PathSummary::<u64>::followed_by(&self.offset, &other.offset)?,
})
}
}
impl Refines<()> for MzOffset {
fn to_inner(_: ()) -> Self {
MzOffset::minimum()
}
fn to_outer(self) {}
fn summarize(_: Self::Summary) {}
}
impl PartialOrder for MzOffset {
fn less_equal(&self, other: &Self) -> bool {
self.offset.less_equal(&other.offset)
}
}
impl TotalOrder for MzOffset {}
#[derive(Arbitrary, Copy, Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
pub enum IncludedColumnSource {
Partition,
Offset,
Timestamp,
Topic,
Headers,
}
impl RustType<ProtoIncludedColumnSource> for IncludedColumnSource {
fn into_proto(&self) -> ProtoIncludedColumnSource {
use proto_included_column_source::Kind;
ProtoIncludedColumnSource {
kind: Some(match self {
IncludedColumnSource::Partition => Kind::Partition(()),
IncludedColumnSource::Offset => Kind::Offset(()),
IncludedColumnSource::Timestamp => Kind::Timestamp(()),
IncludedColumnSource::Topic => Kind::Topic(()),
IncludedColumnSource::Headers => Kind::Headers(()),
}),
}
}
fn from_proto(proto: ProtoIncludedColumnSource) -> Result<Self, TryFromProtoError> {
use proto_included_column_source::Kind;
let kind = proto
.kind
.ok_or_else(|| TryFromProtoError::missing_field("ProtoIncludedColumnSource::kind"))?;
Ok(match kind {
Kind::Partition(()) => IncludedColumnSource::Partition,
Kind::Offset(()) => IncludedColumnSource::Offset,
Kind::Timestamp(()) => IncludedColumnSource::Timestamp,
Kind::Topic(()) => IncludedColumnSource::Topic,
Kind::Headers(()) => IncludedColumnSource::Headers,
})
}
}
#[derive(Arbitrary, Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
pub enum KeyEnvelope {
None,
Flattened,
Named(String),
}
impl RustType<ProtoKeyEnvelope> for KeyEnvelope {
fn into_proto(&self) -> ProtoKeyEnvelope {
use proto_key_envelope::Kind;
ProtoKeyEnvelope {
kind: Some(match self {
KeyEnvelope::None => Kind::None(()),
KeyEnvelope::Flattened => Kind::Flattened(()),
KeyEnvelope::Named(name) => Kind::Named(name.clone()),
}),
}
}
fn from_proto(proto: ProtoKeyEnvelope) -> Result<Self, TryFromProtoError> {
use proto_key_envelope::Kind;
let kind = proto
.kind
.ok_or_else(|| TryFromProtoError::missing_field("ProtoKeyEnvelope::kind"))?;
Ok(match kind {
Kind::None(()) => KeyEnvelope::None,
Kind::Flattened(()) => KeyEnvelope::Flattened,
Kind::Named(name) => KeyEnvelope::Named(name),
})
}
}
#[derive(Arbitrary, Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
pub struct IncludedColumnPos {
pub name: String,
pub pos: usize,
}
impl RustType<ProtoIncludedColumnPos> for IncludedColumnPos {
fn into_proto(&self) -> ProtoIncludedColumnPos {
ProtoIncludedColumnPos {
name: self.name.clone(),
pos: self.pos.into_proto(),
}
}
fn from_proto(proto: ProtoIncludedColumnPos) -> Result<Self, TryFromProtoError> {
Ok(IncludedColumnPos {
name: proto.name,
pos: usize::from_proto(proto.pos)?,
})
}
}
#[derive(Arbitrary, Clone, Debug, Ord, PartialOrd, Eq, PartialEq, Serialize, Deserialize, Hash)]
pub enum Timeline {
EpochMilliseconds,
External(String),
User(String),
}
impl Timeline {
const EPOCH_MILLISECOND_ID_CHAR: char = 'M';
const EXTERNAL_ID_CHAR: char = 'E';
const USER_ID_CHAR: char = 'U';
fn id_char(&self) -> char {
match self {
Self::EpochMilliseconds => Self::EPOCH_MILLISECOND_ID_CHAR,
Self::External(_) => Self::EXTERNAL_ID_CHAR,
Self::User(_) => Self::USER_ID_CHAR,
}
}
}
impl RustType<ProtoTimeline> for Timeline {
fn into_proto(&self) -> ProtoTimeline {
use proto_timeline::Kind;
ProtoTimeline {
kind: Some(match self {
Timeline::EpochMilliseconds => Kind::EpochMilliseconds(()),
Timeline::External(s) => Kind::External(s.clone()),
Timeline::User(s) => Kind::User(s.clone()),
}),
}
}
fn from_proto(proto: ProtoTimeline) -> Result<Self, TryFromProtoError> {
use proto_timeline::Kind;
let kind = proto
.kind
.ok_or_else(|| TryFromProtoError::missing_field("ProtoTimeline::kind"))?;
Ok(match kind {
Kind::EpochMilliseconds(()) => Timeline::EpochMilliseconds,
Kind::External(s) => Timeline::External(s),
Kind::User(s) => Timeline::User(s),
})
}
}
impl ToString for Timeline {
fn to_string(&self) -> String {
match self {
Self::EpochMilliseconds => format!("{}", self.id_char()),
Self::External(id) => format!("{}.{id}", self.id_char()),
Self::User(id) => format!("{}.{id}", self.id_char()),
}
}
}
impl FromStr for Timeline {
type Err = String;
fn from_str(s: &str) -> Result<Self, Self::Err> {
if s.is_empty() {
return Err("empty timeline".to_string());
}
let mut chars = s.chars();
match chars.next().expect("non-empty string") {
Self::EPOCH_MILLISECOND_ID_CHAR => match chars.next() {
None => Ok(Self::EpochMilliseconds),
Some(_) => Err(format!("unknown timeline: {s}")),
},
Self::EXTERNAL_ID_CHAR => match chars.next() {
Some('.') => Ok(Self::External(chars.as_str().to_string())),
_ => Err(format!("unknown timeline: {s}")),
},
Self::USER_ID_CHAR => match chars.next() {
Some('.') => Ok(Self::User(chars.as_str().to_string())),
_ => Err(format!("unknown timeline: {s}")),
},
_ => Err(format!("unknown timeline: {s}")),
}
}
}
#[derive(Arbitrary, Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
pub enum SourceEnvelope {
None(NoneEnvelope),
Debezium(DebeziumEnvelope),
Upsert(UpsertEnvelope),
CdcV2,
}
impl RustType<ProtoSourceEnvelope> for SourceEnvelope {
fn into_proto(&self) -> ProtoSourceEnvelope {
use proto_source_envelope::Kind;
ProtoSourceEnvelope {
kind: Some(match self {
SourceEnvelope::None(e) => Kind::None(e.into_proto()),
SourceEnvelope::Debezium(e) => Kind::Debezium(e.into_proto()),
SourceEnvelope::Upsert(e) => Kind::Upsert(e.into_proto()),
SourceEnvelope::CdcV2 => Kind::CdcV2(()),
}),
}
}
fn from_proto(proto: ProtoSourceEnvelope) -> Result<Self, TryFromProtoError> {
use proto_source_envelope::Kind;
let kind = proto
.kind
.ok_or_else(|| TryFromProtoError::missing_field("ProtoSourceEnvelope::kind"))?;
Ok(match kind {
Kind::None(e) => SourceEnvelope::None(e.into_rust()?),
Kind::Debezium(e) => SourceEnvelope::Debezium(e.into_rust()?),
Kind::Upsert(e) => SourceEnvelope::Upsert(e.into_rust()?),
Kind::CdcV2(()) => SourceEnvelope::CdcV2,
})
}
}
#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
pub enum UnplannedSourceEnvelope {
None(KeyEnvelope),
Debezium(DebeziumEnvelope),
Upsert { style: UpsertStyle },
CdcV2,
}
#[derive(Arbitrary, Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
pub struct NoneEnvelope {
pub key_envelope: KeyEnvelope,
pub key_arity: usize,
}
impl RustType<ProtoNoneEnvelope> for NoneEnvelope {
fn into_proto(&self) -> ProtoNoneEnvelope {
ProtoNoneEnvelope {
key_envelope: Some(self.key_envelope.into_proto()),
key_arity: self.key_arity.into_proto(),
}
}
fn from_proto(proto: ProtoNoneEnvelope) -> Result<Self, TryFromProtoError> {
Ok(NoneEnvelope {
key_envelope: proto
.key_envelope
.into_rust_if_some("ProtoNoneEnvelope::key_envelope")?,
key_arity: proto.key_arity.into_rust()?,
})
}
}
#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
pub struct UpsertEnvelope {
pub source_arity: usize,
pub style: UpsertStyle,
pub key_indices: Vec<usize>,
}
impl Arbitrary for UpsertEnvelope {
type Strategy = BoxedStrategy<Self>;
type Parameters = ();
fn arbitrary_with(_: Self::Parameters) -> Self::Strategy {
(
any::<usize>(),
any::<UpsertStyle>(),
proptest::collection::vec(any::<usize>(), 1..4),
)
.prop_map(|(source_arity, style, key_indices)| Self {
source_arity,
style,
key_indices,
})
.boxed()
}
}
impl RustType<ProtoUpsertEnvelope> for UpsertEnvelope {
fn into_proto(&self) -> ProtoUpsertEnvelope {
ProtoUpsertEnvelope {
source_arity: self.source_arity.into_proto(),
style: Some(self.style.into_proto()),
key_indices: self.key_indices.into_proto(),
}
}
fn from_proto(proto: ProtoUpsertEnvelope) -> Result<Self, TryFromProtoError> {
Ok(UpsertEnvelope {
source_arity: proto.source_arity.into_rust()?,
style: proto
.style
.into_rust_if_some("ProtoUpsertEnvelope::style")?,
key_indices: proto.key_indices.into_rust()?,
})
}
}
#[derive(Arbitrary, Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
pub enum UpsertStyle {
Default(KeyEnvelope),
Debezium { after_idx: usize },
}
impl RustType<ProtoUpsertStyle> for UpsertStyle {
fn into_proto(&self) -> ProtoUpsertStyle {
use proto_upsert_style::{Kind, ProtoDebezium};
ProtoUpsertStyle {
kind: Some(match self {
UpsertStyle::Default(e) => Kind::Default(e.into_proto()),
UpsertStyle::Debezium { after_idx } => Kind::Debezium(ProtoDebezium {
after_idx: after_idx.into_proto(),
}),
}),
}
}
fn from_proto(proto: ProtoUpsertStyle) -> Result<Self, TryFromProtoError> {
use proto_upsert_style::Kind;
let kind = proto
.kind
.ok_or_else(|| TryFromProtoError::missing_field("ProtoUpsertStyle::kind"))?;
Ok(match kind {
Kind::Default(e) => UpsertStyle::Default(e.into_rust()?),
Kind::Debezium(d) => UpsertStyle::Debezium {
after_idx: d.after_idx.into_rust()?,
},
})
}
}
#[derive(Arbitrary, Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
pub struct DebeziumEnvelope {
pub before_idx: usize,
pub after_idx: usize,
pub dedup: DebeziumDedupProjection,
}
impl RustType<ProtoDebeziumEnvelope> for DebeziumEnvelope {
fn into_proto(&self) -> ProtoDebeziumEnvelope {
ProtoDebeziumEnvelope {
before_idx: self.before_idx.into_proto(),
after_idx: self.after_idx.into_proto(),
dedup: Some(self.dedup.into_proto()),
}
}
fn from_proto(proto: ProtoDebeziumEnvelope) -> Result<Self, TryFromProtoError> {
Ok(DebeziumEnvelope {
before_idx: proto.before_idx.into_rust()?,
after_idx: proto.after_idx.into_rust()?,
dedup: proto
.dedup
.into_rust_if_some("ProtoDebeziumEnvelope::dedup")?,
})
}
}
#[derive(Arbitrary, Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
pub struct DebeziumTransactionMetadata {
pub tx_metadata_global_id: GlobalId,
pub tx_status_idx: usize,
pub tx_transaction_id_idx: usize,
pub tx_data_collections_idx: usize,
pub tx_data_collections_data_collection_idx: usize,
pub tx_data_collections_event_count_idx: usize,
pub tx_data_collection_name: String,
pub data_transaction_idx: usize,
pub data_transaction_id_idx: usize,
}
impl RustType<ProtoDebeziumTransactionMetadata> for DebeziumTransactionMetadata {
fn into_proto(&self) -> ProtoDebeziumTransactionMetadata {
ProtoDebeziumTransactionMetadata {
tx_metadata_global_id: Some(self.tx_metadata_global_id.into_proto()),
tx_status_idx: self.tx_status_idx.into_proto(),
tx_transaction_id_idx: self.tx_transaction_id_idx.into_proto(),
tx_data_collections_idx: self.tx_data_collections_idx.into_proto(),
tx_data_collections_data_collection_idx: self
.tx_data_collections_data_collection_idx
.into_proto(),
tx_data_collections_event_count_idx: self
.tx_data_collections_event_count_idx
.into_proto(),
tx_data_collection_name: self.tx_data_collection_name.clone(),
data_transaction_idx: self.data_transaction_idx.into_proto(),
data_transaction_id_idx: self.data_transaction_id_idx.into_proto(),
}
}
fn from_proto(proto: ProtoDebeziumTransactionMetadata) -> Result<Self, TryFromProtoError> {
Ok(DebeziumTransactionMetadata {
tx_metadata_global_id: proto
.tx_metadata_global_id
.into_rust_if_some("ProtoDebeziumTransactionMetadata::tx_metadata_global_id")?,
tx_status_idx: proto.tx_status_idx.into_rust()?,
tx_transaction_id_idx: proto.tx_transaction_id_idx.into_rust()?,
tx_data_collections_idx: proto.tx_data_collections_idx.into_rust()?,
tx_data_collections_data_collection_idx: proto
.tx_data_collections_data_collection_idx
.into_rust()?,
tx_data_collections_event_count_idx: proto
.tx_data_collections_event_count_idx
.into_rust()?,
tx_data_collection_name: proto.tx_data_collection_name,
data_transaction_idx: proto.data_transaction_idx.into_rust()?,
data_transaction_id_idx: proto.data_transaction_id_idx.into_rust()?,
})
}
}
#[derive(Arbitrary, Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
pub struct DebeziumDedupProjection {
pub op_idx: usize,
pub source_idx: usize,
pub snapshot_idx: usize,
pub source_projection: DebeziumSourceProjection,
pub tx_metadata: Option<DebeziumTransactionMetadata>,
}
impl RustType<ProtoDebeziumDedupProjection> for DebeziumDedupProjection {
fn into_proto(&self) -> ProtoDebeziumDedupProjection {
ProtoDebeziumDedupProjection {
op_idx: self.op_idx.into_proto(),
source_idx: self.source_idx.into_proto(),
snapshot_idx: self.snapshot_idx.into_proto(),
source_projection: Some(self.source_projection.into_proto()),
tx_metadata: self.tx_metadata.into_proto(),
}
}
fn from_proto(proto: ProtoDebeziumDedupProjection) -> Result<Self, TryFromProtoError> {
Ok(DebeziumDedupProjection {
op_idx: proto.op_idx.into_rust()?,
source_idx: proto.source_idx.into_rust()?,
snapshot_idx: proto.snapshot_idx.into_rust()?,
source_projection: proto
.source_projection
.into_rust_if_some("ProtoDebeziumDedupProjection::source_projection")?,
tx_metadata: proto.tx_metadata.into_rust()?,
})
}
}
#[derive(Arbitrary, Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
pub enum DebeziumSourceProjection {
MySql {
file: usize,
pos: usize,
row: usize,
},
Postgres {
sequence: usize,
lsn: usize,
},
SqlServer {
change_lsn: usize,
event_serial_no: usize,
},
}
impl RustType<ProtoDebeziumSourceProjection> for DebeziumSourceProjection {
fn into_proto(&self) -> ProtoDebeziumSourceProjection {
use proto_debezium_source_projection::{Kind, ProtoMySql, ProtoPostgres, ProtoSqlServer};
ProtoDebeziumSourceProjection {
kind: Some(match self {
DebeziumSourceProjection::MySql { file, pos, row } => Kind::MySql(ProtoMySql {
file: file.into_proto(),
pos: pos.into_proto(),
row: row.into_proto(),
}),
DebeziumSourceProjection::Postgres { sequence, lsn } => {
Kind::Postgres(ProtoPostgres {
sequence: sequence.into_proto(),
lsn: lsn.into_proto(),
})
}
DebeziumSourceProjection::SqlServer {
change_lsn,
event_serial_no,
} => Kind::SqlServer(ProtoSqlServer {
change_lsn: change_lsn.into_proto(),
event_serial_no: event_serial_no.into_proto(),
}),
}),
}
}
fn from_proto(proto: ProtoDebeziumSourceProjection) -> Result<Self, TryFromProtoError> {
use proto_debezium_source_projection::{Kind, ProtoMySql, ProtoPostgres, ProtoSqlServer};
let kind = proto.kind.ok_or_else(|| {
TryFromProtoError::missing_field("ProtoDebeziumSourceProjection::kind")
})?;
Ok(match kind {
Kind::MySql(ProtoMySql { file, pos, row }) => DebeziumSourceProjection::MySql {
file: file.into_rust()?,
pos: pos.into_rust()?,
row: row.into_rust()?,
},
Kind::Postgres(ProtoPostgres { sequence, lsn }) => DebeziumSourceProjection::Postgres {
sequence: sequence.into_rust()?,
lsn: lsn.into_rust()?,
},
Kind::SqlServer(ProtoSqlServer {
change_lsn,
event_serial_no,
}) => DebeziumSourceProjection::SqlServer {
change_lsn: change_lsn.into_rust()?,
event_serial_no: event_serial_no.into_rust()?,
},
})
}
}
fn match_key_indices(
key_desc: &RelationDesc,
value_desc: &RelationDesc,
) -> anyhow::Result<Vec<usize>> {
let mut indices = Vec::new();
for (name, key_type) in key_desc.iter() {
let (index, value_type) = value_desc
.get_by_name(name)
.ok_or_else(|| anyhow!("Value schema missing primary key column: {}", name))?;
if key_type == value_type {
indices.push(index);
} else {
bail!(
"key and value column types do not match: key {:?} vs. value {:?}",
key_type,
value_type
);
}
}
Ok(indices)
}
impl UnplannedSourceEnvelope {
fn into_source_envelope(
self,
key: Option<Vec<usize>>,
key_arity: Option<usize>,
source_arity: Option<usize>,
) -> SourceEnvelope {
match self {
UnplannedSourceEnvelope::Upsert {
style: upsert_style,
} => SourceEnvelope::Upsert(UpsertEnvelope {
style: upsert_style,
key_indices: key.expect(
"into_source_envelope to be passed \
correct parameters for UnplannedSourceEnvelope::Upsert",
),
source_arity: source_arity.expect(
"into_source_envelope to be passed \
correct parameters for UnplannedSourceEnvelope::Upsert",
),
}),
UnplannedSourceEnvelope::Debezium(inner) => SourceEnvelope::Debezium(inner),
UnplannedSourceEnvelope::None(key_envelope) => SourceEnvelope::None(NoneEnvelope {
key_envelope,
key_arity: key_arity.unwrap_or(0),
}),
UnplannedSourceEnvelope::CdcV2 => SourceEnvelope::CdcV2,
}
}
pub fn desc(
self,
key_desc: Option<RelationDesc>,
value_desc: RelationDesc,
metadata_desc: RelationDesc,
) -> anyhow::Result<(SourceEnvelope, RelationDesc)> {
Ok(match &self {
UnplannedSourceEnvelope::None(key_envelope)
| UnplannedSourceEnvelope::Upsert {
style: UpsertStyle::Default(key_envelope),
..
} => {
let key_desc = match key_desc {
Some(desc) => desc,
None => {
return Ok((
self.into_source_envelope(None, None, None),
value_desc.concat(metadata_desc),
))
}
};
let key_arity = key_desc.arity();
let (keyed, key) = match key_envelope {
KeyEnvelope::None => (value_desc, None),
KeyEnvelope::Flattened => {
let key_indices: Vec<usize> = (0..key_desc.arity()).collect();
let key_desc = key_desc.with_key(key_indices.clone());
(key_desc.concat(value_desc), Some(key_indices))
}
KeyEnvelope::Named(key_name) => {
let key_desc = {
if key_desc.arity() > 1 {
let key_type = key_desc.typ();
let key_as_record = RelationType::new(vec![ColumnType {
nullable: false,
scalar_type: ScalarType::Record {
fields: key_desc
.iter_names()
.zip(key_type.column_types.iter())
.map(|(name, ty)| (name.clone(), ty.clone()))
.collect(),
custom_id: None,
},
}]);
RelationDesc::new(key_as_record, [key_name.to_string()])
} else {
key_desc.with_names([key_name.to_string()])
}
};
let (key_desc, key) = match self {
UnplannedSourceEnvelope::None(_) => (key_desc, None),
UnplannedSourceEnvelope::Upsert { .. } => {
(key_desc.with_key(vec![0]), Some(vec![0]))
}
_ => unreachable!(),
};
(key_desc.concat(value_desc), key)
}
};
let desc = keyed.concat(metadata_desc);
(
self.into_source_envelope(key, Some(key_arity), Some(desc.arity())),
desc,
)
}
UnplannedSourceEnvelope::Debezium(DebeziumEnvelope { after_idx, .. })
| UnplannedSourceEnvelope::Upsert {
style: UpsertStyle::Debezium { after_idx },
..
} => match &value_desc.typ().column_types[*after_idx].scalar_type {
ScalarType::Record { fields, .. } => {
let mut desc = RelationDesc::from_names_and_types(fields.clone());
let key = key_desc.map(|k| match_key_indices(&k, &desc)).transpose()?;
if let Some(key) = key.clone() {
desc = desc.with_key(key);
}
let desc = match self {
UnplannedSourceEnvelope::Upsert { .. } => desc.concat(metadata_desc),
_ => desc,
};
(
self.into_source_envelope(key, None, Some(desc.arity())),
desc,
)
}
ty => bail!(
"Incorrect type for Debezium value, expected Record, got {:?}",
ty
),
},
UnplannedSourceEnvelope::CdcV2 => {
match &value_desc.typ().column_types[0].scalar_type {
ScalarType::List { element_type, .. } => match &**element_type {
ScalarType::Record { fields, .. } => {
match &fields[0].1.scalar_type {
ScalarType::Record { fields, .. } => (
self.into_source_envelope(None, None, None),
RelationDesc::from_names_and_types(fields.clone()),
),
ty => {
bail!("Unexpected type for MATERIALIZE envelope: {:?}", ty)
}
}
}
ty => bail!("Unexpected type for MATERIALIZE envelope: {:?}", ty),
},
ty => bail!("Unexpected type for MATERIALIZE envelope: {:?}", ty),
}
}
})
}
}
pub trait SourceConnection: Debug + Clone + PartialEq {
fn name(&self) -> &'static str;
fn upstream_name(&self) -> Option<&str>;
fn timestamp_desc(&self) -> RelationDesc;
fn connection_id(&self) -> Option<GlobalId>;
fn metadata_columns(&self) -> Vec<(&str, ColumnType)>;
fn metadata_column_types(&self) -> Vec<IncludedColumnSource>;
fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), StorageError> {
if self == other {
Ok(())
} else {
tracing::warn!(
"SourceConnection incompatible:\nself:\n{:#?}\n\nother\n{:#?}",
self,
other
);
Err(StorageError::InvalidAlterSource { id })
}
}
}
#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
pub struct KafkaSourceConnection<C: ConnectionAccess = InlinedConnection> {
pub connection: C::Kafka,
pub connection_id: GlobalId,
pub topic: String,
pub start_offsets: BTreeMap<i32, i64>,
pub group_id_prefix: Option<String>,
pub environment_id: String,
pub include_timestamp: Option<IncludedColumnPos>,
pub include_partition: Option<IncludedColumnPos>,
pub include_topic: Option<IncludedColumnPos>,
pub include_offset: Option<IncludedColumnPos>,
pub include_headers: Option<IncludedColumnPos>,
}
impl<R: ConnectionResolver> IntoInlineConnection<KafkaSourceConnection, R>
for KafkaSourceConnection<ReferencedConnection>
{
fn into_inline_connection(self, r: R) -> KafkaSourceConnection {
let KafkaSourceConnection {
connection,
connection_id,
topic,
start_offsets,
group_id_prefix,
environment_id,
include_timestamp,
include_partition,
include_topic,
include_offset,
include_headers,
} = self;
KafkaSourceConnection {
connection: r.resolve_connection(connection).unwrap_kafka(),
connection_id,
topic,
start_offsets,
group_id_prefix,
environment_id,
include_timestamp,
include_partition,
include_topic,
include_offset,
include_headers,
}
}
}
pub static KAFKA_PROGRESS_DESC: Lazy<RelationDesc> = Lazy::new(|| {
RelationDesc::empty()
.with_column(
"partition",
ScalarType::Range {
element_type: Box::new(ScalarType::Numeric { max_scale: None }),
}
.nullable(false),
)
.with_column("offset", ScalarType::UInt64.nullable(true))
});
impl<C: ConnectionAccess> KafkaSourceConnection<C> {
pub fn group_id(&self, source_id: GlobalId) -> String {
format!(
"{}materialize-{}-{}-{}",
self.group_id_prefix.clone().unwrap_or_else(String::new),
self.environment_id,
self.connection_id,
source_id,
)
}
}
impl<C: ConnectionAccess> SourceConnection for KafkaSourceConnection<C> {
fn name(&self) -> &'static str {
"kafka"
}
fn upstream_name(&self) -> Option<&str> {
Some(self.topic.as_str())
}
fn timestamp_desc(&self) -> RelationDesc {
KAFKA_PROGRESS_DESC.clone()
}
fn connection_id(&self) -> Option<GlobalId> {
Some(self.connection_id)
}
fn metadata_columns(&self) -> Vec<(&str, ColumnType)> {
let mut items = BTreeMap::new();
let header_typ = ScalarType::List {
element_type: Box::new(ScalarType::Record {
fields: vec![
(
"key".into(),
ColumnType {
nullable: false,
scalar_type: ScalarType::String,
},
),
(
"value".into(),
ColumnType {
nullable: false,
scalar_type: ScalarType::Bytes,
},
),
],
custom_id: None,
}),
custom_id: None,
};
let metadata_columns = [
(&self.include_offset, ScalarType::UInt64),
(&self.include_partition, ScalarType::Int32),
(&self.include_timestamp, ScalarType::Timestamp),
(&self.include_topic, ScalarType::String),
(&self.include_headers, header_typ),
];
for (include, ty) in metadata_columns {
if let Some(include) = include {
items.insert(include.pos + 1, (&*include.name, ty.nullable(false)));
}
}
items.into_values().collect()
}
fn metadata_column_types(&self) -> Vec<IncludedColumnSource> {
let mut items = BTreeMap::new();
let metadata_columns = [
(&self.include_offset, IncludedColumnSource::Offset),
(&self.include_partition, IncludedColumnSource::Partition),
(&self.include_timestamp, IncludedColumnSource::Timestamp),
(&self.include_topic, IncludedColumnSource::Topic),
(&self.include_headers, IncludedColumnSource::Headers),
];
for (include, ty) in metadata_columns {
if let Some(include) = include {
items.insert(include.pos, ty);
}
}
items.into_values().collect()
}
}
impl<C: ConnectionAccess> Arbitrary for KafkaSourceConnection<C>
where
<<C as ConnectionAccess>::Kafka as Arbitrary>::Strategy: 'static,
{
type Strategy = BoxedStrategy<Self>;
type Parameters = ();
fn arbitrary_with(_: Self::Parameters) -> Self::Strategy {
(
any::<C::Kafka>(),
any::<GlobalId>(),
any::<String>(),
proptest::collection::btree_map(any::<i32>(), any::<i64>(), 1..4),
any::<Option<String>>(),
any::<String>(),
any::<Option<IncludedColumnPos>>(),
any::<Option<IncludedColumnPos>>(),
any::<Option<IncludedColumnPos>>(),
any::<Option<IncludedColumnPos>>(),
any::<Option<IncludedColumnPos>>(),
)
.prop_map(
|(
connection,
connection_id,
topic,
start_offsets,
group_id_prefix,
environment_id,
include_timestamp,
include_partition,
include_topic,
include_offset,
include_headers,
)| KafkaSourceConnection {
connection,
connection_id,
topic,
start_offsets,
group_id_prefix,
environment_id,
include_timestamp,
include_partition,
include_topic,
include_offset,
include_headers,
},
)
.boxed()
}
}
impl RustType<ProtoKafkaSourceConnection> for KafkaSourceConnection<InlinedConnection> {
fn into_proto(&self) -> ProtoKafkaSourceConnection {
ProtoKafkaSourceConnection {
connection: Some(self.connection.into_proto()),
connection_id: Some(self.connection_id.into_proto()),
topic: self.topic.clone(),
start_offsets: self.start_offsets.clone(),
group_id_prefix: self.group_id_prefix.clone(),
environment_id: None,
environment_name: Some(self.environment_id.into_proto()),
include_timestamp: self.include_timestamp.into_proto(),
include_partition: self.include_partition.into_proto(),
include_topic: self.include_topic.into_proto(),
include_offset: self.include_offset.into_proto(),
include_headers: self.include_headers.into_proto(),
}
}
fn from_proto(proto: ProtoKafkaSourceConnection) -> Result<Self, TryFromProtoError> {
Ok(KafkaSourceConnection {
connection: proto
.connection
.into_rust_if_some("ProtoKafkaSourceConnection::connection")?,
connection_id: proto
.connection_id
.into_rust_if_some("ProtoKafkaSourceConnection::connection_id")?,
topic: proto.topic,
start_offsets: proto.start_offsets,
group_id_prefix: proto.group_id_prefix,
environment_id: match (proto.environment_id, proto.environment_name) {
(_, Some(name)) => name,
(u128, _) => {
let uuid: Uuid =
u128.into_rust_if_some("ProtoKafkaSourceConnection::environment_id")?;
uuid.to_string()
}
},
include_timestamp: proto.include_timestamp.into_rust()?,
include_partition: proto.include_partition.into_rust()?,
include_topic: proto.include_topic.into_rust()?,
include_offset: proto.include_offset.into_rust()?,
include_headers: proto.include_headers.into_rust()?,
})
}
}
#[derive(Arbitrary, Clone, Copy, Debug, Eq, PartialEq, Serialize, Deserialize)]
pub enum Compression {
Gzip,
None,
}
impl RustType<ProtoCompression> for Compression {
fn into_proto(&self) -> ProtoCompression {
use proto_compression::Kind;
ProtoCompression {
kind: Some(match self {
Compression::Gzip => Kind::Gzip(()),
Compression::None => Kind::None(()),
}),
}
}
fn from_proto(proto: ProtoCompression) -> Result<Self, TryFromProtoError> {
use proto_compression::Kind;
Ok(match proto.kind {
Some(Kind::Gzip(())) => Compression::Gzip,
Some(Kind::None(())) => Compression::None,
None => {
return Err(TryFromProtoError::MissingField(
"ProtoCompression::kind".into(),
))
}
})
}
}
#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
pub struct SourceDesc<C: ConnectionAccess = InlinedConnection> {
pub connection: GenericSourceConnection<C>,
pub encoding: encoding::SourceDataEncoding<C>,
pub envelope: SourceEnvelope,
pub metadata_columns: Vec<IncludedColumnSource>,
pub timestamp_interval: Duration,
}
impl<R: ConnectionResolver> IntoInlineConnection<SourceDesc, R>
for SourceDesc<ReferencedConnection>
{
fn into_inline_connection(self, r: R) -> SourceDesc {
let SourceDesc {
connection,
encoding,
envelope,
metadata_columns,
timestamp_interval,
} = self;
SourceDesc {
connection: connection.into_inline_connection(&r),
encoding: encoding.into_inline_connection(r),
envelope,
metadata_columns,
timestamp_interval,
}
}
}
impl Arbitrary for SourceDesc {
type Strategy = BoxedStrategy<Self>;
type Parameters = ();
fn arbitrary_with(_: Self::Parameters) -> Self::Strategy {
(
any::<GenericSourceConnection>(),
any::<encoding::SourceDataEncoding>(),
any::<SourceEnvelope>(),
any::<Vec<IncludedColumnSource>>(),
any::<Duration>(),
)
.prop_map(
|(connection, encoding, envelope, metadata_columns, timestamp_interval)| Self {
connection,
encoding,
envelope,
metadata_columns,
timestamp_interval,
},
)
.boxed()
}
}
impl RustType<ProtoSourceDesc> for SourceDesc {
fn into_proto(&self) -> ProtoSourceDesc {
ProtoSourceDesc {
connection: Some(self.connection.into_proto()),
encoding: Some(self.encoding.into_proto()),
envelope: Some(self.envelope.into_proto()),
metadata_columns: self.metadata_columns.into_proto(),
timestamp_interval: Some(self.timestamp_interval.into_proto()),
}
}
fn from_proto(proto: ProtoSourceDesc) -> Result<Self, TryFromProtoError> {
Ok(SourceDesc {
connection: proto
.connection
.into_rust_if_some("ProtoSourceDesc::connection")?,
encoding: proto
.encoding
.into_rust_if_some("ProtoSourceDesc::encoding")?,
envelope: proto
.envelope
.into_rust_if_some("ProtoSourceDesc::envelope")?,
metadata_columns: proto.metadata_columns.into_rust()?,
timestamp_interval: proto
.timestamp_interval
.into_rust_if_some("ProtoSourceDesc::timestamp_interval")?,
})
}
}
impl<C: ConnectionAccess> SourceDesc<C> {
pub fn monotonic(&self) -> bool {
match self {
SourceDesc {
connection: GenericSourceConnection::Postgres(_),
..
} => false,
SourceDesc {
connection: GenericSourceConnection::LoadGenerator(g),
..
} => g.load_generator.is_monotonic(),
SourceDesc {
envelope: SourceEnvelope::None(_),
..
} => true,
SourceDesc {
envelope:
SourceEnvelope::Debezium(_) | SourceEnvelope::Upsert(_) | SourceEnvelope::CdcV2,
connection:
GenericSourceConnection::Kafka(_) | GenericSourceConnection::TestScript(_),
..
} => false,
}
}
pub fn envelope(&self) -> &SourceEnvelope {
&self.envelope
}
pub fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), StorageError> {
if self == other {
return Ok(());
}
let Self {
connection,
encoding,
envelope,
metadata_columns,
timestamp_interval,
} = &self;
connection.alter_compatible(id, &other.connection)?;
let compatibility_checks = [
connection == &other.connection,
encoding == &other.encoding,
envelope == &other.envelope,
metadata_columns == &other.metadata_columns,
timestamp_interval == &other.timestamp_interval,
];
for compatible in compatibility_checks {
if !compatible {
tracing::warn!(
"SourceDesc incompatible:\nself:\n{:#?}\n\nother\n{:#?}",
self,
other
);
return Err(StorageError::InvalidAlterSource { id });
}
}
Ok(())
}
}
#[derive(Arbitrary, Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
pub enum GenericSourceConnection<C: ConnectionAccess = InlinedConnection> {
Kafka(KafkaSourceConnection<C>),
Postgres(PostgresSourceConnection<C>),
LoadGenerator(LoadGeneratorSourceConnection),
TestScript(TestScriptSourceConnection),
}
impl<C: ConnectionAccess> From<KafkaSourceConnection<C>> for GenericSourceConnection<C> {
fn from(conn: KafkaSourceConnection<C>) -> Self {
Self::Kafka(conn)
}
}
impl<C: ConnectionAccess> From<PostgresSourceConnection<C>> for GenericSourceConnection<C> {
fn from(conn: PostgresSourceConnection<C>) -> Self {
Self::Postgres(conn)
}
}
impl<C: ConnectionAccess> From<LoadGeneratorSourceConnection> for GenericSourceConnection<C> {
fn from(conn: LoadGeneratorSourceConnection) -> Self {
Self::LoadGenerator(conn)
}
}
impl<C: ConnectionAccess> From<TestScriptSourceConnection> for GenericSourceConnection<C> {
fn from(conn: TestScriptSourceConnection) -> Self {
Self::TestScript(conn)
}
}
impl<R: ConnectionResolver> IntoInlineConnection<GenericSourceConnection, R>
for GenericSourceConnection<ReferencedConnection>
{
fn into_inline_connection(self, r: R) -> GenericSourceConnection {
match self {
GenericSourceConnection::Kafka(kafka) => {
GenericSourceConnection::Kafka(kafka.into_inline_connection(r))
}
GenericSourceConnection::Postgres(pg) => {
GenericSourceConnection::Postgres(pg.into_inline_connection(r))
}
GenericSourceConnection::LoadGenerator(lg) => {
GenericSourceConnection::LoadGenerator(lg)
}
GenericSourceConnection::TestScript(ts) => GenericSourceConnection::TestScript(ts),
}
}
}
impl<C: ConnectionAccess> SourceConnection for GenericSourceConnection<C> {
fn name(&self) -> &'static str {
match self {
Self::Kafka(conn) => conn.name(),
Self::Postgres(conn) => conn.name(),
Self::LoadGenerator(conn) => conn.name(),
Self::TestScript(conn) => conn.name(),
}
}
fn upstream_name(&self) -> Option<&str> {
match self {
Self::Kafka(conn) => conn.upstream_name(),
Self::Postgres(conn) => conn.upstream_name(),
Self::LoadGenerator(conn) => conn.upstream_name(),
Self::TestScript(conn) => conn.upstream_name(),
}
}
fn timestamp_desc(&self) -> RelationDesc {
match self {
Self::Kafka(conn) => conn.timestamp_desc(),
Self::Postgres(conn) => conn.timestamp_desc(),
Self::LoadGenerator(conn) => conn.timestamp_desc(),
Self::TestScript(conn) => conn.timestamp_desc(),
}
}
fn connection_id(&self) -> Option<GlobalId> {
match self {
Self::Kafka(conn) => conn.connection_id(),
Self::Postgres(conn) => conn.connection_id(),
Self::LoadGenerator(conn) => conn.connection_id(),
Self::TestScript(conn) => conn.connection_id(),
}
}
fn metadata_columns(&self) -> Vec<(&str, ColumnType)> {
match self {
Self::Kafka(conn) => conn.metadata_columns(),
Self::Postgres(conn) => conn.metadata_columns(),
Self::LoadGenerator(conn) => conn.metadata_columns(),
Self::TestScript(conn) => conn.metadata_columns(),
}
}
fn metadata_column_types(&self) -> Vec<IncludedColumnSource> {
match self {
Self::Kafka(conn) => conn.metadata_column_types(),
Self::Postgres(conn) => conn.metadata_column_types(),
Self::LoadGenerator(conn) => conn.metadata_column_types(),
Self::TestScript(conn) => conn.metadata_column_types(),
}
}
fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), StorageError> {
if self == other {
return Ok(());
}
match (self, other) {
(Self::Kafka(conn), Self::Kafka(other)) => conn.alter_compatible(id, other),
(Self::Postgres(conn), Self::Postgres(other)) => conn.alter_compatible(id, other),
(Self::LoadGenerator(conn), Self::LoadGenerator(other)) => {
conn.alter_compatible(id, other)
}
(Self::TestScript(conn), Self::TestScript(other)) => conn.alter_compatible(id, other),
_ => Err(StorageError::InvalidAlterSource { id }),
}
}
}
impl RustType<ProtoSourceConnection> for GenericSourceConnection<InlinedConnection> {
fn into_proto(&self) -> ProtoSourceConnection {
use proto_source_connection::Kind;
ProtoSourceConnection {
kind: Some(match self {
GenericSourceConnection::Kafka(kafka) => Kind::Kafka(kafka.into_proto()),
GenericSourceConnection::Postgres(postgres) => {
Kind::Postgres(postgres.into_proto())
}
GenericSourceConnection::LoadGenerator(loadgen) => {
Kind::Loadgen(loadgen.into_proto())
}
GenericSourceConnection::TestScript(testscript) => {
Kind::Testscript(testscript.into_proto())
}
}),
}
}
fn from_proto(proto: ProtoSourceConnection) -> Result<Self, TryFromProtoError> {
use proto_source_connection::Kind;
let kind = proto
.kind
.ok_or_else(|| TryFromProtoError::missing_field("ProtoSourceConnection::kind"))?;
Ok(match kind {
Kind::Kafka(kafka) => GenericSourceConnection::Kafka(kafka.into_rust()?),
Kind::Postgres(postgres) => GenericSourceConnection::Postgres(postgres.into_rust()?),
Kind::Loadgen(loadgen) => GenericSourceConnection::LoadGenerator(loadgen.into_rust()?),
Kind::Testscript(testscript) => {
GenericSourceConnection::TestScript(testscript.into_rust()?)
}
})
}
}
#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
pub struct PostgresSourceConnection<C: ConnectionAccess = InlinedConnection> {
pub connection_id: GlobalId,
pub connection: C::Pg,
pub table_casts: BTreeMap<usize, Vec<MirScalarExpr>>,
pub publication: String,
pub publication_details: PostgresSourcePublicationDetails,
}
impl<R: ConnectionResolver> IntoInlineConnection<PostgresSourceConnection, R>
for PostgresSourceConnection<ReferencedConnection>
{
fn into_inline_connection(self, r: R) -> PostgresSourceConnection {
let PostgresSourceConnection {
connection_id,
connection,
table_casts,
publication,
publication_details,
} = self;
PostgresSourceConnection {
connection_id,
connection: r.resolve_connection(connection).unwrap_pg(),
table_casts,
publication,
publication_details,
}
}
}
impl<C: ConnectionAccess> Arbitrary for PostgresSourceConnection<C> {
type Strategy = BoxedStrategy<Self>;
type Parameters = ();
fn arbitrary_with(_: Self::Parameters) -> Self::Strategy {
(
any::<C::Pg>(),
any::<GlobalId>(),
proptest::collection::btree_map(
any::<usize>(),
proptest::collection::vec(any::<MirScalarExpr>(), 1..4),
1..4,
),
any::<String>(),
any::<PostgresSourcePublicationDetails>(),
)
.prop_map(
|(connection, connection_id, table_casts, publication, details)| Self {
connection,
connection_id,
table_casts,
publication,
publication_details: details,
},
)
.boxed()
}
}
pub static PG_PROGRESS_DESC: Lazy<RelationDesc> =
Lazy::new(|| RelationDesc::empty().with_column("lsn", ScalarType::UInt64.nullable(true)));
impl<C: ConnectionAccess> SourceConnection for PostgresSourceConnection<C> {
fn name(&self) -> &'static str {
"postgres"
}
fn upstream_name(&self) -> Option<&str> {
None
}
fn timestamp_desc(&self) -> RelationDesc {
PG_PROGRESS_DESC.clone()
}
fn connection_id(&self) -> Option<GlobalId> {
Some(self.connection_id)
}
fn metadata_columns(&self) -> Vec<(&str, ColumnType)> {
vec![]
}
fn metadata_column_types(&self) -> Vec<IncludedColumnSource> {
vec![]
}
fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), StorageError> {
if self == other {
return Ok(());
}
let PostgresSourceConnection {
connection_id,
connection,
table_casts,
publication,
publication_details,
} = self;
let compatibility_checks = [
connection_id == &other.connection_id,
connection == &other.connection,
table_casts
.iter()
.merge_join_by(&other.table_casts, |(l_key, _), (r_key, _)| {
l_key.cmp(r_key)
})
.all(|r| match r {
Both((_, l_val), (_, r_val)) => l_val == r_val,
_ => true,
}),
publication == &other.publication,
publication_details == &other.publication_details,
];
for compatible in compatibility_checks {
if !compatible {
tracing::warn!(
"PostgresSourceConnection incompatible:\nself:\n{:#?}\n\nother\n{:#?}",
self,
other
);
return Err(StorageError::InvalidAlterSource { id });
}
}
Ok(())
}
}
impl RustType<ProtoPostgresSourceConnection> for PostgresSourceConnection {
fn into_proto(&self) -> ProtoPostgresSourceConnection {
use proto_postgres_source_connection::ProtoPostgresTableCast;
let mut table_casts = Vec::with_capacity(self.table_casts.len());
let mut table_cast_pos = Vec::with_capacity(self.table_casts.len());
for (pos, table_cast_cols) in self.table_casts.iter() {
table_casts.push(ProtoPostgresTableCast {
column_casts: table_cast_cols
.iter()
.cloned()
.map(|cast| cast.into_proto())
.collect(),
});
table_cast_pos.push(mz_ore::cast::usize_to_u64(*pos));
}
ProtoPostgresSourceConnection {
connection: Some(self.connection.into_proto()),
connection_id: Some(self.connection_id.into_proto()),
publication: self.publication.clone(),
details: Some(self.publication_details.into_proto()),
table_casts,
table_cast_pos,
}
}
fn from_proto(proto: ProtoPostgresSourceConnection) -> Result<Self, TryFromProtoError> {
let table_cast_pos = if proto.table_casts.len() == proto.table_cast_pos.len() {
proto.table_cast_pos
} else {
(1..proto.table_casts.len() + 1)
.map(mz_ore::cast::usize_to_u64)
.collect()
};
let mut table_casts = BTreeMap::new();
for (pos, cast) in table_cast_pos
.into_iter()
.zip_eq(proto.table_casts.into_iter())
{
let mut column_casts = vec![];
for cast in cast.column_casts {
column_casts.push(cast.into_rust()?);
}
table_casts.insert(mz_ore::cast::u64_to_usize(pos), column_casts);
}
Ok(PostgresSourceConnection {
connection: proto
.connection
.into_rust_if_some("ProtoPostgresSourceConnection::connection")?,
connection_id: proto
.connection_id
.into_rust_if_some("ProtoPostgresSourceConnection::connection_id")?,
publication: proto.publication,
publication_details: proto
.details
.into_rust_if_some("ProtoPostgresSourceConnection::details")?,
table_casts,
})
}
}
#[derive(Arbitrary, Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
pub struct PostgresSourcePublicationDetails {
pub tables: Vec<mz_postgres_util::desc::PostgresTableDesc>,
pub slot: String,
}
impl RustType<ProtoPostgresSourcePublicationDetails> for PostgresSourcePublicationDetails {
fn into_proto(&self) -> ProtoPostgresSourcePublicationDetails {
ProtoPostgresSourcePublicationDetails {
tables: self.tables.iter().map(|t| t.into_proto()).collect(),
slot: self.slot.clone(),
}
}
fn from_proto(proto: ProtoPostgresSourcePublicationDetails) -> Result<Self, TryFromProtoError> {
Ok(PostgresSourcePublicationDetails {
tables: proto
.tables
.into_iter()
.map(mz_postgres_util::desc::PostgresTableDesc::from_proto)
.collect::<Result<_, _>>()?,
slot: proto.slot,
})
}
}
#[derive(Arbitrary, Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
pub struct LoadGeneratorSourceConnection {
pub load_generator: LoadGenerator,
pub tick_micros: Option<u64>,
}
pub static LOAD_GEN_PROGRESS_DESC: Lazy<RelationDesc> =
Lazy::new(|| RelationDesc::empty().with_column("offset", ScalarType::UInt64.nullable(true)));
impl SourceConnection for LoadGeneratorSourceConnection {
fn name(&self) -> &'static str {
"load-generator"
}
fn upstream_name(&self) -> Option<&str> {
None
}
fn timestamp_desc(&self) -> RelationDesc {
LOAD_GEN_PROGRESS_DESC.clone()
}
fn connection_id(&self) -> Option<GlobalId> {
None
}
fn metadata_columns(&self) -> Vec<(&str, ColumnType)> {
vec![]
}
fn metadata_column_types(&self) -> Vec<IncludedColumnSource> {
vec![]
}
}
#[derive(Arbitrary, Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
pub enum LoadGenerator {
Auction,
Counter {
max_cardinality: Option<u64>,
},
Datums,
Marketing,
Tpch {
count_supplier: i64,
count_part: i64,
count_customer: i64,
count_orders: i64,
count_clerk: i64,
},
}
impl LoadGenerator {
fn data_encoding_inner<C: ConnectionAccess>(&self) -> DataEncodingInner<C> {
match self {
LoadGenerator::Auction => DataEncodingInner::RowCodec(RelationDesc::empty()),
LoadGenerator::Datums => {
let mut desc =
RelationDesc::empty().with_column("rowid", ScalarType::Int64.nullable(false));
let typs = ScalarType::enumerate();
let mut names = BTreeSet::new();
for typ in typs {
let mut name = format!("_{:?}", typ)
.split(' ')
.next()
.unwrap()
.to_lowercase();
while names.contains(&name) {
name.push('_');
}
names.insert(name.clone());
desc = desc.with_column(name, typ.clone().nullable(true));
}
DataEncodingInner::RowCodec(desc)
}
LoadGenerator::Counter { .. } => DataEncodingInner::RowCodec(
RelationDesc::empty().with_column("counter", ScalarType::Int64.nullable(false)),
),
LoadGenerator::Marketing => DataEncodingInner::RowCodec(RelationDesc::empty()),
LoadGenerator::Tpch { .. } => DataEncodingInner::RowCodec(RelationDesc::empty()),
}
}
pub fn data_encoding<C: ConnectionAccess>(&self) -> SourceDataEncoding<C> {
SourceDataEncoding::Single(DataEncoding::new(self.data_encoding_inner()))
}
pub fn views(&self) -> Vec<(&str, RelationDesc)> {
match self {
LoadGenerator::Auction => vec![
(
"organizations",
RelationDesc::empty()
.with_column("id", ScalarType::Int64.nullable(false))
.with_column("name", ScalarType::String.nullable(false))
.with_key(vec![0]),
),
(
"users",
RelationDesc::empty()
.with_column("id", ScalarType::Int64.nullable(false))
.with_column("org_id", ScalarType::Int64.nullable(false))
.with_column("name", ScalarType::String.nullable(false))
.with_key(vec![0]),
),
(
"accounts",
RelationDesc::empty()
.with_column("id", ScalarType::Int64.nullable(false))
.with_column("org_id", ScalarType::Int64.nullable(false))
.with_column("balance", ScalarType::Int64.nullable(false))
.with_key(vec![0]),
),
(
"auctions",
RelationDesc::empty()
.with_column("id", ScalarType::Int64.nullable(false))
.with_column("seller", ScalarType::Int64.nullable(false))
.with_column("item", ScalarType::String.nullable(false))
.with_column("end_time", ScalarType::TimestampTz.nullable(false))
.with_key(vec![0]),
),
(
"bids",
RelationDesc::empty()
.with_column("id", ScalarType::Int64.nullable(false))
.with_column("buyer", ScalarType::Int64.nullable(false))
.with_column("auction_id", ScalarType::Int64.nullable(false))
.with_column("amount", ScalarType::Int32.nullable(false))
.with_column("bid_time", ScalarType::TimestampTz.nullable(false))
.with_key(vec![0]),
),
],
LoadGenerator::Counter { max_cardinality: _ } => vec![],
LoadGenerator::Marketing => {
vec![
(
"customers",
RelationDesc::empty()
.with_column("id", ScalarType::Int64.nullable(false))
.with_column("email", ScalarType::String.nullable(false))
.with_column("income", ScalarType::Int64.nullable(false))
.with_key(vec![0]),
),
(
"impressions",
RelationDesc::empty()
.with_column("id", ScalarType::Int64.nullable(false))
.with_column("customer_id", ScalarType::Int64.nullable(false))
.with_column("campaign_id", ScalarType::Int64.nullable(false))
.with_column("impression_time", ScalarType::TimestampTz.nullable(false))
.with_key(vec![0]),
),
(
"clicks",
RelationDesc::empty()
.with_column("impression_id", ScalarType::Int64.nullable(false))
.with_column("click_time", ScalarType::TimestampTz.nullable(false))
.without_keys(),
),
(
"leads",
RelationDesc::empty()
.with_column("id", ScalarType::Int64.nullable(false))
.with_column("customer_id", ScalarType::Int64.nullable(false))
.with_column("created_at", ScalarType::TimestampTz.nullable(false))
.with_column("converted_at", ScalarType::TimestampTz.nullable(true))
.with_column("conversion_amount", ScalarType::Int64.nullable(true))
.with_key(vec![0]),
),
(
"coupons",
RelationDesc::empty()
.with_column("id", ScalarType::Int64.nullable(false))
.with_column("lead_id", ScalarType::Int64.nullable(false))
.with_column("created_at", ScalarType::TimestampTz.nullable(false))
.with_column("amount", ScalarType::Int64.nullable(false))
.with_key(vec![0]),
),
(
"conversion_predictions",
RelationDesc::empty()
.with_column("lead_id", ScalarType::Int64.nullable(false))
.with_column("experiment_bucket", ScalarType::String.nullable(false))
.with_column("predicted_at", ScalarType::TimestampTz.nullable(false))
.with_column("score", ScalarType::Float64.nullable(false))
.without_keys(),
),
]
}
LoadGenerator::Datums => vec![],
LoadGenerator::Tpch { .. } => {
let identifier = ScalarType::Int64.nullable(false);
let decimal = ScalarType::Numeric {
max_scale: Some(NumericMaxScale::try_from(2i64).unwrap()),
}
.nullable(false);
vec![
(
"supplier",
RelationDesc::empty()
.with_column("s_suppkey", identifier.clone())
.with_column("s_name", ScalarType::String.nullable(false))
.with_column("s_address", ScalarType::String.nullable(false))
.with_column("s_nationkey", identifier.clone())
.with_column("s_phone", ScalarType::String.nullable(false))
.with_column("s_acctbal", decimal.clone())
.with_column("s_comment", ScalarType::String.nullable(false))
.with_key(vec![0]),
),
(
"part",
RelationDesc::empty()
.with_column("p_partkey", identifier.clone())
.with_column("p_name", ScalarType::String.nullable(false))
.with_column("p_mfgr", ScalarType::String.nullable(false))
.with_column("p_brand", ScalarType::String.nullable(false))
.with_column("p_type", ScalarType::String.nullable(false))
.with_column("p_size", ScalarType::Int32.nullable(false))
.with_column("p_container", ScalarType::String.nullable(false))
.with_column("p_retailprice", decimal.clone())
.with_column("p_comment", ScalarType::String.nullable(false))
.with_key(vec![0]),
),
(
"partsupp",
RelationDesc::empty()
.with_column("ps_partkey", identifier.clone())
.with_column("ps_suppkey", identifier.clone())
.with_column("ps_availqty", ScalarType::Int32.nullable(false))
.with_column("ps_supplycost", decimal.clone())
.with_column("ps_comment", ScalarType::String.nullable(false))
.with_key(vec![0, 1]),
),
(
"customer",
RelationDesc::empty()
.with_column("c_custkey", identifier.clone())
.with_column("c_name", ScalarType::String.nullable(false))
.with_column("c_address", ScalarType::String.nullable(false))
.with_column("c_nationkey", identifier.clone())
.with_column("c_phone", ScalarType::String.nullable(false))
.with_column("c_acctbal", decimal.clone())
.with_column("c_mktsegment", ScalarType::String.nullable(false))
.with_column("c_comment", ScalarType::String.nullable(false))
.with_key(vec![0]),
),
(
"orders",
RelationDesc::empty()
.with_column("o_orderkey", identifier.clone())
.with_column("o_custkey", identifier.clone())
.with_column("o_orderstatus", ScalarType::String.nullable(false))
.with_column("o_totalprice", decimal.clone())
.with_column("o_orderdate", ScalarType::Date.nullable(false))
.with_column("o_orderpriority", ScalarType::String.nullable(false))
.with_column("o_clerk", ScalarType::String.nullable(false))
.with_column("o_shippriority", ScalarType::Int32.nullable(false))
.with_column("o_comment", ScalarType::String.nullable(false))
.with_key(vec![0]),
),
(
"lineitem",
RelationDesc::empty()
.with_column("l_orderkey", identifier.clone())
.with_column("l_partkey", identifier.clone())
.with_column("l_suppkey", identifier.clone())
.with_column("l_linenumber", ScalarType::Int32.nullable(false))
.with_column("l_quantity", decimal.clone())
.with_column("l_extendedprice", decimal.clone())
.with_column("l_discount", decimal.clone())
.with_column("l_tax", decimal)
.with_column("l_returnflag", ScalarType::String.nullable(false))
.with_column("l_linestatus", ScalarType::String.nullable(false))
.with_column("l_shipdate", ScalarType::Date.nullable(false))
.with_column("l_commitdate", ScalarType::Date.nullable(false))
.with_column("l_receiptdate", ScalarType::Date.nullable(false))
.with_column("l_shipinstruct", ScalarType::String.nullable(false))
.with_column("l_shipmode", ScalarType::String.nullable(false))
.with_column("l_comment", ScalarType::String.nullable(false))
.with_key(vec![0, 3]),
),
(
"nation",
RelationDesc::empty()
.with_column("n_nationkey", identifier.clone())
.with_column("n_name", ScalarType::String.nullable(false))
.with_column("n_regionkey", identifier.clone())
.with_column("n_comment", ScalarType::String.nullable(false))
.with_key(vec![0]),
),
(
"region",
RelationDesc::empty()
.with_column("r_regionkey", identifier)
.with_column("r_name", ScalarType::String.nullable(false))
.with_column("r_comment", ScalarType::String.nullable(false))
.with_key(vec![0]),
),
]
}
}
}
pub fn is_monotonic(&self) -> bool {
match self {
LoadGenerator::Auction => true,
LoadGenerator::Counter {
max_cardinality: None,
} => true,
LoadGenerator::Counter { .. } => false,
LoadGenerator::Marketing => false,
LoadGenerator::Datums => true,
LoadGenerator::Tpch { .. } => false,
}
}
}
pub trait Generator {
fn by_seed(
&self,
now: NowFn,
seed: Option<u64>,
resume_offset: MzOffset,
) -> Box<dyn Iterator<Item = (usize, Event<Option<MzOffset>, (Row, i64)>)>>;
}
impl RustType<ProtoLoadGeneratorSourceConnection> for LoadGeneratorSourceConnection {
fn into_proto(&self) -> ProtoLoadGeneratorSourceConnection {
ProtoLoadGeneratorSourceConnection {
generator: Some(match &self.load_generator {
LoadGenerator::Auction => ProtoGenerator::Auction(()),
LoadGenerator::Counter { max_cardinality } => {
ProtoGenerator::Counter(ProtoCounterLoadGenerator {
max_cardinality: *max_cardinality,
})
}
LoadGenerator::Marketing => ProtoGenerator::Marketing(()),
LoadGenerator::Tpch {
count_supplier,
count_part,
count_customer,
count_orders,
count_clerk,
} => ProtoGenerator::Tpch(ProtoTpchLoadGenerator {
count_supplier: *count_supplier,
count_part: *count_part,
count_customer: *count_customer,
count_orders: *count_orders,
count_clerk: *count_clerk,
}),
LoadGenerator::Datums => ProtoGenerator::Datums(()),
}),
tick_micros: self.tick_micros,
}
}
fn from_proto(proto: ProtoLoadGeneratorSourceConnection) -> Result<Self, TryFromProtoError> {
let generator = proto.generator.ok_or_else(|| {
TryFromProtoError::missing_field("ProtoLoadGeneratorSourceConnection::generator")
})?;
Ok(LoadGeneratorSourceConnection {
load_generator: match generator {
ProtoGenerator::Auction(()) => LoadGenerator::Auction,
ProtoGenerator::Counter(ProtoCounterLoadGenerator { max_cardinality }) => {
LoadGenerator::Counter { max_cardinality }
}
ProtoGenerator::Marketing(()) => LoadGenerator::Marketing,
ProtoGenerator::Tpch(ProtoTpchLoadGenerator {
count_supplier,
count_part,
count_customer,
count_orders,
count_clerk,
}) => LoadGenerator::Tpch {
count_supplier,
count_part,
count_customer,
count_orders,
count_clerk,
},
ProtoGenerator::Datums(()) => LoadGenerator::Datums,
},
tick_micros: proto.tick_micros,
})
}
}
#[derive(Arbitrary, Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
pub struct TestScriptSourceConnection {
pub desc_json: String,
}
pub static TEST_SCRIPT_PROGRESS_DESC: Lazy<RelationDesc> =
Lazy::new(|| RelationDesc::empty().with_column("offset", ScalarType::UInt64.nullable(true)));
impl SourceConnection for TestScriptSourceConnection {
fn name(&self) -> &'static str {
"testscript"
}
fn upstream_name(&self) -> Option<&str> {
None
}
fn timestamp_desc(&self) -> RelationDesc {
TEST_SCRIPT_PROGRESS_DESC.clone()
}
fn connection_id(&self) -> Option<GlobalId> {
None
}
fn metadata_columns(&self) -> Vec<(&str, ColumnType)> {
vec![]
}
fn metadata_column_types(&self) -> Vec<IncludedColumnSource> {
vec![]
}
}
impl RustType<ProtoTestScriptSourceConnection> for TestScriptSourceConnection {
fn into_proto(&self) -> ProtoTestScriptSourceConnection {
ProtoTestScriptSourceConnection {
desc_json: self.desc_json.clone(),
}
}
fn from_proto(proto: ProtoTestScriptSourceConnection) -> Result<Self, TryFromProtoError> {
Ok(TestScriptSourceConnection {
desc_json: proto.desc_json,
})
}
}
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
#[repr(transparent)]
pub struct SourceData(pub Result<Row, DataflowError>);
#[cfg(test)]
impl Default for SourceData {
fn default() -> Self {
SourceData(Ok(Row::default()))
}
}
impl Deref for SourceData {
type Target = Result<Row, DataflowError>;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl DerefMut for SourceData {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}
impl RustType<ProtoSourceData> for SourceData {
fn into_proto(&self) -> ProtoSourceData {
use proto_source_data::Kind;
ProtoSourceData {
kind: Some(match &**self {
Ok(row) => Kind::Ok(row.into_proto()),
Err(err) => Kind::Err(err.into_proto()),
}),
}
}
fn from_proto(proto: ProtoSourceData) -> Result<Self, TryFromProtoError> {
use proto_source_data::Kind;
match proto.kind {
Some(kind) => match kind {
Kind::Ok(row) => Ok(SourceData(Ok(row.into_rust()?))),
Kind::Err(err) => Ok(SourceData(Err(err.into_rust()?))),
},
None => Result::Err(TryFromProtoError::missing_field("ProtoSourceData::kind")),
}
}
}
impl Codec for SourceData {
type Schema = RelationDesc;
fn codec_name() -> String {
"protobuf[SourceData]".into()
}
fn encode<B: BufMut>(&self, buf: &mut B) {
self.into_proto()
.encode(buf)
.expect("no required fields means no initialization errors");
}
fn decode(buf: &[u8]) -> Result<Self, String> {
let proto = ProtoSourceData::decode(buf).map_err(|err| err.to_string())?;
proto.into_rust().map_err(|err| err.to_string())
}
}
#[derive(Debug)]
pub struct SourceDataEncoder<'a> {
len: &'a mut usize,
ok_validity: ValidityMut<'a>,
ok: RowEncoder<'a>,
err: &'a mut <Option<Vec<u8>> as Data>::Mut,
}
impl<'a> PartEncoder<'a, SourceData> for SourceDataEncoder<'a> {
fn encode(&mut self, val: &SourceData) {
*self.len += 1;
match val.as_ref() {
Ok(row) => {
self.ok_validity.push(true);
self.ok.inc_len();
for (encoder, datum) in self.ok.col_encoders().iter_mut().zip(row.iter()) {
encoder.encode(datum);
}
ColumnPush::<Option<Vec<u8>>>::push(self.err, None);
}
Err(err) => {
self.ok_validity.push(false);
self.ok.inc_len();
for encoder in self.ok.col_encoders() {
encoder.encode_default();
}
let err = err.into_proto().encode_to_vec();
ColumnPush::<Option<Vec<u8>>>::push(self.err, Some(err.as_slice()));
}
}
}
}
#[derive(Debug)]
pub struct SourceDataDecoder<'a> {
ok_validity: ValidityRef<'a>,
ok: RowDecoder<'a>,
err: &'a <Option<Vec<u8>> as Data>::Col,
}
impl<'a> PartDecoder<'a, SourceData> for SourceDataDecoder<'a> {
fn decode(&self, idx: usize, val: &mut SourceData) {
let err = ColumnGet::<Option<Vec<u8>>>::get(self.err, idx);
match (self.ok_validity.get(idx), err) {
(true, None) => {
let mut packer = match val.0.as_mut() {
Ok(x) => x.packer(),
Err(_) => {
val.0 = Ok(Row::default());
val.0.as_mut().unwrap().packer()
}
};
for decoder in self.ok.col_decoders() {
decoder.decode(idx, &mut packer);
}
}
(false, Some(err)) => {
let err = ProtoDataflowError::decode(err)
.expect("proto should be valid")
.into_rust()
.expect("error should be valid");
val.0 = Err(err);
}
(true, Some(_)) | (false, None) => {
panic!("SourceData should have exactly one of ok or err")
}
};
}
}
impl Schema<SourceData> for RelationDesc {
type Encoder<'a> = SourceDataEncoder<'a>;
type Decoder<'a> = SourceDataDecoder<'a>;
fn columns(&self) -> DynStructCfg {
let ok_schema = Schema::<Row>::columns(self);
let cols = vec![
(
"ok".to_owned(),
DataType {
optional: true,
format: ColumnFormat::Struct(ok_schema),
},
StatsFn::Default,
),
(
"err".to_owned(),
DataType {
optional: true,
format: ColumnFormat::Bytes,
},
StatsFn::Default,
),
];
DynStructCfg::from(cols)
}
fn decoder<'a>(
&self,
mut cols: mz_persist_types::dyn_struct::ColumnsRef<'a>,
) -> Result<Self::Decoder<'a>, String> {
let ok = cols.col::<Option<DynStruct>>("ok")?;
let err = cols.col::<Option<Vec<u8>>>("err")?;
let () = cols.finish()?;
let (ok_validity, ok) = RelationDesc::decoder(self, ok.as_opt_ref())?;
Ok(SourceDataDecoder {
ok_validity,
ok,
err,
})
}
fn encoder<'a>(
&self,
mut cols: mz_persist_types::dyn_struct::ColumnsMut<'a>,
) -> Result<Self::Encoder<'a>, String> {
let ok = cols.col::<Option<DynStruct>>("ok")?;
let err = cols.col::<Option<Vec<u8>>>("err")?;
let (len, ()) = cols.finish()?;
let (ok_validity, ok) = RelationDesc::encoder(self, ok.as_opt_mut())?;
Ok(SourceDataEncoder {
len,
ok_validity,
ok,
err,
})
}
}
#[cfg(test)]
mod tests {
use mz_repr::is_no_stats_type;
use proptest::prelude::*;
use crate::types::errors::EnvelopeError;
use super::*;
#[mz_ore::test]
fn test_timeline_parsing() {
assert_eq!(Ok(Timeline::EpochMilliseconds), "M".parse());
assert_eq!(Ok(Timeline::External("JOE".to_string())), "E.JOE".parse());
assert_eq!(Ok(Timeline::User("MIKE".to_string())), "U.MIKE".parse());
assert!("Materialize".parse::<Timeline>().is_err());
assert!("Ejoe".parse::<Timeline>().is_err());
assert!("Umike".parse::<Timeline>().is_err());
assert!("Dance".parse::<Timeline>().is_err());
assert!("".parse::<Timeline>().is_err());
}
fn scalar_type_columnar_roundtrip(scalar_type: ScalarType) {
let skip_decode = is_no_stats_type(&scalar_type);
use mz_persist_types::columnar::validate_roundtrip;
let mut rows = Vec::new();
for datum in scalar_type.interesting_datums() {
rows.push(SourceData(Ok(Row::pack(std::iter::once(datum)))));
}
rows.push(SourceData(
Err(EnvelopeError::Debezium("foo".into()).into()),
));
let schema = RelationDesc::empty().with_column("col", scalar_type.clone().nullable(false));
for row in rows.iter() {
assert_eq!(validate_roundtrip(&schema, row, skip_decode), Ok(()));
}
let schema = RelationDesc::empty().with_column("col", scalar_type.nullable(true));
rows.push(SourceData(Ok(Row::pack(std::iter::once(Datum::Null)))));
for row in rows.iter() {
assert_eq!(validate_roundtrip(&schema, row, skip_decode), Ok(()));
}
}
#[mz_ore::test]
#[cfg_attr(miri, ignore)] fn all_scalar_types_columnar_roundtrip() {
proptest!(|(scalar_type in any::<ScalarType>())| {
scalar_type_columnar_roundtrip(scalar_type)
});
}
}