1use base64::Engine;
26use chrono::{NaiveDateTime, SubsecRound};
27use dec::OrderedDecimal;
28use mz_ore::cast::CastFrom;
29use mz_proto::{IntoRustIfSome, ProtoType, RustType};
30use mz_repr::adt::char::CharLength;
31use mz_repr::adt::numeric::{Numeric, NumericMaxScale};
32use mz_repr::adt::timestamp::{CheckedTimestamp, TimestampPrecision};
33use mz_repr::adt::varchar::VarCharMaxLength;
34use mz_repr::{Datum, RelationDesc, Row, RowArena, SqlColumnType, SqlScalarType};
35use proptest_derive::Arbitrary;
36use serde::{Deserialize, Serialize};
37
38use std::collections::BTreeSet;
39use std::sync::Arc;
40
41use crate::desc::proto_sql_server_table_constraint::ConstraintType;
42use crate::{SqlServerDecodeError, SqlServerError};
43
44include!(concat!(env!("OUT_DIR"), "/mz_sql_server_util.rs"));
45
46#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Arbitrary)]
56pub struct SqlServerTableDesc {
57 pub schema_name: Arc<str>,
59 pub name: Arc<str>,
61 pub columns: Box<[SqlServerColumnDesc]>,
63 pub constraints: Vec<SqlServerTableConstraint>,
65}
66
67impl SqlServerTableDesc {
68 pub fn new(
73 raw: SqlServerTableRaw,
74 raw_constraints: Vec<SqlServerTableConstraintRaw>,
75 ) -> Result<Self, SqlServerError> {
76 let columns: Box<[_]> = raw
77 .columns
78 .into_iter()
79 .map(SqlServerColumnDesc::new)
80 .collect();
81 let constraints = raw_constraints
82 .into_iter()
83 .map(SqlServerTableConstraint::try_from)
84 .collect::<Result<Vec<_>, _>>()?;
85 Ok(SqlServerTableDesc {
86 schema_name: raw.schema_name,
87 name: raw.name,
88 columns,
89 constraints,
90 })
91 }
92
93 pub fn qualified_name(&self) -> SqlServerQualifiedTableName {
95 SqlServerQualifiedTableName {
96 schema_name: Arc::clone(&self.schema_name),
97 table_name: Arc::clone(&self.name),
98 }
99 }
100
101 pub fn apply_text_columns(&mut self, text_columns: &BTreeSet<&str>) {
104 for column in &mut self.columns {
105 if text_columns.contains(column.name.as_ref()) {
106 column.represent_as_text();
107 }
108 }
109 }
110
111 pub fn apply_excl_columns(&mut self, excl_columns: &BTreeSet<&str>) {
114 for column in &mut self.columns {
115 if excl_columns.contains(column.name.as_ref()) {
116 column.exclude();
117 }
118 }
119 }
120
121 pub fn decoder(&self, desc: &RelationDesc) -> Result<SqlServerRowDecoder, SqlServerError> {
124 let decoder = SqlServerRowDecoder::try_new(self, desc)?;
125 Ok(decoder)
126 }
127}
128
129impl RustType<ProtoSqlServerTableDesc> for SqlServerTableDesc {
130 fn into_proto(&self) -> ProtoSqlServerTableDesc {
131 ProtoSqlServerTableDesc {
132 name: self.name.to_string(),
133 schema_name: self.schema_name.to_string(),
134 columns: self.columns.iter().map(|c| c.into_proto()).collect(),
135 constraints: self.constraints.iter().map(|c| c.into_proto()).collect(),
136 }
137 }
138
139 fn from_proto(proto: ProtoSqlServerTableDesc) -> Result<Self, mz_proto::TryFromProtoError> {
140 let columns = proto
141 .columns
142 .into_iter()
143 .map(|c| c.into_rust())
144 .collect::<Result<_, _>>()?;
145 let constraints = proto
146 .constraints
147 .into_iter()
148 .map(|c| c.into_rust())
149 .collect::<Result<_, _>>()?;
150 Ok(SqlServerTableDesc {
151 schema_name: proto.schema_name.into(),
152 name: proto.name.into(),
153 columns,
154 constraints,
155 })
156 }
157}
158
159#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, Arbitrary)]
162pub enum SqlServerTableConstraintType {
163 PrimaryKey,
164 Unique,
165}
166
167impl TryFrom<String> for SqlServerTableConstraintType {
168 type Error = SqlServerError;
169
170 fn try_from(value: String) -> Result<Self, Self::Error> {
171 match value.as_str() {
172 "PRIMARY KEY" => Ok(Self::PrimaryKey),
173 "UNIQUE" => Ok(Self::Unique),
174 name => Err(SqlServerError::InvalidData {
175 column_name: "constraint_type".into(),
176 error: format!("Unknown constraint type: {name}"),
177 }),
178 }
179 }
180}
181
182impl RustType<proto_sql_server_table_constraint::ConstraintType> for SqlServerTableConstraintType {
183 fn into_proto(&self) -> proto_sql_server_table_constraint::ConstraintType {
184 match self {
185 SqlServerTableConstraintType::PrimaryKey => ConstraintType::PrimaryKey(()),
186 SqlServerTableConstraintType::Unique => ConstraintType::Unique(()),
187 }
188 }
189
190 fn from_proto(
191 proto: proto_sql_server_table_constraint::ConstraintType,
192 ) -> Result<Self, mz_proto::TryFromProtoError> {
193 Ok(match proto {
194 ConstraintType::PrimaryKey(_) => SqlServerTableConstraintType::PrimaryKey,
195 ConstraintType::Unique(_) => SqlServerTableConstraintType::Unique,
196 })
197 }
198}
199
200#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, Arbitrary)]
202pub struct SqlServerTableConstraint {
203 pub constraint_name: String,
204 pub constraint_type: SqlServerTableConstraintType,
205 pub column_names: Vec<String>,
206}
207
208impl TryFrom<SqlServerTableConstraintRaw> for SqlServerTableConstraint {
209 type Error = SqlServerError;
210
211 fn try_from(value: SqlServerTableConstraintRaw) -> Result<Self, Self::Error> {
212 Ok(SqlServerTableConstraint {
213 constraint_name: value.constraint_name,
214 constraint_type: value.constraint_type.try_into()?,
215 column_names: value.columns,
216 })
217 }
218}
219
220impl RustType<ProtoSqlServerTableConstraint> for SqlServerTableConstraint {
221 fn into_proto(&self) -> ProtoSqlServerTableConstraint {
222 ProtoSqlServerTableConstraint {
223 constraint_name: self.constraint_name.clone(),
224 constraint_type: Some(self.constraint_type.into_proto()),
225 column_names: self.column_names.clone(),
226 }
227 }
228
229 fn from_proto(
230 proto: ProtoSqlServerTableConstraint,
231 ) -> Result<Self, mz_proto::TryFromProtoError> {
232 Ok(SqlServerTableConstraint {
233 constraint_name: proto.constraint_name,
234 constraint_type: proto
235 .constraint_type
236 .into_rust_if_some("ProtoSqlServerTableConstraint::constraint_type")?,
237 column_names: proto.column_names,
238 })
239 }
240}
241
242#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
246pub struct SqlServerQualifiedTableName {
247 pub schema_name: Arc<str>,
248 pub table_name: Arc<str>,
249}
250
251impl ToString for SqlServerQualifiedTableName {
252 fn to_string(&self) -> String {
253 format!("[{}].[{}]", self.schema_name, self.table_name)
254 }
255}
256
257#[derive(Debug, Clone)]
262pub struct SqlServerTableRaw {
263 pub schema_name: Arc<str>,
265 pub name: Arc<str>,
267 pub capture_instance: Arc<SqlServerCaptureInstanceRaw>,
269 pub columns: Arc<[SqlServerColumnRaw]>,
271}
272
273#[derive(Debug, Clone)]
275pub struct SqlServerCaptureInstanceRaw {
276 pub name: Arc<str>,
278 pub create_date: Arc<NaiveDateTime>,
280}
281
282#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Arbitrary)]
284pub struct SqlServerColumnDesc {
285 pub name: Arc<str>,
287 pub column_type: Option<SqlColumnType>,
293 pub primary_key_constraint: Option<Arc<str>>,
296 pub decode_type: SqlServerColumnDecodeType,
298 pub raw_type: Arc<str>,
302}
303
304impl SqlServerColumnDesc {
305 pub fn new(raw: &SqlServerColumnRaw) -> Self {
307 let (column_type, decode_type) = match parse_data_type(raw) {
308 Ok((scalar_type, decode_type)) => {
309 let column_type = scalar_type.nullable(raw.is_nullable);
310 (Some(column_type), decode_type)
311 }
312 Err(err) => {
313 tracing::warn!(
314 ?err,
315 ?raw,
316 "found an unsupported data type when parsing raw data"
317 );
318 (
319 None,
320 SqlServerColumnDecodeType::Unsupported {
321 context: err.reason,
322 },
323 )
324 }
325 };
326 SqlServerColumnDesc {
327 name: Arc::clone(&raw.name),
328 primary_key_constraint: None,
329 column_type,
330 decode_type,
331 raw_type: Arc::clone(&raw.data_type),
332 }
333 }
334
335 pub fn represent_as_text(&mut self) {
337 self.column_type = self
338 .column_type
339 .as_ref()
340 .map(|ct| SqlScalarType::String.nullable(ct.nullable));
341 }
342
343 pub fn exclude(&mut self) {
345 self.column_type = None;
346 }
347
348 pub fn is_excluded(&self) -> bool {
350 self.column_type.is_none()
351 }
352}
353
354impl RustType<ProtoSqlServerColumnDesc> for SqlServerColumnDesc {
355 fn into_proto(&self) -> ProtoSqlServerColumnDesc {
356 ProtoSqlServerColumnDesc {
357 name: self.name.to_string(),
358 column_type: self.column_type.into_proto(),
359 primary_key_constraint: self.primary_key_constraint.as_ref().map(|v| v.to_string()),
360 decode_type: Some(self.decode_type.into_proto()),
361 raw_type: self.raw_type.to_string(),
362 }
363 }
364
365 fn from_proto(proto: ProtoSqlServerColumnDesc) -> Result<Self, mz_proto::TryFromProtoError> {
366 Ok(SqlServerColumnDesc {
367 name: proto.name.into(),
368 column_type: proto.column_type.into_rust()?,
369 primary_key_constraint: proto.primary_key_constraint.map(|v| v.into()),
370 decode_type: proto
371 .decode_type
372 .into_rust_if_some("ProtoSqlServerColumnDesc::decode_type")?,
373 raw_type: proto.raw_type.into(),
374 })
375 }
376}
377
378#[derive(Debug)]
380#[allow(dead_code)]
381pub struct UnsupportedDataType {
382 column_name: String,
383 column_type: String,
384 reason: String,
385}
386
387fn parse_data_type(
392 raw: &SqlServerColumnRaw,
393) -> Result<(SqlScalarType, SqlServerColumnDecodeType), UnsupportedDataType> {
394 if raw.is_computed {
398 return Err(UnsupportedDataType {
399 column_name: raw.name.to_string(),
400 column_type: format!("{} (computed)", raw.data_type.to_lowercase()),
401 reason: "column is computed".into(),
402 });
403 }
404
405 let scalar =
406 match raw.data_type.to_lowercase().as_str() {
407 "tinyint" => (SqlScalarType::Int16, SqlServerColumnDecodeType::U8),
408 "smallint" => (SqlScalarType::Int16, SqlServerColumnDecodeType::I16),
409 "int" => (SqlScalarType::Int32, SqlServerColumnDecodeType::I32),
410 "bigint" => (SqlScalarType::Int64, SqlServerColumnDecodeType::I64),
411 "bit" => (SqlScalarType::Bool, SqlServerColumnDecodeType::Bool),
412 "decimal" | "numeric" | "money" | "smallmoney" => {
413 if raw.precision > 38 || raw.scale > raw.precision {
420 tracing::warn!(
421 "unexpected value from SQL Server, precision of {} and scale of {}",
422 raw.precision,
423 raw.scale,
424 );
425 }
426 if raw.precision > 39 {
427 let reason = format!(
428 "precision of {} is greater than our maximum of 39",
429 raw.precision
430 );
431 return Err(UnsupportedDataType {
432 column_name: raw.name.to_string(),
433 column_type: raw.data_type.to_string(),
434 reason,
435 });
436 }
437
438 let raw_scale = usize::cast_from(raw.scale);
439 let max_scale =
440 NumericMaxScale::try_from(raw_scale).map_err(|_| UnsupportedDataType {
441 column_type: raw.data_type.to_string(),
442 column_name: raw.name.to_string(),
443 reason: format!("scale of {} is too large", raw.scale),
444 })?;
445 let column_type = SqlScalarType::Numeric {
446 max_scale: Some(max_scale),
447 };
448
449 (column_type, SqlServerColumnDecodeType::Numeric)
450 }
451 "real" | "float" | "double precision" => match raw.max_length {
459 4 => (SqlScalarType::Float32, SqlServerColumnDecodeType::F32),
462 8 => (SqlScalarType::Float64, SqlServerColumnDecodeType::F64),
463 _ => {
464 return Err(UnsupportedDataType {
465 column_name: raw.name.to_string(),
466 column_type: raw.data_type.to_string(),
467 reason: format!("unsupported length {}", raw.max_length),
468 });
469 }
470 },
471 dt @ ("char" | "nchar" | "sysname") => {
472 if raw.max_length == -1 {
475 return Err(UnsupportedDataType {
476 column_name: raw.name.to_string(),
477 column_type: raw.data_type.to_string(),
478 reason: "columns with unlimited size do not support CDC".to_string(),
479 });
480 }
481
482 let column_type = match dt {
483 "char" => {
484 let length =
485 if raw.max_length != -1 {
486 let length = CharLength::try_from(i64::from(raw.max_length))
487 .map_err(|e| UnsupportedDataType {
488 column_name: raw.name.to_string(),
489 column_type: raw.data_type.to_string(),
490 reason: e.to_string(),
491 })?;
492 Some(length)
493 } else {
494 None
495 };
496 SqlScalarType::Char { length }
497 }
498 "nchar" | "sysname" => SqlScalarType::String,
502 other => unreachable!("'{other}' checked above"),
503 };
504
505 (column_type, SqlServerColumnDecodeType::String)
506 }
507 "varchar" | "nvarchar" => {
508 let max_length =
519 if raw.max_length != -1 {
520 let length = VarCharMaxLength::try_from(i64::from(raw.max_length))
521 .map_err(|e| UnsupportedDataType {
522 column_name: raw.name.to_string(),
523 column_type: raw.data_type.to_string(),
524 reason: e.to_string(),
525 })?;
526 Some(length)
527 } else {
528 None
529 };
530 let column_type = SqlScalarType::VarChar { max_length };
531 (column_type, SqlServerColumnDecodeType::String)
532 }
533 "text" | "ntext" | "image" => {
534 mz_ore::soft_assert_eq_no_log!(raw.max_length, 16);
537
538 return Err(UnsupportedDataType {
540 column_name: raw.name.to_string(),
541 column_type: raw.data_type.to_string(),
542 reason: "columns with unlimited size do not support CDC".to_string(),
543 });
544 }
545 "xml" => {
546 if raw.max_length == -1 {
551 return Err(UnsupportedDataType {
552 column_name: raw.name.to_string(),
553 column_type: raw.data_type.to_string(),
554 reason: "columns with unlimited size do not support CDC".to_string(),
555 });
556 }
557 (SqlScalarType::String, SqlServerColumnDecodeType::Xml)
558 }
559 "binary" | "varbinary" => {
560 if raw.max_length == -1 {
565 return Err(UnsupportedDataType {
566 column_name: raw.name.to_string(),
567 column_type: raw.data_type.to_string(),
568 reason: "columns with unlimited size do not support CDC".to_string(),
569 });
570 }
571 (SqlScalarType::Bytes, SqlServerColumnDecodeType::Bytes)
572 }
573 "json" => (SqlScalarType::Jsonb, SqlServerColumnDecodeType::String),
574 "date" => (SqlScalarType::Date, SqlServerColumnDecodeType::NaiveDate),
575 "time" => (SqlScalarType::Time, SqlServerColumnDecodeType::NaiveTime),
588 dt @ ("smalldatetime" | "datetime" | "datetime2" | "datetimeoffset") => {
589 if raw.scale > 7 {
590 tracing::warn!("unexpected scale '{}' from SQL Server", raw.scale);
591 }
592 if raw.scale > mz_repr::adt::timestamp::MAX_PRECISION {
593 tracing::warn!("truncating scale of '{}' for '{}'", raw.scale, dt);
594 }
595 let precision = std::cmp::min(raw.scale, mz_repr::adt::timestamp::MAX_PRECISION);
596 let precision =
597 Some(TimestampPrecision::try_from(i64::from(precision)).expect("known to fit"));
598
599 match dt {
600 "smalldatetime" | "datetime" | "datetime2" => (
601 SqlScalarType::Timestamp { precision },
602 SqlServerColumnDecodeType::NaiveDateTime,
603 ),
604 "datetimeoffset" => (
605 SqlScalarType::TimestampTz { precision },
606 SqlServerColumnDecodeType::DateTime,
607 ),
608 other => unreachable!("'{other}' checked above"),
609 }
610 }
611 "uniqueidentifier" => (SqlScalarType::Uuid, SqlServerColumnDecodeType::Uuid),
612 other => {
625 return Err(UnsupportedDataType {
626 column_type: other.to_string(),
627 column_name: raw.name.to_string(),
628 reason: format!("'{other}' is unimplemented"),
629 });
630 }
631 };
632 Ok(scalar)
633}
634
635#[derive(Clone, Debug)]
639pub struct SqlServerColumnRaw {
640 pub name: Arc<str>,
642 pub data_type: Arc<str>,
644 pub is_nullable: bool,
646 pub max_length: i16,
656 pub precision: u8,
658 pub scale: u8,
660 pub is_computed: bool,
662}
663
664#[derive(Clone, Debug)]
666pub struct SqlServerTableConstraintRaw {
667 pub constraint_name: String,
668 pub constraint_type: String,
669 pub columns: Vec<String>,
670}
671
672#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Arbitrary)]
674pub enum SqlServerColumnDecodeType {
675 Bool,
676 U8,
677 I16,
678 I32,
679 I64,
680 F32,
681 F64,
682 String,
683 Bytes,
684 Uuid,
686 Numeric,
688 Xml,
690 NaiveDate,
692 NaiveTime,
694 DateTime,
696 NaiveDateTime,
698 Unsupported {
700 context: String,
702 },
703}
704
705impl SqlServerColumnDecodeType {
706 pub fn decode<'a>(
708 &self,
709 data: &'a tiberius::Row,
710 name: &'a str,
711 column: &'a SqlColumnType,
712 arena: &'a RowArena,
713 ) -> Result<Datum<'a>, SqlServerDecodeError> {
714 let maybe_datum = match (&column.scalar_type, self) {
715 (SqlScalarType::Bool, SqlServerColumnDecodeType::Bool) => data
716 .try_get(name)
717 .map_err(|_| SqlServerDecodeError::invalid_column(name, "bool"))?
718 .map(|val: bool| if val { Datum::True } else { Datum::False }),
719 (SqlScalarType::Int16, SqlServerColumnDecodeType::U8) => data
720 .try_get(name)
721 .map_err(|_| SqlServerDecodeError::invalid_column(name, "u8"))?
722 .map(|val: u8| Datum::Int16(i16::cast_from(val))),
723 (SqlScalarType::Int16, SqlServerColumnDecodeType::I16) => data
724 .try_get(name)
725 .map_err(|_| SqlServerDecodeError::invalid_column(name, "i16"))?
726 .map(Datum::Int16),
727 (SqlScalarType::Int32, SqlServerColumnDecodeType::I32) => data
728 .try_get(name)
729 .map_err(|_| SqlServerDecodeError::invalid_column(name, "i32"))?
730 .map(Datum::Int32),
731 (SqlScalarType::Int64, SqlServerColumnDecodeType::I64) => data
732 .try_get(name)
733 .map_err(|_| SqlServerDecodeError::invalid_column(name, "i64"))?
734 .map(Datum::Int64),
735 (SqlScalarType::Float32, SqlServerColumnDecodeType::F32) => data
736 .try_get(name)
737 .map_err(|_| SqlServerDecodeError::invalid_column(name, "f32"))?
738 .map(|val: f32| Datum::Float32(ordered_float::OrderedFloat(val))),
739 (SqlScalarType::Float64, SqlServerColumnDecodeType::F64) => data
740 .try_get(name)
741 .map_err(|_| SqlServerDecodeError::invalid_column(name, "f64"))?
742 .map(|val: f64| Datum::Float64(ordered_float::OrderedFloat(val))),
743 (SqlScalarType::String, SqlServerColumnDecodeType::String) => data
744 .try_get(name)
745 .map_err(|_| SqlServerDecodeError::invalid_column(name, "string"))?
746 .map(Datum::String),
747 (SqlScalarType::Char { length }, SqlServerColumnDecodeType::String) => data
748 .try_get(name)
749 .map_err(|_| SqlServerDecodeError::invalid_column(name, "char"))?
750 .map(|val: &str| match length {
751 Some(expected) => {
752 let found_chars = val.chars().count();
753 let expct_chars = usize::cast_from(expected.into_u32());
754 if found_chars != expct_chars {
755 Err(SqlServerDecodeError::invalid_char(
756 name,
757 expct_chars,
758 found_chars,
759 ))
760 } else {
761 Ok(Datum::String(val))
762 }
763 }
764 None => Ok(Datum::String(val)),
765 })
766 .transpose()?,
767 (SqlScalarType::VarChar { max_length }, SqlServerColumnDecodeType::String) => data
768 .try_get(name)
769 .map_err(|_| SqlServerDecodeError::invalid_column(name, "varchar"))?
770 .map(|val: &str| match max_length {
771 Some(max) => {
772 let found_chars = val.chars().count();
773 let max_chars = usize::cast_from(max.into_u32());
774 if found_chars > max_chars {
775 Err(SqlServerDecodeError::invalid_varchar(
776 name,
777 max_chars,
778 found_chars,
779 ))
780 } else {
781 Ok(Datum::String(val))
782 }
783 }
784 None => Ok(Datum::String(val)),
785 })
786 .transpose()?,
787 (SqlScalarType::Bytes, SqlServerColumnDecodeType::Bytes) => data
788 .try_get(name)
789 .map_err(|_| SqlServerDecodeError::invalid_column(name, "bytes"))?
790 .map(Datum::Bytes),
791 (SqlScalarType::Uuid, SqlServerColumnDecodeType::Uuid) => data
792 .try_get(name)
793 .map_err(|_| SqlServerDecodeError::invalid_column(name, "uuid"))?
794 .map(Datum::Uuid),
795 (SqlScalarType::Numeric { .. }, SqlServerColumnDecodeType::Numeric) => data
796 .try_get(name)
797 .map_err(|_| SqlServerDecodeError::invalid_column(name, "numeric"))?
798 .map(|val: tiberius::numeric::Numeric| {
799 let numeric = tiberius_numeric_to_mz_numeric(val);
800 Datum::Numeric(OrderedDecimal(numeric))
801 }),
802 (SqlScalarType::String, SqlServerColumnDecodeType::Xml) => data
803 .try_get(name)
804 .map_err(|_| SqlServerDecodeError::invalid_column(name, "xml"))?
805 .map(|val: &tiberius::xml::XmlData| Datum::String(val.as_ref())),
806 (SqlScalarType::Date, SqlServerColumnDecodeType::NaiveDate) => data
807 .try_get(name)
808 .map_err(|_| SqlServerDecodeError::invalid_column(name, "date"))?
809 .map(|val: chrono::NaiveDate| {
810 let date = val
811 .try_into()
812 .map_err(|e| SqlServerDecodeError::invalid_date(name, e))?;
813 Ok::<_, SqlServerDecodeError>(Datum::Date(date))
814 })
815 .transpose()?,
816 (SqlScalarType::Time, SqlServerColumnDecodeType::NaiveTime) => data
817 .try_get(name)
818 .map_err(|_| SqlServerDecodeError::invalid_column(name, "time"))?
819 .map(|val: chrono::NaiveTime| {
820 let rounded = val.round_subsecs(6);
825 let val = if rounded < val {
827 val.trunc_subsecs(6)
828 } else {
829 val
830 };
831 Datum::Time(val)
832 }),
833 (SqlScalarType::Timestamp { precision }, SqlServerColumnDecodeType::NaiveDateTime) => {
834 data.try_get(name)
835 .map_err(|_| SqlServerDecodeError::invalid_column(name, "timestamp"))?
836 .map(|val: chrono::NaiveDateTime| {
837 let ts: CheckedTimestamp<chrono::NaiveDateTime> = val
838 .try_into()
839 .map_err(|e| SqlServerDecodeError::invalid_timestamp(name, e))?;
840 let rounded = ts
841 .round_to_precision(*precision)
842 .map_err(|e| SqlServerDecodeError::invalid_timestamp(name, e))?;
843 Ok::<_, SqlServerDecodeError>(Datum::Timestamp(rounded))
844 })
845 .transpose()?
846 }
847 (SqlScalarType::TimestampTz { precision }, SqlServerColumnDecodeType::DateTime) => data
848 .try_get(name)
849 .map_err(|_| SqlServerDecodeError::invalid_column(name, "timestamptz"))?
850 .map(|val: chrono::DateTime<chrono::Utc>| {
851 let ts: CheckedTimestamp<chrono::DateTime<chrono::Utc>> = val
852 .try_into()
853 .map_err(|e| SqlServerDecodeError::invalid_timestamp(name, e))?;
854 let rounded = ts
855 .round_to_precision(*precision)
856 .map_err(|e| SqlServerDecodeError::invalid_timestamp(name, e))?;
857 Ok::<_, SqlServerDecodeError>(Datum::TimestampTz(rounded))
858 })
859 .transpose()?,
860 (SqlScalarType::String, SqlServerColumnDecodeType::Bool) => data
862 .try_get(name)
863 .map_err(|_| SqlServerDecodeError::invalid_column(name, "bool-text"))?
864 .map(|val: bool| {
865 if val {
866 Datum::String("true")
867 } else {
868 Datum::String("false")
869 }
870 }),
871 (SqlScalarType::String, SqlServerColumnDecodeType::U8) => data
872 .try_get(name)
873 .map_err(|_| SqlServerDecodeError::invalid_column(name, "u8-text"))?
874 .map(|val: u8| {
875 arena.make_datum(|packer| packer.push(Datum::String(&val.to_string())))
876 }),
877 (SqlScalarType::String, SqlServerColumnDecodeType::I16) => data
878 .try_get(name)
879 .map_err(|_| SqlServerDecodeError::invalid_column(name, "i16-text"))?
880 .map(|val: i16| {
881 arena.make_datum(|packer| packer.push(Datum::String(&val.to_string())))
882 }),
883 (SqlScalarType::String, SqlServerColumnDecodeType::I32) => data
884 .try_get(name)
885 .map_err(|_| SqlServerDecodeError::invalid_column(name, "i32-text"))?
886 .map(|val: i32| {
887 arena.make_datum(|packer| packer.push(Datum::String(&val.to_string())))
888 }),
889 (SqlScalarType::String, SqlServerColumnDecodeType::I64) => data
890 .try_get(name)
891 .map_err(|_| SqlServerDecodeError::invalid_column(name, "i64-text"))?
892 .map(|val: i64| {
893 arena.make_datum(|packer| packer.push(Datum::String(&val.to_string())))
894 }),
895 (SqlScalarType::String, SqlServerColumnDecodeType::F32) => data
896 .try_get(name)
897 .map_err(|_| SqlServerDecodeError::invalid_column(name, "f32-text"))?
898 .map(|val: f32| {
899 arena.make_datum(|packer| packer.push(Datum::String(&val.to_string())))
900 }),
901 (SqlScalarType::String, SqlServerColumnDecodeType::F64) => data
902 .try_get(name)
903 .map_err(|_| SqlServerDecodeError::invalid_column(name, "f64-text"))?
904 .map(|val: f64| {
905 arena.make_datum(|packer| packer.push(Datum::String(&val.to_string())))
906 }),
907 (SqlScalarType::String, SqlServerColumnDecodeType::Uuid) => data
908 .try_get(name)
909 .map_err(|_| SqlServerDecodeError::invalid_column(name, "uuid-text"))?
910 .map(|val: uuid::Uuid| {
911 arena.make_datum(|packer| packer.push(Datum::String(&val.to_string())))
912 }),
913 (SqlScalarType::String, SqlServerColumnDecodeType::Bytes) => data
914 .try_get(name)
915 .map_err(|_| SqlServerDecodeError::invalid_column(name, "bytes-text"))?
916 .map(|val: &[u8]| {
917 let encoded = base64::engine::general_purpose::STANDARD.encode(val);
918 arena.make_datum(|packer| packer.push(Datum::String(&encoded)))
919 }),
920 (SqlScalarType::String, SqlServerColumnDecodeType::Numeric) => data
921 .try_get(name)
922 .map_err(|_| SqlServerDecodeError::invalid_column(name, "numeric-text"))?
923 .map(|val: tiberius::numeric::Numeric| {
924 arena.make_datum(|packer| packer.push(Datum::String(&val.to_string())))
925 }),
926 (SqlScalarType::String, SqlServerColumnDecodeType::NaiveDate) => data
927 .try_get(name)
928 .map_err(|_| SqlServerDecodeError::invalid_column(name, "naivedate-text"))?
929 .map(|val: chrono::NaiveDate| {
930 arena.make_datum(|packer| packer.push(Datum::String(&val.to_string())))
931 }),
932 (SqlScalarType::String, SqlServerColumnDecodeType::NaiveTime) => data
933 .try_get(name)
934 .map_err(|_| SqlServerDecodeError::invalid_column(name, "naivetime-text"))?
935 .map(|val: chrono::NaiveTime| {
936 arena.make_datum(|packer| packer.push(Datum::String(&val.to_string())))
937 }),
938 (SqlScalarType::String, SqlServerColumnDecodeType::DateTime) => data
939 .try_get(name)
940 .map_err(|_| SqlServerDecodeError::invalid_column(name, "datetime-text"))?
941 .map(|val: chrono::DateTime<chrono::Utc>| {
942 arena.make_datum(|packer| packer.push(Datum::String(&val.to_string())))
943 }),
944 (SqlScalarType::String, SqlServerColumnDecodeType::NaiveDateTime) => data
945 .try_get(name)
946 .map_err(|_| SqlServerDecodeError::invalid_column(name, "naivedatetime-text"))?
947 .map(|val: chrono::NaiveDateTime| {
948 arena.make_datum(|packer| packer.push(Datum::String(&val.to_string())))
949 }),
950 (column_type, decode_type) => {
951 return Err(SqlServerDecodeError::Unsupported {
952 sql_server_type: decode_type.clone(),
953 mz_type: column_type.clone(),
954 });
955 }
956 };
957
958 match (maybe_datum, column.nullable) {
959 (Some(datum), _) => Ok(datum),
960 (None, true) => Ok(Datum::Null),
961 (None, false) => Err(SqlServerDecodeError::InvalidData {
962 column_name: name.to_string(),
963 error: "found Null in non-nullable column".to_string(),
965 }),
966 }
967 }
968}
969
970impl RustType<proto_sql_server_column_desc::DecodeType> for SqlServerColumnDecodeType {
971 fn into_proto(&self) -> proto_sql_server_column_desc::DecodeType {
972 match self {
973 SqlServerColumnDecodeType::Bool => proto_sql_server_column_desc::DecodeType::Bool(()),
974 SqlServerColumnDecodeType::U8 => proto_sql_server_column_desc::DecodeType::U8(()),
975 SqlServerColumnDecodeType::I16 => proto_sql_server_column_desc::DecodeType::I16(()),
976 SqlServerColumnDecodeType::I32 => proto_sql_server_column_desc::DecodeType::I32(()),
977 SqlServerColumnDecodeType::I64 => proto_sql_server_column_desc::DecodeType::I64(()),
978 SqlServerColumnDecodeType::F32 => proto_sql_server_column_desc::DecodeType::F32(()),
979 SqlServerColumnDecodeType::F64 => proto_sql_server_column_desc::DecodeType::F64(()),
980 SqlServerColumnDecodeType::String => {
981 proto_sql_server_column_desc::DecodeType::String(())
982 }
983 SqlServerColumnDecodeType::Bytes => proto_sql_server_column_desc::DecodeType::Bytes(()),
984 SqlServerColumnDecodeType::Uuid => proto_sql_server_column_desc::DecodeType::Uuid(()),
985 SqlServerColumnDecodeType::Numeric => {
986 proto_sql_server_column_desc::DecodeType::Numeric(())
987 }
988 SqlServerColumnDecodeType::Xml => proto_sql_server_column_desc::DecodeType::Xml(()),
989 SqlServerColumnDecodeType::NaiveDate => {
990 proto_sql_server_column_desc::DecodeType::NaiveDate(())
991 }
992 SqlServerColumnDecodeType::NaiveTime => {
993 proto_sql_server_column_desc::DecodeType::NaiveTime(())
994 }
995 SqlServerColumnDecodeType::DateTime => {
996 proto_sql_server_column_desc::DecodeType::DateTime(())
997 }
998 SqlServerColumnDecodeType::NaiveDateTime => {
999 proto_sql_server_column_desc::DecodeType::NaiveDateTime(())
1000 }
1001 SqlServerColumnDecodeType::Unsupported { context } => {
1002 proto_sql_server_column_desc::DecodeType::Unsupported(context.clone())
1003 }
1004 }
1005 }
1006
1007 fn from_proto(
1008 proto: proto_sql_server_column_desc::DecodeType,
1009 ) -> Result<Self, mz_proto::TryFromProtoError> {
1010 let val = match proto {
1011 proto_sql_server_column_desc::DecodeType::Bool(()) => SqlServerColumnDecodeType::Bool,
1012 proto_sql_server_column_desc::DecodeType::U8(()) => SqlServerColumnDecodeType::U8,
1013 proto_sql_server_column_desc::DecodeType::I16(()) => SqlServerColumnDecodeType::I16,
1014 proto_sql_server_column_desc::DecodeType::I32(()) => SqlServerColumnDecodeType::I32,
1015 proto_sql_server_column_desc::DecodeType::I64(()) => SqlServerColumnDecodeType::I64,
1016 proto_sql_server_column_desc::DecodeType::F32(()) => SqlServerColumnDecodeType::F32,
1017 proto_sql_server_column_desc::DecodeType::F64(()) => SqlServerColumnDecodeType::F64,
1018 proto_sql_server_column_desc::DecodeType::String(()) => {
1019 SqlServerColumnDecodeType::String
1020 }
1021 proto_sql_server_column_desc::DecodeType::Bytes(()) => SqlServerColumnDecodeType::Bytes,
1022 proto_sql_server_column_desc::DecodeType::Uuid(()) => SqlServerColumnDecodeType::Uuid,
1023 proto_sql_server_column_desc::DecodeType::Numeric(()) => {
1024 SqlServerColumnDecodeType::Numeric
1025 }
1026 proto_sql_server_column_desc::DecodeType::Xml(()) => SqlServerColumnDecodeType::Xml,
1027 proto_sql_server_column_desc::DecodeType::NaiveDate(()) => {
1028 SqlServerColumnDecodeType::NaiveDate
1029 }
1030 proto_sql_server_column_desc::DecodeType::NaiveTime(()) => {
1031 SqlServerColumnDecodeType::NaiveTime
1032 }
1033 proto_sql_server_column_desc::DecodeType::DateTime(()) => {
1034 SqlServerColumnDecodeType::DateTime
1035 }
1036 proto_sql_server_column_desc::DecodeType::NaiveDateTime(()) => {
1037 SqlServerColumnDecodeType::NaiveDateTime
1038 }
1039 proto_sql_server_column_desc::DecodeType::Unsupported(context) => {
1040 SqlServerColumnDecodeType::Unsupported { context }
1041 }
1042 };
1043 Ok(val)
1044 }
1045}
1046
1047fn tiberius_numeric_to_mz_numeric(val: tiberius::numeric::Numeric) -> Numeric {
1050 let mut numeric = mz_repr::adt::numeric::cx_datum().from_i128(val.value());
1051 mz_repr::adt::numeric::cx_datum().scaleb(&mut numeric, &Numeric::from(-i32::from(val.scale())));
1054 numeric
1055}
1056
1057#[derive(Debug)]
1062pub struct UpdateMask {
1063 mask: Vec<u8>,
1064}
1065
1066impl TryFrom<&tiberius::Row> for UpdateMask {
1067 type Error = SqlServerDecodeError;
1068
1069 fn try_from(row: &tiberius::Row) -> Result<Self, Self::Error> {
1070 static UPDATE_MASK: &str = "__$update_mask";
1071
1072 let mask: Vec<u8> = row
1073 .try_get::<&[u8], _>(UPDATE_MASK)
1074 .inspect_err(|e| tracing::warn!("Failed extracting update mask: {e:?}"))
1075 .map_err(|_| SqlServerDecodeError::InvalidColumn {
1076 column_name: UPDATE_MASK.to_string(),
1077 as_type: "bytes",
1078 })?
1079 .ok_or_else(|| SqlServerDecodeError::InvalidData {
1080 column_name: UPDATE_MASK.to_string(),
1081 error: "column cannot be null".to_string(),
1082 })?
1083 .into();
1084 Ok(UpdateMask { mask })
1085 }
1086}
1087
1088impl UpdateMask {
1089 pub fn data_col_updated(&self, col_index: usize) -> bool {
1102 const CDC_METADATA_COL_COUNT: usize = 4;
1103
1104 if col_index < CDC_METADATA_COL_COUNT {
1105 return false;
1106 }
1107 let adj_col_index = col_index - CDC_METADATA_COL_COUNT;
1108 let byte_offset = adj_col_index / usize::cast_from(u8::BITS);
1109 assert!(
1110 byte_offset < self.mask.len(),
1111 "byte_offset = {byte_offset} mask_len = {}",
1112 self.mask.len()
1113 );
1114 let bit_offset = adj_col_index % usize::cast_from(u8::BITS);
1115 (self.mask[self.mask.len() - byte_offset - 1] >> bit_offset) & 1 == 1
1116 }
1117}
1118
1119#[derive(Debug)]
1124pub struct SqlServerRowDecoder {
1125 decoders: Vec<(Arc<str>, SqlColumnType, SqlServerColumnDecodeType)>,
1126}
1127
1128impl SqlServerRowDecoder {
1129 pub fn try_new(
1133 table: &SqlServerTableDesc,
1134 desc: &RelationDesc,
1135 ) -> Result<Self, SqlServerError> {
1136 let decoders = desc
1137 .iter()
1138 .map(|(col_name, col_type)| {
1139 let sql_server_col = table
1140 .columns
1141 .iter()
1142 .find(|col| col.name.as_ref() == col_name.as_str())
1143 .ok_or_else(|| {
1144 anyhow::anyhow!("no SQL Server column with name {col_name} found")
1146 })?;
1147 let Some(sql_server_col_typ) = sql_server_col.column_type.as_ref() else {
1148 return Err(SqlServerError::ProgrammingError(format!(
1149 "programming error, {col_name} should have been exluded",
1150 )));
1151 };
1152
1153 let matches = match (&sql_server_col_typ.scalar_type, &col_type.scalar_type) {
1161 (SqlScalarType::Timestamp { .. }, SqlScalarType::Timestamp { .. })
1162 | (SqlScalarType::TimestampTz { .. }, SqlScalarType::TimestampTz { .. }) => {
1163 sql_server_col_typ.nullable == col_type.nullable
1165 }
1166 (_, _) => sql_server_col_typ == col_type,
1167 };
1168 if !matches {
1169 return Err(SqlServerError::ProgrammingError(format!(
1170 "programming error, {col_name} has mismatched type {:?} vs {:?}",
1171 sql_server_col.column_type, col_type
1172 )));
1173 }
1174
1175 let name = Arc::clone(&sql_server_col.name);
1176 let decoder = sql_server_col.decode_type.clone();
1177 let col_typ = sql_server_col_typ.clone();
1182
1183 Ok::<_, SqlServerError>((name, col_typ, decoder))
1184 })
1185 .collect::<Result<_, _>>()?;
1186
1187 Ok(SqlServerRowDecoder { decoders })
1188 }
1189
1190 pub fn decode(
1197 &self,
1198 data: &tiberius::Row,
1199 row: &mut Row,
1200 arena: &RowArena,
1201 new_data: Option<&tiberius::Row>,
1202 ) -> Result<(), SqlServerDecodeError> {
1203 let mut packer = row.packer();
1204
1205 for (col_name, col_type, decoder) in &self.decoders {
1206 let datum = decoder.decode(data, col_name, col_type, arena)?;
1207
1208 let datum = if let Some(new_data) = new_data
1209 && matches!(
1210 col_type.scalar_type,
1211 SqlScalarType::VarChar { max_length: None }
1212 )
1213 && matches!(datum, Datum::Null)
1214 {
1215 let update_mask = UpdateMask::try_from(new_data)?;
1216 let col_index = new_data
1217 .columns()
1218 .iter()
1219 .position(|c| c.name() == col_name.as_ref())
1220 .expect("column exists");
1221 if !update_mask.data_col_updated(col_index) {
1226 decoder.decode(new_data, col_name, col_type, arena)?
1227 } else {
1228 datum
1229 }
1230 } else {
1231 datum
1232 };
1233
1234 packer.push(datum);
1235 }
1236 Ok(())
1237 }
1238}
1239
1240#[cfg(test)]
1241mod tests {
1242 use std::collections::BTreeSet;
1243 use std::sync::Arc;
1244
1245 use chrono::NaiveDateTime;
1246 use itertools::Itertools;
1247 use mz_ore::assert_contains;
1248 use mz_ore::collections::CollectionExt;
1249 use mz_repr::adt::numeric::NumericMaxScale;
1250 use mz_repr::adt::varchar::VarCharMaxLength;
1251 use mz_repr::{Datum, RelationDesc, Row, RowArena, SqlScalarType};
1252 use tiberius::RowTestExt;
1253
1254 use crate::desc::{
1255 SqlServerCaptureInstanceRaw, SqlServerColumnDecodeType, SqlServerColumnDesc,
1256 SqlServerTableDesc, SqlServerTableRaw, tiberius_numeric_to_mz_numeric,
1257 };
1258
1259 use super::SqlServerColumnRaw;
1260
1261 impl SqlServerColumnRaw {
1262 fn new(name: &str, data_type: &str) -> Self {
1265 SqlServerColumnRaw {
1266 name: name.into(),
1267 data_type: data_type.into(),
1268 is_nullable: false,
1269 max_length: 0,
1270 precision: 0,
1271 scale: 0,
1272 is_computed: false,
1273 }
1274 }
1275
1276 fn nullable(mut self, nullable: bool) -> Self {
1277 self.is_nullable = nullable;
1278 self
1279 }
1280
1281 fn max_length(mut self, max_length: i16) -> Self {
1282 self.max_length = max_length;
1283 self
1284 }
1285
1286 fn precision(mut self, precision: u8) -> Self {
1287 self.precision = precision;
1288 self
1289 }
1290
1291 fn scale(mut self, scale: u8) -> Self {
1292 self.scale = scale;
1293 self
1294 }
1295 }
1296
1297 #[mz_ore::test]
1298 fn smoketest_column_raw() {
1299 let raw = SqlServerColumnRaw::new("foo", "bit");
1300 let col = SqlServerColumnDesc::new(&raw);
1301
1302 assert_eq!(&*col.name, "foo");
1303 assert_eq!(col.column_type, Some(SqlScalarType::Bool.nullable(false)));
1304 assert_eq!(col.decode_type, SqlServerColumnDecodeType::Bool);
1305
1306 let raw = SqlServerColumnRaw::new("foo", "decimal")
1307 .precision(20)
1308 .scale(10);
1309 let col = SqlServerColumnDesc::new(&raw);
1310
1311 let col_type = SqlScalarType::Numeric {
1312 max_scale: Some(NumericMaxScale::try_from(10i64).expect("known valid")),
1313 }
1314 .nullable(false);
1315 assert_eq!(col.column_type, Some(col_type));
1316 assert_eq!(col.decode_type, SqlServerColumnDecodeType::Numeric);
1317 }
1318
1319 #[mz_ore::test]
1320 fn smoketest_column_raw_invalid() {
1321 let raw = SqlServerColumnRaw::new("foo", "bad_data_type");
1322 let desc = SqlServerColumnDesc::new(&raw);
1323 let SqlServerColumnDecodeType::Unsupported { context } = desc.decode_type else {
1324 panic!("unexpected decode type {desc:?}");
1325 };
1326 assert_contains!(context, "'bad_data_type' is unimplemented");
1327
1328 let raw = SqlServerColumnRaw::new("foo", "decimal")
1329 .precision(100)
1330 .scale(10);
1331 let desc = SqlServerColumnDesc::new(&raw);
1332 assert!(matches!(
1333 desc.decode_type,
1334 SqlServerColumnDecodeType::Unsupported { .. }
1335 ));
1336
1337 let raw = SqlServerColumnRaw::new("foo", "varbinary").max_length(-1);
1338 let desc = SqlServerColumnDesc::new(&raw);
1339 let SqlServerColumnDecodeType::Unsupported { context } = desc.decode_type else {
1340 panic!("unexpected decode type {desc:?}");
1341 };
1342 assert_contains!(context, "columns with unlimited size do not support CDC");
1343 }
1344
1345 #[mz_ore::test]
1346 fn smoketest_decoder() {
1347 let sql_server_columns = [
1348 SqlServerColumnRaw::new("a", "varchar").max_length(16),
1349 SqlServerColumnRaw::new("b", "int").nullable(true),
1350 SqlServerColumnRaw::new("c", "bit"),
1351 ];
1352 let sql_server_desc = SqlServerTableRaw {
1353 schema_name: "my_schema".into(),
1354 name: "my_table".into(),
1355 capture_instance: Arc::new(SqlServerCaptureInstanceRaw {
1356 name: "my_table_CT".into(),
1357 create_date: NaiveDateTime::parse_from_str(
1358 "2024-01-01 00:00:00",
1359 "%Y-%m-%d %H:%M:%S",
1360 )
1361 .unwrap()
1362 .into(),
1363 }),
1364 columns: sql_server_columns.into(),
1365 };
1366 let sql_server_desc = SqlServerTableDesc::new(sql_server_desc, vec![]).unwrap();
1367
1368 let max_length = Some(VarCharMaxLength::try_from(16).unwrap());
1369 let relation_desc = RelationDesc::builder()
1370 .with_column("a", SqlScalarType::VarChar { max_length }.nullable(false))
1371 .with_column("c", SqlScalarType::Bool.nullable(false))
1373 .with_column("b", SqlScalarType::Int32.nullable(true))
1374 .finish();
1375
1376 let decoder = sql_server_desc
1378 .decoder(&relation_desc)
1379 .expect("known valid");
1380
1381 let sql_server_columns = [
1382 tiberius::Column::new("a".to_string(), tiberius::ColumnType::BigVarChar),
1383 tiberius::Column::new("b".to_string(), tiberius::ColumnType::Int4),
1384 tiberius::Column::new("c".to_string(), tiberius::ColumnType::Bit),
1385 ];
1386
1387 let data_a = [
1388 tiberius::ColumnData::String(Some("hello world".into())),
1389 tiberius::ColumnData::I32(Some(42)),
1390 tiberius::ColumnData::Bit(Some(true)),
1391 ];
1392 let sql_server_row_a = tiberius::Row::build(
1393 sql_server_columns
1394 .iter()
1395 .cloned()
1396 .zip_eq(data_a.into_iter()),
1397 );
1398
1399 let data_b = [
1400 tiberius::ColumnData::String(Some("foo bar".into())),
1401 tiberius::ColumnData::I32(None),
1402 tiberius::ColumnData::Bit(Some(false)),
1403 ];
1404 let sql_server_row_b =
1405 tiberius::Row::build(sql_server_columns.into_iter().zip_eq(data_b.into_iter()));
1406
1407 let mut rnd_row = Row::default();
1408 let arena = RowArena::default();
1409
1410 decoder
1411 .decode(&sql_server_row_a, &mut rnd_row, &arena, None)
1412 .unwrap();
1413 assert_eq!(
1414 &rnd_row,
1415 &Row::pack_slice(&[Datum::String("hello world"), Datum::True, Datum::Int32(42)])
1416 );
1417
1418 decoder
1419 .decode(&sql_server_row_b, &mut rnd_row, &arena, None)
1420 .unwrap();
1421 assert_eq!(
1422 &rnd_row,
1423 &Row::pack_slice(&[Datum::String("foo bar"), Datum::False, Datum::Null])
1424 );
1425 }
1426
1427 #[mz_ore::test]
1428 fn smoketest_decode_to_string() {
1429 #[track_caller]
1430 fn testcase(
1431 data_type: &'static str,
1432 col_type: tiberius::ColumnType,
1433 col_data: tiberius::ColumnData<'static>,
1434 ) {
1435 let columns = [SqlServerColumnRaw::new("a", data_type)];
1436 let sql_server_desc = SqlServerTableRaw {
1437 schema_name: "my_schema".into(),
1438 name: "my_table".into(),
1439 capture_instance: Arc::new(SqlServerCaptureInstanceRaw {
1440 name: "my_table_CT".into(),
1441 create_date: NaiveDateTime::parse_from_str(
1442 "2024-01-01 00:00:00",
1443 "%Y-%m-%d %H:%M:%S",
1444 )
1445 .unwrap()
1446 .into(),
1447 }),
1448 columns: columns.into(),
1449 };
1450 let mut sql_server_desc = SqlServerTableDesc::new(sql_server_desc, vec![]).unwrap();
1451 sql_server_desc.apply_text_columns(&BTreeSet::from(["a"]));
1452
1453 let relation_desc = RelationDesc::builder()
1455 .with_column("a", SqlScalarType::String.nullable(false))
1456 .finish();
1457
1458 let decoder = sql_server_desc
1460 .decoder(&relation_desc)
1461 .expect("known valid");
1462
1463 let sql_server_row = tiberius::Row::build([(
1464 tiberius::Column::new("a".to_string(), col_type),
1465 col_data,
1466 )]);
1467 let mut mz_row = Row::default();
1468 let arena = RowArena::new();
1469 decoder
1470 .decode(&sql_server_row, &mut mz_row, &arena, None)
1471 .unwrap();
1472
1473 let str_datum = mz_row.into_element();
1474 assert!(matches!(str_datum, Datum::String(_)));
1475 }
1476
1477 use tiberius::ColumnData;
1478
1479 testcase(
1480 "bit",
1481 tiberius::ColumnType::Bit,
1482 ColumnData::Bit(Some(true)),
1483 );
1484 testcase(
1485 "bit",
1486 tiberius::ColumnType::Bit,
1487 ColumnData::Bit(Some(false)),
1488 );
1489 testcase(
1490 "tinyint",
1491 tiberius::ColumnType::Int1,
1492 ColumnData::U8(Some(33)),
1493 );
1494 testcase(
1495 "smallint",
1496 tiberius::ColumnType::Int2,
1497 ColumnData::I16(Some(101)),
1498 );
1499 testcase(
1500 "int",
1501 tiberius::ColumnType::Int4,
1502 ColumnData::I32(Some(-42)),
1503 );
1504 {
1505 let datetime = tiberius::time::DateTime::new(10, 300);
1506 testcase(
1507 "datetime",
1508 tiberius::ColumnType::Datetime,
1509 ColumnData::DateTime(Some(datetime)),
1510 );
1511 }
1512 }
1513
1514 #[mz_ore::test]
1515 #[cfg_attr(miri, ignore)] fn smoketest_numeric_conversion() {
1517 let a = tiberius::numeric::Numeric::new_with_scale(12345, 2);
1518 let rnd = tiberius_numeric_to_mz_numeric(a);
1519 let og = mz_repr::adt::numeric::cx_datum().parse("123.45").unwrap();
1520 assert_eq!(og, rnd);
1521
1522 let a = tiberius::numeric::Numeric::new_with_scale(-99999, 5);
1523 let rnd = tiberius_numeric_to_mz_numeric(a);
1524 let og = mz_repr::adt::numeric::cx_datum().parse("-.99999").unwrap();
1525 assert_eq!(og, rnd);
1526
1527 let a = tiberius::numeric::Numeric::new_with_scale(1, 29);
1528 let rnd = tiberius_numeric_to_mz_numeric(a);
1529 let og = mz_repr::adt::numeric::cx_datum()
1530 .parse("0.00000000000000000000000000001")
1531 .unwrap();
1532 assert_eq!(og, rnd);
1533
1534 let a = tiberius::numeric::Numeric::new_with_scale(-111111111111111111, 0);
1535 let rnd = tiberius_numeric_to_mz_numeric(a);
1536 let og = mz_repr::adt::numeric::cx_datum()
1537 .parse("-111111111111111111")
1538 .unwrap();
1539 assert_eq!(og, rnd);
1540 }
1541
1542 }