1use std::collections::BTreeMap;
13use std::fmt::Debug;
14use std::hash::Hash;
15use std::ops::{Add, AddAssign, Deref, DerefMut};
16use std::str::FromStr;
17use std::sync::Arc;
18use std::time::Duration;
19
20use arrow::array::{Array, ArrayRef, BinaryArray, BinaryBuilder, NullArray, StructArray};
21use arrow::datatypes::{Field, Fields};
22use bytes::{BufMut, Bytes};
23use columnation::Columnation;
24use itertools::EitherOrBoth::Both;
25use itertools::Itertools;
26use kafka::KafkaSourceExportDetails;
27use load_generator::{LoadGeneratorOutput, LoadGeneratorSourceExportDetails};
28use mz_ore::assert_none;
29use mz_persist_types::Codec;
30use mz_persist_types::arrow::ArrayOrd;
31use mz_persist_types::columnar::{ColumnDecoder, ColumnEncoder, Schema};
32use mz_persist_types::stats::{
33 ColumnNullStats, ColumnStatKinds, ColumnarStats, ColumnarStatsBuilder, PrimitiveStats,
34 StructStats,
35};
36use mz_proto::{IntoRustIfSome, ProtoType, RustType, TryFromProtoError};
37use mz_repr::{
38 CatalogItemId, Datum, GlobalId, ProtoRelationDesc, ProtoRow, RelationDesc, Row,
39 RowColumnarDecoder, RowColumnarEncoder, arb_row_for_relation,
40};
41use mz_sql_parser::ast::{Ident, IdentError, UnresolvedItemName};
42use proptest::prelude::any;
43use proptest::strategy::Strategy;
44use prost::Message;
45use serde::{Deserialize, Serialize};
46use timely::order::{PartialOrder, TotalOrder};
47use timely::progress::timestamp::Refines;
48use timely::progress::{PathSummary, Timestamp};
49
50use crate::AlterCompatible;
51use crate::connections::inline::{
52 ConnectionAccess, ConnectionResolver, InlinedConnection, IntoInlineConnection,
53 ReferencedConnection,
54};
55use crate::controller::AlterError;
56use crate::errors::{DataflowError, ProtoDataflowError};
57use crate::instances::StorageInstanceId;
58use crate::sources::sql_server::SqlServerSourceExportDetails;
59
60pub mod encoding;
61pub mod envelope;
62pub mod kafka;
63pub mod load_generator;
64pub mod mysql;
65pub mod postgres;
66pub mod sql_server;
67
68pub use crate::sources::envelope::SourceEnvelope;
69pub use crate::sources::kafka::KafkaSourceConnection;
70pub use crate::sources::load_generator::LoadGeneratorSourceConnection;
71pub use crate::sources::mysql::{MySqlSourceConnection, MySqlSourceExportDetails};
72pub use crate::sources::postgres::{PostgresSourceConnection, PostgresSourceExportDetails};
73pub use crate::sources::sql_server::{SqlServerSourceConnection, SqlServerSourceExtras};
74
75include!(concat!(env!("OUT_DIR"), "/mz_storage_types.sources.rs"));
76
77#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
79pub struct IngestionDescription<S: 'static = (), C: ConnectionAccess = InlinedConnection> {
80 pub desc: SourceDesc<C>,
82 pub source_exports: BTreeMap<GlobalId, SourceExport<S>>,
96 pub instance_id: StorageInstanceId,
98 pub remap_collection_id: GlobalId,
100 pub remap_metadata: S,
102}
103
104impl IngestionDescription {
105 pub fn new(
106 desc: SourceDesc,
107 instance_id: StorageInstanceId,
108 remap_collection_id: GlobalId,
109 ) -> Self {
110 Self {
111 desc,
112 remap_metadata: (),
113 source_exports: BTreeMap::new(),
114 instance_id,
115 remap_collection_id,
116 }
117 }
118}
119
120impl<S> IngestionDescription<S> {
121 pub fn collection_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
126 let IngestionDescription {
129 desc: _,
130 remap_metadata: _,
131 source_exports,
132 instance_id: _,
133 remap_collection_id,
134 } = &self;
135
136 source_exports
137 .keys()
138 .copied()
139 .chain(std::iter::once(*remap_collection_id))
140 }
141}
142
143impl<S: Debug + Eq + PartialEq + AlterCompatible> AlterCompatible for IngestionDescription<S> {
144 fn alter_compatible(
145 &self,
146 id: GlobalId,
147 other: &IngestionDescription<S>,
148 ) -> Result<(), AlterError> {
149 if self == other {
150 return Ok(());
151 }
152 let IngestionDescription {
153 desc,
154 remap_metadata,
155 source_exports,
156 instance_id,
157 remap_collection_id,
158 } = self;
159
160 let compatibility_checks = [
161 (desc.alter_compatible(id, &other.desc).is_ok(), "desc"),
162 (remap_metadata == &other.remap_metadata, "remap_metadata"),
163 (
164 source_exports
165 .iter()
166 .merge_join_by(&other.source_exports, |(l_key, _), (r_key, _)| {
167 l_key.cmp(r_key)
168 })
169 .all(|r| match r {
170 Both(
171 (
172 _,
173 SourceExport {
174 storage_metadata: l_metadata,
175 details: l_details,
176 data_config: l_data_config,
177 },
178 ),
179 (
180 _,
181 SourceExport {
182 storage_metadata: r_metadata,
183 details: r_details,
184 data_config: r_data_config,
185 },
186 ),
187 ) => {
188 l_metadata.alter_compatible(id, r_metadata).is_ok()
189 && l_details.alter_compatible(id, r_details).is_ok()
190 && l_data_config.alter_compatible(id, r_data_config).is_ok()
191 }
192 _ => true,
193 }),
194 "source_exports",
195 ),
196 (instance_id == &other.instance_id, "instance_id"),
197 (
198 remap_collection_id == &other.remap_collection_id,
199 "remap_collection_id",
200 ),
201 ];
202 for (compatible, field) in compatibility_checks {
203 if !compatible {
204 tracing::warn!(
205 "IngestionDescription incompatible at {field}:\nself:\n{:#?}\n\nother\n{:#?}",
206 self,
207 other
208 );
209
210 return Err(AlterError { id });
211 }
212 }
213
214 Ok(())
215 }
216}
217
218impl<R: ConnectionResolver> IntoInlineConnection<IngestionDescription, R>
219 for IngestionDescription<(), ReferencedConnection>
220{
221 fn into_inline_connection(self, r: R) -> IngestionDescription {
222 let IngestionDescription {
223 desc,
224 remap_metadata,
225 source_exports,
226 instance_id,
227 remap_collection_id,
228 } = self;
229
230 IngestionDescription {
231 desc: desc.into_inline_connection(r),
232 remap_metadata,
233 source_exports,
234 instance_id,
235 remap_collection_id,
236 }
237 }
238}
239
240#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
241pub struct SourceExport<S = (), C: ConnectionAccess = InlinedConnection> {
242 pub storage_metadata: S,
244 pub details: SourceExportDetails,
246 pub data_config: SourceExportDataConfig<C>,
248}
249
250pub trait SourceTimestamp:
251 Timestamp + Columnation + Refines<()> + std::fmt::Display + Sync
252{
253 fn encode_row(&self) -> Row;
254 fn decode_row(row: &Row) -> Self;
255}
256
257impl SourceTimestamp for MzOffset {
258 fn encode_row(&self) -> Row {
259 Row::pack([Datum::UInt64(self.offset)])
260 }
261
262 fn decode_row(row: &Row) -> Self {
263 let mut datums = row.iter();
264 match (datums.next(), datums.next()) {
265 (Some(Datum::UInt64(offset)), None) => MzOffset::from(offset),
266 _ => panic!("invalid row {row:?}"),
267 }
268 }
269}
270
271#[derive(
275 Copy,
276 Clone,
277 Default,
278 Debug,
279 PartialEq,
280 PartialOrd,
281 Eq,
282 Ord,
283 Hash,
284 Serialize,
285 Deserialize
286)]
287pub struct MzOffset {
288 pub offset: u64,
289}
290
291impl differential_dataflow::difference::Semigroup for MzOffset {
292 fn plus_equals(&mut self, rhs: &Self) {
293 self.offset.plus_equals(&rhs.offset)
294 }
295}
296
297impl differential_dataflow::difference::IsZero for MzOffset {
298 fn is_zero(&self) -> bool {
299 self.offset.is_zero()
300 }
301}
302
303impl mz_persist_types::Codec64 for MzOffset {
304 fn codec_name() -> String {
305 "MzOffset".to_string()
306 }
307
308 fn encode(&self) -> [u8; 8] {
309 mz_persist_types::Codec64::encode(&self.offset)
310 }
311
312 fn decode(buf: [u8; 8]) -> Self {
313 Self {
314 offset: mz_persist_types::Codec64::decode(buf),
315 }
316 }
317}
318
319impl columnation::Columnation for MzOffset {
320 type InnerRegion = columnation::CopyRegion<MzOffset>;
321}
322
323impl MzOffset {
324 pub fn checked_sub(self, other: Self) -> Option<Self> {
325 self.offset
326 .checked_sub(other.offset)
327 .map(|offset| Self { offset })
328 }
329}
330
331impl From<u64> for MzOffset {
334 fn from(offset: u64) -> Self {
335 Self { offset }
336 }
337}
338
339impl std::fmt::Display for MzOffset {
340 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
341 write!(f, "{}", self.offset)
342 }
343}
344
345impl Add<u64> for MzOffset {
347 type Output = MzOffset;
348
349 fn add(self, x: u64) -> MzOffset {
350 MzOffset {
351 offset: self.offset + x,
352 }
353 }
354}
355impl Add<Self> for MzOffset {
356 type Output = Self;
357
358 fn add(self, x: Self) -> Self {
359 MzOffset {
360 offset: self.offset + x.offset,
361 }
362 }
363}
364impl AddAssign<u64> for MzOffset {
365 fn add_assign(&mut self, x: u64) {
366 self.offset += x;
367 }
368}
369impl AddAssign<Self> for MzOffset {
370 fn add_assign(&mut self, x: Self) {
371 self.offset += x.offset;
372 }
373}
374
375impl From<tokio_postgres::types::PgLsn> for MzOffset {
377 fn from(lsn: tokio_postgres::types::PgLsn) -> Self {
378 MzOffset { offset: lsn.into() }
379 }
380}
381
382impl Timestamp for MzOffset {
383 type Summary = MzOffset;
384
385 fn minimum() -> Self {
386 MzOffset {
387 offset: Timestamp::minimum(),
388 }
389 }
390}
391
392impl PathSummary<MzOffset> for MzOffset {
393 fn results_in(&self, src: &MzOffset) -> Option<MzOffset> {
394 Some(MzOffset {
395 offset: self.offset.results_in(&src.offset)?,
396 })
397 }
398
399 fn followed_by(&self, other: &Self) -> Option<Self> {
400 Some(MzOffset {
401 offset: PathSummary::<u64>::followed_by(&self.offset, &other.offset)?,
402 })
403 }
404}
405
406impl Refines<()> for MzOffset {
407 fn to_inner(_: ()) -> Self {
408 MzOffset::minimum()
409 }
410 fn to_outer(self) {}
411 fn summarize(_: Self::Summary) {}
412}
413
414impl PartialOrder for MzOffset {
415 #[inline]
416 fn less_equal(&self, other: &Self) -> bool {
417 self.offset.less_equal(&other.offset)
418 }
419}
420
421impl TotalOrder for MzOffset {}
422
423#[derive(
432 Clone,
433 Debug,
434 Ord,
435 PartialOrd,
436 Eq,
437 PartialEq,
438 Serialize,
439 Deserialize,
440 Hash
441)]
442pub enum Timeline {
443 EpochMilliseconds,
446 External(String),
450 User(String),
454}
455
456impl Timeline {
457 const EPOCH_MILLISECOND_ID_CHAR: char = 'M';
458 const EXTERNAL_ID_CHAR: char = 'E';
459 const USER_ID_CHAR: char = 'U';
460
461 fn id_char(&self) -> char {
462 match self {
463 Self::EpochMilliseconds => Self::EPOCH_MILLISECOND_ID_CHAR,
464 Self::External(_) => Self::EXTERNAL_ID_CHAR,
465 Self::User(_) => Self::USER_ID_CHAR,
466 }
467 }
468}
469
470impl ToString for Timeline {
471 fn to_string(&self) -> String {
472 match self {
473 Self::EpochMilliseconds => format!("{}", self.id_char()),
474 Self::External(id) => format!("{}.{id}", self.id_char()),
475 Self::User(id) => format!("{}.{id}", self.id_char()),
476 }
477 }
478}
479
480impl FromStr for Timeline {
481 type Err = String;
482
483 fn from_str(s: &str) -> Result<Self, Self::Err> {
484 if s.is_empty() {
485 return Err("empty timeline".to_string());
486 }
487 let mut chars = s.chars();
488 match chars.next().expect("non-empty string") {
489 Self::EPOCH_MILLISECOND_ID_CHAR => match chars.next() {
490 None => Ok(Self::EpochMilliseconds),
491 Some(_) => Err(format!("unknown timeline: {s}")),
492 },
493 Self::EXTERNAL_ID_CHAR => match chars.next() {
494 Some('.') => Ok(Self::External(chars.as_str().to_string())),
495 _ => Err(format!("unknown timeline: {s}")),
496 },
497 Self::USER_ID_CHAR => match chars.next() {
498 Some('.') => Ok(Self::User(chars.as_str().to_string())),
499 _ => Err(format!("unknown timeline: {s}")),
500 },
501 _ => Err(format!("unknown timeline: {s}")),
502 }
503 }
504}
505
506pub trait SourceConnection: Debug + Clone + PartialEq + AlterCompatible {
508 fn name(&self) -> &'static str;
510
511 fn external_reference(&self) -> Option<&str>;
513
514 fn default_key_desc(&self) -> RelationDesc;
518
519 fn default_value_desc(&self) -> RelationDesc;
523
524 fn timestamp_desc(&self) -> RelationDesc;
527
528 fn connection_id(&self) -> Option<CatalogItemId>;
531
532 fn supports_read_only(&self) -> bool;
534
535 fn prefers_single_replica(&self) -> bool;
537}
538
539#[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize, Deserialize)]
540pub enum Compression {
541 Gzip,
542 None,
543}
544
545#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
548pub struct SourceExportDataConfig<C: ConnectionAccess = InlinedConnection> {
549 pub encoding: Option<encoding::SourceDataEncoding<C>>,
550 pub envelope: SourceEnvelope,
551}
552
553impl<R: ConnectionResolver> IntoInlineConnection<SourceExportDataConfig, R>
554 for SourceExportDataConfig<ReferencedConnection>
555{
556 fn into_inline_connection(self, r: R) -> SourceExportDataConfig {
557 let SourceExportDataConfig { encoding, envelope } = self;
558
559 SourceExportDataConfig {
560 encoding: encoding.map(|e| e.into_inline_connection(r)),
561 envelope,
562 }
563 }
564}
565
566impl<C: ConnectionAccess> AlterCompatible for SourceExportDataConfig<C> {
567 fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
568 if self == other {
569 return Ok(());
570 }
571 let Self { encoding, envelope } = &self;
572
573 let compatibility_checks = [
574 (
575 match (encoding, &other.encoding) {
576 (Some(s), Some(o)) => s.alter_compatible(id, o).is_ok(),
577 (s, o) => s == o,
578 },
579 "encoding",
580 ),
581 (envelope == &other.envelope, "envelope"),
582 ];
583
584 for (compatible, field) in compatibility_checks {
585 if !compatible {
586 tracing::warn!(
587 "SourceDesc incompatible {field}:\nself:\n{:#?}\n\nother\n{:#?}",
588 self,
589 other
590 );
591
592 return Err(AlterError { id });
593 }
594 }
595 Ok(())
596 }
597}
598
599impl<C: ConnectionAccess> SourceExportDataConfig<C> {
600 pub fn monotonic(&self, connection: &GenericSourceConnection<C>) -> bool {
607 match &self.envelope {
608 SourceEnvelope::Upsert(_) | SourceEnvelope::CdcV2 => false,
610 SourceEnvelope::None(_) => {
611 match connection {
612 GenericSourceConnection::Postgres(_) => false,
614 GenericSourceConnection::MySql(_) => false,
616 GenericSourceConnection::SqlServer(_) => false,
618 GenericSourceConnection::LoadGenerator(g) => g.load_generator.is_monotonic(),
620 GenericSourceConnection::Kafka(_) => true,
622 }
623 }
624 }
625 }
626}
627
628#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
630pub struct SourceDesc<C: ConnectionAccess = InlinedConnection> {
631 pub connection: GenericSourceConnection<C>,
632 pub timestamp_interval: Duration,
633}
634
635impl<R: ConnectionResolver> IntoInlineConnection<SourceDesc, R>
636 for SourceDesc<ReferencedConnection>
637{
638 fn into_inline_connection(self, r: R) -> SourceDesc {
639 let SourceDesc {
640 connection,
641 timestamp_interval,
642 } = self;
643
644 SourceDesc {
645 connection: connection.into_inline_connection(&r),
646 timestamp_interval,
647 }
648 }
649}
650
651impl<C: ConnectionAccess> AlterCompatible for SourceDesc<C> {
652 fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
656 if self == other {
657 return Ok(());
658 }
659 let Self {
660 connection,
661 timestamp_interval: _,
663 } = &self;
664
665 let compatibility_checks = [(
666 connection.alter_compatible(id, &other.connection).is_ok(),
667 "connection",
668 )];
669
670 for (compatible, field) in compatibility_checks {
671 if !compatible {
672 tracing::warn!(
673 "SourceDesc incompatible {field}:\nself:\n{:#?}\n\nother\n{:#?}",
674 self,
675 other
676 );
677
678 return Err(AlterError { id });
679 }
680 }
681
682 Ok(())
683 }
684}
685
686#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
687pub enum GenericSourceConnection<C: ConnectionAccess = InlinedConnection> {
688 Kafka(KafkaSourceConnection<C>),
689 Postgres(PostgresSourceConnection<C>),
690 MySql(MySqlSourceConnection<C>),
691 SqlServer(SqlServerSourceConnection<C>),
692 LoadGenerator(LoadGeneratorSourceConnection),
693}
694
695impl<C: ConnectionAccess> From<KafkaSourceConnection<C>> for GenericSourceConnection<C> {
696 fn from(conn: KafkaSourceConnection<C>) -> Self {
697 Self::Kafka(conn)
698 }
699}
700
701impl<C: ConnectionAccess> From<PostgresSourceConnection<C>> for GenericSourceConnection<C> {
702 fn from(conn: PostgresSourceConnection<C>) -> Self {
703 Self::Postgres(conn)
704 }
705}
706
707impl<C: ConnectionAccess> From<MySqlSourceConnection<C>> for GenericSourceConnection<C> {
708 fn from(conn: MySqlSourceConnection<C>) -> Self {
709 Self::MySql(conn)
710 }
711}
712
713impl<C: ConnectionAccess> From<SqlServerSourceConnection<C>> for GenericSourceConnection<C> {
714 fn from(conn: SqlServerSourceConnection<C>) -> Self {
715 Self::SqlServer(conn)
716 }
717}
718
719impl<C: ConnectionAccess> From<LoadGeneratorSourceConnection> for GenericSourceConnection<C> {
720 fn from(conn: LoadGeneratorSourceConnection) -> Self {
721 Self::LoadGenerator(conn)
722 }
723}
724
725impl<R: ConnectionResolver> IntoInlineConnection<GenericSourceConnection, R>
726 for GenericSourceConnection<ReferencedConnection>
727{
728 fn into_inline_connection(self, r: R) -> GenericSourceConnection {
729 match self {
730 GenericSourceConnection::Kafka(kafka) => {
731 GenericSourceConnection::Kafka(kafka.into_inline_connection(r))
732 }
733 GenericSourceConnection::Postgres(pg) => {
734 GenericSourceConnection::Postgres(pg.into_inline_connection(r))
735 }
736 GenericSourceConnection::MySql(mysql) => {
737 GenericSourceConnection::MySql(mysql.into_inline_connection(r))
738 }
739 GenericSourceConnection::SqlServer(sql_server) => {
740 GenericSourceConnection::SqlServer(sql_server.into_inline_connection(r))
741 }
742 GenericSourceConnection::LoadGenerator(lg) => {
743 GenericSourceConnection::LoadGenerator(lg)
744 }
745 }
746 }
747}
748
749impl<C: ConnectionAccess> SourceConnection for GenericSourceConnection<C> {
750 fn name(&self) -> &'static str {
751 match self {
752 Self::Kafka(conn) => conn.name(),
753 Self::Postgres(conn) => conn.name(),
754 Self::MySql(conn) => conn.name(),
755 Self::SqlServer(conn) => conn.name(),
756 Self::LoadGenerator(conn) => conn.name(),
757 }
758 }
759
760 fn external_reference(&self) -> Option<&str> {
761 match self {
762 Self::Kafka(conn) => conn.external_reference(),
763 Self::Postgres(conn) => conn.external_reference(),
764 Self::MySql(conn) => conn.external_reference(),
765 Self::SqlServer(conn) => conn.external_reference(),
766 Self::LoadGenerator(conn) => conn.external_reference(),
767 }
768 }
769
770 fn default_key_desc(&self) -> RelationDesc {
771 match self {
772 Self::Kafka(conn) => conn.default_key_desc(),
773 Self::Postgres(conn) => conn.default_key_desc(),
774 Self::MySql(conn) => conn.default_key_desc(),
775 Self::SqlServer(conn) => conn.default_key_desc(),
776 Self::LoadGenerator(conn) => conn.default_key_desc(),
777 }
778 }
779
780 fn default_value_desc(&self) -> RelationDesc {
781 match self {
782 Self::Kafka(conn) => conn.default_value_desc(),
783 Self::Postgres(conn) => conn.default_value_desc(),
784 Self::MySql(conn) => conn.default_value_desc(),
785 Self::SqlServer(conn) => conn.default_value_desc(),
786 Self::LoadGenerator(conn) => conn.default_value_desc(),
787 }
788 }
789
790 fn timestamp_desc(&self) -> RelationDesc {
791 match self {
792 Self::Kafka(conn) => conn.timestamp_desc(),
793 Self::Postgres(conn) => conn.timestamp_desc(),
794 Self::MySql(conn) => conn.timestamp_desc(),
795 Self::SqlServer(conn) => conn.timestamp_desc(),
796 Self::LoadGenerator(conn) => conn.timestamp_desc(),
797 }
798 }
799
800 fn connection_id(&self) -> Option<CatalogItemId> {
801 match self {
802 Self::Kafka(conn) => conn.connection_id(),
803 Self::Postgres(conn) => conn.connection_id(),
804 Self::MySql(conn) => conn.connection_id(),
805 Self::SqlServer(conn) => conn.connection_id(),
806 Self::LoadGenerator(conn) => conn.connection_id(),
807 }
808 }
809
810 fn supports_read_only(&self) -> bool {
811 match self {
812 GenericSourceConnection::Kafka(conn) => conn.supports_read_only(),
813 GenericSourceConnection::Postgres(conn) => conn.supports_read_only(),
814 GenericSourceConnection::MySql(conn) => conn.supports_read_only(),
815 GenericSourceConnection::SqlServer(conn) => conn.supports_read_only(),
816 GenericSourceConnection::LoadGenerator(conn) => conn.supports_read_only(),
817 }
818 }
819
820 fn prefers_single_replica(&self) -> bool {
821 match self {
822 GenericSourceConnection::Kafka(conn) => conn.prefers_single_replica(),
823 GenericSourceConnection::Postgres(conn) => conn.prefers_single_replica(),
824 GenericSourceConnection::MySql(conn) => conn.prefers_single_replica(),
825 GenericSourceConnection::SqlServer(conn) => conn.prefers_single_replica(),
826 GenericSourceConnection::LoadGenerator(conn) => conn.prefers_single_replica(),
827 }
828 }
829}
830impl<C: ConnectionAccess> crate::AlterCompatible for GenericSourceConnection<C> {
831 fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
832 if self == other {
833 return Ok(());
834 }
835 let r = match (self, other) {
836 (Self::Kafka(conn), Self::Kafka(other)) => conn.alter_compatible(id, other),
837 (Self::Postgres(conn), Self::Postgres(other)) => conn.alter_compatible(id, other),
838 (Self::MySql(conn), Self::MySql(other)) => conn.alter_compatible(id, other),
839 (Self::SqlServer(conn), Self::SqlServer(other)) => conn.alter_compatible(id, other),
840 (Self::LoadGenerator(conn), Self::LoadGenerator(other)) => {
841 conn.alter_compatible(id, other)
842 }
843 _ => Err(AlterError { id }),
844 };
845
846 if r.is_err() {
847 tracing::warn!(
848 "GenericSourceConnection incompatible:\nself:\n{:#?}\n\nother\n{:#?}",
849 self,
850 other
851 );
852 }
853
854 r
855 }
856}
857
858#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
861pub enum SourceExportDetails {
862 None,
865 Kafka(KafkaSourceExportDetails),
866 Postgres(PostgresSourceExportDetails),
867 MySql(MySqlSourceExportDetails),
868 SqlServer(SqlServerSourceExportDetails),
869 LoadGenerator(LoadGeneratorSourceExportDetails),
870}
871
872impl crate::AlterCompatible for SourceExportDetails {
873 fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
874 if self == other {
875 return Ok(());
876 }
877 let r = match (self, other) {
878 (Self::None, Self::None) => Ok(()),
879 (Self::Kafka(s), Self::Kafka(o)) => s.alter_compatible(id, o),
880 (Self::Postgres(s), Self::Postgres(o)) => s.alter_compatible(id, o),
881 (Self::MySql(s), Self::MySql(o)) => s.alter_compatible(id, o),
882 (Self::LoadGenerator(s), Self::LoadGenerator(o)) => s.alter_compatible(id, o),
883 _ => Err(AlterError { id }),
884 };
885
886 if r.is_err() {
887 tracing::warn!(
888 "SourceExportDetails incompatible:\nself:\n{:#?}\n\nother\n{:#?}",
889 self,
890 other
891 );
892 }
893
894 r
895 }
896}
897
898pub enum SourceExportStatementDetails {
904 Postgres {
905 table: mz_postgres_util::desc::PostgresTableDesc,
906 },
907 MySql {
908 table: mz_mysql_util::MySqlTableDesc,
909 initial_gtid_set: String,
910 },
911 SqlServer {
912 table: mz_sql_server_util::desc::SqlServerTableDesc,
913 capture_instance: Arc<str>,
914 initial_lsn: mz_sql_server_util::cdc::Lsn,
915 },
916 LoadGenerator {
917 output: LoadGeneratorOutput,
918 },
919 Kafka {},
920}
921
922impl RustType<ProtoSourceExportStatementDetails> for SourceExportStatementDetails {
923 fn into_proto(&self) -> ProtoSourceExportStatementDetails {
924 match self {
925 SourceExportStatementDetails::Postgres { table } => ProtoSourceExportStatementDetails {
926 kind: Some(proto_source_export_statement_details::Kind::Postgres(
927 postgres::ProtoPostgresSourceExportStatementDetails {
928 table: Some(table.into_proto()),
929 },
930 )),
931 },
932 SourceExportStatementDetails::MySql {
933 table,
934 initial_gtid_set,
935 } => ProtoSourceExportStatementDetails {
936 kind: Some(proto_source_export_statement_details::Kind::Mysql(
937 mysql::ProtoMySqlSourceExportStatementDetails {
938 table: Some(table.into_proto()),
939 initial_gtid_set: initial_gtid_set.clone(),
940 },
941 )),
942 },
943 SourceExportStatementDetails::SqlServer {
944 table,
945 capture_instance,
946 initial_lsn,
947 } => ProtoSourceExportStatementDetails {
948 kind: Some(proto_source_export_statement_details::Kind::SqlServer(
949 sql_server::ProtoSqlServerSourceExportStatementDetails {
950 table: Some(table.into_proto()),
951 capture_instance: capture_instance.to_string(),
952 initial_lsn: initial_lsn.as_bytes().to_vec(),
953 },
954 )),
955 },
956 SourceExportStatementDetails::LoadGenerator { output } => {
957 ProtoSourceExportStatementDetails {
958 kind: Some(proto_source_export_statement_details::Kind::Loadgen(
959 load_generator::ProtoLoadGeneratorSourceExportStatementDetails {
960 output: output.into_proto().into(),
961 },
962 )),
963 }
964 }
965 SourceExportStatementDetails::Kafka {} => ProtoSourceExportStatementDetails {
966 kind: Some(proto_source_export_statement_details::Kind::Kafka(
967 kafka::ProtoKafkaSourceExportStatementDetails {},
968 )),
969 },
970 }
971 }
972
973 fn from_proto(proto: ProtoSourceExportStatementDetails) -> Result<Self, TryFromProtoError> {
974 use proto_source_export_statement_details::Kind;
975 Ok(match proto.kind {
976 Some(Kind::Postgres(details)) => SourceExportStatementDetails::Postgres {
977 table: details
978 .table
979 .into_rust_if_some("ProtoPostgresSourceExportStatementDetails::table")?,
980 },
981 Some(Kind::Mysql(details)) => SourceExportStatementDetails::MySql {
982 table: details
983 .table
984 .into_rust_if_some("ProtoMySqlSourceExportStatementDetails::table")?,
985
986 initial_gtid_set: details.initial_gtid_set,
987 },
988 Some(Kind::SqlServer(details)) => SourceExportStatementDetails::SqlServer {
989 table: details
990 .table
991 .into_rust_if_some("ProtoSqlServerSourceExportStatementDetails::table")?,
992 capture_instance: details.capture_instance.into(),
993 initial_lsn: mz_sql_server_util::cdc::Lsn::try_from(details.initial_lsn.as_slice())
994 .map_err(|e| TryFromProtoError::InvalidFieldError(e.to_string()))?,
995 },
996 Some(Kind::Loadgen(details)) => SourceExportStatementDetails::LoadGenerator {
997 output: details
998 .output
999 .into_rust_if_some("ProtoLoadGeneratorSourceExportStatementDetails::output")?,
1000 },
1001 Some(Kind::Kafka(_details)) => SourceExportStatementDetails::Kafka {},
1002 None => {
1003 return Err(TryFromProtoError::missing_field(
1004 "ProtoSourceExportStatementDetails::kind",
1005 ));
1006 }
1007 })
1008 }
1009}
1010
1011#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
1012#[repr(transparent)]
1013pub struct SourceData(pub Result<Row, DataflowError>);
1014
1015impl Default for SourceData {
1016 fn default() -> Self {
1017 SourceData(Ok(Row::default()))
1018 }
1019}
1020
1021impl Deref for SourceData {
1022 type Target = Result<Row, DataflowError>;
1023
1024 fn deref(&self) -> &Self::Target {
1025 &self.0
1026 }
1027}
1028
1029impl DerefMut for SourceData {
1030 fn deref_mut(&mut self) -> &mut Self::Target {
1031 &mut self.0
1032 }
1033}
1034
1035impl RustType<ProtoSourceData> for SourceData {
1036 fn into_proto(&self) -> ProtoSourceData {
1037 use proto_source_data::Kind;
1038 ProtoSourceData {
1039 kind: Some(match &**self {
1040 Ok(row) => Kind::Ok(row.into_proto()),
1041 Err(err) => Kind::Err(err.into_proto()),
1042 }),
1043 }
1044 }
1045
1046 fn from_proto(proto: ProtoSourceData) -> Result<Self, TryFromProtoError> {
1047 use proto_source_data::Kind;
1048 match proto.kind {
1049 Some(kind) => match kind {
1050 Kind::Ok(row) => Ok(SourceData(Ok(row.into_rust()?))),
1051 Kind::Err(err) => Ok(SourceData(Err(err.into_rust()?))),
1052 },
1053 None => Result::Err(TryFromProtoError::missing_field("ProtoSourceData::kind")),
1054 }
1055 }
1056}
1057
1058impl Codec for SourceData {
1059 type Storage = ProtoRow;
1060 type Schema = RelationDesc;
1061
1062 fn codec_name() -> String {
1063 "protobuf[SourceData]".into()
1064 }
1065
1066 fn encode<B: BufMut>(&self, buf: &mut B) {
1067 self.into_proto()
1068 .encode(buf)
1069 .expect("no required fields means no initialization errors");
1070 }
1071
1072 fn decode(buf: &[u8], schema: &RelationDesc) -> Result<Self, String> {
1073 let mut val = SourceData::default();
1074 <Self as Codec>::decode_from(&mut val, buf, &mut None, schema)?;
1075 Ok(val)
1076 }
1077
1078 fn decode_from<'a>(
1079 &mut self,
1080 buf: &'a [u8],
1081 storage: &mut Option<ProtoRow>,
1082 schema: &RelationDesc,
1083 ) -> Result<(), String> {
1084 let mut proto = storage.take().unwrap_or_default();
1088 proto.clear();
1089 let mut proto = ProtoSourceData {
1090 kind: Some(proto_source_data::Kind::Ok(proto)),
1091 };
1092 proto.merge(buf).map_err(|err| err.to_string())?;
1093 match (proto.kind, &mut self.0) {
1094 (Some(proto_source_data::Kind::Ok(proto)), Ok(row)) => {
1096 let ret = row.decode_from_proto(&proto, schema);
1097 storage.replace(proto);
1098 ret
1099 }
1100 (kind, _) => {
1102 let proto = ProtoSourceData { kind };
1103 *self = proto.into_rust().map_err(|err| err.to_string())?;
1104 Ok(())
1106 }
1107 }
1108 }
1109
1110 fn validate(val: &Self, desc: &Self::Schema) -> Result<(), String> {
1111 match &val.0 {
1112 Ok(row) => Row::validate(row, desc),
1113 Err(_) => Ok(()),
1114 }
1115 }
1116
1117 fn encode_schema(schema: &Self::Schema) -> Bytes {
1118 schema.into_proto().encode_to_vec().into()
1119 }
1120
1121 fn decode_schema(buf: &Bytes) -> Self::Schema {
1122 let proto = ProtoRelationDesc::decode(buf.as_ref()).expect("valid schema");
1123 proto.into_rust().expect("valid schema")
1124 }
1125}
1126
1127pub fn arb_source_data_for_relation_desc(
1129 desc: &RelationDesc,
1130) -> impl Strategy<Value = SourceData> + use<> {
1131 let row_strat = arb_row_for_relation(desc).no_shrink();
1132
1133 proptest::strategy::Union::new_weighted(vec![
1134 (50, row_strat.prop_map(|row| SourceData(Ok(row))).boxed()),
1135 (
1136 1,
1137 any::<DataflowError>()
1138 .prop_map(|err| SourceData(Err(err)))
1139 .no_shrink()
1140 .boxed(),
1141 ),
1142 ])
1143}
1144
1145pub trait ExternalCatalogReference {
1153 fn schema_name(&self) -> &str;
1155 fn item_name(&self) -> &str;
1157}
1158
1159impl ExternalCatalogReference for &mz_mysql_util::MySqlTableDesc {
1160 fn schema_name(&self) -> &str {
1161 &self.schema_name
1162 }
1163
1164 fn item_name(&self) -> &str {
1165 &self.name
1166 }
1167}
1168
1169impl ExternalCatalogReference for mz_postgres_util::desc::PostgresTableDesc {
1170 fn schema_name(&self) -> &str {
1171 &self.namespace
1172 }
1173
1174 fn item_name(&self) -> &str {
1175 &self.name
1176 }
1177}
1178
1179impl ExternalCatalogReference for &mz_sql_server_util::desc::SqlServerTableDesc {
1180 fn schema_name(&self) -> &str {
1181 &*self.schema_name
1182 }
1183
1184 fn item_name(&self) -> &str {
1185 &*self.name
1186 }
1187}
1188
1189impl<'a> ExternalCatalogReference for (&'a str, &'a str) {
1192 fn schema_name(&self) -> &str {
1193 self.0
1194 }
1195
1196 fn item_name(&self) -> &str {
1197 self.1
1198 }
1199}
1200
1201#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
1209pub struct SourceReferenceResolver {
1210 inner: BTreeMap<Ident, BTreeMap<Ident, BTreeMap<Ident, usize>>>,
1211}
1212
1213#[derive(Debug, Clone, thiserror::Error)]
1214pub enum ExternalReferenceResolutionError {
1215 #[error("reference to {name} not found in source")]
1216 DoesNotExist { name: String },
1217 #[error(
1218 "reference {name} is ambiguous, consider specifying an additional \
1219 layer of qualification"
1220 )]
1221 Ambiguous { name: String },
1222 #[error("invalid identifier: {0}")]
1223 Ident(#[from] IdentError),
1224}
1225
1226impl<'a> SourceReferenceResolver {
1227 pub fn new<T: ExternalCatalogReference>(
1233 database: &str,
1234 referenceable_items: &'a [T],
1235 ) -> Result<SourceReferenceResolver, ExternalReferenceResolutionError> {
1236 let mut inner = BTreeMap::new();
1239
1240 let database = Ident::new(database)?;
1241
1242 for (reference_idx, item) in referenceable_items.iter().enumerate() {
1243 let item_name = Ident::new(item.item_name())?;
1244 let schema_name = Ident::new(item.schema_name())?;
1245
1246 inner
1247 .entry(item_name)
1248 .or_insert_with(BTreeMap::new)
1249 .entry(schema_name)
1250 .or_insert_with(BTreeMap::new)
1251 .entry(database.clone())
1252 .or_insert(reference_idx);
1253 }
1254
1255 Ok(SourceReferenceResolver { inner })
1256 }
1257
1258 pub fn resolve(
1275 &self,
1276 name: &[Ident],
1277 canonicalize_to_width: usize,
1278 ) -> Result<(UnresolvedItemName, usize), ExternalReferenceResolutionError> {
1279 let (db, schema, idx) = self.resolve_inner(name)?;
1280
1281 let item = name.last().expect("must have provided at least 1 element");
1282
1283 let canonical_name = match canonicalize_to_width {
1284 1 => vec![item.clone()],
1285 2 => vec![schema.clone(), item.clone()],
1286 3 => vec![db.clone(), schema.clone(), item.clone()],
1287 o => panic!("canonicalize_to_width values must be 1..=3, but got {}", o),
1288 };
1289
1290 Ok((UnresolvedItemName(canonical_name), idx))
1291 }
1292
1293 pub fn resolve_idx(&self, name: &[Ident]) -> Result<usize, ExternalReferenceResolutionError> {
1303 let (_db, _schema, idx) = self.resolve_inner(name)?;
1304 Ok(idx)
1305 }
1306
1307 fn resolve_inner<'name: 'a>(
1324 &'a self,
1325 name: &'name [Ident],
1326 ) -> Result<(&'a Ident, &'a Ident, usize), ExternalReferenceResolutionError> {
1327 let get_provided_name = || UnresolvedItemName(name.to_vec()).to_string();
1328
1329 if !(1..=3).contains(&name.len()) {
1331 Err(ExternalReferenceResolutionError::DoesNotExist {
1332 name: get_provided_name(),
1333 })?;
1334 }
1335
1336 let mut names = std::iter::repeat(None)
1338 .take(3 - name.len())
1339 .chain(name.iter().map(Some));
1340
1341 let database = names.next().flatten();
1342 let schema = names.next().flatten();
1343 let item = names
1344 .next()
1345 .flatten()
1346 .expect("must have provided the item name");
1347
1348 assert_none!(names.next(), "expected a 3-element iterator");
1349
1350 let schemas =
1351 self.inner
1352 .get(item)
1353 .ok_or_else(|| ExternalReferenceResolutionError::DoesNotExist {
1354 name: get_provided_name(),
1355 })?;
1356
1357 let schema = match schema {
1358 Some(schema) => schema,
1359 None => schemas.keys().exactly_one().map_err(|_e| {
1360 ExternalReferenceResolutionError::Ambiguous {
1361 name: get_provided_name(),
1362 }
1363 })?,
1364 };
1365
1366 let databases =
1367 schemas
1368 .get(schema)
1369 .ok_or_else(|| ExternalReferenceResolutionError::DoesNotExist {
1370 name: get_provided_name(),
1371 })?;
1372
1373 let database = match database {
1374 Some(database) => database,
1375 None => databases.keys().exactly_one().map_err(|_e| {
1376 ExternalReferenceResolutionError::Ambiguous {
1377 name: get_provided_name(),
1378 }
1379 })?,
1380 };
1381
1382 let reference_idx = databases.get(database).ok_or_else(|| {
1383 ExternalReferenceResolutionError::DoesNotExist {
1384 name: get_provided_name(),
1385 }
1386 })?;
1387
1388 Ok((database, schema, *reference_idx))
1389 }
1390}
1391
1392#[derive(Debug)]
1398pub enum SourceDataRowColumnarDecoder {
1399 Row(RowColumnarDecoder),
1400 EmptyRow,
1401}
1402
1403impl SourceDataRowColumnarDecoder {
1404 pub fn decode(&self, idx: usize, row: &mut Row) {
1405 match self {
1406 SourceDataRowColumnarDecoder::Row(decoder) => decoder.decode(idx, row),
1407 SourceDataRowColumnarDecoder::EmptyRow => {
1408 row.packer();
1410 }
1411 }
1412 }
1413
1414 pub fn goodbytes(&self) -> usize {
1415 match self {
1416 SourceDataRowColumnarDecoder::Row(decoder) => decoder.goodbytes(),
1417 SourceDataRowColumnarDecoder::EmptyRow => 0,
1418 }
1419 }
1420}
1421
1422#[derive(Debug)]
1423pub struct SourceDataColumnarDecoder {
1424 row_decoder: SourceDataRowColumnarDecoder,
1425 err_decoder: BinaryArray,
1426}
1427
1428impl SourceDataColumnarDecoder {
1429 pub fn new(col: StructArray, desc: &RelationDesc) -> Result<Self, anyhow::Error> {
1430 let (_fields, arrays, nullability) = col.into_parts();
1432
1433 if nullability.is_some() {
1434 anyhow::bail!("SourceData is not nullable, but found {nullability:?}");
1435 }
1436 if arrays.len() != 2 {
1437 anyhow::bail!("SourceData should only have two fields, found {arrays:?}");
1438 }
1439
1440 let errs = arrays[1]
1441 .as_any()
1442 .downcast_ref::<BinaryArray>()
1443 .ok_or_else(|| anyhow::anyhow!("expected BinaryArray, found {:?}", arrays[1]))?;
1444
1445 let row_decoder = match arrays[0].data_type() {
1446 arrow::datatypes::DataType::Struct(_) => {
1447 let rows = arrays[0]
1448 .as_any()
1449 .downcast_ref::<StructArray>()
1450 .ok_or_else(|| {
1451 anyhow::anyhow!("expected StructArray, found {:?}", arrays[0])
1452 })?;
1453 let decoder = RowColumnarDecoder::new(rows.clone(), desc)?;
1454 SourceDataRowColumnarDecoder::Row(decoder)
1455 }
1456 arrow::datatypes::DataType::Null => SourceDataRowColumnarDecoder::EmptyRow,
1457 other => anyhow::bail!("expected Struct or Null Array, found {other:?}"),
1458 };
1459
1460 Ok(SourceDataColumnarDecoder {
1461 row_decoder,
1462 err_decoder: errs.clone(),
1463 })
1464 }
1465}
1466
1467impl ColumnDecoder<SourceData> for SourceDataColumnarDecoder {
1468 fn decode(&self, idx: usize, val: &mut SourceData) {
1469 let err_null = self.err_decoder.is_null(idx);
1470 let row_null = match &self.row_decoder {
1471 SourceDataRowColumnarDecoder::Row(decoder) => decoder.is_null(idx),
1472 SourceDataRowColumnarDecoder::EmptyRow => !err_null,
1473 };
1474
1475 match (row_null, err_null) {
1476 (true, false) => {
1477 let err = self.err_decoder.value(idx);
1478 let err = ProtoDataflowError::decode(err)
1479 .expect("proto should be valid")
1480 .into_rust()
1481 .expect("error should be valid");
1482 val.0 = Err(err);
1483 }
1484 (false, true) => {
1485 let row = match val.0.as_mut() {
1486 Ok(row) => row,
1487 Err(_) => {
1488 val.0 = Ok(Row::default());
1489 val.0.as_mut().unwrap()
1490 }
1491 };
1492 self.row_decoder.decode(idx, row);
1493 }
1494 (true, true) => panic!("should have one of 'ok' or 'err'"),
1495 (false, false) => panic!("cannot have both 'ok' and 'err'"),
1496 }
1497 }
1498
1499 fn is_null(&self, idx: usize) -> bool {
1500 let err_null = self.err_decoder.is_null(idx);
1501 let row_null = match &self.row_decoder {
1502 SourceDataRowColumnarDecoder::Row(decoder) => decoder.is_null(idx),
1503 SourceDataRowColumnarDecoder::EmptyRow => !err_null,
1504 };
1505 assert!(!err_null || !row_null, "SourceData should never be null!");
1506
1507 false
1508 }
1509
1510 fn goodbytes(&self) -> usize {
1511 self.row_decoder.goodbytes() + ArrayOrd::Binary(self.err_decoder.clone()).goodbytes()
1512 }
1513
1514 fn stats(&self) -> StructStats {
1515 let len = self.err_decoder.len();
1516 let err_stats = ColumnarStats {
1517 nulls: Some(ColumnNullStats {
1518 count: self.err_decoder.null_count(),
1519 }),
1520 values: PrimitiveStats::<Vec<u8>>::from_column(&self.err_decoder).into(),
1521 };
1522 let row_null_count = len - self.err_decoder.null_count();
1527 let row_stats = match &self.row_decoder {
1528 SourceDataRowColumnarDecoder::Row(encoder) => {
1529 assert_eq!(encoder.null_count(), row_null_count);
1533 encoder.stats()
1534 }
1535 SourceDataRowColumnarDecoder::EmptyRow => StructStats {
1536 len,
1537 cols: BTreeMap::default(),
1538 },
1539 };
1540 let row_stats = ColumnarStats {
1541 nulls: Some(ColumnNullStats {
1542 count: row_null_count,
1543 }),
1544 values: ColumnStatKinds::Struct(row_stats),
1545 };
1546
1547 let stats = [
1548 (
1549 SourceDataColumnarEncoder::OK_COLUMN_NAME.to_string(),
1550 row_stats,
1551 ),
1552 (
1553 SourceDataColumnarEncoder::ERR_COLUMN_NAME.to_string(),
1554 err_stats,
1555 ),
1556 ];
1557 StructStats {
1558 len,
1559 cols: stats.into_iter().map(|(name, s)| (name, s)).collect(),
1560 }
1561 }
1562}
1563
1564#[derive(Debug)]
1571pub enum SourceDataRowColumnarEncoder {
1572 Row(RowColumnarEncoder),
1573 EmptyRow,
1574}
1575
1576impl SourceDataRowColumnarEncoder {
1577 pub(crate) fn goodbytes(&self) -> usize {
1578 match self {
1579 SourceDataRowColumnarEncoder::Row(e) => e.goodbytes(),
1580 SourceDataRowColumnarEncoder::EmptyRow => 0,
1581 }
1582 }
1583
1584 pub fn append(&mut self, row: &Row) {
1585 match self {
1586 SourceDataRowColumnarEncoder::Row(encoder) => encoder.append(row),
1587 SourceDataRowColumnarEncoder::EmptyRow => {
1588 assert_eq!(row.iter().count(), 0)
1589 }
1590 }
1591 }
1592
1593 pub fn append_null(&mut self) {
1594 match self {
1595 SourceDataRowColumnarEncoder::Row(encoder) => encoder.append_null(),
1596 SourceDataRowColumnarEncoder::EmptyRow => (),
1597 }
1598 }
1599}
1600
1601#[derive(Debug)]
1602pub struct SourceDataColumnarEncoder {
1603 row_encoder: SourceDataRowColumnarEncoder,
1604 err_encoder: BinaryBuilder,
1605}
1606
1607impl SourceDataColumnarEncoder {
1608 const OK_COLUMN_NAME: &'static str = "ok";
1609 const ERR_COLUMN_NAME: &'static str = "err";
1610
1611 pub fn new(desc: &RelationDesc) -> Self {
1612 let row_encoder = match RowColumnarEncoder::new(desc) {
1613 Some(encoder) => SourceDataRowColumnarEncoder::Row(encoder),
1614 None => {
1615 assert!(desc.typ().columns().is_empty());
1616 SourceDataRowColumnarEncoder::EmptyRow
1617 }
1618 };
1619 let err_encoder = BinaryBuilder::new();
1620
1621 SourceDataColumnarEncoder {
1622 row_encoder,
1623 err_encoder,
1624 }
1625 }
1626}
1627
1628impl ColumnEncoder<SourceData> for SourceDataColumnarEncoder {
1629 type FinishedColumn = StructArray;
1630
1631 fn goodbytes(&self) -> usize {
1632 self.row_encoder.goodbytes() + self.err_encoder.values_slice().len()
1633 }
1634
1635 #[inline]
1636 fn append(&mut self, val: &SourceData) {
1637 match val.0.as_ref() {
1638 Ok(row) => {
1639 self.row_encoder.append(row);
1640 self.err_encoder.append_null();
1641 }
1642 Err(err) => {
1643 self.row_encoder.append_null();
1644 self.err_encoder
1645 .append_value(err.into_proto().encode_to_vec());
1646 }
1647 }
1648 }
1649
1650 #[inline]
1651 fn append_null(&mut self) {
1652 panic!("appending a null into SourceDataColumnarEncoder is not supported");
1653 }
1654
1655 fn finish(self) -> Self::FinishedColumn {
1656 let SourceDataColumnarEncoder {
1657 row_encoder,
1658 mut err_encoder,
1659 } = self;
1660
1661 let err_column = BinaryBuilder::finish(&mut err_encoder);
1662 let row_column: ArrayRef = match row_encoder {
1663 SourceDataRowColumnarEncoder::Row(encoder) => {
1664 let column = encoder.finish();
1665 Arc::new(column)
1666 }
1667 SourceDataRowColumnarEncoder::EmptyRow => Arc::new(NullArray::new(err_column.len())),
1668 };
1669
1670 assert_eq!(row_column.len(), err_column.len());
1671
1672 let fields = vec![
1673 Field::new(Self::OK_COLUMN_NAME, row_column.data_type().clone(), true),
1674 Field::new(Self::ERR_COLUMN_NAME, err_column.data_type().clone(), true),
1675 ];
1676 let arrays: Vec<Arc<dyn Array>> = vec![row_column, Arc::new(err_column)];
1677 StructArray::new(Fields::from(fields), arrays, None)
1678 }
1679}
1680
1681impl Schema<SourceData> for RelationDesc {
1682 type ArrowColumn = StructArray;
1683 type Statistics = StructStats;
1684
1685 type Decoder = SourceDataColumnarDecoder;
1686 type Encoder = SourceDataColumnarEncoder;
1687
1688 fn decoder(&self, col: Self::ArrowColumn) -> Result<Self::Decoder, anyhow::Error> {
1689 SourceDataColumnarDecoder::new(col, self)
1690 }
1691
1692 fn encoder(&self) -> Result<Self::Encoder, anyhow::Error> {
1693 Ok(SourceDataColumnarEncoder::new(self))
1694 }
1695}
1696
1697#[cfg(test)]
1698mod tests {
1699 use arrow::array::{ArrayData, make_comparator};
1700 use base64::Engine;
1701 use bytes::Bytes;
1702 use mz_expr::EvalError;
1703 use mz_ore::assert_err;
1704 use mz_ore::metrics::MetricsRegistry;
1705 use mz_persist::indexed::columnar::arrow::{realloc_any, realloc_array};
1706 use mz_persist::metrics::ColumnarMetrics;
1707 use mz_persist_types::parquet::EncodingConfig;
1708 use mz_persist_types::schema::{Migration, backward_compatible};
1709 use mz_persist_types::stats::{PartStats, PartStatsMetrics};
1710 use mz_repr::{
1711 ColumnIndex, DatumVec, PropRelationDescDiff, ProtoRelationDesc, RelationDescBuilder,
1712 RowArena, SqlScalarType, arb_relation_desc_diff, arb_relation_desc_projection,
1713 };
1714 use proptest::prelude::*;
1715 use proptest::strategy::{Union, ValueTree};
1716
1717 use crate::stats::RelationPartStats;
1718
1719 use super::*;
1720
1721 #[mz_ore::test]
1722 fn test_timeline_parsing() {
1723 assert_eq!(Ok(Timeline::EpochMilliseconds), "M".parse());
1724 assert_eq!(Ok(Timeline::External("JOE".to_string())), "E.JOE".parse());
1725 assert_eq!(Ok(Timeline::User("MIKE".to_string())), "U.MIKE".parse());
1726
1727 assert_err!("Materialize".parse::<Timeline>());
1728 assert_err!("Ejoe".parse::<Timeline>());
1729 assert_err!("Umike".parse::<Timeline>());
1730 assert_err!("Dance".parse::<Timeline>());
1731 assert_err!("".parse::<Timeline>());
1732 }
1733
1734 #[track_caller]
1735 fn roundtrip_source_data(
1736 desc: &RelationDesc,
1737 datas: Vec<SourceData>,
1738 read_desc: &RelationDesc,
1739 config: &EncodingConfig,
1740 ) {
1741 let metrics = ColumnarMetrics::disconnected();
1742 let mut encoder = <RelationDesc as Schema<SourceData>>::encoder(desc).unwrap();
1743 for data in &datas {
1744 encoder.append(data);
1745 }
1746 let col = encoder.finish();
1747
1748 assert!(!col.is_nullable());
1750
1751 let col = realloc_array(&col, &metrics);
1753
1754 {
1756 let proto = col.to_data().into_proto();
1757 let bytes = proto.encode_to_vec();
1758 let proto = mz_persist_types::arrow::ProtoArrayData::decode(&bytes[..]).unwrap();
1759 let array_data: ArrayData = proto.into_rust().unwrap();
1760
1761 let col_rnd = StructArray::from(array_data.clone());
1762 assert_eq!(col, col_rnd);
1763
1764 let col_dyn = arrow::array::make_array(array_data);
1765 let col_dyn = col_dyn.as_any().downcast_ref::<StructArray>().unwrap();
1766 assert_eq!(&col, col_dyn);
1767 }
1768
1769 let mut buf = Vec::new();
1771 let fields = Fields::from(vec![Field::new("k", col.data_type().clone(), false)]);
1772 let arrays: Vec<Arc<dyn Array>> = vec![Arc::new(col.clone())];
1773 mz_persist_types::parquet::encode_arrays(&mut buf, fields, arrays, config).unwrap();
1774
1775 let buf = Bytes::from(buf);
1777 let mut reader = mz_persist_types::parquet::decode_arrays(buf).unwrap();
1778 let maybe_batch = reader.next();
1779
1780 let Some(record_batch) = maybe_batch else {
1782 assert!(datas.is_empty());
1783 return;
1784 };
1785 let record_batch = record_batch.unwrap();
1786
1787 assert_eq!(record_batch.columns().len(), 1);
1788 let rnd_col = &record_batch.columns()[0];
1789 let rnd_col = realloc_any(Arc::clone(rnd_col), &metrics);
1790 let rnd_col = rnd_col
1791 .as_any()
1792 .downcast_ref::<StructArray>()
1793 .unwrap()
1794 .clone();
1795
1796 let stats = <RelationDesc as Schema<SourceData>>::decoder_any(desc, &rnd_col)
1798 .expect("valid decoder")
1799 .stats();
1800
1801 let mut rnd_data = SourceData(Ok(Row::default()));
1803 let decoder = <RelationDesc as Schema<SourceData>>::decoder(desc, rnd_col.clone()).unwrap();
1804 for (idx, og_data) in datas.iter().enumerate() {
1805 decoder.decode(idx, &mut rnd_data);
1806 assert_eq!(og_data, &rnd_data);
1807 }
1808
1809 let stats_metrics = PartStatsMetrics::new(&MetricsRegistry::new());
1812 let stats = RelationPartStats {
1813 name: "test",
1814 metrics: &stats_metrics,
1815 stats: &PartStats { key: stats },
1816 desc: read_desc,
1817 };
1818 let mut datum_vec = DatumVec::new();
1819 let arena = RowArena::default();
1820 let decoder = <RelationDesc as Schema<SourceData>>::decoder(read_desc, rnd_col).unwrap();
1821
1822 for (idx, og_data) in datas.iter().enumerate() {
1823 decoder.decode(idx, &mut rnd_data);
1824 match (&og_data.0, &rnd_data.0) {
1825 (Ok(og_row), Ok(rnd_row)) => {
1826 {
1828 let datums = datum_vec.borrow_with(og_row);
1829 let projected_datums =
1830 datums.iter().enumerate().filter_map(|(idx, datum)| {
1831 read_desc
1832 .contains_index(&ColumnIndex::from_raw(idx))
1833 .then_some(datum)
1834 });
1835 let og_projected_row = Row::pack(projected_datums);
1836 assert_eq!(&og_projected_row, rnd_row);
1837 }
1838
1839 {
1841 let proj_datums = datum_vec.borrow_with(rnd_row);
1842 for (pos, (idx, _, _)) in read_desc.iter_all().enumerate() {
1843 let spec = stats.col_stats(idx, &arena);
1844 assert!(spec.may_contain(proj_datums[pos]));
1845 }
1846 }
1847 }
1848 (Err(_), Err(_)) => assert_eq!(og_data, &rnd_data),
1849 (_, _) => panic!("decoded to a different type? {og_data:?} {rnd_data:?}"),
1850 }
1851 }
1852
1853 let encoded_schema = SourceData::encode_schema(desc);
1856 let roundtrip_desc = SourceData::decode_schema(&encoded_schema);
1857 assert_eq!(desc, &roundtrip_desc);
1858
1859 let migration =
1862 mz_persist_types::schema::backward_compatible(col.data_type(), col.data_type());
1863 let migration = migration.expect("should be backward compatible with self");
1864 let migrated = migration.migrate(Arc::new(col.clone()));
1866 assert_eq!(col.data_type(), migrated.data_type());
1867 }
1868
1869 #[mz_ore::test]
1870 #[cfg_attr(miri, ignore)] fn all_source_data_roundtrips() {
1872 let mut weights = vec![(500, Just(0..8)), (50, Just(8..32))];
1873 if std::env::var("PROPTEST_LARGE_DATA").is_ok() {
1874 weights.extend([
1875 (10, Just(32..128)),
1876 (5, Just(128..512)),
1877 (3, Just(512..2048)),
1878 (1, Just(2048..8192)),
1879 ]);
1880 }
1881 let num_rows = Union::new_weighted(weights);
1882
1883 let strat = (any::<RelationDesc>(), num_rows)
1885 .prop_flat_map(|(desc, num_rows)| {
1886 arb_relation_desc_projection(desc.clone())
1887 .prop_map(move |read_desc| (desc.clone(), read_desc, num_rows.clone()))
1888 })
1889 .prop_flat_map(|(desc, read_desc, num_rows)| {
1890 proptest::collection::vec(arb_source_data_for_relation_desc(&desc), num_rows)
1891 .prop_map(move |datas| (desc.clone(), datas, read_desc.clone()))
1892 });
1893
1894 let combined_strat = (any::<EncodingConfig>(), strat);
1895 proptest!(|((config, (desc, source_datas, read_desc)) in combined_strat)| {
1896 roundtrip_source_data(&desc, source_datas, &read_desc, &config);
1897 });
1898 }
1899
1900 #[mz_ore::test]
1901 fn roundtrip_error_nulls() {
1902 let desc = RelationDescBuilder::default()
1903 .with_column(
1904 "ts",
1905 SqlScalarType::TimestampTz { precision: None }.nullable(false),
1906 )
1907 .finish();
1908 let source_datas = vec![SourceData(Err(DataflowError::EvalError(
1909 EvalError::DateOutOfRange.into(),
1910 )))];
1911 let config = EncodingConfig::default();
1912 roundtrip_source_data(&desc, source_datas, &desc, &config);
1913 }
1914
1915 fn is_sorted(array: &dyn Array) -> bool {
1916 let sort_options = arrow::compute::SortOptions::default();
1917 let Ok(cmp) = make_comparator(array, array, sort_options) else {
1918 return false;
1924 };
1925 (0..array.len())
1926 .tuple_windows()
1927 .all(|(i, j)| cmp(i, j).is_le())
1928 }
1929
1930 fn get_data_type(schema: &impl Schema<SourceData>) -> arrow::datatypes::DataType {
1931 use mz_persist_types::columnar::ColumnEncoder;
1932 let array = Schema::encoder(schema).expect("valid schema").finish();
1933 Array::data_type(&array).clone()
1934 }
1935
1936 #[track_caller]
1937 fn backward_compatible_testcase(
1938 old: &RelationDesc,
1939 new: &RelationDesc,
1940 migration: Migration,
1941 datas: &[SourceData],
1942 ) {
1943 let mut encoder = Schema::<SourceData>::encoder(old).expect("valid schema");
1944 for data in datas {
1945 encoder.append(data);
1946 }
1947 let old = encoder.finish();
1948 let new = Schema::<SourceData>::encoder(new)
1949 .expect("valid schema")
1950 .finish();
1951 let old: Arc<dyn Array> = Arc::new(old);
1952 let new: Arc<dyn Array> = Arc::new(new);
1953 let migrated = migration.migrate(Arc::clone(&old));
1954 assert_eq!(migrated.data_type(), new.data_type());
1955
1956 if migration.preserves_order() && is_sorted(&old) {
1958 assert!(is_sorted(&new))
1959 }
1960 }
1961
1962 #[mz_ore::test]
1963 fn backward_compatible_empty_add_column() {
1964 let old = RelationDesc::empty();
1965 let new = RelationDesc::from_names_and_types([("a", SqlScalarType::Bool.nullable(true))]);
1966
1967 let old_data_type = get_data_type(&old);
1968 let new_data_type = get_data_type(&new);
1969
1970 let migration = backward_compatible(&old_data_type, &new_data_type);
1971 assert!(migration.is_some());
1972 }
1973
1974 #[mz_ore::test]
1975 fn backward_compatible_project_away_all() {
1976 let old = RelationDesc::from_names_and_types([("a", SqlScalarType::Bool.nullable(true))]);
1977 let new = RelationDesc::empty();
1978
1979 let old_data_type = get_data_type(&old);
1980 let new_data_type = get_data_type(&new);
1981
1982 let migration = backward_compatible(&old_data_type, &new_data_type);
1983 assert!(migration.is_some());
1984 }
1985
1986 #[mz_ore::test]
1987 #[cfg_attr(miri, ignore)]
1988 fn backward_compatible_migrate() {
1989 let strat = (any::<RelationDesc>(), any::<RelationDesc>()).prop_flat_map(|(old, new)| {
1990 proptest::collection::vec(arb_source_data_for_relation_desc(&old), 2)
1991 .prop_map(move |datas| (old.clone(), new.clone(), datas))
1992 });
1993
1994 proptest!(|((old, new, datas) in strat)| {
1995 let old_data_type = get_data_type(&old);
1996 let new_data_type = get_data_type(&new);
1997
1998 if let Some(migration) = backward_compatible(&old_data_type, &new_data_type) {
1999 backward_compatible_testcase(&old, &new, migration, &datas);
2000 };
2001 });
2002 }
2003
2004 #[mz_ore::test]
2005 #[cfg_attr(miri, ignore)]
2006 fn backward_compatible_migrate_from_common() {
2007 use mz_repr::SqlColumnType;
2008 fn test_case(old: RelationDesc, diffs: Vec<PropRelationDescDiff>, datas: Vec<SourceData>) {
2009 let should_be_compatible = diffs.iter().all(|diff| match diff {
2011 PropRelationDescDiff::AddColumn {
2013 typ: SqlColumnType { nullable, .. },
2014 ..
2015 } => *nullable,
2016 PropRelationDescDiff::DropColumn { .. } => true,
2017 _ => false,
2018 });
2019
2020 let mut new = old.clone();
2021 for diff in diffs.into_iter() {
2022 diff.apply(&mut new)
2023 }
2024
2025 let old_data_type = get_data_type(&old);
2026 let new_data_type = get_data_type(&new);
2027
2028 if let Some(migration) = backward_compatible(&old_data_type, &new_data_type) {
2029 backward_compatible_testcase(&old, &new, migration, &datas);
2030 } else if should_be_compatible {
2031 panic!("new DataType was not compatible when it should have been!");
2032 }
2033 }
2034
2035 let strat = any::<RelationDesc>()
2036 .prop_flat_map(|desc| {
2037 proptest::collection::vec(arb_source_data_for_relation_desc(&desc), 2)
2038 .no_shrink()
2039 .prop_map(move |datas| (desc.clone(), datas))
2040 })
2041 .prop_flat_map(|(desc, datas)| {
2042 arb_relation_desc_diff(&desc)
2043 .prop_map(move |diffs| (desc.clone(), diffs, datas.clone()))
2044 });
2045
2046 proptest!(|((old, diffs, datas) in strat)| {
2047 test_case(old, diffs, datas);
2048 });
2049 }
2050
2051 #[mz_ore::test]
2052 #[cfg_attr(miri, ignore)] fn empty_relation_desc_roundtrips() {
2054 let empty = RelationDesc::empty();
2055 let rows = proptest::collection::vec(arb_source_data_for_relation_desc(&empty), 0..8)
2056 .prop_map(move |datas| (empty.clone(), datas));
2057
2058 proptest!(|((config, (desc, source_datas)) in (any::<EncodingConfig>(), rows))| {
2061 roundtrip_source_data(&desc, source_datas, &desc, &config);
2062 });
2063 }
2064
2065 #[mz_ore::test]
2066 #[cfg_attr(miri, ignore)] fn arrow_datatype_consistent() {
2068 fn test_case(desc: RelationDesc, datas: Vec<SourceData>) {
2069 let half = datas.len() / 2;
2070
2071 let mut encoder_a = <RelationDesc as Schema<SourceData>>::encoder(&desc).unwrap();
2072 for data in &datas[..half] {
2073 encoder_a.append(data);
2074 }
2075 let col_a = encoder_a.finish();
2076
2077 let mut encoder_b = <RelationDesc as Schema<SourceData>>::encoder(&desc).unwrap();
2078 for data in &datas[half..] {
2079 encoder_b.append(data);
2080 }
2081 let col_b = encoder_b.finish();
2082
2083 assert_eq!(col_a.data_type(), col_b.data_type());
2086 }
2087
2088 let num_rows = 12;
2089 let strat = any::<RelationDesc>().prop_flat_map(|desc| {
2090 proptest::collection::vec(arb_source_data_for_relation_desc(&desc), num_rows)
2091 .prop_map(move |datas| (desc.clone(), datas))
2092 });
2093
2094 proptest!(|((desc, data) in strat)| {
2095 test_case(desc, data);
2096 });
2097 }
2098
2099 #[mz_ore::test]
2100 #[cfg_attr(miri, ignore)] fn source_proto_serialization_stability() {
2102 let min_protos = 10;
2103 let encoded = include_str!("snapshots/source-datas.txt");
2104
2105 let mut decoded: Vec<(RelationDesc, SourceData)> = encoded
2107 .lines()
2108 .map(|s| {
2109 let (desc, data) = s.split_once(',').expect("comma separated data");
2110 let desc = base64::engine::general_purpose::STANDARD
2111 .decode(desc)
2112 .expect("valid base64");
2113 let data = base64::engine::general_purpose::STANDARD
2114 .decode(data)
2115 .expect("valid base64");
2116 (desc, data)
2117 })
2118 .map(|(desc, data)| {
2119 let desc = ProtoRelationDesc::decode(&desc[..]).expect("valid proto");
2120 let desc = desc.into_rust().expect("valid proto");
2121 let data = SourceData::decode(&data, &desc).expect("valid proto");
2122 (desc, data)
2123 })
2124 .collect();
2125
2126 let mut runner = proptest::test_runner::TestRunner::deterministic();
2128 let strategy = RelationDesc::arbitrary().prop_flat_map(|desc| {
2129 arb_source_data_for_relation_desc(&desc).prop_map(move |data| (desc.clone(), data))
2130 });
2131 while decoded.len() < min_protos {
2132 let arbitrary_data = strategy
2133 .new_tree(&mut runner)
2134 .expect("source data")
2135 .current();
2136 decoded.push(arbitrary_data);
2137 }
2138
2139 let mut reencoded = String::new();
2141 let mut buf = vec![];
2142 for (desc, data) in decoded {
2143 buf.clear();
2144 desc.into_proto().encode(&mut buf).expect("success");
2145 base64::engine::general_purpose::STANDARD.encode_string(buf.as_slice(), &mut reencoded);
2146 reencoded.push(',');
2147
2148 buf.clear();
2149 data.encode(&mut buf);
2150 base64::engine::general_purpose::STANDARD.encode_string(buf.as_slice(), &mut reencoded);
2151 reencoded.push('\n');
2152 }
2153
2154 assert_eq!(
2165 encoded,
2166 reencoded.as_str(),
2167 "SourceData serde should be stable"
2168 )
2169 }
2170}