1use std::collections::BTreeMap;
13use std::fmt::Debug;
14use std::hash::Hash;
15use std::ops::{Add, AddAssign, Deref, DerefMut};
16use std::str::FromStr;
17use std::sync::Arc;
18use std::time::Duration;
19
20use arrow::array::{Array, ArrayRef, BinaryArray, BinaryBuilder, NullArray, StructArray};
21use arrow::datatypes::{Field, Fields};
22use bytes::{BufMut, Bytes};
23use columnation::Columnation;
24use itertools::EitherOrBoth::Both;
25use itertools::Itertools;
26use kafka::KafkaSourceExportDetails;
27use load_generator::{LoadGeneratorOutput, LoadGeneratorSourceExportDetails};
28use mz_ore::assert_none;
29use mz_persist_types::Codec;
30use mz_persist_types::arrow::ArrayOrd;
31use mz_persist_types::columnar::{ColumnDecoder, ColumnEncoder, Schema};
32use mz_persist_types::stats::{
33 ColumnNullStats, ColumnStatKinds, ColumnarStats, ColumnarStatsBuilder, PrimitiveStats,
34 StructStats,
35};
36use mz_proto::{IntoRustIfSome, ProtoType, RustType, TryFromProtoError};
37#[cfg(any(test, feature = "proptest"))]
38use mz_repr::arb_row_for_relation;
39use mz_repr::{
40 CatalogItemId, Datum, GlobalId, ProtoRelationDesc, ProtoRow, RelationDesc, Row,
41 RowColumnarDecoder, RowColumnarEncoder,
42};
43use mz_sql_parser::ast::{Ident, IdentError, UnresolvedItemName};
44#[cfg(any(test, feature = "proptest"))]
45use proptest::prelude::any;
46#[cfg(any(test, feature = "proptest"))]
47use proptest::strategy::Strategy;
48use prost::Message;
49use serde::{Deserialize, Serialize};
50use timely::order::{PartialOrder, TotalOrder};
51use timely::progress::timestamp::Refines;
52use timely::progress::{PathSummary, Timestamp};
53
54use crate::AlterCompatible;
55use crate::connections::inline::{
56 ConnectionAccess, ConnectionResolver, InlinedConnection, IntoInlineConnection,
57 ReferencedConnection,
58};
59use crate::controller::AlterError;
60use crate::errors::{DataflowError, ProtoDataflowError};
61use crate::instances::StorageInstanceId;
62use crate::sources::sql_server::SqlServerSourceExportDetails;
63
64pub mod casts;
65pub mod encoding;
66pub mod envelope;
67pub mod kafka;
68pub mod load_generator;
69pub mod mysql;
70pub mod postgres;
71pub mod sql_server;
72
73pub use crate::sources::envelope::SourceEnvelope;
74pub use crate::sources::kafka::KafkaSourceConnection;
75pub use crate::sources::load_generator::LoadGeneratorSourceConnection;
76pub use crate::sources::mysql::{MySqlSourceConnection, MySqlSourceExportDetails};
77pub use crate::sources::postgres::{PostgresSourceConnection, PostgresSourceExportDetails};
78pub use crate::sources::sql_server::{SqlServerSourceConnection, SqlServerSourceExtras};
79
80include!(concat!(env!("OUT_DIR"), "/mz_storage_types.sources.rs"));
81
82#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
84pub struct IngestionDescription<S: 'static = (), C: ConnectionAccess = InlinedConnection> {
85 pub desc: SourceDesc<C>,
87 pub source_exports: BTreeMap<GlobalId, SourceExport<S>>,
101 pub instance_id: StorageInstanceId,
103 pub remap_collection_id: GlobalId,
105 pub remap_metadata: S,
107}
108
109impl IngestionDescription {
110 pub fn new(
111 desc: SourceDesc,
112 instance_id: StorageInstanceId,
113 remap_collection_id: GlobalId,
114 ) -> Self {
115 Self {
116 desc,
117 remap_metadata: (),
118 source_exports: BTreeMap::new(),
119 instance_id,
120 remap_collection_id,
121 }
122 }
123}
124
125impl<S> IngestionDescription<S> {
126 pub fn collection_ids(&self) -> impl Iterator<Item = GlobalId> + '_ {
131 let IngestionDescription {
134 desc: _,
135 remap_metadata: _,
136 source_exports,
137 instance_id: _,
138 remap_collection_id,
139 } = &self;
140
141 source_exports
142 .keys()
143 .copied()
144 .chain(std::iter::once(*remap_collection_id))
145 }
146}
147
148impl<S: Debug + Eq + PartialEq + AlterCompatible> AlterCompatible for IngestionDescription<S> {
149 fn alter_compatible(
150 &self,
151 id: GlobalId,
152 other: &IngestionDescription<S>,
153 ) -> Result<(), AlterError> {
154 if self == other {
155 return Ok(());
156 }
157 let IngestionDescription {
158 desc,
159 remap_metadata,
160 source_exports,
161 instance_id,
162 remap_collection_id,
163 } = self;
164
165 let compatibility_checks = [
166 (desc.alter_compatible(id, &other.desc).is_ok(), "desc"),
167 (remap_metadata == &other.remap_metadata, "remap_metadata"),
168 (
169 source_exports
170 .iter()
171 .merge_join_by(&other.source_exports, |(l_key, _), (r_key, _)| {
172 l_key.cmp(r_key)
173 })
174 .all(|r| match r {
175 Both(
176 (
177 _,
178 SourceExport {
179 storage_metadata: l_metadata,
180 details: l_details,
181 data_config: l_data_config,
182 },
183 ),
184 (
185 _,
186 SourceExport {
187 storage_metadata: r_metadata,
188 details: r_details,
189 data_config: r_data_config,
190 },
191 ),
192 ) => {
193 l_metadata.alter_compatible(id, r_metadata).is_ok()
194 && l_details.alter_compatible(id, r_details).is_ok()
195 && l_data_config.alter_compatible(id, r_data_config).is_ok()
196 }
197 _ => true,
198 }),
199 "source_exports",
200 ),
201 (instance_id == &other.instance_id, "instance_id"),
202 (
203 remap_collection_id == &other.remap_collection_id,
204 "remap_collection_id",
205 ),
206 ];
207 for (compatible, field) in compatibility_checks {
208 if !compatible {
209 tracing::warn!(
210 "IngestionDescription incompatible at {field}:\nself:\n{:#?}\n\nother\n{:#?}",
211 self,
212 other
213 );
214
215 return Err(AlterError { id });
216 }
217 }
218
219 Ok(())
220 }
221}
222
223impl<R: ConnectionResolver> IntoInlineConnection<IngestionDescription, R>
224 for IngestionDescription<(), ReferencedConnection>
225{
226 fn into_inline_connection(self, r: R) -> IngestionDescription {
227 let IngestionDescription {
228 desc,
229 remap_metadata,
230 source_exports,
231 instance_id,
232 remap_collection_id,
233 } = self;
234
235 IngestionDescription {
236 desc: desc.into_inline_connection(r),
237 remap_metadata,
238 source_exports,
239 instance_id,
240 remap_collection_id,
241 }
242 }
243}
244
245#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
246pub struct SourceExport<S = (), C: ConnectionAccess = InlinedConnection> {
247 pub storage_metadata: S,
249 pub details: SourceExportDetails,
251 pub data_config: SourceExportDataConfig<C>,
253}
254
255pub trait SourceTimestamp:
256 Timestamp + Columnation + Refines<()> + std::fmt::Display + Sync
257{
258 fn encode_row(&self) -> Row;
259 fn decode_row(row: &Row) -> Self;
260}
261
262impl SourceTimestamp for MzOffset {
263 fn encode_row(&self) -> Row {
264 Row::pack([Datum::UInt64(self.offset)])
265 }
266
267 fn decode_row(row: &Row) -> Self {
268 let mut datums = row.iter();
269 match (datums.next(), datums.next()) {
270 (Some(Datum::UInt64(offset)), None) => MzOffset::from(offset),
271 _ => panic!("invalid row {row:?}"),
272 }
273 }
274}
275
276#[derive(
280 Copy,
281 Clone,
282 Default,
283 Debug,
284 PartialEq,
285 PartialOrd,
286 Eq,
287 Ord,
288 Hash,
289 Serialize,
290 Deserialize
291)]
292pub struct MzOffset {
293 pub offset: u64,
294}
295
296impl differential_dataflow::difference::Semigroup for MzOffset {
297 fn plus_equals(&mut self, rhs: &Self) {
298 self.offset.plus_equals(&rhs.offset)
299 }
300}
301
302impl differential_dataflow::difference::IsZero for MzOffset {
303 fn is_zero(&self) -> bool {
304 self.offset.is_zero()
305 }
306}
307
308impl mz_persist_types::Codec64 for MzOffset {
309 fn codec_name() -> String {
310 "MzOffset".to_string()
311 }
312
313 fn encode(&self) -> [u8; 8] {
314 mz_persist_types::Codec64::encode(&self.offset)
315 }
316
317 fn decode(buf: [u8; 8]) -> Self {
318 Self {
319 offset: mz_persist_types::Codec64::decode(buf),
320 }
321 }
322}
323
324impl columnation::Columnation for MzOffset {
325 type InnerRegion = columnation::CopyRegion<MzOffset>;
326}
327
328impl MzOffset {
329 pub fn checked_sub(self, other: Self) -> Option<Self> {
330 self.offset
331 .checked_sub(other.offset)
332 .map(|offset| Self { offset })
333 }
334}
335
336impl From<u64> for MzOffset {
339 fn from(offset: u64) -> Self {
340 Self { offset }
341 }
342}
343
344impl std::fmt::Display for MzOffset {
345 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
346 write!(f, "{}", self.offset)
347 }
348}
349
350impl Add<u64> for MzOffset {
352 type Output = MzOffset;
353
354 fn add(self, x: u64) -> MzOffset {
355 MzOffset {
356 offset: self.offset + x,
357 }
358 }
359}
360impl Add<Self> for MzOffset {
361 type Output = Self;
362
363 fn add(self, x: Self) -> Self {
364 MzOffset {
365 offset: self.offset + x.offset,
366 }
367 }
368}
369impl AddAssign<u64> for MzOffset {
370 fn add_assign(&mut self, x: u64) {
371 self.offset += x;
372 }
373}
374impl AddAssign<Self> for MzOffset {
375 fn add_assign(&mut self, x: Self) {
376 self.offset += x.offset;
377 }
378}
379
380impl From<tokio_postgres::types::PgLsn> for MzOffset {
382 fn from(lsn: tokio_postgres::types::PgLsn) -> Self {
383 MzOffset { offset: lsn.into() }
384 }
385}
386
387impl Timestamp for MzOffset {
388 type Summary = MzOffset;
389
390 fn minimum() -> Self {
391 MzOffset {
392 offset: Timestamp::minimum(),
393 }
394 }
395}
396
397impl PathSummary<MzOffset> for MzOffset {
398 fn results_in(&self, src: &MzOffset) -> Option<MzOffset> {
399 Some(MzOffset {
400 offset: self.offset.results_in(&src.offset)?,
401 })
402 }
403
404 fn followed_by(&self, other: &Self) -> Option<Self> {
405 Some(MzOffset {
406 offset: PathSummary::<u64>::followed_by(&self.offset, &other.offset)?,
407 })
408 }
409}
410
411impl Refines<()> for MzOffset {
412 fn to_inner(_: ()) -> Self {
413 MzOffset::minimum()
414 }
415 fn to_outer(self) {}
416 fn summarize(_: Self::Summary) {}
417}
418
419impl PartialOrder for MzOffset {
420 #[inline]
421 fn less_equal(&self, other: &Self) -> bool {
422 self.offset.less_equal(&other.offset)
423 }
424}
425
426impl TotalOrder for MzOffset {}
427
428#[derive(
437 Clone,
438 Debug,
439 Ord,
440 PartialOrd,
441 Eq,
442 PartialEq,
443 Serialize,
444 Deserialize,
445 Hash
446)]
447pub enum Timeline {
448 EpochMilliseconds,
451 External(String),
455 User(String),
459}
460
461impl Timeline {
462 const EPOCH_MILLISECOND_ID_CHAR: char = 'M';
463 const EXTERNAL_ID_CHAR: char = 'E';
464 const USER_ID_CHAR: char = 'U';
465
466 fn id_char(&self) -> char {
467 match self {
468 Self::EpochMilliseconds => Self::EPOCH_MILLISECOND_ID_CHAR,
469 Self::External(_) => Self::EXTERNAL_ID_CHAR,
470 Self::User(_) => Self::USER_ID_CHAR,
471 }
472 }
473}
474
475impl ToString for Timeline {
476 fn to_string(&self) -> String {
477 match self {
478 Self::EpochMilliseconds => format!("{}", self.id_char()),
479 Self::External(id) => format!("{}.{id}", self.id_char()),
480 Self::User(id) => format!("{}.{id}", self.id_char()),
481 }
482 }
483}
484
485impl FromStr for Timeline {
486 type Err = String;
487
488 fn from_str(s: &str) -> Result<Self, Self::Err> {
489 if s.is_empty() {
490 return Err("empty timeline".to_string());
491 }
492 let mut chars = s.chars();
493 match chars.next().expect("non-empty string") {
494 Self::EPOCH_MILLISECOND_ID_CHAR => match chars.next() {
495 None => Ok(Self::EpochMilliseconds),
496 Some(_) => Err(format!("unknown timeline: {s}")),
497 },
498 Self::EXTERNAL_ID_CHAR => match chars.next() {
499 Some('.') => Ok(Self::External(chars.as_str().to_string())),
500 _ => Err(format!("unknown timeline: {s}")),
501 },
502 Self::USER_ID_CHAR => match chars.next() {
503 Some('.') => Ok(Self::User(chars.as_str().to_string())),
504 _ => Err(format!("unknown timeline: {s}")),
505 },
506 _ => Err(format!("unknown timeline: {s}")),
507 }
508 }
509}
510
511pub trait SourceConnection: Debug + Clone + PartialEq + AlterCompatible {
513 fn name(&self) -> &'static str;
515
516 fn external_reference(&self) -> Option<&str>;
518
519 fn default_key_desc(&self) -> RelationDesc;
523
524 fn default_value_desc(&self) -> RelationDesc;
528
529 fn timestamp_desc(&self) -> RelationDesc;
532
533 fn connection_id(&self) -> Option<CatalogItemId>;
536
537 fn supports_read_only(&self) -> bool;
539
540 fn prefers_single_replica(&self) -> bool;
542}
543
544#[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize, Deserialize)]
545pub enum Compression {
546 Gzip,
547 None,
548}
549
550#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
553pub struct SourceExportDataConfig<C: ConnectionAccess = InlinedConnection> {
554 pub encoding: Option<encoding::SourceDataEncoding<C>>,
555 pub envelope: SourceEnvelope,
556}
557
558impl<R: ConnectionResolver> IntoInlineConnection<SourceExportDataConfig, R>
559 for SourceExportDataConfig<ReferencedConnection>
560{
561 fn into_inline_connection(self, r: R) -> SourceExportDataConfig {
562 let SourceExportDataConfig { encoding, envelope } = self;
563
564 SourceExportDataConfig {
565 encoding: encoding.map(|e| e.into_inline_connection(r)),
566 envelope,
567 }
568 }
569}
570
571impl<C: ConnectionAccess> AlterCompatible for SourceExportDataConfig<C> {
572 fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
573 if self == other {
574 return Ok(());
575 }
576 let Self { encoding, envelope } = &self;
577
578 let compatibility_checks = [
579 (
580 match (encoding, &other.encoding) {
581 (Some(s), Some(o)) => s.alter_compatible(id, o).is_ok(),
582 (s, o) => s == o,
583 },
584 "encoding",
585 ),
586 (envelope == &other.envelope, "envelope"),
587 ];
588
589 for (compatible, field) in compatibility_checks {
590 if !compatible {
591 tracing::warn!(
592 "SourceDesc incompatible {field}:\nself:\n{:#?}\n\nother\n{:#?}",
593 self,
594 other
595 );
596
597 return Err(AlterError { id });
598 }
599 }
600 Ok(())
601 }
602}
603
604impl<C: ConnectionAccess> SourceExportDataConfig<C> {
605 pub fn monotonic(&self, connection: &GenericSourceConnection<C>) -> bool {
612 match &self.envelope {
613 SourceEnvelope::Upsert(_) | SourceEnvelope::CdcV2 => false,
615 SourceEnvelope::None(_) => {
616 match connection {
617 GenericSourceConnection::Postgres(_) => false,
619 GenericSourceConnection::MySql(_) => false,
621 GenericSourceConnection::SqlServer(_) => false,
623 GenericSourceConnection::LoadGenerator(g) => g.load_generator.is_monotonic(),
625 GenericSourceConnection::Kafka(_) => true,
627 }
628 }
629 }
630 }
631}
632
633#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
635pub struct SourceDesc<C: ConnectionAccess = InlinedConnection> {
636 pub connection: GenericSourceConnection<C>,
637 pub timestamp_interval: Duration,
638}
639
640impl<R: ConnectionResolver> IntoInlineConnection<SourceDesc, R>
641 for SourceDesc<ReferencedConnection>
642{
643 fn into_inline_connection(self, r: R) -> SourceDesc {
644 let SourceDesc {
645 connection,
646 timestamp_interval,
647 } = self;
648
649 SourceDesc {
650 connection: connection.into_inline_connection(&r),
651 timestamp_interval,
652 }
653 }
654}
655
656impl<C: ConnectionAccess> AlterCompatible for SourceDesc<C> {
657 fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
661 if self == other {
662 return Ok(());
663 }
664 let Self {
665 connection,
666 timestamp_interval: _,
668 } = &self;
669
670 let compatibility_checks = [(
671 connection.alter_compatible(id, &other.connection).is_ok(),
672 "connection",
673 )];
674
675 for (compatible, field) in compatibility_checks {
676 if !compatible {
677 tracing::warn!(
678 "SourceDesc incompatible {field}:\nself:\n{:#?}\n\nother\n{:#?}",
679 self,
680 other
681 );
682
683 return Err(AlterError { id });
684 }
685 }
686
687 Ok(())
688 }
689}
690
691#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
692pub enum GenericSourceConnection<C: ConnectionAccess = InlinedConnection> {
693 Kafka(KafkaSourceConnection<C>),
694 Postgres(PostgresSourceConnection<C>),
695 MySql(MySqlSourceConnection<C>),
696 SqlServer(SqlServerSourceConnection<C>),
697 LoadGenerator(LoadGeneratorSourceConnection),
698}
699
700impl<C: ConnectionAccess> From<KafkaSourceConnection<C>> for GenericSourceConnection<C> {
701 fn from(conn: KafkaSourceConnection<C>) -> Self {
702 Self::Kafka(conn)
703 }
704}
705
706impl<C: ConnectionAccess> From<PostgresSourceConnection<C>> for GenericSourceConnection<C> {
707 fn from(conn: PostgresSourceConnection<C>) -> Self {
708 Self::Postgres(conn)
709 }
710}
711
712impl<C: ConnectionAccess> From<MySqlSourceConnection<C>> for GenericSourceConnection<C> {
713 fn from(conn: MySqlSourceConnection<C>) -> Self {
714 Self::MySql(conn)
715 }
716}
717
718impl<C: ConnectionAccess> From<SqlServerSourceConnection<C>> for GenericSourceConnection<C> {
719 fn from(conn: SqlServerSourceConnection<C>) -> Self {
720 Self::SqlServer(conn)
721 }
722}
723
724impl<C: ConnectionAccess> From<LoadGeneratorSourceConnection> for GenericSourceConnection<C> {
725 fn from(conn: LoadGeneratorSourceConnection) -> Self {
726 Self::LoadGenerator(conn)
727 }
728}
729
730impl<R: ConnectionResolver> IntoInlineConnection<GenericSourceConnection, R>
731 for GenericSourceConnection<ReferencedConnection>
732{
733 fn into_inline_connection(self, r: R) -> GenericSourceConnection {
734 match self {
735 GenericSourceConnection::Kafka(kafka) => {
736 GenericSourceConnection::Kafka(kafka.into_inline_connection(r))
737 }
738 GenericSourceConnection::Postgres(pg) => {
739 GenericSourceConnection::Postgres(pg.into_inline_connection(r))
740 }
741 GenericSourceConnection::MySql(mysql) => {
742 GenericSourceConnection::MySql(mysql.into_inline_connection(r))
743 }
744 GenericSourceConnection::SqlServer(sql_server) => {
745 GenericSourceConnection::SqlServer(sql_server.into_inline_connection(r))
746 }
747 GenericSourceConnection::LoadGenerator(lg) => {
748 GenericSourceConnection::LoadGenerator(lg)
749 }
750 }
751 }
752}
753
754impl<C: ConnectionAccess> SourceConnection for GenericSourceConnection<C> {
755 fn name(&self) -> &'static str {
756 match self {
757 Self::Kafka(conn) => conn.name(),
758 Self::Postgres(conn) => conn.name(),
759 Self::MySql(conn) => conn.name(),
760 Self::SqlServer(conn) => conn.name(),
761 Self::LoadGenerator(conn) => conn.name(),
762 }
763 }
764
765 fn external_reference(&self) -> Option<&str> {
766 match self {
767 Self::Kafka(conn) => conn.external_reference(),
768 Self::Postgres(conn) => conn.external_reference(),
769 Self::MySql(conn) => conn.external_reference(),
770 Self::SqlServer(conn) => conn.external_reference(),
771 Self::LoadGenerator(conn) => conn.external_reference(),
772 }
773 }
774
775 fn default_key_desc(&self) -> RelationDesc {
776 match self {
777 Self::Kafka(conn) => conn.default_key_desc(),
778 Self::Postgres(conn) => conn.default_key_desc(),
779 Self::MySql(conn) => conn.default_key_desc(),
780 Self::SqlServer(conn) => conn.default_key_desc(),
781 Self::LoadGenerator(conn) => conn.default_key_desc(),
782 }
783 }
784
785 fn default_value_desc(&self) -> RelationDesc {
786 match self {
787 Self::Kafka(conn) => conn.default_value_desc(),
788 Self::Postgres(conn) => conn.default_value_desc(),
789 Self::MySql(conn) => conn.default_value_desc(),
790 Self::SqlServer(conn) => conn.default_value_desc(),
791 Self::LoadGenerator(conn) => conn.default_value_desc(),
792 }
793 }
794
795 fn timestamp_desc(&self) -> RelationDesc {
796 match self {
797 Self::Kafka(conn) => conn.timestamp_desc(),
798 Self::Postgres(conn) => conn.timestamp_desc(),
799 Self::MySql(conn) => conn.timestamp_desc(),
800 Self::SqlServer(conn) => conn.timestamp_desc(),
801 Self::LoadGenerator(conn) => conn.timestamp_desc(),
802 }
803 }
804
805 fn connection_id(&self) -> Option<CatalogItemId> {
806 match self {
807 Self::Kafka(conn) => conn.connection_id(),
808 Self::Postgres(conn) => conn.connection_id(),
809 Self::MySql(conn) => conn.connection_id(),
810 Self::SqlServer(conn) => conn.connection_id(),
811 Self::LoadGenerator(conn) => conn.connection_id(),
812 }
813 }
814
815 fn supports_read_only(&self) -> bool {
816 match self {
817 GenericSourceConnection::Kafka(conn) => conn.supports_read_only(),
818 GenericSourceConnection::Postgres(conn) => conn.supports_read_only(),
819 GenericSourceConnection::MySql(conn) => conn.supports_read_only(),
820 GenericSourceConnection::SqlServer(conn) => conn.supports_read_only(),
821 GenericSourceConnection::LoadGenerator(conn) => conn.supports_read_only(),
822 }
823 }
824
825 fn prefers_single_replica(&self) -> bool {
826 match self {
827 GenericSourceConnection::Kafka(conn) => conn.prefers_single_replica(),
828 GenericSourceConnection::Postgres(conn) => conn.prefers_single_replica(),
829 GenericSourceConnection::MySql(conn) => conn.prefers_single_replica(),
830 GenericSourceConnection::SqlServer(conn) => conn.prefers_single_replica(),
831 GenericSourceConnection::LoadGenerator(conn) => conn.prefers_single_replica(),
832 }
833 }
834}
835impl<C: ConnectionAccess> crate::AlterCompatible for GenericSourceConnection<C> {
836 fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
837 if self == other {
838 return Ok(());
839 }
840 let r = match (self, other) {
841 (Self::Kafka(conn), Self::Kafka(other)) => conn.alter_compatible(id, other),
842 (Self::Postgres(conn), Self::Postgres(other)) => conn.alter_compatible(id, other),
843 (Self::MySql(conn), Self::MySql(other)) => conn.alter_compatible(id, other),
844 (Self::SqlServer(conn), Self::SqlServer(other)) => conn.alter_compatible(id, other),
845 (Self::LoadGenerator(conn), Self::LoadGenerator(other)) => {
846 conn.alter_compatible(id, other)
847 }
848 _ => Err(AlterError { id }),
849 };
850
851 if r.is_err() {
852 tracing::warn!(
853 "GenericSourceConnection incompatible:\nself:\n{:#?}\n\nother\n{:#?}",
854 self,
855 other
856 );
857 }
858
859 r
860 }
861}
862
863#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
866pub enum SourceExportDetails {
867 None,
870 Kafka(KafkaSourceExportDetails),
871 Postgres(PostgresSourceExportDetails),
872 MySql(MySqlSourceExportDetails),
873 SqlServer(SqlServerSourceExportDetails),
874 LoadGenerator(LoadGeneratorSourceExportDetails),
875}
876
877impl crate::AlterCompatible for SourceExportDetails {
878 fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
879 if self == other {
880 return Ok(());
881 }
882 let r = match (self, other) {
883 (Self::None, Self::None) => Ok(()),
884 (Self::Kafka(s), Self::Kafka(o)) => s.alter_compatible(id, o),
885 (Self::Postgres(s), Self::Postgres(o)) => s.alter_compatible(id, o),
886 (Self::MySql(s), Self::MySql(o)) => s.alter_compatible(id, o),
887 (Self::LoadGenerator(s), Self::LoadGenerator(o)) => s.alter_compatible(id, o),
888 _ => Err(AlterError { id }),
889 };
890
891 if r.is_err() {
892 tracing::warn!(
893 "SourceExportDetails incompatible:\nself:\n{:#?}\n\nother\n{:#?}",
894 self,
895 other
896 );
897 }
898
899 r
900 }
901}
902
903pub enum SourceExportStatementDetails {
909 Postgres {
910 table: mz_postgres_util::desc::PostgresTableDesc,
911 },
912 MySql {
913 table: mz_mysql_util::MySqlTableDesc,
914 initial_gtid_set: String,
915 binlog_full_metadata: bool,
916 },
917 SqlServer {
918 table: mz_sql_server_util::desc::SqlServerTableDesc,
919 capture_instance: Arc<str>,
920 initial_lsn: mz_sql_server_util::cdc::Lsn,
921 },
922 LoadGenerator {
923 output: LoadGeneratorOutput,
924 },
925 Kafka {},
926}
927
928impl RustType<ProtoSourceExportStatementDetails> for SourceExportStatementDetails {
929 fn into_proto(&self) -> ProtoSourceExportStatementDetails {
930 match self {
931 SourceExportStatementDetails::Postgres { table } => ProtoSourceExportStatementDetails {
932 kind: Some(proto_source_export_statement_details::Kind::Postgres(
933 postgres::ProtoPostgresSourceExportStatementDetails {
934 table: Some(table.into_proto()),
935 },
936 )),
937 },
938 SourceExportStatementDetails::MySql {
939 table,
940 initial_gtid_set,
941 binlog_full_metadata,
942 } => ProtoSourceExportStatementDetails {
943 kind: Some(proto_source_export_statement_details::Kind::Mysql(
944 mysql::ProtoMySqlSourceExportStatementDetails {
945 table: Some(table.into_proto()),
946 initial_gtid_set: initial_gtid_set.clone(),
947 binlog_full_metadata: *binlog_full_metadata,
948 },
949 )),
950 },
951 SourceExportStatementDetails::SqlServer {
952 table,
953 capture_instance,
954 initial_lsn,
955 } => ProtoSourceExportStatementDetails {
956 kind: Some(proto_source_export_statement_details::Kind::SqlServer(
957 sql_server::ProtoSqlServerSourceExportStatementDetails {
958 table: Some(table.into_proto()),
959 capture_instance: capture_instance.to_string(),
960 initial_lsn: initial_lsn.as_bytes().to_vec(),
961 },
962 )),
963 },
964 SourceExportStatementDetails::LoadGenerator { output } => {
965 ProtoSourceExportStatementDetails {
966 kind: Some(proto_source_export_statement_details::Kind::Loadgen(
967 load_generator::ProtoLoadGeneratorSourceExportStatementDetails {
968 output: output.into_proto().into(),
969 },
970 )),
971 }
972 }
973 SourceExportStatementDetails::Kafka {} => ProtoSourceExportStatementDetails {
974 kind: Some(proto_source_export_statement_details::Kind::Kafka(
975 kafka::ProtoKafkaSourceExportStatementDetails {},
976 )),
977 },
978 }
979 }
980
981 fn from_proto(proto: ProtoSourceExportStatementDetails) -> Result<Self, TryFromProtoError> {
982 use proto_source_export_statement_details::Kind;
983 Ok(match proto.kind {
984 Some(Kind::Postgres(details)) => SourceExportStatementDetails::Postgres {
985 table: details
986 .table
987 .into_rust_if_some("ProtoPostgresSourceExportStatementDetails::table")?,
988 },
989 Some(Kind::Mysql(details)) => SourceExportStatementDetails::MySql {
990 table: details
991 .table
992 .into_rust_if_some("ProtoMySqlSourceExportStatementDetails::table")?,
993
994 initial_gtid_set: details.initial_gtid_set,
995 binlog_full_metadata: details.binlog_full_metadata,
996 },
997 Some(Kind::SqlServer(details)) => SourceExportStatementDetails::SqlServer {
998 table: details
999 .table
1000 .into_rust_if_some("ProtoSqlServerSourceExportStatementDetails::table")?,
1001 capture_instance: details.capture_instance.into(),
1002 initial_lsn: mz_sql_server_util::cdc::Lsn::try_from(details.initial_lsn.as_slice())
1003 .map_err(|e| TryFromProtoError::InvalidFieldError(e.to_string()))?,
1004 },
1005 Some(Kind::Loadgen(details)) => SourceExportStatementDetails::LoadGenerator {
1006 output: details
1007 .output
1008 .into_rust_if_some("ProtoLoadGeneratorSourceExportStatementDetails::output")?,
1009 },
1010 Some(Kind::Kafka(_details)) => SourceExportStatementDetails::Kafka {},
1011 None => {
1012 return Err(TryFromProtoError::missing_field(
1013 "ProtoSourceExportStatementDetails::kind",
1014 ));
1015 }
1016 })
1017 }
1018}
1019
1020#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
1021#[repr(transparent)]
1022pub struct SourceData(pub Result<Row, DataflowError>);
1023
1024impl Default for SourceData {
1025 fn default() -> Self {
1026 SourceData(Ok(Row::default()))
1027 }
1028}
1029
1030impl Deref for SourceData {
1031 type Target = Result<Row, DataflowError>;
1032
1033 fn deref(&self) -> &Self::Target {
1034 &self.0
1035 }
1036}
1037
1038impl DerefMut for SourceData {
1039 fn deref_mut(&mut self) -> &mut Self::Target {
1040 &mut self.0
1041 }
1042}
1043
1044impl RustType<ProtoSourceData> for SourceData {
1045 fn into_proto(&self) -> ProtoSourceData {
1046 use proto_source_data::Kind;
1047 ProtoSourceData {
1048 kind: Some(match &**self {
1049 Ok(row) => Kind::Ok(row.into_proto()),
1050 Err(err) => Kind::Err(err.into_proto()),
1051 }),
1052 }
1053 }
1054
1055 fn from_proto(proto: ProtoSourceData) -> Result<Self, TryFromProtoError> {
1056 use proto_source_data::Kind;
1057 match proto.kind {
1058 Some(kind) => match kind {
1059 Kind::Ok(row) => Ok(SourceData(Ok(row.into_rust()?))),
1060 Kind::Err(err) => Ok(SourceData(Err(err.into_rust()?))),
1061 },
1062 None => Result::Err(TryFromProtoError::missing_field("ProtoSourceData::kind")),
1063 }
1064 }
1065}
1066
1067impl Codec for SourceData {
1068 type Storage = ProtoRow;
1069 type Schema = RelationDesc;
1070
1071 fn codec_name() -> String {
1072 "protobuf[SourceData]".into()
1073 }
1074
1075 fn encode<B: BufMut>(&self, buf: &mut B) {
1076 self.into_proto()
1077 .encode(buf)
1078 .expect("no required fields means no initialization errors");
1079 }
1080
1081 fn decode(buf: &[u8], schema: &RelationDesc) -> Result<Self, String> {
1082 let mut val = SourceData::default();
1083 <Self as Codec>::decode_from(&mut val, buf, &mut None, schema)?;
1084 Ok(val)
1085 }
1086
1087 fn decode_from<'a>(
1088 &mut self,
1089 buf: &'a [u8],
1090 storage: &mut Option<ProtoRow>,
1091 schema: &RelationDesc,
1092 ) -> Result<(), String> {
1093 let mut proto = storage.take().unwrap_or_default();
1097 proto.clear();
1098 let mut proto = ProtoSourceData {
1099 kind: Some(proto_source_data::Kind::Ok(proto)),
1100 };
1101 proto.merge(buf).map_err(|err| err.to_string())?;
1102 match (proto.kind, &mut self.0) {
1103 (Some(proto_source_data::Kind::Ok(proto)), Ok(row)) => {
1105 let ret = row.decode_from_proto(&proto, schema);
1106 storage.replace(proto);
1107 ret
1108 }
1109 (kind, _) => {
1111 let proto = ProtoSourceData { kind };
1112 *self = proto.into_rust().map_err(|err| err.to_string())?;
1113 Ok(())
1115 }
1116 }
1117 }
1118
1119 fn validate(val: &Self, desc: &Self::Schema) -> Result<(), String> {
1120 match &val.0 {
1121 Ok(row) => Row::validate(row, desc),
1122 Err(_) => Ok(()),
1123 }
1124 }
1125
1126 fn encode_schema(schema: &Self::Schema) -> Bytes {
1127 schema.into_proto().encode_to_vec().into()
1128 }
1129
1130 fn decode_schema(buf: &Bytes) -> Self::Schema {
1131 let proto = ProtoRelationDesc::decode(buf.as_ref()).expect("valid schema");
1132 proto.into_rust().expect("valid schema")
1133 }
1134}
1135
1136#[cfg(any(test, feature = "proptest"))]
1138pub fn arb_source_data_for_relation_desc(
1139 desc: &RelationDesc,
1140) -> impl Strategy<Value = SourceData> + use<> {
1141 let row_strat = arb_row_for_relation(desc).no_shrink();
1142
1143 proptest::strategy::Union::new_weighted(vec![
1144 (50, row_strat.prop_map(|row| SourceData(Ok(row))).boxed()),
1145 (
1146 1,
1147 any::<DataflowError>()
1148 .prop_map(|err| SourceData(Err(err)))
1149 .no_shrink()
1150 .boxed(),
1151 ),
1152 ])
1153}
1154
1155pub trait ExternalCatalogReference {
1163 fn schema_name(&self) -> &str;
1165 fn item_name(&self) -> &str;
1167}
1168
1169impl ExternalCatalogReference for &mz_mysql_util::MySqlTableDesc {
1170 fn schema_name(&self) -> &str {
1171 &self.schema_name
1172 }
1173
1174 fn item_name(&self) -> &str {
1175 &self.name
1176 }
1177}
1178
1179impl ExternalCatalogReference for mz_postgres_util::desc::PostgresTableDesc {
1180 fn schema_name(&self) -> &str {
1181 &self.namespace
1182 }
1183
1184 fn item_name(&self) -> &str {
1185 &self.name
1186 }
1187}
1188
1189impl ExternalCatalogReference for &mz_sql_server_util::desc::SqlServerTableDesc {
1190 fn schema_name(&self) -> &str {
1191 &*self.schema_name
1192 }
1193
1194 fn item_name(&self) -> &str {
1195 &*self.name
1196 }
1197}
1198
1199impl<'a> ExternalCatalogReference for (&'a str, &'a str) {
1202 fn schema_name(&self) -> &str {
1203 self.0
1204 }
1205
1206 fn item_name(&self) -> &str {
1207 self.1
1208 }
1209}
1210
1211#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
1219pub struct SourceReferenceResolver {
1220 inner: BTreeMap<Ident, BTreeMap<Ident, BTreeMap<Ident, usize>>>,
1221}
1222
1223#[derive(Debug, Clone, thiserror::Error)]
1224pub enum ExternalReferenceResolutionError {
1225 #[error("reference to {name} not found in source")]
1226 DoesNotExist { name: String },
1227 #[error(
1228 "reference {name} is ambiguous, consider specifying an additional \
1229 layer of qualification"
1230 )]
1231 Ambiguous { name: String },
1232 #[error("invalid identifier: {0}")]
1233 Ident(#[from] IdentError),
1234}
1235
1236impl<'a> SourceReferenceResolver {
1237 pub fn new<T: ExternalCatalogReference>(
1243 database: &str,
1244 referenceable_items: &'a [T],
1245 ) -> Result<SourceReferenceResolver, ExternalReferenceResolutionError> {
1246 let mut inner = BTreeMap::new();
1249
1250 let database = Ident::new(database)?;
1251
1252 for (reference_idx, item) in referenceable_items.iter().enumerate() {
1253 let item_name = Ident::new(item.item_name())?;
1254 let schema_name = Ident::new(item.schema_name())?;
1255
1256 inner
1257 .entry(item_name)
1258 .or_insert_with(BTreeMap::new)
1259 .entry(schema_name)
1260 .or_insert_with(BTreeMap::new)
1261 .entry(database.clone())
1262 .or_insert(reference_idx);
1263 }
1264
1265 Ok(SourceReferenceResolver { inner })
1266 }
1267
1268 pub fn resolve(
1285 &self,
1286 name: &[Ident],
1287 canonicalize_to_width: usize,
1288 ) -> Result<(UnresolvedItemName, usize), ExternalReferenceResolutionError> {
1289 let (db, schema, idx) = self.resolve_inner(name)?;
1290
1291 let item = name.last().expect("must have provided at least 1 element");
1292
1293 let canonical_name = match canonicalize_to_width {
1294 1 => vec![item.clone()],
1295 2 => vec![schema.clone(), item.clone()],
1296 3 => vec![db.clone(), schema.clone(), item.clone()],
1297 o => panic!("canonicalize_to_width values must be 1..=3, but got {}", o),
1298 };
1299
1300 Ok((UnresolvedItemName(canonical_name), idx))
1301 }
1302
1303 pub fn resolve_idx(&self, name: &[Ident]) -> Result<usize, ExternalReferenceResolutionError> {
1313 let (_db, _schema, idx) = self.resolve_inner(name)?;
1314 Ok(idx)
1315 }
1316
1317 fn resolve_inner<'name: 'a>(
1334 &'a self,
1335 name: &'name [Ident],
1336 ) -> Result<(&'a Ident, &'a Ident, usize), ExternalReferenceResolutionError> {
1337 let get_provided_name = || UnresolvedItemName(name.to_vec()).to_string();
1338
1339 if !(1..=3).contains(&name.len()) {
1341 Err(ExternalReferenceResolutionError::DoesNotExist {
1342 name: get_provided_name(),
1343 })?;
1344 }
1345
1346 let mut names = std::iter::repeat(None)
1348 .take(3 - name.len())
1349 .chain(name.iter().map(Some));
1350
1351 let database = names.next().flatten();
1352 let schema = names.next().flatten();
1353 let item = names
1354 .next()
1355 .flatten()
1356 .expect("must have provided the item name");
1357
1358 assert_none!(names.next(), "expected a 3-element iterator");
1359
1360 let schemas =
1361 self.inner
1362 .get(item)
1363 .ok_or_else(|| ExternalReferenceResolutionError::DoesNotExist {
1364 name: get_provided_name(),
1365 })?;
1366
1367 let schema = match schema {
1368 Some(schema) => schema,
1369 None => schemas.keys().exactly_one().map_err(|_e| {
1370 ExternalReferenceResolutionError::Ambiguous {
1371 name: get_provided_name(),
1372 }
1373 })?,
1374 };
1375
1376 let databases =
1377 schemas
1378 .get(schema)
1379 .ok_or_else(|| ExternalReferenceResolutionError::DoesNotExist {
1380 name: get_provided_name(),
1381 })?;
1382
1383 let database = match database {
1384 Some(database) => database,
1385 None => databases.keys().exactly_one().map_err(|_e| {
1386 ExternalReferenceResolutionError::Ambiguous {
1387 name: get_provided_name(),
1388 }
1389 })?,
1390 };
1391
1392 let reference_idx = databases.get(database).ok_or_else(|| {
1393 ExternalReferenceResolutionError::DoesNotExist {
1394 name: get_provided_name(),
1395 }
1396 })?;
1397
1398 Ok((database, schema, *reference_idx))
1399 }
1400}
1401
1402#[derive(Debug)]
1408pub enum SourceDataRowColumnarDecoder {
1409 Row(RowColumnarDecoder),
1410 EmptyRow,
1411}
1412
1413impl SourceDataRowColumnarDecoder {
1414 pub fn decode(&self, idx: usize, row: &mut Row) {
1415 match self {
1416 SourceDataRowColumnarDecoder::Row(decoder) => decoder.decode(idx, row),
1417 SourceDataRowColumnarDecoder::EmptyRow => {
1418 row.packer();
1420 }
1421 }
1422 }
1423
1424 pub fn goodbytes(&self) -> usize {
1425 match self {
1426 SourceDataRowColumnarDecoder::Row(decoder) => decoder.goodbytes(),
1427 SourceDataRowColumnarDecoder::EmptyRow => 0,
1428 }
1429 }
1430}
1431
1432#[derive(Debug)]
1433pub struct SourceDataColumnarDecoder {
1434 row_decoder: SourceDataRowColumnarDecoder,
1435 err_decoder: BinaryArray,
1436}
1437
1438impl SourceDataColumnarDecoder {
1439 pub fn new(col: StructArray, desc: &RelationDesc) -> Result<Self, anyhow::Error> {
1440 let (_fields, arrays, nullability) = col.into_parts();
1442
1443 if nullability.is_some() {
1444 anyhow::bail!("SourceData is not nullable, but found {nullability:?}");
1445 }
1446 if arrays.len() != 2 {
1447 anyhow::bail!("SourceData should only have two fields, found {arrays:?}");
1448 }
1449
1450 let errs = arrays[1]
1451 .as_any()
1452 .downcast_ref::<BinaryArray>()
1453 .ok_or_else(|| anyhow::anyhow!("expected BinaryArray, found {:?}", arrays[1]))?;
1454
1455 let row_decoder = match arrays[0].data_type() {
1456 arrow::datatypes::DataType::Struct(_) => {
1457 let rows = arrays[0]
1458 .as_any()
1459 .downcast_ref::<StructArray>()
1460 .ok_or_else(|| {
1461 anyhow::anyhow!("expected StructArray, found {:?}", arrays[0])
1462 })?;
1463 let decoder = RowColumnarDecoder::new(rows.clone(), desc)?;
1464 SourceDataRowColumnarDecoder::Row(decoder)
1465 }
1466 arrow::datatypes::DataType::Null => SourceDataRowColumnarDecoder::EmptyRow,
1467 other => anyhow::bail!("expected Struct or Null Array, found {other:?}"),
1468 };
1469
1470 Ok(SourceDataColumnarDecoder {
1471 row_decoder,
1472 err_decoder: errs.clone(),
1473 })
1474 }
1475}
1476
1477impl ColumnDecoder<SourceData> for SourceDataColumnarDecoder {
1478 fn decode(&self, idx: usize, val: &mut SourceData) {
1479 let err_null = self.err_decoder.is_null(idx);
1480 let row_null = match &self.row_decoder {
1481 SourceDataRowColumnarDecoder::Row(decoder) => decoder.is_null(idx),
1482 SourceDataRowColumnarDecoder::EmptyRow => !err_null,
1483 };
1484
1485 match (row_null, err_null) {
1486 (true, false) => {
1487 let err = self.err_decoder.value(idx);
1488 let err = ProtoDataflowError::decode(err)
1489 .expect("proto should be valid")
1490 .into_rust()
1491 .expect("error should be valid");
1492 val.0 = Err(err);
1493 }
1494 (false, true) => {
1495 let row = match val.0.as_mut() {
1496 Ok(row) => row,
1497 Err(_) => {
1498 val.0 = Ok(Row::default());
1499 val.0.as_mut().unwrap()
1500 }
1501 };
1502 self.row_decoder.decode(idx, row);
1503 }
1504 (true, true) => panic!("should have one of 'ok' or 'err'"),
1505 (false, false) => panic!("cannot have both 'ok' and 'err'"),
1506 }
1507 }
1508
1509 fn is_null(&self, idx: usize) -> bool {
1510 let err_null = self.err_decoder.is_null(idx);
1511 let row_null = match &self.row_decoder {
1512 SourceDataRowColumnarDecoder::Row(decoder) => decoder.is_null(idx),
1513 SourceDataRowColumnarDecoder::EmptyRow => !err_null,
1514 };
1515 assert!(!err_null || !row_null, "SourceData should never be null!");
1516
1517 false
1518 }
1519
1520 fn goodbytes(&self) -> usize {
1521 self.row_decoder.goodbytes() + ArrayOrd::Binary(self.err_decoder.clone()).goodbytes()
1522 }
1523
1524 fn stats(&self) -> StructStats {
1525 let len = self.err_decoder.len();
1526 let err_stats = ColumnarStats {
1527 nulls: Some(ColumnNullStats {
1528 count: self.err_decoder.null_count(),
1529 }),
1530 values: PrimitiveStats::<Vec<u8>>::from_column(&self.err_decoder).into(),
1531 };
1532 let row_null_count = len - self.err_decoder.null_count();
1537 let row_stats = match &self.row_decoder {
1538 SourceDataRowColumnarDecoder::Row(encoder) => {
1539 assert_eq!(encoder.null_count(), row_null_count);
1543 encoder.stats()
1544 }
1545 SourceDataRowColumnarDecoder::EmptyRow => StructStats {
1546 len,
1547 cols: BTreeMap::default(),
1548 },
1549 };
1550 let row_stats = ColumnarStats {
1551 nulls: Some(ColumnNullStats {
1552 count: row_null_count,
1553 }),
1554 values: ColumnStatKinds::Struct(row_stats),
1555 };
1556
1557 let stats = [
1558 (
1559 SourceDataColumnarEncoder::OK_COLUMN_NAME.to_string(),
1560 row_stats,
1561 ),
1562 (
1563 SourceDataColumnarEncoder::ERR_COLUMN_NAME.to_string(),
1564 err_stats,
1565 ),
1566 ];
1567 StructStats {
1568 len,
1569 cols: stats.into_iter().map(|(name, s)| (name, s)).collect(),
1570 }
1571 }
1572}
1573
1574#[derive(Debug)]
1581pub enum SourceDataRowColumnarEncoder {
1582 Row(RowColumnarEncoder),
1583 EmptyRow,
1584}
1585
1586impl SourceDataRowColumnarEncoder {
1587 pub(crate) fn goodbytes(&self) -> usize {
1588 match self {
1589 SourceDataRowColumnarEncoder::Row(e) => e.goodbytes(),
1590 SourceDataRowColumnarEncoder::EmptyRow => 0,
1591 }
1592 }
1593
1594 pub fn append(&mut self, row: &Row) {
1595 match self {
1596 SourceDataRowColumnarEncoder::Row(encoder) => encoder.append(row),
1597 SourceDataRowColumnarEncoder::EmptyRow => {
1598 assert_eq!(row.iter().count(), 0)
1599 }
1600 }
1601 }
1602
1603 pub fn append_null(&mut self) {
1604 match self {
1605 SourceDataRowColumnarEncoder::Row(encoder) => encoder.append_null(),
1606 SourceDataRowColumnarEncoder::EmptyRow => (),
1607 }
1608 }
1609}
1610
1611#[derive(Debug)]
1612pub struct SourceDataColumnarEncoder {
1613 row_encoder: SourceDataRowColumnarEncoder,
1614 err_encoder: BinaryBuilder,
1615}
1616
1617impl SourceDataColumnarEncoder {
1618 const OK_COLUMN_NAME: &'static str = "ok";
1619 const ERR_COLUMN_NAME: &'static str = "err";
1620
1621 pub fn new(desc: &RelationDesc) -> Self {
1622 let row_encoder = match RowColumnarEncoder::new(desc) {
1623 Some(encoder) => SourceDataRowColumnarEncoder::Row(encoder),
1624 None => {
1625 assert!(desc.typ().columns().is_empty());
1626 SourceDataRowColumnarEncoder::EmptyRow
1627 }
1628 };
1629 let err_encoder = BinaryBuilder::new();
1630
1631 SourceDataColumnarEncoder {
1632 row_encoder,
1633 err_encoder,
1634 }
1635 }
1636}
1637
1638impl ColumnEncoder<SourceData> for SourceDataColumnarEncoder {
1639 type FinishedColumn = StructArray;
1640
1641 fn goodbytes(&self) -> usize {
1642 self.row_encoder.goodbytes() + self.err_encoder.values_slice().len()
1643 }
1644
1645 #[inline]
1646 fn append(&mut self, val: &SourceData) {
1647 match val.0.as_ref() {
1648 Ok(row) => {
1649 self.row_encoder.append(row);
1650 self.err_encoder.append_null();
1651 }
1652 Err(err) => {
1653 self.row_encoder.append_null();
1654 self.err_encoder
1655 .append_value(err.into_proto().encode_to_vec());
1656 }
1657 }
1658 }
1659
1660 #[inline]
1661 fn append_null(&mut self) {
1662 panic!("appending a null into SourceDataColumnarEncoder is not supported");
1663 }
1664
1665 fn finish(self) -> Self::FinishedColumn {
1666 let SourceDataColumnarEncoder {
1667 row_encoder,
1668 mut err_encoder,
1669 } = self;
1670
1671 let err_column = BinaryBuilder::finish(&mut err_encoder);
1672 let row_column: ArrayRef = match row_encoder {
1673 SourceDataRowColumnarEncoder::Row(encoder) => {
1674 let column = encoder.finish();
1675 Arc::new(column)
1676 }
1677 SourceDataRowColumnarEncoder::EmptyRow => Arc::new(NullArray::new(err_column.len())),
1678 };
1679
1680 assert_eq!(row_column.len(), err_column.len());
1681
1682 let fields = vec![
1683 Field::new(Self::OK_COLUMN_NAME, row_column.data_type().clone(), true),
1684 Field::new(Self::ERR_COLUMN_NAME, err_column.data_type().clone(), true),
1685 ];
1686 let arrays: Vec<Arc<dyn Array>> = vec![row_column, Arc::new(err_column)];
1687 StructArray::new(Fields::from(fields), arrays, None)
1688 }
1689}
1690
1691impl Schema<SourceData> for RelationDesc {
1692 type ArrowColumn = StructArray;
1693 type Statistics = StructStats;
1694
1695 type Decoder = SourceDataColumnarDecoder;
1696 type Encoder = SourceDataColumnarEncoder;
1697
1698 fn decoder(&self, col: Self::ArrowColumn) -> Result<Self::Decoder, anyhow::Error> {
1699 SourceDataColumnarDecoder::new(col, self)
1700 }
1701
1702 fn encoder(&self) -> Result<Self::Encoder, anyhow::Error> {
1703 Ok(SourceDataColumnarEncoder::new(self))
1704 }
1705}
1706
1707#[cfg(test)]
1708mod tests {
1709 use arrow::array::{ArrayData, make_comparator};
1710 use base64::Engine;
1711 use bytes::Bytes;
1712 use mz_expr::EvalError;
1713 use mz_ore::assert_err;
1714 use mz_ore::metrics::MetricsRegistry;
1715 use mz_persist::indexed::columnar::arrow::{realloc_any, realloc_array};
1716 use mz_persist::metrics::ColumnarMetrics;
1717 use mz_persist_types::parquet::EncodingConfig;
1718 use mz_persist_types::schema::{Migration, backward_compatible};
1719 use mz_persist_types::stats::{PartStats, PartStatsMetrics};
1720 use mz_repr::{
1721 ColumnIndex, DatumVec, PropRelationDescDiff, ProtoRelationDesc, RelationDescBuilder,
1722 RowArena, SqlScalarType, arb_relation_desc_diff, arb_relation_desc_projection,
1723 };
1724 use proptest::prelude::*;
1725 use proptest::strategy::{Union, ValueTree};
1726
1727 use crate::stats::RelationPartStats;
1728
1729 use super::*;
1730
1731 #[mz_ore::test]
1732 fn test_timeline_parsing() {
1733 assert_eq!(Ok(Timeline::EpochMilliseconds), "M".parse());
1734 assert_eq!(Ok(Timeline::External("JOE".to_string())), "E.JOE".parse());
1735 assert_eq!(Ok(Timeline::User("MIKE".to_string())), "U.MIKE".parse());
1736
1737 assert_err!("Materialize".parse::<Timeline>());
1738 assert_err!("Ejoe".parse::<Timeline>());
1739 assert_err!("Umike".parse::<Timeline>());
1740 assert_err!("Dance".parse::<Timeline>());
1741 assert_err!("".parse::<Timeline>());
1742 }
1743
1744 #[track_caller]
1745 fn roundtrip_source_data(
1746 desc: &RelationDesc,
1747 datas: Vec<SourceData>,
1748 read_desc: &RelationDesc,
1749 config: &EncodingConfig,
1750 ) {
1751 let metrics = ColumnarMetrics::disconnected();
1752 let mut encoder = <RelationDesc as Schema<SourceData>>::encoder(desc).unwrap();
1753 for data in &datas {
1754 encoder.append(data);
1755 }
1756 let col = encoder.finish();
1757
1758 assert!(!col.is_nullable());
1760
1761 let col = realloc_array(&col, &metrics);
1763
1764 {
1766 let proto = col.to_data().into_proto();
1767 let bytes = proto.encode_to_vec();
1768 let proto = mz_persist_types::arrow::ProtoArrayData::decode(&bytes[..]).unwrap();
1769 let array_data: ArrayData = proto.into_rust().unwrap();
1770
1771 let col_rnd = StructArray::from(array_data.clone());
1772 assert_eq!(col, col_rnd);
1773
1774 let col_dyn = arrow::array::make_array(array_data);
1775 let col_dyn = col_dyn.as_any().downcast_ref::<StructArray>().unwrap();
1776 assert_eq!(&col, col_dyn);
1777 }
1778
1779 let mut buf = Vec::new();
1781 let fields = Fields::from(vec![Field::new("k", col.data_type().clone(), false)]);
1782 let arrays: Vec<Arc<dyn Array>> = vec![Arc::new(col.clone())];
1783 mz_persist_types::parquet::encode_arrays(&mut buf, fields, arrays, config).unwrap();
1784
1785 let buf = Bytes::from(buf);
1787 let mut reader = mz_persist_types::parquet::decode_arrays(buf).unwrap();
1788 let maybe_batch = reader.next();
1789
1790 let Some(record_batch) = maybe_batch else {
1792 assert!(datas.is_empty());
1793 return;
1794 };
1795 let record_batch = record_batch.unwrap();
1796
1797 assert_eq!(record_batch.columns().len(), 1);
1798 let rnd_col = &record_batch.columns()[0];
1799 let rnd_col = realloc_any(Arc::clone(rnd_col), &metrics);
1800 let rnd_col = rnd_col
1801 .as_any()
1802 .downcast_ref::<StructArray>()
1803 .unwrap()
1804 .clone();
1805
1806 let stats = <RelationDesc as Schema<SourceData>>::decoder_any(desc, &rnd_col)
1808 .expect("valid decoder")
1809 .stats();
1810
1811 let mut rnd_data = SourceData(Ok(Row::default()));
1813 let decoder = <RelationDesc as Schema<SourceData>>::decoder(desc, rnd_col.clone()).unwrap();
1814 for (idx, og_data) in datas.iter().enumerate() {
1815 decoder.decode(idx, &mut rnd_data);
1816 assert_eq!(og_data, &rnd_data);
1817 }
1818
1819 let stats_metrics = PartStatsMetrics::new(&MetricsRegistry::new());
1822 let stats = RelationPartStats {
1823 name: "test",
1824 metrics: &stats_metrics,
1825 stats: &PartStats { key: stats },
1826 desc: read_desc,
1827 };
1828 let mut datum_vec = DatumVec::new();
1829 let arena = RowArena::default();
1830 let decoder = <RelationDesc as Schema<SourceData>>::decoder(read_desc, rnd_col).unwrap();
1831
1832 for (idx, og_data) in datas.iter().enumerate() {
1833 decoder.decode(idx, &mut rnd_data);
1834 match (&og_data.0, &rnd_data.0) {
1835 (Ok(og_row), Ok(rnd_row)) => {
1836 {
1838 let datums = datum_vec.borrow_with(og_row);
1839 let projected_datums =
1840 datums.iter().enumerate().filter_map(|(idx, datum)| {
1841 read_desc
1842 .contains_index(&ColumnIndex::from_raw(idx))
1843 .then_some(datum)
1844 });
1845 let og_projected_row = Row::pack(projected_datums);
1846 assert_eq!(&og_projected_row, rnd_row);
1847 }
1848
1849 {
1851 let proj_datums = datum_vec.borrow_with(rnd_row);
1852 for (pos, (idx, _, _)) in read_desc.iter_all().enumerate() {
1853 let spec = stats.col_stats(idx, &arena);
1854 assert!(spec.may_contain(proj_datums[pos]));
1855 }
1856 }
1857 }
1858 (Err(_), Err(_)) => assert_eq!(og_data, &rnd_data),
1859 (_, _) => panic!("decoded to a different type? {og_data:?} {rnd_data:?}"),
1860 }
1861 }
1862
1863 let encoded_schema = SourceData::encode_schema(desc);
1866 let roundtrip_desc = SourceData::decode_schema(&encoded_schema);
1867 assert_eq!(desc, &roundtrip_desc);
1868
1869 let migration =
1872 mz_persist_types::schema::backward_compatible(col.data_type(), col.data_type());
1873 let migration = migration.expect("should be backward compatible with self");
1874 let migrated = migration.migrate(Arc::new(col.clone()));
1876 assert_eq!(col.data_type(), migrated.data_type());
1877 }
1878
1879 #[mz_ore::test]
1880 #[cfg_attr(miri, ignore)] fn all_source_data_roundtrips() {
1882 let mut weights = vec![(500, Just(0..8)), (50, Just(8..32))];
1883 if std::env::var("PROPTEST_LARGE_DATA").is_ok() {
1884 weights.extend([
1885 (10, Just(32..128)),
1886 (5, Just(128..512)),
1887 (3, Just(512..2048)),
1888 (1, Just(2048..8192)),
1889 ]);
1890 }
1891 let num_rows = Union::new_weighted(weights);
1892
1893 let strat = (any::<RelationDesc>(), num_rows)
1895 .prop_flat_map(|(desc, num_rows)| {
1896 arb_relation_desc_projection(desc.clone())
1897 .prop_map(move |read_desc| (desc.clone(), read_desc, num_rows.clone()))
1898 })
1899 .prop_flat_map(|(desc, read_desc, num_rows)| {
1900 proptest::collection::vec(arb_source_data_for_relation_desc(&desc), num_rows)
1901 .prop_map(move |datas| (desc.clone(), datas, read_desc.clone()))
1902 });
1903
1904 let combined_strat = (any::<EncodingConfig>(), strat);
1905 proptest!(|((config, (desc, source_datas, read_desc)) in combined_strat)| {
1906 roundtrip_source_data(&desc, source_datas, &read_desc, &config);
1907 });
1908 }
1909
1910 #[mz_ore::test]
1911 fn roundtrip_error_nulls() {
1912 let desc = RelationDescBuilder::default()
1913 .with_column(
1914 "ts",
1915 SqlScalarType::TimestampTz { precision: None }.nullable(false),
1916 )
1917 .finish();
1918 let source_datas = vec![SourceData(Err(DataflowError::EvalError(
1919 EvalError::DateOutOfRange.into(),
1920 )))];
1921 let config = EncodingConfig::default();
1922 roundtrip_source_data(&desc, source_datas, &desc, &config);
1923 }
1924
1925 fn is_sorted(array: &dyn Array) -> bool {
1926 let sort_options = arrow::compute::SortOptions::default();
1927 let Ok(cmp) = make_comparator(array, array, sort_options) else {
1928 return false;
1934 };
1935 (0..array.len())
1936 .tuple_windows()
1937 .all(|(i, j)| cmp(i, j).is_le())
1938 }
1939
1940 fn get_data_type(schema: &impl Schema<SourceData>) -> arrow::datatypes::DataType {
1941 use mz_persist_types::columnar::ColumnEncoder;
1942 let array = Schema::encoder(schema).expect("valid schema").finish();
1943 Array::data_type(&array).clone()
1944 }
1945
1946 #[track_caller]
1947 fn backward_compatible_testcase(
1948 old: &RelationDesc,
1949 new: &RelationDesc,
1950 migration: Migration,
1951 datas: &[SourceData],
1952 ) {
1953 let mut encoder = Schema::<SourceData>::encoder(old).expect("valid schema");
1954 for data in datas {
1955 encoder.append(data);
1956 }
1957 let old = encoder.finish();
1958 let new = Schema::<SourceData>::encoder(new)
1959 .expect("valid schema")
1960 .finish();
1961 let old: Arc<dyn Array> = Arc::new(old);
1962 let new: Arc<dyn Array> = Arc::new(new);
1963 let migrated = migration.migrate(Arc::clone(&old));
1964 assert_eq!(migrated.data_type(), new.data_type());
1965
1966 if migration.preserves_order() && is_sorted(&old) {
1968 assert!(is_sorted(&new))
1969 }
1970 }
1971
1972 #[mz_ore::test]
1973 fn backward_compatible_empty_add_column() {
1974 let old = RelationDesc::empty();
1975 let new = RelationDesc::from_names_and_types([("a", SqlScalarType::Bool.nullable(true))]);
1976
1977 let old_data_type = get_data_type(&old);
1978 let new_data_type = get_data_type(&new);
1979
1980 let migration = backward_compatible(&old_data_type, &new_data_type);
1981 assert!(migration.is_some());
1982 }
1983
1984 #[mz_ore::test]
1985 fn backward_compatible_project_away_all() {
1986 let old = RelationDesc::from_names_and_types([("a", SqlScalarType::Bool.nullable(true))]);
1987 let new = RelationDesc::empty();
1988
1989 let old_data_type = get_data_type(&old);
1990 let new_data_type = get_data_type(&new);
1991
1992 let migration = backward_compatible(&old_data_type, &new_data_type);
1993 assert!(migration.is_some());
1994 }
1995
1996 #[mz_ore::test]
1997 #[cfg_attr(miri, ignore)]
1998 fn backward_compatible_migrate() {
1999 let strat = (any::<RelationDesc>(), any::<RelationDesc>()).prop_flat_map(|(old, new)| {
2000 proptest::collection::vec(arb_source_data_for_relation_desc(&old), 2)
2001 .prop_map(move |datas| (old.clone(), new.clone(), datas))
2002 });
2003
2004 proptest!(|((old, new, datas) in strat)| {
2005 let old_data_type = get_data_type(&old);
2006 let new_data_type = get_data_type(&new);
2007
2008 if let Some(migration) = backward_compatible(&old_data_type, &new_data_type) {
2009 backward_compatible_testcase(&old, &new, migration, &datas);
2010 };
2011 });
2012 }
2013
2014 #[mz_ore::test]
2015 #[cfg_attr(miri, ignore)]
2016 fn backward_compatible_migrate_from_common() {
2017 use mz_repr::SqlColumnType;
2018 fn test_case(old: RelationDesc, diffs: Vec<PropRelationDescDiff>, datas: Vec<SourceData>) {
2019 let should_be_compatible = diffs.iter().all(|diff| match diff {
2021 PropRelationDescDiff::AddColumn {
2023 typ: SqlColumnType { nullable, .. },
2024 ..
2025 } => *nullable,
2026 PropRelationDescDiff::DropColumn { .. } => true,
2027 _ => false,
2028 });
2029
2030 let mut new = old.clone();
2031 for diff in diffs.into_iter() {
2032 diff.apply(&mut new)
2033 }
2034
2035 let old_data_type = get_data_type(&old);
2036 let new_data_type = get_data_type(&new);
2037
2038 if let Some(migration) = backward_compatible(&old_data_type, &new_data_type) {
2039 backward_compatible_testcase(&old, &new, migration, &datas);
2040 } else if should_be_compatible {
2041 panic!("new DataType was not compatible when it should have been!");
2042 }
2043 }
2044
2045 let strat = any::<RelationDesc>()
2046 .prop_flat_map(|desc| {
2047 proptest::collection::vec(arb_source_data_for_relation_desc(&desc), 2)
2048 .no_shrink()
2049 .prop_map(move |datas| (desc.clone(), datas))
2050 })
2051 .prop_flat_map(|(desc, datas)| {
2052 arb_relation_desc_diff(&desc)
2053 .prop_map(move |diffs| (desc.clone(), diffs, datas.clone()))
2054 });
2055
2056 proptest!(|((old, diffs, datas) in strat)| {
2057 test_case(old, diffs, datas);
2058 });
2059 }
2060
2061 #[mz_ore::test]
2062 #[cfg_attr(miri, ignore)] fn empty_relation_desc_roundtrips() {
2064 let empty = RelationDesc::empty();
2065 let rows = proptest::collection::vec(arb_source_data_for_relation_desc(&empty), 0..8)
2066 .prop_map(move |datas| (empty.clone(), datas));
2067
2068 proptest!(|((config, (desc, source_datas)) in (any::<EncodingConfig>(), rows))| {
2071 roundtrip_source_data(&desc, source_datas, &desc, &config);
2072 });
2073 }
2074
2075 #[mz_ore::test]
2076 #[cfg_attr(miri, ignore)] fn arrow_datatype_consistent() {
2078 fn test_case(desc: RelationDesc, datas: Vec<SourceData>) {
2079 let half = datas.len() / 2;
2080
2081 let mut encoder_a = <RelationDesc as Schema<SourceData>>::encoder(&desc).unwrap();
2082 for data in &datas[..half] {
2083 encoder_a.append(data);
2084 }
2085 let col_a = encoder_a.finish();
2086
2087 let mut encoder_b = <RelationDesc as Schema<SourceData>>::encoder(&desc).unwrap();
2088 for data in &datas[half..] {
2089 encoder_b.append(data);
2090 }
2091 let col_b = encoder_b.finish();
2092
2093 assert_eq!(col_a.data_type(), col_b.data_type());
2096 }
2097
2098 let num_rows = 12;
2099 let strat = any::<RelationDesc>().prop_flat_map(|desc| {
2100 proptest::collection::vec(arb_source_data_for_relation_desc(&desc), num_rows)
2101 .prop_map(move |datas| (desc.clone(), datas))
2102 });
2103
2104 proptest!(|((desc, data) in strat)| {
2105 test_case(desc, data);
2106 });
2107 }
2108
2109 #[mz_ore::test]
2110 #[cfg_attr(miri, ignore)] fn source_proto_serialization_stability() {
2112 let min_protos = 10;
2113 let encoded = include_str!("snapshots/source-datas.txt");
2114
2115 let mut decoded: Vec<(RelationDesc, SourceData)> = encoded
2117 .lines()
2118 .map(|s| {
2119 let (desc, data) = s.split_once(',').expect("comma separated data");
2120 let desc = base64::engine::general_purpose::STANDARD
2121 .decode(desc)
2122 .expect("valid base64");
2123 let data = base64::engine::general_purpose::STANDARD
2124 .decode(data)
2125 .expect("valid base64");
2126 (desc, data)
2127 })
2128 .map(|(desc, data)| {
2129 let desc = ProtoRelationDesc::decode(&desc[..]).expect("valid proto");
2130 let desc = desc.into_rust().expect("valid proto");
2131 let data = SourceData::decode(&data, &desc).expect("valid proto");
2132 (desc, data)
2133 })
2134 .collect();
2135
2136 let mut runner = proptest::test_runner::TestRunner::deterministic();
2138 let strategy = RelationDesc::arbitrary().prop_flat_map(|desc| {
2139 arb_source_data_for_relation_desc(&desc).prop_map(move |data| (desc.clone(), data))
2140 });
2141 while decoded.len() < min_protos {
2142 let arbitrary_data = strategy
2143 .new_tree(&mut runner)
2144 .expect("source data")
2145 .current();
2146 decoded.push(arbitrary_data);
2147 }
2148
2149 let mut reencoded = String::new();
2151 let mut buf = vec![];
2152 for (desc, data) in decoded {
2153 buf.clear();
2154 desc.into_proto().encode(&mut buf).expect("success");
2155 base64::engine::general_purpose::STANDARD.encode_string(buf.as_slice(), &mut reencoded);
2156 reencoded.push(',');
2157
2158 buf.clear();
2159 data.encode(&mut buf);
2160 base64::engine::general_purpose::STANDARD.encode_string(buf.as_slice(), &mut reencoded);
2161 reencoded.push('\n');
2162 }
2163
2164 assert_eq!(
2175 encoded,
2176 reencoded.as_str(),
2177 "SourceData serde should be stable"
2178 )
2179 }
2180}