1use std::collections::BTreeMap;
13use std::fmt::Debug;
14use std::hash::Hash;
15use std::ops::{Add, AddAssign, Deref, DerefMut};
16use std::str::FromStr;
17use std::sync::Arc;
18use std::time::Duration;
19
20use arrow::array::{Array, ArrayRef, BinaryArray, BinaryBuilder, NullArray, StructArray};
21use arrow::datatypes::{Field, Fields};
22use bytes::{BufMut, Bytes};
23use columnation::Columnation;
24use itertools::EitherOrBoth::Both;
25use itertools::Itertools;
26use kafka::KafkaSourceExportDetails;
27use load_generator::{LoadGeneratorOutput, LoadGeneratorSourceExportDetails};
28use mz_ore::assert_none;
29use mz_persist_types::Codec;
30use mz_persist_types::arrow::ArrayOrd;
31use mz_persist_types::columnar::{ColumnDecoder, ColumnEncoder, Schema};
32use mz_persist_types::stats::{
33 ColumnNullStats, ColumnStatKinds, ColumnarStats, ColumnarStatsBuilder, PrimitiveStats,
34 StructStats,
35};
36use mz_proto::{IntoRustIfSome, ProtoType, RustType, TryFromProtoError};
37use mz_repr::{
38 CatalogItemId, Datum, GlobalId, ProtoRelationDesc, ProtoRow, RelationDesc, Row,
39 RowColumnarDecoder, RowColumnarEncoder, arb_row_for_relation,
40};
41use mz_sql_parser::ast::{Ident, IdentError, UnresolvedItemName};
42use proptest::prelude::any;
43use proptest::strategy::Strategy;
44use prost::Message;
45use serde::{Deserialize, Serialize};
46use timely::order::{PartialOrder, TotalOrder};
47use timely::progress::timestamp::Refines;
48use timely::progress::{PathSummary, Timestamp};
49
50use crate::AlterCompatible;
51use crate::connections::inline::{
52 ConnectionAccess, ConnectionResolver, InlinedConnection, IntoInlineConnection,
53 ReferencedConnection,
54};
55use crate::controller::AlterError;
56use crate::errors::{DataflowError, ProtoDataflowError};
57use crate::instances::StorageInstanceId;
58use crate::sources::sql_server::SqlServerSourceExportDetails;
59
60pub mod encoding;
61pub mod envelope;
62pub mod kafka;
63pub mod load_generator;
64pub mod mysql;
65pub mod postgres;
66pub mod sql_server;
67
68pub use crate::sources::envelope::SourceEnvelope;
69pub use crate::sources::kafka::KafkaSourceConnection;
70pub use crate::sources::load_generator::LoadGeneratorSourceConnection;
71pub use crate::sources::mysql::{MySqlSourceConnection, MySqlSourceExportDetails};
72pub use crate::sources::postgres::{PostgresSourceConnection, PostgresSourceExportDetails};
73pub use crate::sources::sql_server::{SqlServerSourceConnection, SqlServerSourceExtras};
74
75include!(concat!(env!("OUT_DIR"), "/mz_storage_types.sources.rs"));
76
77#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
79pub struct IngestionDescription<S: 'static = (), C: ConnectionAccess = InlinedConnection> {
80 pub desc: SourceDesc<C>,
82 pub source_exports: BTreeMap<GlobalId, SourceExport<S>>,
96 pub instance_id: StorageInstanceId,
98 pub remap_collection_id: GlobalId,
100 pub remap_metadata: S,
102}
103
104impl IngestionDescription {
105 pub fn new(
106 desc: SourceDesc,
107 instance_id: StorageInstanceId,
108 remap_collection_id: GlobalId,
109 ) -> Self {
110 Self {
111 desc,
112 remap_metadata: (),
113 source_exports: BTreeMap::new(),
114 instance_id,
115 remap_collection_id,
116 }
117 }
118}
119
120impl<S> IngestionDescription<S> {
121 pub fn collection_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
126 let IngestionDescription {
129 desc: _,
130 remap_metadata: _,
131 source_exports,
132 instance_id: _,
133 remap_collection_id,
134 } = &self;
135
136 source_exports
137 .keys()
138 .copied()
139 .chain(std::iter::once(*remap_collection_id))
140 }
141}
142
143impl<S: Debug + Eq + PartialEq + AlterCompatible> AlterCompatible for IngestionDescription<S> {
144 fn alter_compatible(
145 &self,
146 id: GlobalId,
147 other: &IngestionDescription<S>,
148 ) -> Result<(), AlterError> {
149 if self == other {
150 return Ok(());
151 }
152 let IngestionDescription {
153 desc,
154 remap_metadata,
155 source_exports,
156 instance_id,
157 remap_collection_id,
158 } = self;
159
160 let compatibility_checks = [
161 (desc.alter_compatible(id, &other.desc).is_ok(), "desc"),
162 (remap_metadata == &other.remap_metadata, "remap_metadata"),
163 (
164 source_exports
165 .iter()
166 .merge_join_by(&other.source_exports, |(l_key, _), (r_key, _)| {
167 l_key.cmp(r_key)
168 })
169 .all(|r| match r {
170 Both(
171 (
172 _,
173 SourceExport {
174 storage_metadata: l_metadata,
175 details: l_details,
176 data_config: l_data_config,
177 },
178 ),
179 (
180 _,
181 SourceExport {
182 storage_metadata: r_metadata,
183 details: r_details,
184 data_config: r_data_config,
185 },
186 ),
187 ) => {
188 l_metadata.alter_compatible(id, r_metadata).is_ok()
189 && l_details.alter_compatible(id, r_details).is_ok()
190 && l_data_config.alter_compatible(id, r_data_config).is_ok()
191 }
192 _ => true,
193 }),
194 "source_exports",
195 ),
196 (instance_id == &other.instance_id, "instance_id"),
197 (
198 remap_collection_id == &other.remap_collection_id,
199 "remap_collection_id",
200 ),
201 ];
202 for (compatible, field) in compatibility_checks {
203 if !compatible {
204 tracing::warn!(
205 "IngestionDescription incompatible at {field}:\nself:\n{:#?}\n\nother\n{:#?}",
206 self,
207 other
208 );
209
210 return Err(AlterError { id });
211 }
212 }
213
214 Ok(())
215 }
216}
217
218impl<R: ConnectionResolver> IntoInlineConnection<IngestionDescription, R>
219 for IngestionDescription<(), ReferencedConnection>
220{
221 fn into_inline_connection(self, r: R) -> IngestionDescription {
222 let IngestionDescription {
223 desc,
224 remap_metadata,
225 source_exports,
226 instance_id,
227 remap_collection_id,
228 } = self;
229
230 IngestionDescription {
231 desc: desc.into_inline_connection(r),
232 remap_metadata,
233 source_exports,
234 instance_id,
235 remap_collection_id,
236 }
237 }
238}
239
240#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
241pub struct SourceExport<S = (), C: ConnectionAccess = InlinedConnection> {
242 pub storage_metadata: S,
244 pub details: SourceExportDetails,
246 pub data_config: SourceExportDataConfig<C>,
248}
249
250pub trait SourceTimestamp:
251 Timestamp + Columnation + Refines<()> + std::fmt::Display + Sync
252{
253 fn encode_row(&self) -> Row;
254 fn decode_row(row: &Row) -> Self;
255}
256
257impl SourceTimestamp for MzOffset {
258 fn encode_row(&self) -> Row {
259 Row::pack([Datum::UInt64(self.offset)])
260 }
261
262 fn decode_row(row: &Row) -> Self {
263 let mut datums = row.iter();
264 match (datums.next(), datums.next()) {
265 (Some(Datum::UInt64(offset)), None) => MzOffset::from(offset),
266 _ => panic!("invalid row {row:?}"),
267 }
268 }
269}
270
271#[derive(
275 Copy, Clone, Default, Debug, PartialEq, PartialOrd, Eq, Ord, Hash, Serialize, Deserialize,
276)]
277pub struct MzOffset {
278 pub offset: u64,
279}
280
281impl differential_dataflow::difference::Semigroup for MzOffset {
282 fn plus_equals(&mut self, rhs: &Self) {
283 self.offset.plus_equals(&rhs.offset)
284 }
285}
286
287impl differential_dataflow::difference::IsZero for MzOffset {
288 fn is_zero(&self) -> bool {
289 self.offset.is_zero()
290 }
291}
292
293impl mz_persist_types::Codec64 for MzOffset {
294 fn codec_name() -> String {
295 "MzOffset".to_string()
296 }
297
298 fn encode(&self) -> [u8; 8] {
299 mz_persist_types::Codec64::encode(&self.offset)
300 }
301
302 fn decode(buf: [u8; 8]) -> Self {
303 Self {
304 offset: mz_persist_types::Codec64::decode(buf),
305 }
306 }
307}
308
309impl columnation::Columnation for MzOffset {
310 type InnerRegion = columnation::CopyRegion<MzOffset>;
311}
312
313impl MzOffset {
314 pub fn checked_sub(self, other: Self) -> Option<Self> {
315 self.offset
316 .checked_sub(other.offset)
317 .map(|offset| Self { offset })
318 }
319}
320
321impl From<u64> for MzOffset {
324 fn from(offset: u64) -> Self {
325 Self { offset }
326 }
327}
328
329impl std::fmt::Display for MzOffset {
330 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
331 write!(f, "{}", self.offset)
332 }
333}
334
335impl Add<u64> for MzOffset {
337 type Output = MzOffset;
338
339 fn add(self, x: u64) -> MzOffset {
340 MzOffset {
341 offset: self.offset + x,
342 }
343 }
344}
345impl Add<Self> for MzOffset {
346 type Output = Self;
347
348 fn add(self, x: Self) -> Self {
349 MzOffset {
350 offset: self.offset + x.offset,
351 }
352 }
353}
354impl AddAssign<u64> for MzOffset {
355 fn add_assign(&mut self, x: u64) {
356 self.offset += x;
357 }
358}
359impl AddAssign<Self> for MzOffset {
360 fn add_assign(&mut self, x: Self) {
361 self.offset += x.offset;
362 }
363}
364
365impl From<tokio_postgres::types::PgLsn> for MzOffset {
367 fn from(lsn: tokio_postgres::types::PgLsn) -> Self {
368 MzOffset { offset: lsn.into() }
369 }
370}
371
372impl Timestamp for MzOffset {
373 type Summary = MzOffset;
374
375 fn minimum() -> Self {
376 MzOffset {
377 offset: Timestamp::minimum(),
378 }
379 }
380}
381
382impl PathSummary<MzOffset> for MzOffset {
383 fn results_in(&self, src: &MzOffset) -> Option<MzOffset> {
384 Some(MzOffset {
385 offset: self.offset.results_in(&src.offset)?,
386 })
387 }
388
389 fn followed_by(&self, other: &Self) -> Option<Self> {
390 Some(MzOffset {
391 offset: PathSummary::<u64>::followed_by(&self.offset, &other.offset)?,
392 })
393 }
394}
395
396impl Refines<()> for MzOffset {
397 fn to_inner(_: ()) -> Self {
398 MzOffset::minimum()
399 }
400 fn to_outer(self) {}
401 fn summarize(_: Self::Summary) {}
402}
403
404impl PartialOrder for MzOffset {
405 #[inline]
406 fn less_equal(&self, other: &Self) -> bool {
407 self.offset.less_equal(&other.offset)
408 }
409}
410
411impl TotalOrder for MzOffset {}
412
413#[derive(Clone, Debug, Ord, PartialOrd, Eq, PartialEq, Serialize, Deserialize, Hash)]
422pub enum Timeline {
423 EpochMilliseconds,
426 External(String),
430 User(String),
434}
435
436impl Timeline {
437 const EPOCH_MILLISECOND_ID_CHAR: char = 'M';
438 const EXTERNAL_ID_CHAR: char = 'E';
439 const USER_ID_CHAR: char = 'U';
440
441 fn id_char(&self) -> char {
442 match self {
443 Self::EpochMilliseconds => Self::EPOCH_MILLISECOND_ID_CHAR,
444 Self::External(_) => Self::EXTERNAL_ID_CHAR,
445 Self::User(_) => Self::USER_ID_CHAR,
446 }
447 }
448}
449
450impl ToString for Timeline {
451 fn to_string(&self) -> String {
452 match self {
453 Self::EpochMilliseconds => format!("{}", self.id_char()),
454 Self::External(id) => format!("{}.{id}", self.id_char()),
455 Self::User(id) => format!("{}.{id}", self.id_char()),
456 }
457 }
458}
459
460impl FromStr for Timeline {
461 type Err = String;
462
463 fn from_str(s: &str) -> Result<Self, Self::Err> {
464 if s.is_empty() {
465 return Err("empty timeline".to_string());
466 }
467 let mut chars = s.chars();
468 match chars.next().expect("non-empty string") {
469 Self::EPOCH_MILLISECOND_ID_CHAR => match chars.next() {
470 None => Ok(Self::EpochMilliseconds),
471 Some(_) => Err(format!("unknown timeline: {s}")),
472 },
473 Self::EXTERNAL_ID_CHAR => match chars.next() {
474 Some('.') => Ok(Self::External(chars.as_str().to_string())),
475 _ => Err(format!("unknown timeline: {s}")),
476 },
477 Self::USER_ID_CHAR => match chars.next() {
478 Some('.') => Ok(Self::User(chars.as_str().to_string())),
479 _ => Err(format!("unknown timeline: {s}")),
480 },
481 _ => Err(format!("unknown timeline: {s}")),
482 }
483 }
484}
485
486pub trait SourceConnection: Debug + Clone + PartialEq + AlterCompatible {
488 fn name(&self) -> &'static str;
490
491 fn external_reference(&self) -> Option<&str>;
493
494 fn default_key_desc(&self) -> RelationDesc;
498
499 fn default_value_desc(&self) -> RelationDesc;
503
504 fn timestamp_desc(&self) -> RelationDesc;
507
508 fn connection_id(&self) -> Option<CatalogItemId>;
511
512 fn supports_read_only(&self) -> bool;
514
515 fn prefers_single_replica(&self) -> bool;
517}
518
519#[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize, Deserialize)]
520pub enum Compression {
521 Gzip,
522 None,
523}
524
525#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
528pub struct SourceExportDataConfig<C: ConnectionAccess = InlinedConnection> {
529 pub encoding: Option<encoding::SourceDataEncoding<C>>,
530 pub envelope: SourceEnvelope,
531}
532
533impl<R: ConnectionResolver> IntoInlineConnection<SourceExportDataConfig, R>
534 for SourceExportDataConfig<ReferencedConnection>
535{
536 fn into_inline_connection(self, r: R) -> SourceExportDataConfig {
537 let SourceExportDataConfig { encoding, envelope } = self;
538
539 SourceExportDataConfig {
540 encoding: encoding.map(|e| e.into_inline_connection(r)),
541 envelope,
542 }
543 }
544}
545
546impl<C: ConnectionAccess> AlterCompatible for SourceExportDataConfig<C> {
547 fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
548 if self == other {
549 return Ok(());
550 }
551 let Self { encoding, envelope } = &self;
552
553 let compatibility_checks = [
554 (
555 match (encoding, &other.encoding) {
556 (Some(s), Some(o)) => s.alter_compatible(id, o).is_ok(),
557 (s, o) => s == o,
558 },
559 "encoding",
560 ),
561 (envelope == &other.envelope, "envelope"),
562 ];
563
564 for (compatible, field) in compatibility_checks {
565 if !compatible {
566 tracing::warn!(
567 "SourceDesc incompatible {field}:\nself:\n{:#?}\n\nother\n{:#?}",
568 self,
569 other
570 );
571
572 return Err(AlterError { id });
573 }
574 }
575 Ok(())
576 }
577}
578
579impl<C: ConnectionAccess> SourceExportDataConfig<C> {
580 pub fn monotonic(&self, connection: &GenericSourceConnection<C>) -> bool {
587 match &self.envelope {
588 SourceEnvelope::Upsert(_) | SourceEnvelope::CdcV2 => false,
590 SourceEnvelope::None(_) => {
591 match connection {
592 GenericSourceConnection::Postgres(_) => false,
594 GenericSourceConnection::MySql(_) => false,
596 GenericSourceConnection::SqlServer(_) => false,
598 GenericSourceConnection::LoadGenerator(g) => g.load_generator.is_monotonic(),
600 GenericSourceConnection::Kafka(_) => true,
602 }
603 }
604 }
605 }
606}
607
608#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
610pub struct SourceDesc<C: ConnectionAccess = InlinedConnection> {
611 pub connection: GenericSourceConnection<C>,
612 pub timestamp_interval: Duration,
613}
614
615impl<R: ConnectionResolver> IntoInlineConnection<SourceDesc, R>
616 for SourceDesc<ReferencedConnection>
617{
618 fn into_inline_connection(self, r: R) -> SourceDesc {
619 let SourceDesc {
620 connection,
621 timestamp_interval,
622 } = self;
623
624 SourceDesc {
625 connection: connection.into_inline_connection(&r),
626 timestamp_interval,
627 }
628 }
629}
630
631impl<C: ConnectionAccess> AlterCompatible for SourceDesc<C> {
632 fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
636 if self == other {
637 return Ok(());
638 }
639 let Self {
640 connection,
641 timestamp_interval,
642 } = &self;
643
644 let compatibility_checks = [
645 (
646 connection.alter_compatible(id, &other.connection).is_ok(),
647 "connection",
648 ),
649 (
650 timestamp_interval == &other.timestamp_interval,
651 "timestamp_interval",
652 ),
653 ];
654
655 for (compatible, field) in compatibility_checks {
656 if !compatible {
657 tracing::warn!(
658 "SourceDesc incompatible {field}:\nself:\n{:#?}\n\nother\n{:#?}",
659 self,
660 other
661 );
662
663 return Err(AlterError { id });
664 }
665 }
666
667 Ok(())
668 }
669}
670
671#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
672pub enum GenericSourceConnection<C: ConnectionAccess = InlinedConnection> {
673 Kafka(KafkaSourceConnection<C>),
674 Postgres(PostgresSourceConnection<C>),
675 MySql(MySqlSourceConnection<C>),
676 SqlServer(SqlServerSourceConnection<C>),
677 LoadGenerator(LoadGeneratorSourceConnection),
678}
679
680impl<C: ConnectionAccess> From<KafkaSourceConnection<C>> for GenericSourceConnection<C> {
681 fn from(conn: KafkaSourceConnection<C>) -> Self {
682 Self::Kafka(conn)
683 }
684}
685
686impl<C: ConnectionAccess> From<PostgresSourceConnection<C>> for GenericSourceConnection<C> {
687 fn from(conn: PostgresSourceConnection<C>) -> Self {
688 Self::Postgres(conn)
689 }
690}
691
692impl<C: ConnectionAccess> From<MySqlSourceConnection<C>> for GenericSourceConnection<C> {
693 fn from(conn: MySqlSourceConnection<C>) -> Self {
694 Self::MySql(conn)
695 }
696}
697
698impl<C: ConnectionAccess> From<SqlServerSourceConnection<C>> for GenericSourceConnection<C> {
699 fn from(conn: SqlServerSourceConnection<C>) -> Self {
700 Self::SqlServer(conn)
701 }
702}
703
704impl<C: ConnectionAccess> From<LoadGeneratorSourceConnection> for GenericSourceConnection<C> {
705 fn from(conn: LoadGeneratorSourceConnection) -> Self {
706 Self::LoadGenerator(conn)
707 }
708}
709
710impl<R: ConnectionResolver> IntoInlineConnection<GenericSourceConnection, R>
711 for GenericSourceConnection<ReferencedConnection>
712{
713 fn into_inline_connection(self, r: R) -> GenericSourceConnection {
714 match self {
715 GenericSourceConnection::Kafka(kafka) => {
716 GenericSourceConnection::Kafka(kafka.into_inline_connection(r))
717 }
718 GenericSourceConnection::Postgres(pg) => {
719 GenericSourceConnection::Postgres(pg.into_inline_connection(r))
720 }
721 GenericSourceConnection::MySql(mysql) => {
722 GenericSourceConnection::MySql(mysql.into_inline_connection(r))
723 }
724 GenericSourceConnection::SqlServer(sql_server) => {
725 GenericSourceConnection::SqlServer(sql_server.into_inline_connection(r))
726 }
727 GenericSourceConnection::LoadGenerator(lg) => {
728 GenericSourceConnection::LoadGenerator(lg)
729 }
730 }
731 }
732}
733
734impl<C: ConnectionAccess> SourceConnection for GenericSourceConnection<C> {
735 fn name(&self) -> &'static str {
736 match self {
737 Self::Kafka(conn) => conn.name(),
738 Self::Postgres(conn) => conn.name(),
739 Self::MySql(conn) => conn.name(),
740 Self::SqlServer(conn) => conn.name(),
741 Self::LoadGenerator(conn) => conn.name(),
742 }
743 }
744
745 fn external_reference(&self) -> Option<&str> {
746 match self {
747 Self::Kafka(conn) => conn.external_reference(),
748 Self::Postgres(conn) => conn.external_reference(),
749 Self::MySql(conn) => conn.external_reference(),
750 Self::SqlServer(conn) => conn.external_reference(),
751 Self::LoadGenerator(conn) => conn.external_reference(),
752 }
753 }
754
755 fn default_key_desc(&self) -> RelationDesc {
756 match self {
757 Self::Kafka(conn) => conn.default_key_desc(),
758 Self::Postgres(conn) => conn.default_key_desc(),
759 Self::MySql(conn) => conn.default_key_desc(),
760 Self::SqlServer(conn) => conn.default_key_desc(),
761 Self::LoadGenerator(conn) => conn.default_key_desc(),
762 }
763 }
764
765 fn default_value_desc(&self) -> RelationDesc {
766 match self {
767 Self::Kafka(conn) => conn.default_value_desc(),
768 Self::Postgres(conn) => conn.default_value_desc(),
769 Self::MySql(conn) => conn.default_value_desc(),
770 Self::SqlServer(conn) => conn.default_value_desc(),
771 Self::LoadGenerator(conn) => conn.default_value_desc(),
772 }
773 }
774
775 fn timestamp_desc(&self) -> RelationDesc {
776 match self {
777 Self::Kafka(conn) => conn.timestamp_desc(),
778 Self::Postgres(conn) => conn.timestamp_desc(),
779 Self::MySql(conn) => conn.timestamp_desc(),
780 Self::SqlServer(conn) => conn.timestamp_desc(),
781 Self::LoadGenerator(conn) => conn.timestamp_desc(),
782 }
783 }
784
785 fn connection_id(&self) -> Option<CatalogItemId> {
786 match self {
787 Self::Kafka(conn) => conn.connection_id(),
788 Self::Postgres(conn) => conn.connection_id(),
789 Self::MySql(conn) => conn.connection_id(),
790 Self::SqlServer(conn) => conn.connection_id(),
791 Self::LoadGenerator(conn) => conn.connection_id(),
792 }
793 }
794
795 fn supports_read_only(&self) -> bool {
796 match self {
797 GenericSourceConnection::Kafka(conn) => conn.supports_read_only(),
798 GenericSourceConnection::Postgres(conn) => conn.supports_read_only(),
799 GenericSourceConnection::MySql(conn) => conn.supports_read_only(),
800 GenericSourceConnection::SqlServer(conn) => conn.supports_read_only(),
801 GenericSourceConnection::LoadGenerator(conn) => conn.supports_read_only(),
802 }
803 }
804
805 fn prefers_single_replica(&self) -> bool {
806 match self {
807 GenericSourceConnection::Kafka(conn) => conn.prefers_single_replica(),
808 GenericSourceConnection::Postgres(conn) => conn.prefers_single_replica(),
809 GenericSourceConnection::MySql(conn) => conn.prefers_single_replica(),
810 GenericSourceConnection::SqlServer(conn) => conn.prefers_single_replica(),
811 GenericSourceConnection::LoadGenerator(conn) => conn.prefers_single_replica(),
812 }
813 }
814}
815impl<C: ConnectionAccess> crate::AlterCompatible for GenericSourceConnection<C> {
816 fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
817 if self == other {
818 return Ok(());
819 }
820 let r = match (self, other) {
821 (Self::Kafka(conn), Self::Kafka(other)) => conn.alter_compatible(id, other),
822 (Self::Postgres(conn), Self::Postgres(other)) => conn.alter_compatible(id, other),
823 (Self::MySql(conn), Self::MySql(other)) => conn.alter_compatible(id, other),
824 (Self::SqlServer(conn), Self::SqlServer(other)) => conn.alter_compatible(id, other),
825 (Self::LoadGenerator(conn), Self::LoadGenerator(other)) => {
826 conn.alter_compatible(id, other)
827 }
828 _ => Err(AlterError { id }),
829 };
830
831 if r.is_err() {
832 tracing::warn!(
833 "GenericSourceConnection incompatible:\nself:\n{:#?}\n\nother\n{:#?}",
834 self,
835 other
836 );
837 }
838
839 r
840 }
841}
842
843#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
846pub enum SourceExportDetails {
847 None,
850 Kafka(KafkaSourceExportDetails),
851 Postgres(PostgresSourceExportDetails),
852 MySql(MySqlSourceExportDetails),
853 SqlServer(SqlServerSourceExportDetails),
854 LoadGenerator(LoadGeneratorSourceExportDetails),
855}
856
857impl crate::AlterCompatible for SourceExportDetails {
858 fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
859 if self == other {
860 return Ok(());
861 }
862 let r = match (self, other) {
863 (Self::None, Self::None) => Ok(()),
864 (Self::Kafka(s), Self::Kafka(o)) => s.alter_compatible(id, o),
865 (Self::Postgres(s), Self::Postgres(o)) => s.alter_compatible(id, o),
866 (Self::MySql(s), Self::MySql(o)) => s.alter_compatible(id, o),
867 (Self::LoadGenerator(s), Self::LoadGenerator(o)) => s.alter_compatible(id, o),
868 _ => Err(AlterError { id }),
869 };
870
871 if r.is_err() {
872 tracing::warn!(
873 "SourceExportDetails incompatible:\nself:\n{:#?}\n\nother\n{:#?}",
874 self,
875 other
876 );
877 }
878
879 r
880 }
881}
882
883pub enum SourceExportStatementDetails {
889 Postgres {
890 table: mz_postgres_util::desc::PostgresTableDesc,
891 },
892 MySql {
893 table: mz_mysql_util::MySqlTableDesc,
894 initial_gtid_set: String,
895 },
896 SqlServer {
897 table: mz_sql_server_util::desc::SqlServerTableDesc,
898 capture_instance: Arc<str>,
899 initial_lsn: mz_sql_server_util::cdc::Lsn,
900 },
901 LoadGenerator {
902 output: LoadGeneratorOutput,
903 },
904 Kafka {},
905}
906
907impl RustType<ProtoSourceExportStatementDetails> for SourceExportStatementDetails {
908 fn into_proto(&self) -> ProtoSourceExportStatementDetails {
909 match self {
910 SourceExportStatementDetails::Postgres { table } => ProtoSourceExportStatementDetails {
911 kind: Some(proto_source_export_statement_details::Kind::Postgres(
912 postgres::ProtoPostgresSourceExportStatementDetails {
913 table: Some(table.into_proto()),
914 },
915 )),
916 },
917 SourceExportStatementDetails::MySql {
918 table,
919 initial_gtid_set,
920 } => ProtoSourceExportStatementDetails {
921 kind: Some(proto_source_export_statement_details::Kind::Mysql(
922 mysql::ProtoMySqlSourceExportStatementDetails {
923 table: Some(table.into_proto()),
924 initial_gtid_set: initial_gtid_set.clone(),
925 },
926 )),
927 },
928 SourceExportStatementDetails::SqlServer {
929 table,
930 capture_instance,
931 initial_lsn,
932 } => ProtoSourceExportStatementDetails {
933 kind: Some(proto_source_export_statement_details::Kind::SqlServer(
934 sql_server::ProtoSqlServerSourceExportStatementDetails {
935 table: Some(table.into_proto()),
936 capture_instance: capture_instance.to_string(),
937 initial_lsn: initial_lsn.as_bytes().to_vec(),
938 },
939 )),
940 },
941 SourceExportStatementDetails::LoadGenerator { output } => {
942 ProtoSourceExportStatementDetails {
943 kind: Some(proto_source_export_statement_details::Kind::Loadgen(
944 load_generator::ProtoLoadGeneratorSourceExportStatementDetails {
945 output: output.into_proto().into(),
946 },
947 )),
948 }
949 }
950 SourceExportStatementDetails::Kafka {} => ProtoSourceExportStatementDetails {
951 kind: Some(proto_source_export_statement_details::Kind::Kafka(
952 kafka::ProtoKafkaSourceExportStatementDetails {},
953 )),
954 },
955 }
956 }
957
958 fn from_proto(proto: ProtoSourceExportStatementDetails) -> Result<Self, TryFromProtoError> {
959 use proto_source_export_statement_details::Kind;
960 Ok(match proto.kind {
961 Some(Kind::Postgres(details)) => SourceExportStatementDetails::Postgres {
962 table: details
963 .table
964 .into_rust_if_some("ProtoPostgresSourceExportStatementDetails::table")?,
965 },
966 Some(Kind::Mysql(details)) => SourceExportStatementDetails::MySql {
967 table: details
968 .table
969 .into_rust_if_some("ProtoMySqlSourceExportStatementDetails::table")?,
970
971 initial_gtid_set: details.initial_gtid_set,
972 },
973 Some(Kind::SqlServer(details)) => SourceExportStatementDetails::SqlServer {
974 table: details
975 .table
976 .into_rust_if_some("ProtoSqlServerSourceExportStatementDetails::table")?,
977 capture_instance: details.capture_instance.into(),
978 initial_lsn: mz_sql_server_util::cdc::Lsn::try_from(details.initial_lsn.as_slice())
979 .map_err(|e| TryFromProtoError::InvalidFieldError(e.to_string()))?,
980 },
981 Some(Kind::Loadgen(details)) => SourceExportStatementDetails::LoadGenerator {
982 output: details
983 .output
984 .into_rust_if_some("ProtoLoadGeneratorSourceExportStatementDetails::output")?,
985 },
986 Some(Kind::Kafka(_details)) => SourceExportStatementDetails::Kafka {},
987 None => {
988 return Err(TryFromProtoError::missing_field(
989 "ProtoSourceExportStatementDetails::kind",
990 ));
991 }
992 })
993 }
994}
995
996#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
997#[repr(transparent)]
998pub struct SourceData(pub Result<Row, DataflowError>);
999
1000impl Default for SourceData {
1001 fn default() -> Self {
1002 SourceData(Ok(Row::default()))
1003 }
1004}
1005
1006impl Deref for SourceData {
1007 type Target = Result<Row, DataflowError>;
1008
1009 fn deref(&self) -> &Self::Target {
1010 &self.0
1011 }
1012}
1013
1014impl DerefMut for SourceData {
1015 fn deref_mut(&mut self) -> &mut Self::Target {
1016 &mut self.0
1017 }
1018}
1019
1020impl RustType<ProtoSourceData> for SourceData {
1021 fn into_proto(&self) -> ProtoSourceData {
1022 use proto_source_data::Kind;
1023 ProtoSourceData {
1024 kind: Some(match &**self {
1025 Ok(row) => Kind::Ok(row.into_proto()),
1026 Err(err) => Kind::Err(err.into_proto()),
1027 }),
1028 }
1029 }
1030
1031 fn from_proto(proto: ProtoSourceData) -> Result<Self, TryFromProtoError> {
1032 use proto_source_data::Kind;
1033 match proto.kind {
1034 Some(kind) => match kind {
1035 Kind::Ok(row) => Ok(SourceData(Ok(row.into_rust()?))),
1036 Kind::Err(err) => Ok(SourceData(Err(err.into_rust()?))),
1037 },
1038 None => Result::Err(TryFromProtoError::missing_field("ProtoSourceData::kind")),
1039 }
1040 }
1041}
1042
1043impl Codec for SourceData {
1044 type Storage = ProtoRow;
1045 type Schema = RelationDesc;
1046
1047 fn codec_name() -> String {
1048 "protobuf[SourceData]".into()
1049 }
1050
1051 fn encode<B: BufMut>(&self, buf: &mut B) {
1052 self.into_proto()
1053 .encode(buf)
1054 .expect("no required fields means no initialization errors");
1055 }
1056
1057 fn decode(buf: &[u8], schema: &RelationDesc) -> Result<Self, String> {
1058 let mut val = SourceData::default();
1059 <Self as Codec>::decode_from(&mut val, buf, &mut None, schema)?;
1060 Ok(val)
1061 }
1062
1063 fn decode_from<'a>(
1064 &mut self,
1065 buf: &'a [u8],
1066 storage: &mut Option<ProtoRow>,
1067 schema: &RelationDesc,
1068 ) -> Result<(), String> {
1069 let mut proto = storage.take().unwrap_or_default();
1073 proto.clear();
1074 let mut proto = ProtoSourceData {
1075 kind: Some(proto_source_data::Kind::Ok(proto)),
1076 };
1077 proto.merge(buf).map_err(|err| err.to_string())?;
1078 match (proto.kind, &mut self.0) {
1079 (Some(proto_source_data::Kind::Ok(proto)), Ok(row)) => {
1081 let ret = row.decode_from_proto(&proto, schema);
1082 storage.replace(proto);
1083 ret
1084 }
1085 (kind, _) => {
1087 let proto = ProtoSourceData { kind };
1088 *self = proto.into_rust().map_err(|err| err.to_string())?;
1089 Ok(())
1091 }
1092 }
1093 }
1094
1095 fn validate(val: &Self, desc: &Self::Schema) -> Result<(), String> {
1096 match &val.0 {
1097 Ok(row) => Row::validate(row, desc),
1098 Err(_) => Ok(()),
1099 }
1100 }
1101
1102 fn encode_schema(schema: &Self::Schema) -> Bytes {
1103 schema.into_proto().encode_to_vec().into()
1104 }
1105
1106 fn decode_schema(buf: &Bytes) -> Self::Schema {
1107 let proto = ProtoRelationDesc::decode(buf.as_ref()).expect("valid schema");
1108 proto.into_rust().expect("valid schema")
1109 }
1110}
1111
1112pub fn arb_source_data_for_relation_desc(
1114 desc: &RelationDesc,
1115) -> impl Strategy<Value = SourceData> + use<> {
1116 let row_strat = arb_row_for_relation(desc).no_shrink();
1117
1118 proptest::strategy::Union::new_weighted(vec![
1119 (50, row_strat.prop_map(|row| SourceData(Ok(row))).boxed()),
1120 (
1121 1,
1122 any::<DataflowError>()
1123 .prop_map(|err| SourceData(Err(err)))
1124 .no_shrink()
1125 .boxed(),
1126 ),
1127 ])
1128}
1129
1130pub trait ExternalCatalogReference {
1138 fn schema_name(&self) -> &str;
1140 fn item_name(&self) -> &str;
1142}
1143
1144impl ExternalCatalogReference for &mz_mysql_util::MySqlTableDesc {
1145 fn schema_name(&self) -> &str {
1146 &self.schema_name
1147 }
1148
1149 fn item_name(&self) -> &str {
1150 &self.name
1151 }
1152}
1153
1154impl ExternalCatalogReference for mz_postgres_util::desc::PostgresTableDesc {
1155 fn schema_name(&self) -> &str {
1156 &self.namespace
1157 }
1158
1159 fn item_name(&self) -> &str {
1160 &self.name
1161 }
1162}
1163
1164impl ExternalCatalogReference for &mz_sql_server_util::desc::SqlServerTableDesc {
1165 fn schema_name(&self) -> &str {
1166 &*self.schema_name
1167 }
1168
1169 fn item_name(&self) -> &str {
1170 &*self.name
1171 }
1172}
1173
1174impl<'a> ExternalCatalogReference for (&'a str, &'a str) {
1177 fn schema_name(&self) -> &str {
1178 self.0
1179 }
1180
1181 fn item_name(&self) -> &str {
1182 self.1
1183 }
1184}
1185
1186#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
1194pub struct SourceReferenceResolver {
1195 inner: BTreeMap<Ident, BTreeMap<Ident, BTreeMap<Ident, usize>>>,
1196}
1197
1198#[derive(Debug, Clone, thiserror::Error)]
1199pub enum ExternalReferenceResolutionError {
1200 #[error("reference to {name} not found in source")]
1201 DoesNotExist { name: String },
1202 #[error(
1203 "reference {name} is ambiguous, consider specifying an additional \
1204 layer of qualification"
1205 )]
1206 Ambiguous { name: String },
1207 #[error("invalid identifier: {0}")]
1208 Ident(#[from] IdentError),
1209}
1210
1211impl<'a> SourceReferenceResolver {
1212 pub fn new<T: ExternalCatalogReference>(
1218 database: &str,
1219 referenceable_items: &'a [T],
1220 ) -> Result<SourceReferenceResolver, ExternalReferenceResolutionError> {
1221 let mut inner = BTreeMap::new();
1224
1225 let database = Ident::new(database)?;
1226
1227 for (reference_idx, item) in referenceable_items.iter().enumerate() {
1228 let item_name = Ident::new(item.item_name())?;
1229 let schema_name = Ident::new(item.schema_name())?;
1230
1231 inner
1232 .entry(item_name)
1233 .or_insert_with(BTreeMap::new)
1234 .entry(schema_name)
1235 .or_insert_with(BTreeMap::new)
1236 .entry(database.clone())
1237 .or_insert(reference_idx);
1238 }
1239
1240 Ok(SourceReferenceResolver { inner })
1241 }
1242
1243 pub fn resolve(
1260 &self,
1261 name: &[Ident],
1262 canonicalize_to_width: usize,
1263 ) -> Result<(UnresolvedItemName, usize), ExternalReferenceResolutionError> {
1264 let (db, schema, idx) = self.resolve_inner(name)?;
1265
1266 let item = name.last().expect("must have provided at least 1 element");
1267
1268 let canonical_name = match canonicalize_to_width {
1269 1 => vec![item.clone()],
1270 2 => vec![schema.clone(), item.clone()],
1271 3 => vec![db.clone(), schema.clone(), item.clone()],
1272 o => panic!("canonicalize_to_width values must be 1..=3, but got {}", o),
1273 };
1274
1275 Ok((UnresolvedItemName(canonical_name), idx))
1276 }
1277
1278 pub fn resolve_idx(&self, name: &[Ident]) -> Result<usize, ExternalReferenceResolutionError> {
1288 let (_db, _schema, idx) = self.resolve_inner(name)?;
1289 Ok(idx)
1290 }
1291
1292 fn resolve_inner<'name: 'a>(
1309 &'a self,
1310 name: &'name [Ident],
1311 ) -> Result<(&'a Ident, &'a Ident, usize), ExternalReferenceResolutionError> {
1312 let get_provided_name = || UnresolvedItemName(name.to_vec()).to_string();
1313
1314 if !(1..=3).contains(&name.len()) {
1316 Err(ExternalReferenceResolutionError::DoesNotExist {
1317 name: get_provided_name(),
1318 })?;
1319 }
1320
1321 let mut names = std::iter::repeat(None)
1323 .take(3 - name.len())
1324 .chain(name.iter().map(Some));
1325
1326 let database = names.next().flatten();
1327 let schema = names.next().flatten();
1328 let item = names
1329 .next()
1330 .flatten()
1331 .expect("must have provided the item name");
1332
1333 assert_none!(names.next(), "expected a 3-element iterator");
1334
1335 let schemas =
1336 self.inner
1337 .get(item)
1338 .ok_or_else(|| ExternalReferenceResolutionError::DoesNotExist {
1339 name: get_provided_name(),
1340 })?;
1341
1342 let schema = match schema {
1343 Some(schema) => schema,
1344 None => schemas.keys().exactly_one().map_err(|_e| {
1345 ExternalReferenceResolutionError::Ambiguous {
1346 name: get_provided_name(),
1347 }
1348 })?,
1349 };
1350
1351 let databases =
1352 schemas
1353 .get(schema)
1354 .ok_or_else(|| ExternalReferenceResolutionError::DoesNotExist {
1355 name: get_provided_name(),
1356 })?;
1357
1358 let database = match database {
1359 Some(database) => database,
1360 None => databases.keys().exactly_one().map_err(|_e| {
1361 ExternalReferenceResolutionError::Ambiguous {
1362 name: get_provided_name(),
1363 }
1364 })?,
1365 };
1366
1367 let reference_idx = databases.get(database).ok_or_else(|| {
1368 ExternalReferenceResolutionError::DoesNotExist {
1369 name: get_provided_name(),
1370 }
1371 })?;
1372
1373 Ok((database, schema, *reference_idx))
1374 }
1375}
1376
1377#[derive(Debug)]
1383pub enum SourceDataRowColumnarDecoder {
1384 Row(RowColumnarDecoder),
1385 EmptyRow,
1386}
1387
1388impl SourceDataRowColumnarDecoder {
1389 pub fn decode(&self, idx: usize, row: &mut Row) {
1390 match self {
1391 SourceDataRowColumnarDecoder::Row(decoder) => decoder.decode(idx, row),
1392 SourceDataRowColumnarDecoder::EmptyRow => {
1393 row.packer();
1395 }
1396 }
1397 }
1398
1399 pub fn goodbytes(&self) -> usize {
1400 match self {
1401 SourceDataRowColumnarDecoder::Row(decoder) => decoder.goodbytes(),
1402 SourceDataRowColumnarDecoder::EmptyRow => 0,
1403 }
1404 }
1405}
1406
1407#[derive(Debug)]
1408pub struct SourceDataColumnarDecoder {
1409 row_decoder: SourceDataRowColumnarDecoder,
1410 err_decoder: BinaryArray,
1411}
1412
1413impl SourceDataColumnarDecoder {
1414 pub fn new(col: StructArray, desc: &RelationDesc) -> Result<Self, anyhow::Error> {
1415 let (_fields, arrays, nullability) = col.into_parts();
1417
1418 if nullability.is_some() {
1419 anyhow::bail!("SourceData is not nullable, but found {nullability:?}");
1420 }
1421 if arrays.len() != 2 {
1422 anyhow::bail!("SourceData should only have two fields, found {arrays:?}");
1423 }
1424
1425 let errs = arrays[1]
1426 .as_any()
1427 .downcast_ref::<BinaryArray>()
1428 .ok_or_else(|| anyhow::anyhow!("expected BinaryArray, found {:?}", arrays[1]))?;
1429
1430 let row_decoder = match arrays[0].data_type() {
1431 arrow::datatypes::DataType::Struct(_) => {
1432 let rows = arrays[0]
1433 .as_any()
1434 .downcast_ref::<StructArray>()
1435 .ok_or_else(|| {
1436 anyhow::anyhow!("expected StructArray, found {:?}", arrays[0])
1437 })?;
1438 let decoder = RowColumnarDecoder::new(rows.clone(), desc)?;
1439 SourceDataRowColumnarDecoder::Row(decoder)
1440 }
1441 arrow::datatypes::DataType::Null => SourceDataRowColumnarDecoder::EmptyRow,
1442 other => anyhow::bail!("expected Struct or Null Array, found {other:?}"),
1443 };
1444
1445 Ok(SourceDataColumnarDecoder {
1446 row_decoder,
1447 err_decoder: errs.clone(),
1448 })
1449 }
1450}
1451
1452impl ColumnDecoder<SourceData> for SourceDataColumnarDecoder {
1453 fn decode(&self, idx: usize, val: &mut SourceData) {
1454 let err_null = self.err_decoder.is_null(idx);
1455 let row_null = match &self.row_decoder {
1456 SourceDataRowColumnarDecoder::Row(decoder) => decoder.is_null(idx),
1457 SourceDataRowColumnarDecoder::EmptyRow => !err_null,
1458 };
1459
1460 match (row_null, err_null) {
1461 (true, false) => {
1462 let err = self.err_decoder.value(idx);
1463 let err = ProtoDataflowError::decode(err)
1464 .expect("proto should be valid")
1465 .into_rust()
1466 .expect("error should be valid");
1467 val.0 = Err(err);
1468 }
1469 (false, true) => {
1470 let row = match val.0.as_mut() {
1471 Ok(row) => row,
1472 Err(_) => {
1473 val.0 = Ok(Row::default());
1474 val.0.as_mut().unwrap()
1475 }
1476 };
1477 self.row_decoder.decode(idx, row);
1478 }
1479 (true, true) => panic!("should have one of 'ok' or 'err'"),
1480 (false, false) => panic!("cannot have both 'ok' and 'err'"),
1481 }
1482 }
1483
1484 fn is_null(&self, idx: usize) -> bool {
1485 let err_null = self.err_decoder.is_null(idx);
1486 let row_null = match &self.row_decoder {
1487 SourceDataRowColumnarDecoder::Row(decoder) => decoder.is_null(idx),
1488 SourceDataRowColumnarDecoder::EmptyRow => !err_null,
1489 };
1490 assert!(!err_null || !row_null, "SourceData should never be null!");
1491
1492 false
1493 }
1494
1495 fn goodbytes(&self) -> usize {
1496 self.row_decoder.goodbytes() + ArrayOrd::Binary(self.err_decoder.clone()).goodbytes()
1497 }
1498
1499 fn stats(&self) -> StructStats {
1500 let len = self.err_decoder.len();
1501 let err_stats = ColumnarStats {
1502 nulls: Some(ColumnNullStats {
1503 count: self.err_decoder.null_count(),
1504 }),
1505 values: PrimitiveStats::<Vec<u8>>::from_column(&self.err_decoder).into(),
1506 };
1507 let row_null_count = len - self.err_decoder.null_count();
1512 let row_stats = match &self.row_decoder {
1513 SourceDataRowColumnarDecoder::Row(encoder) => {
1514 assert_eq!(encoder.null_count(), row_null_count);
1518 encoder.stats()
1519 }
1520 SourceDataRowColumnarDecoder::EmptyRow => StructStats {
1521 len,
1522 cols: BTreeMap::default(),
1523 },
1524 };
1525 let row_stats = ColumnarStats {
1526 nulls: Some(ColumnNullStats {
1527 count: row_null_count,
1528 }),
1529 values: ColumnStatKinds::Struct(row_stats),
1530 };
1531
1532 let stats = [
1533 (
1534 SourceDataColumnarEncoder::OK_COLUMN_NAME.to_string(),
1535 row_stats,
1536 ),
1537 (
1538 SourceDataColumnarEncoder::ERR_COLUMN_NAME.to_string(),
1539 err_stats,
1540 ),
1541 ];
1542 StructStats {
1543 len,
1544 cols: stats.into_iter().map(|(name, s)| (name, s)).collect(),
1545 }
1546 }
1547}
1548
1549#[derive(Debug)]
1556pub enum SourceDataRowColumnarEncoder {
1557 Row(RowColumnarEncoder),
1558 EmptyRow,
1559}
1560
1561impl SourceDataRowColumnarEncoder {
1562 pub(crate) fn goodbytes(&self) -> usize {
1563 match self {
1564 SourceDataRowColumnarEncoder::Row(e) => e.goodbytes(),
1565 SourceDataRowColumnarEncoder::EmptyRow => 0,
1566 }
1567 }
1568
1569 pub fn append(&mut self, row: &Row) {
1570 match self {
1571 SourceDataRowColumnarEncoder::Row(encoder) => encoder.append(row),
1572 SourceDataRowColumnarEncoder::EmptyRow => {
1573 assert_eq!(row.iter().count(), 0)
1574 }
1575 }
1576 }
1577
1578 pub fn append_null(&mut self) {
1579 match self {
1580 SourceDataRowColumnarEncoder::Row(encoder) => encoder.append_null(),
1581 SourceDataRowColumnarEncoder::EmptyRow => (),
1582 }
1583 }
1584}
1585
1586#[derive(Debug)]
1587pub struct SourceDataColumnarEncoder {
1588 row_encoder: SourceDataRowColumnarEncoder,
1589 err_encoder: BinaryBuilder,
1590}
1591
1592impl SourceDataColumnarEncoder {
1593 const OK_COLUMN_NAME: &'static str = "ok";
1594 const ERR_COLUMN_NAME: &'static str = "err";
1595
1596 pub fn new(desc: &RelationDesc) -> Self {
1597 let row_encoder = match RowColumnarEncoder::new(desc) {
1598 Some(encoder) => SourceDataRowColumnarEncoder::Row(encoder),
1599 None => {
1600 assert!(desc.typ().columns().is_empty());
1601 SourceDataRowColumnarEncoder::EmptyRow
1602 }
1603 };
1604 let err_encoder = BinaryBuilder::new();
1605
1606 SourceDataColumnarEncoder {
1607 row_encoder,
1608 err_encoder,
1609 }
1610 }
1611}
1612
1613impl ColumnEncoder<SourceData> for SourceDataColumnarEncoder {
1614 type FinishedColumn = StructArray;
1615
1616 fn goodbytes(&self) -> usize {
1617 self.row_encoder.goodbytes() + self.err_encoder.values_slice().len()
1618 }
1619
1620 #[inline]
1621 fn append(&mut self, val: &SourceData) {
1622 match val.0.as_ref() {
1623 Ok(row) => {
1624 self.row_encoder.append(row);
1625 self.err_encoder.append_null();
1626 }
1627 Err(err) => {
1628 self.row_encoder.append_null();
1629 self.err_encoder
1630 .append_value(err.into_proto().encode_to_vec());
1631 }
1632 }
1633 }
1634
1635 #[inline]
1636 fn append_null(&mut self) {
1637 panic!("appending a null into SourceDataColumnarEncoder is not supported");
1638 }
1639
1640 fn finish(self) -> Self::FinishedColumn {
1641 let SourceDataColumnarEncoder {
1642 row_encoder,
1643 mut err_encoder,
1644 } = self;
1645
1646 let err_column = BinaryBuilder::finish(&mut err_encoder);
1647 let row_column: ArrayRef = match row_encoder {
1648 SourceDataRowColumnarEncoder::Row(encoder) => {
1649 let column = encoder.finish();
1650 Arc::new(column)
1651 }
1652 SourceDataRowColumnarEncoder::EmptyRow => Arc::new(NullArray::new(err_column.len())),
1653 };
1654
1655 assert_eq!(row_column.len(), err_column.len());
1656
1657 let fields = vec![
1658 Field::new(Self::OK_COLUMN_NAME, row_column.data_type().clone(), true),
1659 Field::new(Self::ERR_COLUMN_NAME, err_column.data_type().clone(), true),
1660 ];
1661 let arrays: Vec<Arc<dyn Array>> = vec![row_column, Arc::new(err_column)];
1662 StructArray::new(Fields::from(fields), arrays, None)
1663 }
1664}
1665
1666impl Schema<SourceData> for RelationDesc {
1667 type ArrowColumn = StructArray;
1668 type Statistics = StructStats;
1669
1670 type Decoder = SourceDataColumnarDecoder;
1671 type Encoder = SourceDataColumnarEncoder;
1672
1673 fn decoder(&self, col: Self::ArrowColumn) -> Result<Self::Decoder, anyhow::Error> {
1674 SourceDataColumnarDecoder::new(col, self)
1675 }
1676
1677 fn encoder(&self) -> Result<Self::Encoder, anyhow::Error> {
1678 Ok(SourceDataColumnarEncoder::new(self))
1679 }
1680}
1681
1682#[cfg(test)]
1683mod tests {
1684 use arrow::array::{ArrayData, make_comparator};
1685 use base64::Engine;
1686 use bytes::Bytes;
1687 use mz_expr::EvalError;
1688 use mz_ore::assert_err;
1689 use mz_ore::metrics::MetricsRegistry;
1690 use mz_persist::indexed::columnar::arrow::{realloc_any, realloc_array};
1691 use mz_persist::metrics::ColumnarMetrics;
1692 use mz_persist_types::parquet::EncodingConfig;
1693 use mz_persist_types::schema::{Migration, backward_compatible};
1694 use mz_persist_types::stats::{PartStats, PartStatsMetrics};
1695 use mz_repr::{
1696 ColumnIndex, DatumVec, PropRelationDescDiff, ProtoRelationDesc, RelationDescBuilder,
1697 RowArena, SqlScalarType, arb_relation_desc_diff, arb_relation_desc_projection,
1698 };
1699 use proptest::prelude::*;
1700 use proptest::strategy::{Union, ValueTree};
1701
1702 use crate::stats::RelationPartStats;
1703
1704 use super::*;
1705
1706 #[mz_ore::test]
1707 fn test_timeline_parsing() {
1708 assert_eq!(Ok(Timeline::EpochMilliseconds), "M".parse());
1709 assert_eq!(Ok(Timeline::External("JOE".to_string())), "E.JOE".parse());
1710 assert_eq!(Ok(Timeline::User("MIKE".to_string())), "U.MIKE".parse());
1711
1712 assert_err!("Materialize".parse::<Timeline>());
1713 assert_err!("Ejoe".parse::<Timeline>());
1714 assert_err!("Umike".parse::<Timeline>());
1715 assert_err!("Dance".parse::<Timeline>());
1716 assert_err!("".parse::<Timeline>());
1717 }
1718
1719 #[track_caller]
1720 fn roundtrip_source_data(
1721 desc: &RelationDesc,
1722 datas: Vec<SourceData>,
1723 read_desc: &RelationDesc,
1724 config: &EncodingConfig,
1725 ) {
1726 let metrics = ColumnarMetrics::disconnected();
1727 let mut encoder = <RelationDesc as Schema<SourceData>>::encoder(desc).unwrap();
1728 for data in &datas {
1729 encoder.append(data);
1730 }
1731 let col = encoder.finish();
1732
1733 assert!(!col.is_nullable());
1735
1736 let col = realloc_array(&col, &metrics);
1738
1739 {
1741 let proto = col.to_data().into_proto();
1742 let bytes = proto.encode_to_vec();
1743 let proto = mz_persist_types::arrow::ProtoArrayData::decode(&bytes[..]).unwrap();
1744 let array_data: ArrayData = proto.into_rust().unwrap();
1745
1746 let col_rnd = StructArray::from(array_data.clone());
1747 assert_eq!(col, col_rnd);
1748
1749 let col_dyn = arrow::array::make_array(array_data);
1750 let col_dyn = col_dyn.as_any().downcast_ref::<StructArray>().unwrap();
1751 assert_eq!(&col, col_dyn);
1752 }
1753
1754 let mut buf = Vec::new();
1756 let fields = Fields::from(vec![Field::new("k", col.data_type().clone(), false)]);
1757 let arrays: Vec<Arc<dyn Array>> = vec![Arc::new(col.clone())];
1758 mz_persist_types::parquet::encode_arrays(&mut buf, fields, arrays, config).unwrap();
1759
1760 let buf = Bytes::from(buf);
1762 let mut reader = mz_persist_types::parquet::decode_arrays(buf).unwrap();
1763 let maybe_batch = reader.next();
1764
1765 let Some(record_batch) = maybe_batch else {
1767 assert!(datas.is_empty());
1768 return;
1769 };
1770 let record_batch = record_batch.unwrap();
1771
1772 assert_eq!(record_batch.columns().len(), 1);
1773 let rnd_col = &record_batch.columns()[0];
1774 let rnd_col = realloc_any(Arc::clone(rnd_col), &metrics);
1775 let rnd_col = rnd_col
1776 .as_any()
1777 .downcast_ref::<StructArray>()
1778 .unwrap()
1779 .clone();
1780
1781 let stats = <RelationDesc as Schema<SourceData>>::decoder_any(desc, &rnd_col)
1783 .expect("valid decoder")
1784 .stats();
1785
1786 let mut rnd_data = SourceData(Ok(Row::default()));
1788 let decoder = <RelationDesc as Schema<SourceData>>::decoder(desc, rnd_col.clone()).unwrap();
1789 for (idx, og_data) in datas.iter().enumerate() {
1790 decoder.decode(idx, &mut rnd_data);
1791 assert_eq!(og_data, &rnd_data);
1792 }
1793
1794 let stats_metrics = PartStatsMetrics::new(&MetricsRegistry::new());
1797 let stats = RelationPartStats {
1798 name: "test",
1799 metrics: &stats_metrics,
1800 stats: &PartStats { key: stats },
1801 desc: read_desc,
1802 };
1803 let mut datum_vec = DatumVec::new();
1804 let arena = RowArena::default();
1805 let decoder = <RelationDesc as Schema<SourceData>>::decoder(read_desc, rnd_col).unwrap();
1806
1807 for (idx, og_data) in datas.iter().enumerate() {
1808 decoder.decode(idx, &mut rnd_data);
1809 match (&og_data.0, &rnd_data.0) {
1810 (Ok(og_row), Ok(rnd_row)) => {
1811 {
1813 let datums = datum_vec.borrow_with(og_row);
1814 let projected_datums =
1815 datums.iter().enumerate().filter_map(|(idx, datum)| {
1816 read_desc
1817 .contains_index(&ColumnIndex::from_raw(idx))
1818 .then_some(datum)
1819 });
1820 let og_projected_row = Row::pack(projected_datums);
1821 assert_eq!(&og_projected_row, rnd_row);
1822 }
1823
1824 {
1826 let proj_datums = datum_vec.borrow_with(rnd_row);
1827 for (pos, (idx, _, _)) in read_desc.iter_all().enumerate() {
1828 let spec = stats.col_stats(idx, &arena);
1829 assert!(spec.may_contain(proj_datums[pos]));
1830 }
1831 }
1832 }
1833 (Err(_), Err(_)) => assert_eq!(og_data, &rnd_data),
1834 (_, _) => panic!("decoded to a different type? {og_data:?} {rnd_data:?}"),
1835 }
1836 }
1837
1838 let encoded_schema = SourceData::encode_schema(desc);
1841 let roundtrip_desc = SourceData::decode_schema(&encoded_schema);
1842 assert_eq!(desc, &roundtrip_desc);
1843
1844 let migration =
1847 mz_persist_types::schema::backward_compatible(col.data_type(), col.data_type());
1848 let migration = migration.expect("should be backward compatible with self");
1849 let migrated = migration.migrate(Arc::new(col.clone()));
1851 assert_eq!(col.data_type(), migrated.data_type());
1852 }
1853
1854 #[mz_ore::test]
1855 #[cfg_attr(miri, ignore)] fn all_source_data_roundtrips() {
1857 let mut weights = vec![(500, Just(0..8)), (50, Just(8..32))];
1858 if std::env::var("PROPTEST_LARGE_DATA").is_ok() {
1859 weights.extend([
1860 (10, Just(32..128)),
1861 (5, Just(128..512)),
1862 (3, Just(512..2048)),
1863 (1, Just(2048..8192)),
1864 ]);
1865 }
1866 let num_rows = Union::new_weighted(weights);
1867
1868 let strat = (any::<RelationDesc>(), num_rows)
1870 .prop_flat_map(|(desc, num_rows)| {
1871 arb_relation_desc_projection(desc.clone())
1872 .prop_map(move |read_desc| (desc.clone(), read_desc, num_rows.clone()))
1873 })
1874 .prop_flat_map(|(desc, read_desc, num_rows)| {
1875 proptest::collection::vec(arb_source_data_for_relation_desc(&desc), num_rows)
1876 .prop_map(move |datas| (desc.clone(), datas, read_desc.clone()))
1877 });
1878
1879 proptest!(|((config, (desc, source_datas, read_desc)) in (any::<EncodingConfig>(), strat))| {
1880 roundtrip_source_data(&desc, source_datas, &read_desc, &config);
1881 });
1882 }
1883
1884 #[mz_ore::test]
1885 fn roundtrip_error_nulls() {
1886 let desc = RelationDescBuilder::default()
1887 .with_column(
1888 "ts",
1889 SqlScalarType::TimestampTz { precision: None }.nullable(false),
1890 )
1891 .finish();
1892 let source_datas = vec![SourceData(Err(DataflowError::EvalError(
1893 EvalError::DateOutOfRange.into(),
1894 )))];
1895 let config = EncodingConfig::default();
1896 roundtrip_source_data(&desc, source_datas, &desc, &config);
1897 }
1898
1899 fn is_sorted(array: &dyn Array) -> bool {
1900 let sort_options = arrow::compute::SortOptions::default();
1901 let Ok(cmp) = make_comparator(array, array, sort_options) else {
1902 return false;
1908 };
1909 (0..array.len())
1910 .tuple_windows()
1911 .all(|(i, j)| cmp(i, j).is_le())
1912 }
1913
1914 fn get_data_type(schema: &impl Schema<SourceData>) -> arrow::datatypes::DataType {
1915 use mz_persist_types::columnar::ColumnEncoder;
1916 let array = Schema::encoder(schema).expect("valid schema").finish();
1917 Array::data_type(&array).clone()
1918 }
1919
1920 #[track_caller]
1921 fn backward_compatible_testcase(
1922 old: &RelationDesc,
1923 new: &RelationDesc,
1924 migration: Migration,
1925 datas: &[SourceData],
1926 ) {
1927 let mut encoder = Schema::<SourceData>::encoder(old).expect("valid schema");
1928 for data in datas {
1929 encoder.append(data);
1930 }
1931 let old = encoder.finish();
1932 let new = Schema::<SourceData>::encoder(new)
1933 .expect("valid schema")
1934 .finish();
1935 let old: Arc<dyn Array> = Arc::new(old);
1936 let new: Arc<dyn Array> = Arc::new(new);
1937 let migrated = migration.migrate(Arc::clone(&old));
1938 assert_eq!(migrated.data_type(), new.data_type());
1939
1940 if migration.preserves_order() && is_sorted(&old) {
1942 assert!(is_sorted(&new))
1943 }
1944 }
1945
1946 #[mz_ore::test]
1947 fn backward_compatible_empty_add_column() {
1948 let old = RelationDesc::empty();
1949 let new = RelationDesc::from_names_and_types([("a", SqlScalarType::Bool.nullable(true))]);
1950
1951 let old_data_type = get_data_type(&old);
1952 let new_data_type = get_data_type(&new);
1953
1954 let migration = backward_compatible(&old_data_type, &new_data_type);
1955 assert!(migration.is_some());
1956 }
1957
1958 #[mz_ore::test]
1959 fn backward_compatible_project_away_all() {
1960 let old = RelationDesc::from_names_and_types([("a", SqlScalarType::Bool.nullable(true))]);
1961 let new = RelationDesc::empty();
1962
1963 let old_data_type = get_data_type(&old);
1964 let new_data_type = get_data_type(&new);
1965
1966 let migration = backward_compatible(&old_data_type, &new_data_type);
1967 assert!(migration.is_some());
1968 }
1969
1970 #[mz_ore::test]
1971 #[cfg_attr(miri, ignore)]
1972 fn backward_compatible_migrate() {
1973 let strat = (any::<RelationDesc>(), any::<RelationDesc>()).prop_flat_map(|(old, new)| {
1974 proptest::collection::vec(arb_source_data_for_relation_desc(&old), 2)
1975 .prop_map(move |datas| (old.clone(), new.clone(), datas))
1976 });
1977
1978 proptest!(|((old, new, datas) in strat)| {
1979 let old_data_type = get_data_type(&old);
1980 let new_data_type = get_data_type(&new);
1981
1982 if let Some(migration) = backward_compatible(&old_data_type, &new_data_type) {
1983 backward_compatible_testcase(&old, &new, migration, &datas);
1984 };
1985 });
1986 }
1987
1988 #[mz_ore::test]
1989 #[cfg_attr(miri, ignore)]
1990 fn backward_compatible_migrate_from_common() {
1991 use mz_repr::SqlColumnType;
1992 fn test_case(old: RelationDesc, diffs: Vec<PropRelationDescDiff>, datas: Vec<SourceData>) {
1993 let should_be_compatible = diffs.iter().all(|diff| match diff {
1995 PropRelationDescDiff::AddColumn {
1997 typ: SqlColumnType { nullable, .. },
1998 ..
1999 } => *nullable,
2000 PropRelationDescDiff::DropColumn { .. } => true,
2001 _ => false,
2002 });
2003
2004 let mut new = old.clone();
2005 for diff in diffs.into_iter() {
2006 diff.apply(&mut new)
2007 }
2008
2009 let old_data_type = get_data_type(&old);
2010 let new_data_type = get_data_type(&new);
2011
2012 if let Some(migration) = backward_compatible(&old_data_type, &new_data_type) {
2013 backward_compatible_testcase(&old, &new, migration, &datas);
2014 } else if should_be_compatible {
2015 panic!("new DataType was not compatible when it should have been!");
2016 }
2017 }
2018
2019 let strat = any::<RelationDesc>()
2020 .prop_flat_map(|desc| {
2021 proptest::collection::vec(arb_source_data_for_relation_desc(&desc), 2)
2022 .no_shrink()
2023 .prop_map(move |datas| (desc.clone(), datas))
2024 })
2025 .prop_flat_map(|(desc, datas)| {
2026 arb_relation_desc_diff(&desc)
2027 .prop_map(move |diffs| (desc.clone(), diffs, datas.clone()))
2028 });
2029
2030 proptest!(|((old, diffs, datas) in strat)| {
2031 test_case(old, diffs, datas);
2032 });
2033 }
2034
2035 #[mz_ore::test]
2036 #[cfg_attr(miri, ignore)] fn empty_relation_desc_roundtrips() {
2038 let empty = RelationDesc::empty();
2039 let rows = proptest::collection::vec(arb_source_data_for_relation_desc(&empty), 0..8)
2040 .prop_map(move |datas| (empty.clone(), datas));
2041
2042 proptest!(|((config, (desc, source_datas)) in (any::<EncodingConfig>(), rows))| {
2045 roundtrip_source_data(&desc, source_datas, &desc, &config);
2046 });
2047 }
2048
2049 #[mz_ore::test]
2050 #[cfg_attr(miri, ignore)] fn arrow_datatype_consistent() {
2052 fn test_case(desc: RelationDesc, datas: Vec<SourceData>) {
2053 let half = datas.len() / 2;
2054
2055 let mut encoder_a = <RelationDesc as Schema<SourceData>>::encoder(&desc).unwrap();
2056 for data in &datas[..half] {
2057 encoder_a.append(data);
2058 }
2059 let col_a = encoder_a.finish();
2060
2061 let mut encoder_b = <RelationDesc as Schema<SourceData>>::encoder(&desc).unwrap();
2062 for data in &datas[half..] {
2063 encoder_b.append(data);
2064 }
2065 let col_b = encoder_b.finish();
2066
2067 assert_eq!(col_a.data_type(), col_b.data_type());
2070 }
2071
2072 let num_rows = 12;
2073 let strat = any::<RelationDesc>().prop_flat_map(|desc| {
2074 proptest::collection::vec(arb_source_data_for_relation_desc(&desc), num_rows)
2075 .prop_map(move |datas| (desc.clone(), datas))
2076 });
2077
2078 proptest!(|((desc, data) in strat)| {
2079 test_case(desc, data);
2080 });
2081 }
2082
2083 #[mz_ore::test]
2084 #[cfg_attr(miri, ignore)] fn source_proto_serialization_stability() {
2086 let min_protos = 10;
2087 let encoded = include_str!("snapshots/source-datas.txt");
2088
2089 let mut decoded: Vec<(RelationDesc, SourceData)> = encoded
2091 .lines()
2092 .map(|s| {
2093 let (desc, data) = s.split_once(',').expect("comma separated data");
2094 let desc = base64::engine::general_purpose::STANDARD
2095 .decode(desc)
2096 .expect("valid base64");
2097 let data = base64::engine::general_purpose::STANDARD
2098 .decode(data)
2099 .expect("valid base64");
2100 (desc, data)
2101 })
2102 .map(|(desc, data)| {
2103 let desc = ProtoRelationDesc::decode(&desc[..]).expect("valid proto");
2104 let desc = desc.into_rust().expect("valid proto");
2105 let data = SourceData::decode(&data, &desc).expect("valid proto");
2106 (desc, data)
2107 })
2108 .collect();
2109
2110 let mut runner = proptest::test_runner::TestRunner::deterministic();
2112 let strategy = RelationDesc::arbitrary().prop_flat_map(|desc| {
2113 arb_source_data_for_relation_desc(&desc).prop_map(move |data| (desc.clone(), data))
2114 });
2115 while decoded.len() < min_protos {
2116 let arbitrary_data = strategy
2117 .new_tree(&mut runner)
2118 .expect("source data")
2119 .current();
2120 decoded.push(arbitrary_data);
2121 }
2122
2123 let mut reencoded = String::new();
2125 let mut buf = vec![];
2126 for (desc, data) in decoded {
2127 buf.clear();
2128 desc.into_proto().encode(&mut buf).expect("success");
2129 base64::engine::general_purpose::STANDARD.encode_string(buf.as_slice(), &mut reencoded);
2130 reencoded.push(',');
2131
2132 buf.clear();
2133 data.encode(&mut buf);
2134 base64::engine::general_purpose::STANDARD.encode_string(buf.as_slice(), &mut reencoded);
2135 reencoded.push('\n');
2136 }
2137
2138 assert_eq!(
2149 encoded,
2150 reencoded.as_str(),
2151 "SourceData serde should be stable"
2152 )
2153 }
2154}