1use std::collections::BTreeMap;
13use std::fmt::Debug;
14use std::hash::Hash;
15use std::ops::{Add, AddAssign, Deref, DerefMut};
16use std::str::FromStr;
17use std::sync::Arc;
18use std::time::Duration;
19
20use arrow::array::{Array, ArrayRef, BinaryArray, BinaryBuilder, NullArray, StructArray};
21use arrow::datatypes::{Field, Fields};
22use bytes::{BufMut, Bytes};
23use columnation::Columnation;
24use itertools::EitherOrBoth::Both;
25use itertools::Itertools;
26use kafka::KafkaSourceExportDetails;
27use load_generator::{LoadGeneratorOutput, LoadGeneratorSourceExportDetails};
28use mz_ore::assert_none;
29use mz_persist_types::Codec;
30use mz_persist_types::arrow::ArrayOrd;
31use mz_persist_types::columnar::{ColumnDecoder, ColumnEncoder, Schema};
32use mz_persist_types::stats::{
33 ColumnNullStats, ColumnStatKinds, ColumnarStats, ColumnarStatsBuilder, PrimitiveStats,
34 StructStats,
35};
36use mz_proto::{IntoRustIfSome, ProtoType, RustType, TryFromProtoError};
37use mz_repr::{
38 CatalogItemId, Datum, GlobalId, ProtoRelationDesc, ProtoRow, RelationDesc, Row,
39 RowColumnarDecoder, RowColumnarEncoder, arb_row_for_relation,
40};
41use mz_sql_parser::ast::{Ident, IdentError, UnresolvedItemName};
42use proptest::prelude::any;
43use proptest::strategy::Strategy;
44use prost::Message;
45use serde::{Deserialize, Serialize};
46use timely::order::{PartialOrder, TotalOrder};
47use timely::progress::timestamp::Refines;
48use timely::progress::{PathSummary, Timestamp};
49
50use crate::AlterCompatible;
51use crate::connections::inline::{
52 ConnectionAccess, ConnectionResolver, InlinedConnection, IntoInlineConnection,
53 ReferencedConnection,
54};
55use crate::controller::AlterError;
56use crate::errors::{DataflowError, ProtoDataflowError};
57use crate::instances::StorageInstanceId;
58use crate::sources::sql_server::SqlServerSourceExportDetails;
59
60pub mod casts;
61pub mod encoding;
62pub mod envelope;
63pub mod kafka;
64pub mod load_generator;
65pub mod mysql;
66pub mod postgres;
67pub mod sql_server;
68
69pub use crate::sources::envelope::SourceEnvelope;
70pub use crate::sources::kafka::KafkaSourceConnection;
71pub use crate::sources::load_generator::LoadGeneratorSourceConnection;
72pub use crate::sources::mysql::{MySqlSourceConnection, MySqlSourceExportDetails};
73pub use crate::sources::postgres::{PostgresSourceConnection, PostgresSourceExportDetails};
74pub use crate::sources::sql_server::{SqlServerSourceConnection, SqlServerSourceExtras};
75
76include!(concat!(env!("OUT_DIR"), "/mz_storage_types.sources.rs"));
77
78#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
80pub struct IngestionDescription<S: 'static = (), C: ConnectionAccess = InlinedConnection> {
81 pub desc: SourceDesc<C>,
83 pub source_exports: BTreeMap<GlobalId, SourceExport<S>>,
97 pub instance_id: StorageInstanceId,
99 pub remap_collection_id: GlobalId,
101 pub remap_metadata: S,
103}
104
105impl IngestionDescription {
106 pub fn new(
107 desc: SourceDesc,
108 instance_id: StorageInstanceId,
109 remap_collection_id: GlobalId,
110 ) -> Self {
111 Self {
112 desc,
113 remap_metadata: (),
114 source_exports: BTreeMap::new(),
115 instance_id,
116 remap_collection_id,
117 }
118 }
119}
120
121impl<S> IngestionDescription<S> {
122 pub fn collection_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
127 let IngestionDescription {
130 desc: _,
131 remap_metadata: _,
132 source_exports,
133 instance_id: _,
134 remap_collection_id,
135 } = &self;
136
137 source_exports
138 .keys()
139 .copied()
140 .chain(std::iter::once(*remap_collection_id))
141 }
142}
143
144impl<S: Debug + Eq + PartialEq + AlterCompatible> AlterCompatible for IngestionDescription<S> {
145 fn alter_compatible(
146 &self,
147 id: GlobalId,
148 other: &IngestionDescription<S>,
149 ) -> Result<(), AlterError> {
150 if self == other {
151 return Ok(());
152 }
153 let IngestionDescription {
154 desc,
155 remap_metadata,
156 source_exports,
157 instance_id,
158 remap_collection_id,
159 } = self;
160
161 let compatibility_checks = [
162 (desc.alter_compatible(id, &other.desc).is_ok(), "desc"),
163 (remap_metadata == &other.remap_metadata, "remap_metadata"),
164 (
165 source_exports
166 .iter()
167 .merge_join_by(&other.source_exports, |(l_key, _), (r_key, _)| {
168 l_key.cmp(r_key)
169 })
170 .all(|r| match r {
171 Both(
172 (
173 _,
174 SourceExport {
175 storage_metadata: l_metadata,
176 details: l_details,
177 data_config: l_data_config,
178 },
179 ),
180 (
181 _,
182 SourceExport {
183 storage_metadata: r_metadata,
184 details: r_details,
185 data_config: r_data_config,
186 },
187 ),
188 ) => {
189 l_metadata.alter_compatible(id, r_metadata).is_ok()
190 && l_details.alter_compatible(id, r_details).is_ok()
191 && l_data_config.alter_compatible(id, r_data_config).is_ok()
192 }
193 _ => true,
194 }),
195 "source_exports",
196 ),
197 (instance_id == &other.instance_id, "instance_id"),
198 (
199 remap_collection_id == &other.remap_collection_id,
200 "remap_collection_id",
201 ),
202 ];
203 for (compatible, field) in compatibility_checks {
204 if !compatible {
205 tracing::warn!(
206 "IngestionDescription incompatible at {field}:\nself:\n{:#?}\n\nother\n{:#?}",
207 self,
208 other
209 );
210
211 return Err(AlterError { id });
212 }
213 }
214
215 Ok(())
216 }
217}
218
219impl<R: ConnectionResolver> IntoInlineConnection<IngestionDescription, R>
220 for IngestionDescription<(), ReferencedConnection>
221{
222 fn into_inline_connection(self, r: R) -> IngestionDescription {
223 let IngestionDescription {
224 desc,
225 remap_metadata,
226 source_exports,
227 instance_id,
228 remap_collection_id,
229 } = self;
230
231 IngestionDescription {
232 desc: desc.into_inline_connection(r),
233 remap_metadata,
234 source_exports,
235 instance_id,
236 remap_collection_id,
237 }
238 }
239}
240
241#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
242pub struct SourceExport<S = (), C: ConnectionAccess = InlinedConnection> {
243 pub storage_metadata: S,
245 pub details: SourceExportDetails,
247 pub data_config: SourceExportDataConfig<C>,
249}
250
251pub trait SourceTimestamp:
252 Timestamp + Columnation + Refines<()> + std::fmt::Display + Sync
253{
254 fn encode_row(&self) -> Row;
255 fn decode_row(row: &Row) -> Self;
256}
257
258impl SourceTimestamp for MzOffset {
259 fn encode_row(&self) -> Row {
260 Row::pack([Datum::UInt64(self.offset)])
261 }
262
263 fn decode_row(row: &Row) -> Self {
264 let mut datums = row.iter();
265 match (datums.next(), datums.next()) {
266 (Some(Datum::UInt64(offset)), None) => MzOffset::from(offset),
267 _ => panic!("invalid row {row:?}"),
268 }
269 }
270}
271
272#[derive(
276 Copy,
277 Clone,
278 Default,
279 Debug,
280 PartialEq,
281 PartialOrd,
282 Eq,
283 Ord,
284 Hash,
285 Serialize,
286 Deserialize
287)]
288pub struct MzOffset {
289 pub offset: u64,
290}
291
292impl differential_dataflow::difference::Semigroup for MzOffset {
293 fn plus_equals(&mut self, rhs: &Self) {
294 self.offset.plus_equals(&rhs.offset)
295 }
296}
297
298impl differential_dataflow::difference::IsZero for MzOffset {
299 fn is_zero(&self) -> bool {
300 self.offset.is_zero()
301 }
302}
303
304impl mz_persist_types::Codec64 for MzOffset {
305 fn codec_name() -> String {
306 "MzOffset".to_string()
307 }
308
309 fn encode(&self) -> [u8; 8] {
310 mz_persist_types::Codec64::encode(&self.offset)
311 }
312
313 fn decode(buf: [u8; 8]) -> Self {
314 Self {
315 offset: mz_persist_types::Codec64::decode(buf),
316 }
317 }
318}
319
320impl columnation::Columnation for MzOffset {
321 type InnerRegion = columnation::CopyRegion<MzOffset>;
322}
323
324impl MzOffset {
325 pub fn checked_sub(self, other: Self) -> Option<Self> {
326 self.offset
327 .checked_sub(other.offset)
328 .map(|offset| Self { offset })
329 }
330}
331
332impl From<u64> for MzOffset {
335 fn from(offset: u64) -> Self {
336 Self { offset }
337 }
338}
339
340impl std::fmt::Display for MzOffset {
341 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
342 write!(f, "{}", self.offset)
343 }
344}
345
346impl Add<u64> for MzOffset {
348 type Output = MzOffset;
349
350 fn add(self, x: u64) -> MzOffset {
351 MzOffset {
352 offset: self.offset + x,
353 }
354 }
355}
356impl Add<Self> for MzOffset {
357 type Output = Self;
358
359 fn add(self, x: Self) -> Self {
360 MzOffset {
361 offset: self.offset + x.offset,
362 }
363 }
364}
365impl AddAssign<u64> for MzOffset {
366 fn add_assign(&mut self, x: u64) {
367 self.offset += x;
368 }
369}
370impl AddAssign<Self> for MzOffset {
371 fn add_assign(&mut self, x: Self) {
372 self.offset += x.offset;
373 }
374}
375
376impl From<tokio_postgres::types::PgLsn> for MzOffset {
378 fn from(lsn: tokio_postgres::types::PgLsn) -> Self {
379 MzOffset { offset: lsn.into() }
380 }
381}
382
383impl Timestamp for MzOffset {
384 type Summary = MzOffset;
385
386 fn minimum() -> Self {
387 MzOffset {
388 offset: Timestamp::minimum(),
389 }
390 }
391}
392
393impl PathSummary<MzOffset> for MzOffset {
394 fn results_in(&self, src: &MzOffset) -> Option<MzOffset> {
395 Some(MzOffset {
396 offset: self.offset.results_in(&src.offset)?,
397 })
398 }
399
400 fn followed_by(&self, other: &Self) -> Option<Self> {
401 Some(MzOffset {
402 offset: PathSummary::<u64>::followed_by(&self.offset, &other.offset)?,
403 })
404 }
405}
406
407impl Refines<()> for MzOffset {
408 fn to_inner(_: ()) -> Self {
409 MzOffset::minimum()
410 }
411 fn to_outer(self) {}
412 fn summarize(_: Self::Summary) {}
413}
414
415impl PartialOrder for MzOffset {
416 #[inline]
417 fn less_equal(&self, other: &Self) -> bool {
418 self.offset.less_equal(&other.offset)
419 }
420}
421
422impl TotalOrder for MzOffset {}
423
424#[derive(
433 Clone,
434 Debug,
435 Ord,
436 PartialOrd,
437 Eq,
438 PartialEq,
439 Serialize,
440 Deserialize,
441 Hash
442)]
443pub enum Timeline {
444 EpochMilliseconds,
447 External(String),
451 User(String),
455}
456
457impl Timeline {
458 const EPOCH_MILLISECOND_ID_CHAR: char = 'M';
459 const EXTERNAL_ID_CHAR: char = 'E';
460 const USER_ID_CHAR: char = 'U';
461
462 fn id_char(&self) -> char {
463 match self {
464 Self::EpochMilliseconds => Self::EPOCH_MILLISECOND_ID_CHAR,
465 Self::External(_) => Self::EXTERNAL_ID_CHAR,
466 Self::User(_) => Self::USER_ID_CHAR,
467 }
468 }
469}
470
471impl ToString for Timeline {
472 fn to_string(&self) -> String {
473 match self {
474 Self::EpochMilliseconds => format!("{}", self.id_char()),
475 Self::External(id) => format!("{}.{id}", self.id_char()),
476 Self::User(id) => format!("{}.{id}", self.id_char()),
477 }
478 }
479}
480
481impl FromStr for Timeline {
482 type Err = String;
483
484 fn from_str(s: &str) -> Result<Self, Self::Err> {
485 if s.is_empty() {
486 return Err("empty timeline".to_string());
487 }
488 let mut chars = s.chars();
489 match chars.next().expect("non-empty string") {
490 Self::EPOCH_MILLISECOND_ID_CHAR => match chars.next() {
491 None => Ok(Self::EpochMilliseconds),
492 Some(_) => Err(format!("unknown timeline: {s}")),
493 },
494 Self::EXTERNAL_ID_CHAR => match chars.next() {
495 Some('.') => Ok(Self::External(chars.as_str().to_string())),
496 _ => Err(format!("unknown timeline: {s}")),
497 },
498 Self::USER_ID_CHAR => match chars.next() {
499 Some('.') => Ok(Self::User(chars.as_str().to_string())),
500 _ => Err(format!("unknown timeline: {s}")),
501 },
502 _ => Err(format!("unknown timeline: {s}")),
503 }
504 }
505}
506
507pub trait SourceConnection: Debug + Clone + PartialEq + AlterCompatible {
509 fn name(&self) -> &'static str;
511
512 fn external_reference(&self) -> Option<&str>;
514
515 fn default_key_desc(&self) -> RelationDesc;
519
520 fn default_value_desc(&self) -> RelationDesc;
524
525 fn timestamp_desc(&self) -> RelationDesc;
528
529 fn connection_id(&self) -> Option<CatalogItemId>;
532
533 fn supports_read_only(&self) -> bool;
535
536 fn prefers_single_replica(&self) -> bool;
538}
539
540#[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize, Deserialize)]
541pub enum Compression {
542 Gzip,
543 None,
544}
545
546#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
549pub struct SourceExportDataConfig<C: ConnectionAccess = InlinedConnection> {
550 pub encoding: Option<encoding::SourceDataEncoding<C>>,
551 pub envelope: SourceEnvelope,
552}
553
554impl<R: ConnectionResolver> IntoInlineConnection<SourceExportDataConfig, R>
555 for SourceExportDataConfig<ReferencedConnection>
556{
557 fn into_inline_connection(self, r: R) -> SourceExportDataConfig {
558 let SourceExportDataConfig { encoding, envelope } = self;
559
560 SourceExportDataConfig {
561 encoding: encoding.map(|e| e.into_inline_connection(r)),
562 envelope,
563 }
564 }
565}
566
567impl<C: ConnectionAccess> AlterCompatible for SourceExportDataConfig<C> {
568 fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
569 if self == other {
570 return Ok(());
571 }
572 let Self { encoding, envelope } = &self;
573
574 let compatibility_checks = [
575 (
576 match (encoding, &other.encoding) {
577 (Some(s), Some(o)) => s.alter_compatible(id, o).is_ok(),
578 (s, o) => s == o,
579 },
580 "encoding",
581 ),
582 (envelope == &other.envelope, "envelope"),
583 ];
584
585 for (compatible, field) in compatibility_checks {
586 if !compatible {
587 tracing::warn!(
588 "SourceDesc incompatible {field}:\nself:\n{:#?}\n\nother\n{:#?}",
589 self,
590 other
591 );
592
593 return Err(AlterError { id });
594 }
595 }
596 Ok(())
597 }
598}
599
600impl<C: ConnectionAccess> SourceExportDataConfig<C> {
601 pub fn monotonic(&self, connection: &GenericSourceConnection<C>) -> bool {
608 match &self.envelope {
609 SourceEnvelope::Upsert(_) | SourceEnvelope::CdcV2 => false,
611 SourceEnvelope::None(_) => {
612 match connection {
613 GenericSourceConnection::Postgres(_) => false,
615 GenericSourceConnection::MySql(_) => false,
617 GenericSourceConnection::SqlServer(_) => false,
619 GenericSourceConnection::LoadGenerator(g) => g.load_generator.is_monotonic(),
621 GenericSourceConnection::Kafka(_) => true,
623 }
624 }
625 }
626 }
627}
628
629#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
631pub struct SourceDesc<C: ConnectionAccess = InlinedConnection> {
632 pub connection: GenericSourceConnection<C>,
633 pub timestamp_interval: Duration,
634}
635
636impl<R: ConnectionResolver> IntoInlineConnection<SourceDesc, R>
637 for SourceDesc<ReferencedConnection>
638{
639 fn into_inline_connection(self, r: R) -> SourceDesc {
640 let SourceDesc {
641 connection,
642 timestamp_interval,
643 } = self;
644
645 SourceDesc {
646 connection: connection.into_inline_connection(&r),
647 timestamp_interval,
648 }
649 }
650}
651
652impl<C: ConnectionAccess> AlterCompatible for SourceDesc<C> {
653 fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
657 if self == other {
658 return Ok(());
659 }
660 let Self {
661 connection,
662 timestamp_interval: _,
664 } = &self;
665
666 let compatibility_checks = [(
667 connection.alter_compatible(id, &other.connection).is_ok(),
668 "connection",
669 )];
670
671 for (compatible, field) in compatibility_checks {
672 if !compatible {
673 tracing::warn!(
674 "SourceDesc incompatible {field}:\nself:\n{:#?}\n\nother\n{:#?}",
675 self,
676 other
677 );
678
679 return Err(AlterError { id });
680 }
681 }
682
683 Ok(())
684 }
685}
686
687#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
688pub enum GenericSourceConnection<C: ConnectionAccess = InlinedConnection> {
689 Kafka(KafkaSourceConnection<C>),
690 Postgres(PostgresSourceConnection<C>),
691 MySql(MySqlSourceConnection<C>),
692 SqlServer(SqlServerSourceConnection<C>),
693 LoadGenerator(LoadGeneratorSourceConnection),
694}
695
696impl<C: ConnectionAccess> From<KafkaSourceConnection<C>> for GenericSourceConnection<C> {
697 fn from(conn: KafkaSourceConnection<C>) -> Self {
698 Self::Kafka(conn)
699 }
700}
701
702impl<C: ConnectionAccess> From<PostgresSourceConnection<C>> for GenericSourceConnection<C> {
703 fn from(conn: PostgresSourceConnection<C>) -> Self {
704 Self::Postgres(conn)
705 }
706}
707
708impl<C: ConnectionAccess> From<MySqlSourceConnection<C>> for GenericSourceConnection<C> {
709 fn from(conn: MySqlSourceConnection<C>) -> Self {
710 Self::MySql(conn)
711 }
712}
713
714impl<C: ConnectionAccess> From<SqlServerSourceConnection<C>> for GenericSourceConnection<C> {
715 fn from(conn: SqlServerSourceConnection<C>) -> Self {
716 Self::SqlServer(conn)
717 }
718}
719
720impl<C: ConnectionAccess> From<LoadGeneratorSourceConnection> for GenericSourceConnection<C> {
721 fn from(conn: LoadGeneratorSourceConnection) -> Self {
722 Self::LoadGenerator(conn)
723 }
724}
725
726impl<R: ConnectionResolver> IntoInlineConnection<GenericSourceConnection, R>
727 for GenericSourceConnection<ReferencedConnection>
728{
729 fn into_inline_connection(self, r: R) -> GenericSourceConnection {
730 match self {
731 GenericSourceConnection::Kafka(kafka) => {
732 GenericSourceConnection::Kafka(kafka.into_inline_connection(r))
733 }
734 GenericSourceConnection::Postgres(pg) => {
735 GenericSourceConnection::Postgres(pg.into_inline_connection(r))
736 }
737 GenericSourceConnection::MySql(mysql) => {
738 GenericSourceConnection::MySql(mysql.into_inline_connection(r))
739 }
740 GenericSourceConnection::SqlServer(sql_server) => {
741 GenericSourceConnection::SqlServer(sql_server.into_inline_connection(r))
742 }
743 GenericSourceConnection::LoadGenerator(lg) => {
744 GenericSourceConnection::LoadGenerator(lg)
745 }
746 }
747 }
748}
749
750impl<C: ConnectionAccess> SourceConnection for GenericSourceConnection<C> {
751 fn name(&self) -> &'static str {
752 match self {
753 Self::Kafka(conn) => conn.name(),
754 Self::Postgres(conn) => conn.name(),
755 Self::MySql(conn) => conn.name(),
756 Self::SqlServer(conn) => conn.name(),
757 Self::LoadGenerator(conn) => conn.name(),
758 }
759 }
760
761 fn external_reference(&self) -> Option<&str> {
762 match self {
763 Self::Kafka(conn) => conn.external_reference(),
764 Self::Postgres(conn) => conn.external_reference(),
765 Self::MySql(conn) => conn.external_reference(),
766 Self::SqlServer(conn) => conn.external_reference(),
767 Self::LoadGenerator(conn) => conn.external_reference(),
768 }
769 }
770
771 fn default_key_desc(&self) -> RelationDesc {
772 match self {
773 Self::Kafka(conn) => conn.default_key_desc(),
774 Self::Postgres(conn) => conn.default_key_desc(),
775 Self::MySql(conn) => conn.default_key_desc(),
776 Self::SqlServer(conn) => conn.default_key_desc(),
777 Self::LoadGenerator(conn) => conn.default_key_desc(),
778 }
779 }
780
781 fn default_value_desc(&self) -> RelationDesc {
782 match self {
783 Self::Kafka(conn) => conn.default_value_desc(),
784 Self::Postgres(conn) => conn.default_value_desc(),
785 Self::MySql(conn) => conn.default_value_desc(),
786 Self::SqlServer(conn) => conn.default_value_desc(),
787 Self::LoadGenerator(conn) => conn.default_value_desc(),
788 }
789 }
790
791 fn timestamp_desc(&self) -> RelationDesc {
792 match self {
793 Self::Kafka(conn) => conn.timestamp_desc(),
794 Self::Postgres(conn) => conn.timestamp_desc(),
795 Self::MySql(conn) => conn.timestamp_desc(),
796 Self::SqlServer(conn) => conn.timestamp_desc(),
797 Self::LoadGenerator(conn) => conn.timestamp_desc(),
798 }
799 }
800
801 fn connection_id(&self) -> Option<CatalogItemId> {
802 match self {
803 Self::Kafka(conn) => conn.connection_id(),
804 Self::Postgres(conn) => conn.connection_id(),
805 Self::MySql(conn) => conn.connection_id(),
806 Self::SqlServer(conn) => conn.connection_id(),
807 Self::LoadGenerator(conn) => conn.connection_id(),
808 }
809 }
810
811 fn supports_read_only(&self) -> bool {
812 match self {
813 GenericSourceConnection::Kafka(conn) => conn.supports_read_only(),
814 GenericSourceConnection::Postgres(conn) => conn.supports_read_only(),
815 GenericSourceConnection::MySql(conn) => conn.supports_read_only(),
816 GenericSourceConnection::SqlServer(conn) => conn.supports_read_only(),
817 GenericSourceConnection::LoadGenerator(conn) => conn.supports_read_only(),
818 }
819 }
820
821 fn prefers_single_replica(&self) -> bool {
822 match self {
823 GenericSourceConnection::Kafka(conn) => conn.prefers_single_replica(),
824 GenericSourceConnection::Postgres(conn) => conn.prefers_single_replica(),
825 GenericSourceConnection::MySql(conn) => conn.prefers_single_replica(),
826 GenericSourceConnection::SqlServer(conn) => conn.prefers_single_replica(),
827 GenericSourceConnection::LoadGenerator(conn) => conn.prefers_single_replica(),
828 }
829 }
830}
831impl<C: ConnectionAccess> crate::AlterCompatible for GenericSourceConnection<C> {
832 fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
833 if self == other {
834 return Ok(());
835 }
836 let r = match (self, other) {
837 (Self::Kafka(conn), Self::Kafka(other)) => conn.alter_compatible(id, other),
838 (Self::Postgres(conn), Self::Postgres(other)) => conn.alter_compatible(id, other),
839 (Self::MySql(conn), Self::MySql(other)) => conn.alter_compatible(id, other),
840 (Self::SqlServer(conn), Self::SqlServer(other)) => conn.alter_compatible(id, other),
841 (Self::LoadGenerator(conn), Self::LoadGenerator(other)) => {
842 conn.alter_compatible(id, other)
843 }
844 _ => Err(AlterError { id }),
845 };
846
847 if r.is_err() {
848 tracing::warn!(
849 "GenericSourceConnection incompatible:\nself:\n{:#?}\n\nother\n{:#?}",
850 self,
851 other
852 );
853 }
854
855 r
856 }
857}
858
859#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
862pub enum SourceExportDetails {
863 None,
866 Kafka(KafkaSourceExportDetails),
867 Postgres(PostgresSourceExportDetails),
868 MySql(MySqlSourceExportDetails),
869 SqlServer(SqlServerSourceExportDetails),
870 LoadGenerator(LoadGeneratorSourceExportDetails),
871}
872
873impl crate::AlterCompatible for SourceExportDetails {
874 fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
875 if self == other {
876 return Ok(());
877 }
878 let r = match (self, other) {
879 (Self::None, Self::None) => Ok(()),
880 (Self::Kafka(s), Self::Kafka(o)) => s.alter_compatible(id, o),
881 (Self::Postgres(s), Self::Postgres(o)) => s.alter_compatible(id, o),
882 (Self::MySql(s), Self::MySql(o)) => s.alter_compatible(id, o),
883 (Self::LoadGenerator(s), Self::LoadGenerator(o)) => s.alter_compatible(id, o),
884 _ => Err(AlterError { id }),
885 };
886
887 if r.is_err() {
888 tracing::warn!(
889 "SourceExportDetails incompatible:\nself:\n{:#?}\n\nother\n{:#?}",
890 self,
891 other
892 );
893 }
894
895 r
896 }
897}
898
899pub enum SourceExportStatementDetails {
905 Postgres {
906 table: mz_postgres_util::desc::PostgresTableDesc,
907 },
908 MySql {
909 table: mz_mysql_util::MySqlTableDesc,
910 initial_gtid_set: String,
911 },
912 SqlServer {
913 table: mz_sql_server_util::desc::SqlServerTableDesc,
914 capture_instance: Arc<str>,
915 initial_lsn: mz_sql_server_util::cdc::Lsn,
916 },
917 LoadGenerator {
918 output: LoadGeneratorOutput,
919 },
920 Kafka {},
921}
922
923impl RustType<ProtoSourceExportStatementDetails> for SourceExportStatementDetails {
924 fn into_proto(&self) -> ProtoSourceExportStatementDetails {
925 match self {
926 SourceExportStatementDetails::Postgres { table } => ProtoSourceExportStatementDetails {
927 kind: Some(proto_source_export_statement_details::Kind::Postgres(
928 postgres::ProtoPostgresSourceExportStatementDetails {
929 table: Some(table.into_proto()),
930 },
931 )),
932 },
933 SourceExportStatementDetails::MySql {
934 table,
935 initial_gtid_set,
936 } => ProtoSourceExportStatementDetails {
937 kind: Some(proto_source_export_statement_details::Kind::Mysql(
938 mysql::ProtoMySqlSourceExportStatementDetails {
939 table: Some(table.into_proto()),
940 initial_gtid_set: initial_gtid_set.clone(),
941 },
942 )),
943 },
944 SourceExportStatementDetails::SqlServer {
945 table,
946 capture_instance,
947 initial_lsn,
948 } => ProtoSourceExportStatementDetails {
949 kind: Some(proto_source_export_statement_details::Kind::SqlServer(
950 sql_server::ProtoSqlServerSourceExportStatementDetails {
951 table: Some(table.into_proto()),
952 capture_instance: capture_instance.to_string(),
953 initial_lsn: initial_lsn.as_bytes().to_vec(),
954 },
955 )),
956 },
957 SourceExportStatementDetails::LoadGenerator { output } => {
958 ProtoSourceExportStatementDetails {
959 kind: Some(proto_source_export_statement_details::Kind::Loadgen(
960 load_generator::ProtoLoadGeneratorSourceExportStatementDetails {
961 output: output.into_proto().into(),
962 },
963 )),
964 }
965 }
966 SourceExportStatementDetails::Kafka {} => ProtoSourceExportStatementDetails {
967 kind: Some(proto_source_export_statement_details::Kind::Kafka(
968 kafka::ProtoKafkaSourceExportStatementDetails {},
969 )),
970 },
971 }
972 }
973
974 fn from_proto(proto: ProtoSourceExportStatementDetails) -> Result<Self, TryFromProtoError> {
975 use proto_source_export_statement_details::Kind;
976 Ok(match proto.kind {
977 Some(Kind::Postgres(details)) => SourceExportStatementDetails::Postgres {
978 table: details
979 .table
980 .into_rust_if_some("ProtoPostgresSourceExportStatementDetails::table")?,
981 },
982 Some(Kind::Mysql(details)) => SourceExportStatementDetails::MySql {
983 table: details
984 .table
985 .into_rust_if_some("ProtoMySqlSourceExportStatementDetails::table")?,
986
987 initial_gtid_set: details.initial_gtid_set,
988 },
989 Some(Kind::SqlServer(details)) => SourceExportStatementDetails::SqlServer {
990 table: details
991 .table
992 .into_rust_if_some("ProtoSqlServerSourceExportStatementDetails::table")?,
993 capture_instance: details.capture_instance.into(),
994 initial_lsn: mz_sql_server_util::cdc::Lsn::try_from(details.initial_lsn.as_slice())
995 .map_err(|e| TryFromProtoError::InvalidFieldError(e.to_string()))?,
996 },
997 Some(Kind::Loadgen(details)) => SourceExportStatementDetails::LoadGenerator {
998 output: details
999 .output
1000 .into_rust_if_some("ProtoLoadGeneratorSourceExportStatementDetails::output")?,
1001 },
1002 Some(Kind::Kafka(_details)) => SourceExportStatementDetails::Kafka {},
1003 None => {
1004 return Err(TryFromProtoError::missing_field(
1005 "ProtoSourceExportStatementDetails::kind",
1006 ));
1007 }
1008 })
1009 }
1010}
1011
1012#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
1013#[repr(transparent)]
1014pub struct SourceData(pub Result<Row, DataflowError>);
1015
1016impl Default for SourceData {
1017 fn default() -> Self {
1018 SourceData(Ok(Row::default()))
1019 }
1020}
1021
1022impl Deref for SourceData {
1023 type Target = Result<Row, DataflowError>;
1024
1025 fn deref(&self) -> &Self::Target {
1026 &self.0
1027 }
1028}
1029
1030impl DerefMut for SourceData {
1031 fn deref_mut(&mut self) -> &mut Self::Target {
1032 &mut self.0
1033 }
1034}
1035
1036impl RustType<ProtoSourceData> for SourceData {
1037 fn into_proto(&self) -> ProtoSourceData {
1038 use proto_source_data::Kind;
1039 ProtoSourceData {
1040 kind: Some(match &**self {
1041 Ok(row) => Kind::Ok(row.into_proto()),
1042 Err(err) => Kind::Err(err.into_proto()),
1043 }),
1044 }
1045 }
1046
1047 fn from_proto(proto: ProtoSourceData) -> Result<Self, TryFromProtoError> {
1048 use proto_source_data::Kind;
1049 match proto.kind {
1050 Some(kind) => match kind {
1051 Kind::Ok(row) => Ok(SourceData(Ok(row.into_rust()?))),
1052 Kind::Err(err) => Ok(SourceData(Err(err.into_rust()?))),
1053 },
1054 None => Result::Err(TryFromProtoError::missing_field("ProtoSourceData::kind")),
1055 }
1056 }
1057}
1058
1059impl Codec for SourceData {
1060 type Storage = ProtoRow;
1061 type Schema = RelationDesc;
1062
1063 fn codec_name() -> String {
1064 "protobuf[SourceData]".into()
1065 }
1066
1067 fn encode<B: BufMut>(&self, buf: &mut B) {
1068 self.into_proto()
1069 .encode(buf)
1070 .expect("no required fields means no initialization errors");
1071 }
1072
1073 fn decode(buf: &[u8], schema: &RelationDesc) -> Result<Self, String> {
1074 let mut val = SourceData::default();
1075 <Self as Codec>::decode_from(&mut val, buf, &mut None, schema)?;
1076 Ok(val)
1077 }
1078
1079 fn decode_from<'a>(
1080 &mut self,
1081 buf: &'a [u8],
1082 storage: &mut Option<ProtoRow>,
1083 schema: &RelationDesc,
1084 ) -> Result<(), String> {
1085 let mut proto = storage.take().unwrap_or_default();
1089 proto.clear();
1090 let mut proto = ProtoSourceData {
1091 kind: Some(proto_source_data::Kind::Ok(proto)),
1092 };
1093 proto.merge(buf).map_err(|err| err.to_string())?;
1094 match (proto.kind, &mut self.0) {
1095 (Some(proto_source_data::Kind::Ok(proto)), Ok(row)) => {
1097 let ret = row.decode_from_proto(&proto, schema);
1098 storage.replace(proto);
1099 ret
1100 }
1101 (kind, _) => {
1103 let proto = ProtoSourceData { kind };
1104 *self = proto.into_rust().map_err(|err| err.to_string())?;
1105 Ok(())
1107 }
1108 }
1109 }
1110
1111 fn validate(val: &Self, desc: &Self::Schema) -> Result<(), String> {
1112 match &val.0 {
1113 Ok(row) => Row::validate(row, desc),
1114 Err(_) => Ok(()),
1115 }
1116 }
1117
1118 fn encode_schema(schema: &Self::Schema) -> Bytes {
1119 schema.into_proto().encode_to_vec().into()
1120 }
1121
1122 fn decode_schema(buf: &Bytes) -> Self::Schema {
1123 let proto = ProtoRelationDesc::decode(buf.as_ref()).expect("valid schema");
1124 proto.into_rust().expect("valid schema")
1125 }
1126}
1127
1128pub fn arb_source_data_for_relation_desc(
1130 desc: &RelationDesc,
1131) -> impl Strategy<Value = SourceData> + use<> {
1132 let row_strat = arb_row_for_relation(desc).no_shrink();
1133
1134 proptest::strategy::Union::new_weighted(vec![
1135 (50, row_strat.prop_map(|row| SourceData(Ok(row))).boxed()),
1136 (
1137 1,
1138 any::<DataflowError>()
1139 .prop_map(|err| SourceData(Err(err)))
1140 .no_shrink()
1141 .boxed(),
1142 ),
1143 ])
1144}
1145
1146pub trait ExternalCatalogReference {
1154 fn schema_name(&self) -> &str;
1156 fn item_name(&self) -> &str;
1158}
1159
1160impl ExternalCatalogReference for &mz_mysql_util::MySqlTableDesc {
1161 fn schema_name(&self) -> &str {
1162 &self.schema_name
1163 }
1164
1165 fn item_name(&self) -> &str {
1166 &self.name
1167 }
1168}
1169
1170impl ExternalCatalogReference for mz_postgres_util::desc::PostgresTableDesc {
1171 fn schema_name(&self) -> &str {
1172 &self.namespace
1173 }
1174
1175 fn item_name(&self) -> &str {
1176 &self.name
1177 }
1178}
1179
1180impl ExternalCatalogReference for &mz_sql_server_util::desc::SqlServerTableDesc {
1181 fn schema_name(&self) -> &str {
1182 &*self.schema_name
1183 }
1184
1185 fn item_name(&self) -> &str {
1186 &*self.name
1187 }
1188}
1189
1190impl<'a> ExternalCatalogReference for (&'a str, &'a str) {
1193 fn schema_name(&self) -> &str {
1194 self.0
1195 }
1196
1197 fn item_name(&self) -> &str {
1198 self.1
1199 }
1200}
1201
1202#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
1210pub struct SourceReferenceResolver {
1211 inner: BTreeMap<Ident, BTreeMap<Ident, BTreeMap<Ident, usize>>>,
1212}
1213
1214#[derive(Debug, Clone, thiserror::Error)]
1215pub enum ExternalReferenceResolutionError {
1216 #[error("reference to {name} not found in source")]
1217 DoesNotExist { name: String },
1218 #[error(
1219 "reference {name} is ambiguous, consider specifying an additional \
1220 layer of qualification"
1221 )]
1222 Ambiguous { name: String },
1223 #[error("invalid identifier: {0}")]
1224 Ident(#[from] IdentError),
1225}
1226
1227impl<'a> SourceReferenceResolver {
1228 pub fn new<T: ExternalCatalogReference>(
1234 database: &str,
1235 referenceable_items: &'a [T],
1236 ) -> Result<SourceReferenceResolver, ExternalReferenceResolutionError> {
1237 let mut inner = BTreeMap::new();
1240
1241 let database = Ident::new(database)?;
1242
1243 for (reference_idx, item) in referenceable_items.iter().enumerate() {
1244 let item_name = Ident::new(item.item_name())?;
1245 let schema_name = Ident::new(item.schema_name())?;
1246
1247 inner
1248 .entry(item_name)
1249 .or_insert_with(BTreeMap::new)
1250 .entry(schema_name)
1251 .or_insert_with(BTreeMap::new)
1252 .entry(database.clone())
1253 .or_insert(reference_idx);
1254 }
1255
1256 Ok(SourceReferenceResolver { inner })
1257 }
1258
1259 pub fn resolve(
1276 &self,
1277 name: &[Ident],
1278 canonicalize_to_width: usize,
1279 ) -> Result<(UnresolvedItemName, usize), ExternalReferenceResolutionError> {
1280 let (db, schema, idx) = self.resolve_inner(name)?;
1281
1282 let item = name.last().expect("must have provided at least 1 element");
1283
1284 let canonical_name = match canonicalize_to_width {
1285 1 => vec![item.clone()],
1286 2 => vec![schema.clone(), item.clone()],
1287 3 => vec![db.clone(), schema.clone(), item.clone()],
1288 o => panic!("canonicalize_to_width values must be 1..=3, but got {}", o),
1289 };
1290
1291 Ok((UnresolvedItemName(canonical_name), idx))
1292 }
1293
1294 pub fn resolve_idx(&self, name: &[Ident]) -> Result<usize, ExternalReferenceResolutionError> {
1304 let (_db, _schema, idx) = self.resolve_inner(name)?;
1305 Ok(idx)
1306 }
1307
1308 fn resolve_inner<'name: 'a>(
1325 &'a self,
1326 name: &'name [Ident],
1327 ) -> Result<(&'a Ident, &'a Ident, usize), ExternalReferenceResolutionError> {
1328 let get_provided_name = || UnresolvedItemName(name.to_vec()).to_string();
1329
1330 if !(1..=3).contains(&name.len()) {
1332 Err(ExternalReferenceResolutionError::DoesNotExist {
1333 name: get_provided_name(),
1334 })?;
1335 }
1336
1337 let mut names = std::iter::repeat(None)
1339 .take(3 - name.len())
1340 .chain(name.iter().map(Some));
1341
1342 let database = names.next().flatten();
1343 let schema = names.next().flatten();
1344 let item = names
1345 .next()
1346 .flatten()
1347 .expect("must have provided the item name");
1348
1349 assert_none!(names.next(), "expected a 3-element iterator");
1350
1351 let schemas =
1352 self.inner
1353 .get(item)
1354 .ok_or_else(|| ExternalReferenceResolutionError::DoesNotExist {
1355 name: get_provided_name(),
1356 })?;
1357
1358 let schema = match schema {
1359 Some(schema) => schema,
1360 None => schemas.keys().exactly_one().map_err(|_e| {
1361 ExternalReferenceResolutionError::Ambiguous {
1362 name: get_provided_name(),
1363 }
1364 })?,
1365 };
1366
1367 let databases =
1368 schemas
1369 .get(schema)
1370 .ok_or_else(|| ExternalReferenceResolutionError::DoesNotExist {
1371 name: get_provided_name(),
1372 })?;
1373
1374 let database = match database {
1375 Some(database) => database,
1376 None => databases.keys().exactly_one().map_err(|_e| {
1377 ExternalReferenceResolutionError::Ambiguous {
1378 name: get_provided_name(),
1379 }
1380 })?,
1381 };
1382
1383 let reference_idx = databases.get(database).ok_or_else(|| {
1384 ExternalReferenceResolutionError::DoesNotExist {
1385 name: get_provided_name(),
1386 }
1387 })?;
1388
1389 Ok((database, schema, *reference_idx))
1390 }
1391}
1392
1393#[derive(Debug)]
1399pub enum SourceDataRowColumnarDecoder {
1400 Row(RowColumnarDecoder),
1401 EmptyRow,
1402}
1403
1404impl SourceDataRowColumnarDecoder {
1405 pub fn decode(&self, idx: usize, row: &mut Row) {
1406 match self {
1407 SourceDataRowColumnarDecoder::Row(decoder) => decoder.decode(idx, row),
1408 SourceDataRowColumnarDecoder::EmptyRow => {
1409 row.packer();
1411 }
1412 }
1413 }
1414
1415 pub fn goodbytes(&self) -> usize {
1416 match self {
1417 SourceDataRowColumnarDecoder::Row(decoder) => decoder.goodbytes(),
1418 SourceDataRowColumnarDecoder::EmptyRow => 0,
1419 }
1420 }
1421}
1422
1423#[derive(Debug)]
1424pub struct SourceDataColumnarDecoder {
1425 row_decoder: SourceDataRowColumnarDecoder,
1426 err_decoder: BinaryArray,
1427}
1428
1429impl SourceDataColumnarDecoder {
1430 pub fn new(col: StructArray, desc: &RelationDesc) -> Result<Self, anyhow::Error> {
1431 let (_fields, arrays, nullability) = col.into_parts();
1433
1434 if nullability.is_some() {
1435 anyhow::bail!("SourceData is not nullable, but found {nullability:?}");
1436 }
1437 if arrays.len() != 2 {
1438 anyhow::bail!("SourceData should only have two fields, found {arrays:?}");
1439 }
1440
1441 let errs = arrays[1]
1442 .as_any()
1443 .downcast_ref::<BinaryArray>()
1444 .ok_or_else(|| anyhow::anyhow!("expected BinaryArray, found {:?}", arrays[1]))?;
1445
1446 let row_decoder = match arrays[0].data_type() {
1447 arrow::datatypes::DataType::Struct(_) => {
1448 let rows = arrays[0]
1449 .as_any()
1450 .downcast_ref::<StructArray>()
1451 .ok_or_else(|| {
1452 anyhow::anyhow!("expected StructArray, found {:?}", arrays[0])
1453 })?;
1454 let decoder = RowColumnarDecoder::new(rows.clone(), desc)?;
1455 SourceDataRowColumnarDecoder::Row(decoder)
1456 }
1457 arrow::datatypes::DataType::Null => SourceDataRowColumnarDecoder::EmptyRow,
1458 other => anyhow::bail!("expected Struct or Null Array, found {other:?}"),
1459 };
1460
1461 Ok(SourceDataColumnarDecoder {
1462 row_decoder,
1463 err_decoder: errs.clone(),
1464 })
1465 }
1466}
1467
1468impl ColumnDecoder<SourceData> for SourceDataColumnarDecoder {
1469 fn decode(&self, idx: usize, val: &mut SourceData) {
1470 let err_null = self.err_decoder.is_null(idx);
1471 let row_null = match &self.row_decoder {
1472 SourceDataRowColumnarDecoder::Row(decoder) => decoder.is_null(idx),
1473 SourceDataRowColumnarDecoder::EmptyRow => !err_null,
1474 };
1475
1476 match (row_null, err_null) {
1477 (true, false) => {
1478 let err = self.err_decoder.value(idx);
1479 let err = ProtoDataflowError::decode(err)
1480 .expect("proto should be valid")
1481 .into_rust()
1482 .expect("error should be valid");
1483 val.0 = Err(err);
1484 }
1485 (false, true) => {
1486 let row = match val.0.as_mut() {
1487 Ok(row) => row,
1488 Err(_) => {
1489 val.0 = Ok(Row::default());
1490 val.0.as_mut().unwrap()
1491 }
1492 };
1493 self.row_decoder.decode(idx, row);
1494 }
1495 (true, true) => panic!("should have one of 'ok' or 'err'"),
1496 (false, false) => panic!("cannot have both 'ok' and 'err'"),
1497 }
1498 }
1499
1500 fn is_null(&self, idx: usize) -> bool {
1501 let err_null = self.err_decoder.is_null(idx);
1502 let row_null = match &self.row_decoder {
1503 SourceDataRowColumnarDecoder::Row(decoder) => decoder.is_null(idx),
1504 SourceDataRowColumnarDecoder::EmptyRow => !err_null,
1505 };
1506 assert!(!err_null || !row_null, "SourceData should never be null!");
1507
1508 false
1509 }
1510
1511 fn goodbytes(&self) -> usize {
1512 self.row_decoder.goodbytes() + ArrayOrd::Binary(self.err_decoder.clone()).goodbytes()
1513 }
1514
1515 fn stats(&self) -> StructStats {
1516 let len = self.err_decoder.len();
1517 let err_stats = ColumnarStats {
1518 nulls: Some(ColumnNullStats {
1519 count: self.err_decoder.null_count(),
1520 }),
1521 values: PrimitiveStats::<Vec<u8>>::from_column(&self.err_decoder).into(),
1522 };
1523 let row_null_count = len - self.err_decoder.null_count();
1528 let row_stats = match &self.row_decoder {
1529 SourceDataRowColumnarDecoder::Row(encoder) => {
1530 assert_eq!(encoder.null_count(), row_null_count);
1534 encoder.stats()
1535 }
1536 SourceDataRowColumnarDecoder::EmptyRow => StructStats {
1537 len,
1538 cols: BTreeMap::default(),
1539 },
1540 };
1541 let row_stats = ColumnarStats {
1542 nulls: Some(ColumnNullStats {
1543 count: row_null_count,
1544 }),
1545 values: ColumnStatKinds::Struct(row_stats),
1546 };
1547
1548 let stats = [
1549 (
1550 SourceDataColumnarEncoder::OK_COLUMN_NAME.to_string(),
1551 row_stats,
1552 ),
1553 (
1554 SourceDataColumnarEncoder::ERR_COLUMN_NAME.to_string(),
1555 err_stats,
1556 ),
1557 ];
1558 StructStats {
1559 len,
1560 cols: stats.into_iter().map(|(name, s)| (name, s)).collect(),
1561 }
1562 }
1563}
1564
1565#[derive(Debug)]
1572pub enum SourceDataRowColumnarEncoder {
1573 Row(RowColumnarEncoder),
1574 EmptyRow,
1575}
1576
1577impl SourceDataRowColumnarEncoder {
1578 pub(crate) fn goodbytes(&self) -> usize {
1579 match self {
1580 SourceDataRowColumnarEncoder::Row(e) => e.goodbytes(),
1581 SourceDataRowColumnarEncoder::EmptyRow => 0,
1582 }
1583 }
1584
1585 pub fn append(&mut self, row: &Row) {
1586 match self {
1587 SourceDataRowColumnarEncoder::Row(encoder) => encoder.append(row),
1588 SourceDataRowColumnarEncoder::EmptyRow => {
1589 assert_eq!(row.iter().count(), 0)
1590 }
1591 }
1592 }
1593
1594 pub fn append_null(&mut self) {
1595 match self {
1596 SourceDataRowColumnarEncoder::Row(encoder) => encoder.append_null(),
1597 SourceDataRowColumnarEncoder::EmptyRow => (),
1598 }
1599 }
1600}
1601
1602#[derive(Debug)]
1603pub struct SourceDataColumnarEncoder {
1604 row_encoder: SourceDataRowColumnarEncoder,
1605 err_encoder: BinaryBuilder,
1606}
1607
1608impl SourceDataColumnarEncoder {
1609 const OK_COLUMN_NAME: &'static str = "ok";
1610 const ERR_COLUMN_NAME: &'static str = "err";
1611
1612 pub fn new(desc: &RelationDesc) -> Self {
1613 let row_encoder = match RowColumnarEncoder::new(desc) {
1614 Some(encoder) => SourceDataRowColumnarEncoder::Row(encoder),
1615 None => {
1616 assert!(desc.typ().columns().is_empty());
1617 SourceDataRowColumnarEncoder::EmptyRow
1618 }
1619 };
1620 let err_encoder = BinaryBuilder::new();
1621
1622 SourceDataColumnarEncoder {
1623 row_encoder,
1624 err_encoder,
1625 }
1626 }
1627}
1628
1629impl ColumnEncoder<SourceData> for SourceDataColumnarEncoder {
1630 type FinishedColumn = StructArray;
1631
1632 fn goodbytes(&self) -> usize {
1633 self.row_encoder.goodbytes() + self.err_encoder.values_slice().len()
1634 }
1635
1636 #[inline]
1637 fn append(&mut self, val: &SourceData) {
1638 match val.0.as_ref() {
1639 Ok(row) => {
1640 self.row_encoder.append(row);
1641 self.err_encoder.append_null();
1642 }
1643 Err(err) => {
1644 self.row_encoder.append_null();
1645 self.err_encoder
1646 .append_value(err.into_proto().encode_to_vec());
1647 }
1648 }
1649 }
1650
1651 #[inline]
1652 fn append_null(&mut self) {
1653 panic!("appending a null into SourceDataColumnarEncoder is not supported");
1654 }
1655
1656 fn finish(self) -> Self::FinishedColumn {
1657 let SourceDataColumnarEncoder {
1658 row_encoder,
1659 mut err_encoder,
1660 } = self;
1661
1662 let err_column = BinaryBuilder::finish(&mut err_encoder);
1663 let row_column: ArrayRef = match row_encoder {
1664 SourceDataRowColumnarEncoder::Row(encoder) => {
1665 let column = encoder.finish();
1666 Arc::new(column)
1667 }
1668 SourceDataRowColumnarEncoder::EmptyRow => Arc::new(NullArray::new(err_column.len())),
1669 };
1670
1671 assert_eq!(row_column.len(), err_column.len());
1672
1673 let fields = vec![
1674 Field::new(Self::OK_COLUMN_NAME, row_column.data_type().clone(), true),
1675 Field::new(Self::ERR_COLUMN_NAME, err_column.data_type().clone(), true),
1676 ];
1677 let arrays: Vec<Arc<dyn Array>> = vec![row_column, Arc::new(err_column)];
1678 StructArray::new(Fields::from(fields), arrays, None)
1679 }
1680}
1681
1682impl Schema<SourceData> for RelationDesc {
1683 type ArrowColumn = StructArray;
1684 type Statistics = StructStats;
1685
1686 type Decoder = SourceDataColumnarDecoder;
1687 type Encoder = SourceDataColumnarEncoder;
1688
1689 fn decoder(&self, col: Self::ArrowColumn) -> Result<Self::Decoder, anyhow::Error> {
1690 SourceDataColumnarDecoder::new(col, self)
1691 }
1692
1693 fn encoder(&self) -> Result<Self::Encoder, anyhow::Error> {
1694 Ok(SourceDataColumnarEncoder::new(self))
1695 }
1696}
1697
1698#[cfg(test)]
1699mod tests {
1700 use arrow::array::{ArrayData, make_comparator};
1701 use base64::Engine;
1702 use bytes::Bytes;
1703 use mz_expr::EvalError;
1704 use mz_ore::assert_err;
1705 use mz_ore::metrics::MetricsRegistry;
1706 use mz_persist::indexed::columnar::arrow::{realloc_any, realloc_array};
1707 use mz_persist::metrics::ColumnarMetrics;
1708 use mz_persist_types::parquet::EncodingConfig;
1709 use mz_persist_types::schema::{Migration, backward_compatible};
1710 use mz_persist_types::stats::{PartStats, PartStatsMetrics};
1711 use mz_repr::{
1712 ColumnIndex, DatumVec, PropRelationDescDiff, ProtoRelationDesc, RelationDescBuilder,
1713 RowArena, SqlScalarType, arb_relation_desc_diff, arb_relation_desc_projection,
1714 };
1715 use proptest::prelude::*;
1716 use proptest::strategy::{Union, ValueTree};
1717
1718 use crate::stats::RelationPartStats;
1719
1720 use super::*;
1721
1722 #[mz_ore::test]
1723 fn test_timeline_parsing() {
1724 assert_eq!(Ok(Timeline::EpochMilliseconds), "M".parse());
1725 assert_eq!(Ok(Timeline::External("JOE".to_string())), "E.JOE".parse());
1726 assert_eq!(Ok(Timeline::User("MIKE".to_string())), "U.MIKE".parse());
1727
1728 assert_err!("Materialize".parse::<Timeline>());
1729 assert_err!("Ejoe".parse::<Timeline>());
1730 assert_err!("Umike".parse::<Timeline>());
1731 assert_err!("Dance".parse::<Timeline>());
1732 assert_err!("".parse::<Timeline>());
1733 }
1734
1735 #[track_caller]
1736 fn roundtrip_source_data(
1737 desc: &RelationDesc,
1738 datas: Vec<SourceData>,
1739 read_desc: &RelationDesc,
1740 config: &EncodingConfig,
1741 ) {
1742 let metrics = ColumnarMetrics::disconnected();
1743 let mut encoder = <RelationDesc as Schema<SourceData>>::encoder(desc).unwrap();
1744 for data in &datas {
1745 encoder.append(data);
1746 }
1747 let col = encoder.finish();
1748
1749 assert!(!col.is_nullable());
1751
1752 let col = realloc_array(&col, &metrics);
1754
1755 {
1757 let proto = col.to_data().into_proto();
1758 let bytes = proto.encode_to_vec();
1759 let proto = mz_persist_types::arrow::ProtoArrayData::decode(&bytes[..]).unwrap();
1760 let array_data: ArrayData = proto.into_rust().unwrap();
1761
1762 let col_rnd = StructArray::from(array_data.clone());
1763 assert_eq!(col, col_rnd);
1764
1765 let col_dyn = arrow::array::make_array(array_data);
1766 let col_dyn = col_dyn.as_any().downcast_ref::<StructArray>().unwrap();
1767 assert_eq!(&col, col_dyn);
1768 }
1769
1770 let mut buf = Vec::new();
1772 let fields = Fields::from(vec![Field::new("k", col.data_type().clone(), false)]);
1773 let arrays: Vec<Arc<dyn Array>> = vec![Arc::new(col.clone())];
1774 mz_persist_types::parquet::encode_arrays(&mut buf, fields, arrays, config).unwrap();
1775
1776 let buf = Bytes::from(buf);
1778 let mut reader = mz_persist_types::parquet::decode_arrays(buf).unwrap();
1779 let maybe_batch = reader.next();
1780
1781 let Some(record_batch) = maybe_batch else {
1783 assert!(datas.is_empty());
1784 return;
1785 };
1786 let record_batch = record_batch.unwrap();
1787
1788 assert_eq!(record_batch.columns().len(), 1);
1789 let rnd_col = &record_batch.columns()[0];
1790 let rnd_col = realloc_any(Arc::clone(rnd_col), &metrics);
1791 let rnd_col = rnd_col
1792 .as_any()
1793 .downcast_ref::<StructArray>()
1794 .unwrap()
1795 .clone();
1796
1797 let stats = <RelationDesc as Schema<SourceData>>::decoder_any(desc, &rnd_col)
1799 .expect("valid decoder")
1800 .stats();
1801
1802 let mut rnd_data = SourceData(Ok(Row::default()));
1804 let decoder = <RelationDesc as Schema<SourceData>>::decoder(desc, rnd_col.clone()).unwrap();
1805 for (idx, og_data) in datas.iter().enumerate() {
1806 decoder.decode(idx, &mut rnd_data);
1807 assert_eq!(og_data, &rnd_data);
1808 }
1809
1810 let stats_metrics = PartStatsMetrics::new(&MetricsRegistry::new());
1813 let stats = RelationPartStats {
1814 name: "test",
1815 metrics: &stats_metrics,
1816 stats: &PartStats { key: stats },
1817 desc: read_desc,
1818 };
1819 let mut datum_vec = DatumVec::new();
1820 let arena = RowArena::default();
1821 let decoder = <RelationDesc as Schema<SourceData>>::decoder(read_desc, rnd_col).unwrap();
1822
1823 for (idx, og_data) in datas.iter().enumerate() {
1824 decoder.decode(idx, &mut rnd_data);
1825 match (&og_data.0, &rnd_data.0) {
1826 (Ok(og_row), Ok(rnd_row)) => {
1827 {
1829 let datums = datum_vec.borrow_with(og_row);
1830 let projected_datums =
1831 datums.iter().enumerate().filter_map(|(idx, datum)| {
1832 read_desc
1833 .contains_index(&ColumnIndex::from_raw(idx))
1834 .then_some(datum)
1835 });
1836 let og_projected_row = Row::pack(projected_datums);
1837 assert_eq!(&og_projected_row, rnd_row);
1838 }
1839
1840 {
1842 let proj_datums = datum_vec.borrow_with(rnd_row);
1843 for (pos, (idx, _, _)) in read_desc.iter_all().enumerate() {
1844 let spec = stats.col_stats(idx, &arena);
1845 assert!(spec.may_contain(proj_datums[pos]));
1846 }
1847 }
1848 }
1849 (Err(_), Err(_)) => assert_eq!(og_data, &rnd_data),
1850 (_, _) => panic!("decoded to a different type? {og_data:?} {rnd_data:?}"),
1851 }
1852 }
1853
1854 let encoded_schema = SourceData::encode_schema(desc);
1857 let roundtrip_desc = SourceData::decode_schema(&encoded_schema);
1858 assert_eq!(desc, &roundtrip_desc);
1859
1860 let migration =
1863 mz_persist_types::schema::backward_compatible(col.data_type(), col.data_type());
1864 let migration = migration.expect("should be backward compatible with self");
1865 let migrated = migration.migrate(Arc::new(col.clone()));
1867 assert_eq!(col.data_type(), migrated.data_type());
1868 }
1869
1870 #[mz_ore::test]
1871 #[cfg_attr(miri, ignore)] fn all_source_data_roundtrips() {
1873 let mut weights = vec![(500, Just(0..8)), (50, Just(8..32))];
1874 if std::env::var("PROPTEST_LARGE_DATA").is_ok() {
1875 weights.extend([
1876 (10, Just(32..128)),
1877 (5, Just(128..512)),
1878 (3, Just(512..2048)),
1879 (1, Just(2048..8192)),
1880 ]);
1881 }
1882 let num_rows = Union::new_weighted(weights);
1883
1884 let strat = (any::<RelationDesc>(), num_rows)
1886 .prop_flat_map(|(desc, num_rows)| {
1887 arb_relation_desc_projection(desc.clone())
1888 .prop_map(move |read_desc| (desc.clone(), read_desc, num_rows.clone()))
1889 })
1890 .prop_flat_map(|(desc, read_desc, num_rows)| {
1891 proptest::collection::vec(arb_source_data_for_relation_desc(&desc), num_rows)
1892 .prop_map(move |datas| (desc.clone(), datas, read_desc.clone()))
1893 });
1894
1895 let combined_strat = (any::<EncodingConfig>(), strat);
1896 proptest!(|((config, (desc, source_datas, read_desc)) in combined_strat)| {
1897 roundtrip_source_data(&desc, source_datas, &read_desc, &config);
1898 });
1899 }
1900
1901 #[mz_ore::test]
1902 fn roundtrip_error_nulls() {
1903 let desc = RelationDescBuilder::default()
1904 .with_column(
1905 "ts",
1906 SqlScalarType::TimestampTz { precision: None }.nullable(false),
1907 )
1908 .finish();
1909 let source_datas = vec![SourceData(Err(DataflowError::EvalError(
1910 EvalError::DateOutOfRange.into(),
1911 )))];
1912 let config = EncodingConfig::default();
1913 roundtrip_source_data(&desc, source_datas, &desc, &config);
1914 }
1915
1916 fn is_sorted(array: &dyn Array) -> bool {
1917 let sort_options = arrow::compute::SortOptions::default();
1918 let Ok(cmp) = make_comparator(array, array, sort_options) else {
1919 return false;
1925 };
1926 (0..array.len())
1927 .tuple_windows()
1928 .all(|(i, j)| cmp(i, j).is_le())
1929 }
1930
1931 fn get_data_type(schema: &impl Schema<SourceData>) -> arrow::datatypes::DataType {
1932 use mz_persist_types::columnar::ColumnEncoder;
1933 let array = Schema::encoder(schema).expect("valid schema").finish();
1934 Array::data_type(&array).clone()
1935 }
1936
1937 #[track_caller]
1938 fn backward_compatible_testcase(
1939 old: &RelationDesc,
1940 new: &RelationDesc,
1941 migration: Migration,
1942 datas: &[SourceData],
1943 ) {
1944 let mut encoder = Schema::<SourceData>::encoder(old).expect("valid schema");
1945 for data in datas {
1946 encoder.append(data);
1947 }
1948 let old = encoder.finish();
1949 let new = Schema::<SourceData>::encoder(new)
1950 .expect("valid schema")
1951 .finish();
1952 let old: Arc<dyn Array> = Arc::new(old);
1953 let new: Arc<dyn Array> = Arc::new(new);
1954 let migrated = migration.migrate(Arc::clone(&old));
1955 assert_eq!(migrated.data_type(), new.data_type());
1956
1957 if migration.preserves_order() && is_sorted(&old) {
1959 assert!(is_sorted(&new))
1960 }
1961 }
1962
1963 #[mz_ore::test]
1964 fn backward_compatible_empty_add_column() {
1965 let old = RelationDesc::empty();
1966 let new = RelationDesc::from_names_and_types([("a", SqlScalarType::Bool.nullable(true))]);
1967
1968 let old_data_type = get_data_type(&old);
1969 let new_data_type = get_data_type(&new);
1970
1971 let migration = backward_compatible(&old_data_type, &new_data_type);
1972 assert!(migration.is_some());
1973 }
1974
1975 #[mz_ore::test]
1976 fn backward_compatible_project_away_all() {
1977 let old = RelationDesc::from_names_and_types([("a", SqlScalarType::Bool.nullable(true))]);
1978 let new = RelationDesc::empty();
1979
1980 let old_data_type = get_data_type(&old);
1981 let new_data_type = get_data_type(&new);
1982
1983 let migration = backward_compatible(&old_data_type, &new_data_type);
1984 assert!(migration.is_some());
1985 }
1986
1987 #[mz_ore::test]
1988 #[cfg_attr(miri, ignore)]
1989 fn backward_compatible_migrate() {
1990 let strat = (any::<RelationDesc>(), any::<RelationDesc>()).prop_flat_map(|(old, new)| {
1991 proptest::collection::vec(arb_source_data_for_relation_desc(&old), 2)
1992 .prop_map(move |datas| (old.clone(), new.clone(), datas))
1993 });
1994
1995 proptest!(|((old, new, datas) in strat)| {
1996 let old_data_type = get_data_type(&old);
1997 let new_data_type = get_data_type(&new);
1998
1999 if let Some(migration) = backward_compatible(&old_data_type, &new_data_type) {
2000 backward_compatible_testcase(&old, &new, migration, &datas);
2001 };
2002 });
2003 }
2004
2005 #[mz_ore::test]
2006 #[cfg_attr(miri, ignore)]
2007 fn backward_compatible_migrate_from_common() {
2008 use mz_repr::SqlColumnType;
2009 fn test_case(old: RelationDesc, diffs: Vec<PropRelationDescDiff>, datas: Vec<SourceData>) {
2010 let should_be_compatible = diffs.iter().all(|diff| match diff {
2012 PropRelationDescDiff::AddColumn {
2014 typ: SqlColumnType { nullable, .. },
2015 ..
2016 } => *nullable,
2017 PropRelationDescDiff::DropColumn { .. } => true,
2018 _ => false,
2019 });
2020
2021 let mut new = old.clone();
2022 for diff in diffs.into_iter() {
2023 diff.apply(&mut new)
2024 }
2025
2026 let old_data_type = get_data_type(&old);
2027 let new_data_type = get_data_type(&new);
2028
2029 if let Some(migration) = backward_compatible(&old_data_type, &new_data_type) {
2030 backward_compatible_testcase(&old, &new, migration, &datas);
2031 } else if should_be_compatible {
2032 panic!("new DataType was not compatible when it should have been!");
2033 }
2034 }
2035
2036 let strat = any::<RelationDesc>()
2037 .prop_flat_map(|desc| {
2038 proptest::collection::vec(arb_source_data_for_relation_desc(&desc), 2)
2039 .no_shrink()
2040 .prop_map(move |datas| (desc.clone(), datas))
2041 })
2042 .prop_flat_map(|(desc, datas)| {
2043 arb_relation_desc_diff(&desc)
2044 .prop_map(move |diffs| (desc.clone(), diffs, datas.clone()))
2045 });
2046
2047 proptest!(|((old, diffs, datas) in strat)| {
2048 test_case(old, diffs, datas);
2049 });
2050 }
2051
2052 #[mz_ore::test]
2053 #[cfg_attr(miri, ignore)] fn empty_relation_desc_roundtrips() {
2055 let empty = RelationDesc::empty();
2056 let rows = proptest::collection::vec(arb_source_data_for_relation_desc(&empty), 0..8)
2057 .prop_map(move |datas| (empty.clone(), datas));
2058
2059 proptest!(|((config, (desc, source_datas)) in (any::<EncodingConfig>(), rows))| {
2062 roundtrip_source_data(&desc, source_datas, &desc, &config);
2063 });
2064 }
2065
2066 #[mz_ore::test]
2067 #[cfg_attr(miri, ignore)] fn arrow_datatype_consistent() {
2069 fn test_case(desc: RelationDesc, datas: Vec<SourceData>) {
2070 let half = datas.len() / 2;
2071
2072 let mut encoder_a = <RelationDesc as Schema<SourceData>>::encoder(&desc).unwrap();
2073 for data in &datas[..half] {
2074 encoder_a.append(data);
2075 }
2076 let col_a = encoder_a.finish();
2077
2078 let mut encoder_b = <RelationDesc as Schema<SourceData>>::encoder(&desc).unwrap();
2079 for data in &datas[half..] {
2080 encoder_b.append(data);
2081 }
2082 let col_b = encoder_b.finish();
2083
2084 assert_eq!(col_a.data_type(), col_b.data_type());
2087 }
2088
2089 let num_rows = 12;
2090 let strat = any::<RelationDesc>().prop_flat_map(|desc| {
2091 proptest::collection::vec(arb_source_data_for_relation_desc(&desc), num_rows)
2092 .prop_map(move |datas| (desc.clone(), datas))
2093 });
2094
2095 proptest!(|((desc, data) in strat)| {
2096 test_case(desc, data);
2097 });
2098 }
2099
2100 #[mz_ore::test]
2101 #[cfg_attr(miri, ignore)] fn source_proto_serialization_stability() {
2103 let min_protos = 10;
2104 let encoded = include_str!("snapshots/source-datas.txt");
2105
2106 let mut decoded: Vec<(RelationDesc, SourceData)> = encoded
2108 .lines()
2109 .map(|s| {
2110 let (desc, data) = s.split_once(',').expect("comma separated data");
2111 let desc = base64::engine::general_purpose::STANDARD
2112 .decode(desc)
2113 .expect("valid base64");
2114 let data = base64::engine::general_purpose::STANDARD
2115 .decode(data)
2116 .expect("valid base64");
2117 (desc, data)
2118 })
2119 .map(|(desc, data)| {
2120 let desc = ProtoRelationDesc::decode(&desc[..]).expect("valid proto");
2121 let desc = desc.into_rust().expect("valid proto");
2122 let data = SourceData::decode(&data, &desc).expect("valid proto");
2123 (desc, data)
2124 })
2125 .collect();
2126
2127 let mut runner = proptest::test_runner::TestRunner::deterministic();
2129 let strategy = RelationDesc::arbitrary().prop_flat_map(|desc| {
2130 arb_source_data_for_relation_desc(&desc).prop_map(move |data| (desc.clone(), data))
2131 });
2132 while decoded.len() < min_protos {
2133 let arbitrary_data = strategy
2134 .new_tree(&mut runner)
2135 .expect("source data")
2136 .current();
2137 decoded.push(arbitrary_data);
2138 }
2139
2140 let mut reencoded = String::new();
2142 let mut buf = vec![];
2143 for (desc, data) in decoded {
2144 buf.clear();
2145 desc.into_proto().encode(&mut buf).expect("success");
2146 base64::engine::general_purpose::STANDARD.encode_string(buf.as_slice(), &mut reencoded);
2147 reencoded.push(',');
2148
2149 buf.clear();
2150 data.encode(&mut buf);
2151 base64::engine::general_purpose::STANDARD.encode_string(buf.as_slice(), &mut reencoded);
2152 reencoded.push('\n');
2153 }
2154
2155 assert_eq!(
2166 encoded,
2167 reencoded.as_str(),
2168 "SourceData serde should be stable"
2169 )
2170 }
2171}