1use base64::Engine;
26use chrono::{NaiveDateTime, SubsecRound};
27use dec::OrderedDecimal;
28use mz_ore::cast::CastFrom;
29use mz_proto::{IntoRustIfSome, ProtoType, RustType};
30use mz_repr::adt::char::CharLength;
31use mz_repr::adt::numeric::{Numeric, NumericMaxScale};
32use mz_repr::adt::timestamp::{CheckedTimestamp, TimestampPrecision};
33use mz_repr::adt::varchar::VarCharMaxLength;
34use mz_repr::{Datum, RelationDesc, Row, RowArena, SqlColumnType, SqlScalarType};
35use proptest_derive::Arbitrary;
36use serde::{Deserialize, Serialize};
37
38use std::collections::BTreeSet;
39use std::sync::Arc;
40
41use crate::desc::proto_sql_server_table_constraint::ConstraintType;
42use crate::{SqlServerDecodeError, SqlServerError};
43
44include!(concat!(env!("OUT_DIR"), "/mz_sql_server_util.rs"));
45
46#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Arbitrary)]
56pub struct SqlServerTableDesc {
57 pub schema_name: Arc<str>,
59 pub name: Arc<str>,
61 pub columns: Box<[SqlServerColumnDesc]>,
63 pub constraints: Vec<SqlServerTableConstraint>,
65}
66
67impl SqlServerTableDesc {
68 pub fn new(
73 raw: SqlServerTableRaw,
74 raw_constraints: Vec<SqlServerTableConstraintRaw>,
75 ) -> Result<Self, SqlServerError> {
76 let columns: Box<[_]> = raw
77 .columns
78 .into_iter()
79 .map(SqlServerColumnDesc::new)
80 .collect();
81 let constraints = raw_constraints
82 .into_iter()
83 .map(SqlServerTableConstraint::try_from)
84 .collect::<Result<Vec<_>, _>>()?;
85 Ok(SqlServerTableDesc {
86 schema_name: raw.schema_name,
87 name: raw.name,
88 columns,
89 constraints,
90 })
91 }
92
93 pub fn qualified_name(&self) -> SqlServerQualifiedTableName {
95 SqlServerQualifiedTableName {
96 schema_name: Arc::clone(&self.schema_name),
97 table_name: Arc::clone(&self.name),
98 }
99 }
100
101 pub fn apply_text_columns(&mut self, text_columns: &BTreeSet<&str>) {
104 for column in &mut self.columns {
105 if text_columns.contains(column.name.as_ref()) {
106 column.represent_as_text();
107 }
108 }
109 }
110
111 pub fn apply_excl_columns(&mut self, excl_columns: &BTreeSet<&str>) {
114 for column in &mut self.columns {
115 if excl_columns.contains(column.name.as_ref()) {
116 column.exclude();
117 }
118 }
119 }
120
121 pub fn decoder(&self, desc: &RelationDesc) -> Result<SqlServerRowDecoder, SqlServerError> {
124 let decoder = SqlServerRowDecoder::try_new(self, desc)?;
125 Ok(decoder)
126 }
127}
128
129impl RustType<ProtoSqlServerTableDesc> for SqlServerTableDesc {
130 fn into_proto(&self) -> ProtoSqlServerTableDesc {
131 ProtoSqlServerTableDesc {
132 name: self.name.to_string(),
133 schema_name: self.schema_name.to_string(),
134 columns: self.columns.iter().map(|c| c.into_proto()).collect(),
135 constraints: self.constraints.iter().map(|c| c.into_proto()).collect(),
136 }
137 }
138
139 fn from_proto(proto: ProtoSqlServerTableDesc) -> Result<Self, mz_proto::TryFromProtoError> {
140 let columns = proto
141 .columns
142 .into_iter()
143 .map(|c| c.into_rust())
144 .collect::<Result<_, _>>()?;
145 let constraints = proto
146 .constraints
147 .into_iter()
148 .map(|c| c.into_rust())
149 .collect::<Result<_, _>>()?;
150 Ok(SqlServerTableDesc {
151 schema_name: proto.schema_name.into(),
152 name: proto.name.into(),
153 columns,
154 constraints,
155 })
156 }
157}
158
159#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, Arbitrary)]
162pub enum SqlServerTableConstraintType {
163 PrimaryKey,
164 Unique,
165}
166
167impl TryFrom<String> for SqlServerTableConstraintType {
168 type Error = SqlServerError;
169
170 fn try_from(value: String) -> Result<Self, Self::Error> {
171 match value.as_str() {
172 "PRIMARY KEY" => Ok(Self::PrimaryKey),
173 "UNIQUE" => Ok(Self::Unique),
174 name => Err(SqlServerError::InvalidData {
175 column_name: "constraint_type".into(),
176 error: format!("Unknown constraint type: {name}"),
177 }),
178 }
179 }
180}
181
182impl RustType<proto_sql_server_table_constraint::ConstraintType> for SqlServerTableConstraintType {
183 fn into_proto(&self) -> proto_sql_server_table_constraint::ConstraintType {
184 match self {
185 SqlServerTableConstraintType::PrimaryKey => ConstraintType::PrimaryKey(()),
186 SqlServerTableConstraintType::Unique => ConstraintType::Unique(()),
187 }
188 }
189
190 fn from_proto(
191 proto: proto_sql_server_table_constraint::ConstraintType,
192 ) -> Result<Self, mz_proto::TryFromProtoError> {
193 Ok(match proto {
194 ConstraintType::PrimaryKey(_) => SqlServerTableConstraintType::PrimaryKey,
195 ConstraintType::Unique(_) => SqlServerTableConstraintType::Unique,
196 })
197 }
198}
199
200#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, Arbitrary)]
202pub struct SqlServerTableConstraint {
203 pub constraint_name: String,
204 pub constraint_type: SqlServerTableConstraintType,
205 pub column_names: Vec<String>,
206}
207
208impl TryFrom<SqlServerTableConstraintRaw> for SqlServerTableConstraint {
209 type Error = SqlServerError;
210
211 fn try_from(value: SqlServerTableConstraintRaw) -> Result<Self, Self::Error> {
212 Ok(SqlServerTableConstraint {
213 constraint_name: value.constraint_name,
214 constraint_type: value.constraint_type.try_into()?,
215 column_names: value.columns,
216 })
217 }
218}
219
220impl RustType<ProtoSqlServerTableConstraint> for SqlServerTableConstraint {
221 fn into_proto(&self) -> ProtoSqlServerTableConstraint {
222 ProtoSqlServerTableConstraint {
223 constraint_name: self.constraint_name.clone(),
224 constraint_type: Some(self.constraint_type.into_proto()),
225 column_names: self.column_names.clone(),
226 }
227 }
228
229 fn from_proto(
230 proto: ProtoSqlServerTableConstraint,
231 ) -> Result<Self, mz_proto::TryFromProtoError> {
232 Ok(SqlServerTableConstraint {
233 constraint_name: proto.constraint_name,
234 constraint_type: proto
235 .constraint_type
236 .into_rust_if_some("ProtoSqlServerTableConstraint::constraint_type")?,
237 column_names: proto.column_names,
238 })
239 }
240}
241
242#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
246pub struct SqlServerQualifiedTableName {
247 pub schema_name: Arc<str>,
248 pub table_name: Arc<str>,
249}
250
251impl ToString for SqlServerQualifiedTableName {
252 fn to_string(&self) -> String {
253 format!("[{}].[{}]", self.schema_name, self.table_name)
254 }
255}
256
257#[derive(Debug, Clone)]
262pub struct SqlServerTableRaw {
263 pub schema_name: Arc<str>,
265 pub name: Arc<str>,
267 pub capture_instance: Arc<SqlServerCaptureInstanceRaw>,
269 pub columns: Arc<[SqlServerColumnRaw]>,
271}
272
273#[derive(Debug, Clone)]
275pub struct SqlServerCaptureInstanceRaw {
276 pub name: Arc<str>,
278 pub create_date: Arc<NaiveDateTime>,
280}
281
282#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Arbitrary)]
284pub struct SqlServerColumnDesc {
285 pub name: Arc<str>,
287 pub column_type: Option<SqlColumnType>,
293 pub primary_key_constraint: Option<Arc<str>>,
296 pub decode_type: SqlServerColumnDecodeType,
298 pub raw_type: Arc<str>,
302}
303
304impl SqlServerColumnDesc {
305 pub fn new(raw: &SqlServerColumnRaw) -> Self {
307 let (column_type, decode_type) = match parse_data_type(raw) {
308 Ok((scalar_type, decode_type)) => {
309 let column_type = scalar_type.nullable(raw.is_nullable);
310 (Some(column_type), decode_type)
311 }
312 Err(err) => {
313 tracing::warn!(
314 ?err,
315 ?raw,
316 "found an unsupported data type when parsing raw data"
317 );
318 (
319 None,
320 SqlServerColumnDecodeType::Unsupported {
321 context: err.reason,
322 },
323 )
324 }
325 };
326 SqlServerColumnDesc {
327 name: Arc::clone(&raw.name),
328 primary_key_constraint: None,
329 column_type,
330 decode_type,
331 raw_type: Arc::clone(&raw.data_type),
332 }
333 }
334
335 pub fn represent_as_text(&mut self) {
337 self.column_type = self
338 .column_type
339 .as_ref()
340 .map(|ct| SqlScalarType::String.nullable(ct.nullable));
341 }
342
343 pub fn exclude(&mut self) {
345 self.column_type = None;
346 }
347
348 pub fn is_excluded(&self) -> bool {
350 self.column_type.is_none()
351 }
352}
353
354impl RustType<ProtoSqlServerColumnDesc> for SqlServerColumnDesc {
355 fn into_proto(&self) -> ProtoSqlServerColumnDesc {
356 ProtoSqlServerColumnDesc {
357 name: self.name.to_string(),
358 column_type: self.column_type.into_proto(),
359 primary_key_constraint: self.primary_key_constraint.as_ref().map(|v| v.to_string()),
360 decode_type: Some(self.decode_type.into_proto()),
361 raw_type: self.raw_type.to_string(),
362 }
363 }
364
365 fn from_proto(proto: ProtoSqlServerColumnDesc) -> Result<Self, mz_proto::TryFromProtoError> {
366 Ok(SqlServerColumnDesc {
367 name: proto.name.into(),
368 column_type: proto.column_type.into_rust()?,
369 primary_key_constraint: proto.primary_key_constraint.map(|v| v.into()),
370 decode_type: proto
371 .decode_type
372 .into_rust_if_some("ProtoSqlServerColumnDesc::decode_type")?,
373 raw_type: proto.raw_type.into(),
374 })
375 }
376}
377
378#[derive(Debug)]
380#[allow(dead_code)]
381pub struct UnsupportedDataType {
382 column_name: String,
383 column_type: String,
384 reason: String,
385}
386
387fn parse_data_type(
392 raw: &SqlServerColumnRaw,
393) -> Result<(SqlScalarType, SqlServerColumnDecodeType), UnsupportedDataType> {
394 if raw.is_computed {
398 return Err(UnsupportedDataType {
399 column_name: raw.name.to_string(),
400 column_type: format!("{} (computed)", raw.data_type.to_lowercase()),
401 reason: "column is computed".into(),
402 });
403 }
404
405 let scalar =
406 match raw.data_type.to_lowercase().as_str() {
407 "tinyint" => (SqlScalarType::Int16, SqlServerColumnDecodeType::U8),
408 "smallint" => (SqlScalarType::Int16, SqlServerColumnDecodeType::I16),
409 "int" => (SqlScalarType::Int32, SqlServerColumnDecodeType::I32),
410 "bigint" => (SqlScalarType::Int64, SqlServerColumnDecodeType::I64),
411 "bit" => (SqlScalarType::Bool, SqlServerColumnDecodeType::Bool),
412 "decimal" | "numeric" | "money" | "smallmoney" => {
413 if raw.precision > 38 || raw.scale > raw.precision {
420 tracing::warn!(
421 "unexpected value from SQL Server, precision of {} and scale of {}",
422 raw.precision,
423 raw.scale,
424 );
425 }
426 if raw.precision > 39 {
427 let reason = format!(
428 "precision of {} is greater than our maximum of 39",
429 raw.precision
430 );
431 return Err(UnsupportedDataType {
432 column_name: raw.name.to_string(),
433 column_type: raw.data_type.to_string(),
434 reason,
435 });
436 }
437
438 let raw_scale = usize::cast_from(raw.scale);
439 let max_scale =
440 NumericMaxScale::try_from(raw_scale).map_err(|_| UnsupportedDataType {
441 column_type: raw.data_type.to_string(),
442 column_name: raw.name.to_string(),
443 reason: format!("scale of {} is too large", raw.scale),
444 })?;
445 let column_type = SqlScalarType::Numeric {
446 max_scale: Some(max_scale),
447 };
448
449 (column_type, SqlServerColumnDecodeType::Numeric)
450 }
451 "real" | "float" | "double precision" => match raw.max_length {
459 4 => (SqlScalarType::Float32, SqlServerColumnDecodeType::F32),
462 8 => (SqlScalarType::Float64, SqlServerColumnDecodeType::F64),
463 _ => {
464 return Err(UnsupportedDataType {
465 column_name: raw.name.to_string(),
466 column_type: raw.data_type.to_string(),
467 reason: format!("unsupported length {}", raw.max_length),
468 });
469 }
470 },
471 dt @ ("char" | "nchar" | "sysname") => {
472 if raw.max_length == -1 {
475 return Err(UnsupportedDataType {
476 column_name: raw.name.to_string(),
477 column_type: raw.data_type.to_string(),
478 reason: "columns with unlimited size do not support CDC".to_string(),
479 });
480 }
481
482 let column_type = match dt {
483 "char" => {
484 let length =
485 if raw.max_length != -1 {
486 let length = CharLength::try_from(i64::from(raw.max_length))
487 .map_err(|e| UnsupportedDataType {
488 column_name: raw.name.to_string(),
489 column_type: raw.data_type.to_string(),
490 reason: e.to_string(),
491 })?;
492 Some(length)
493 } else {
494 None
495 };
496 SqlScalarType::Char { length }
497 }
498 "nchar" | "sysname" => SqlScalarType::String,
502 other => unreachable!("'{other}' checked above"),
503 };
504
505 (column_type, SqlServerColumnDecodeType::String)
506 }
507 "varchar" | "nvarchar" => {
508 let max_length =
519 if raw.max_length != -1 {
520 let length = VarCharMaxLength::try_from(i64::from(raw.max_length))
521 .map_err(|e| UnsupportedDataType {
522 column_name: raw.name.to_string(),
523 column_type: raw.data_type.to_string(),
524 reason: e.to_string(),
525 })?;
526 Some(length)
527 } else {
528 None
529 };
530 let column_type = SqlScalarType::VarChar { max_length };
531 (column_type, SqlServerColumnDecodeType::String)
532 }
533 "text" | "ntext" | "image" => {
534 mz_ore::soft_assert_eq_no_log!(raw.max_length, 16);
537
538 return Err(UnsupportedDataType {
540 column_name: raw.name.to_string(),
541 column_type: raw.data_type.to_string(),
542 reason: "columns with unlimited size do not support CDC".to_string(),
543 });
544 }
545 "xml" => {
546 if raw.max_length == -1 {
551 return Err(UnsupportedDataType {
552 column_name: raw.name.to_string(),
553 column_type: raw.data_type.to_string(),
554 reason: "columns with unlimited size do not support CDC".to_string(),
555 });
556 }
557 (SqlScalarType::String, SqlServerColumnDecodeType::Xml)
558 }
559 "binary" | "varbinary" => {
560 if raw.max_length == -1 {
565 return Err(UnsupportedDataType {
566 column_name: raw.name.to_string(),
567 column_type: raw.data_type.to_string(),
568 reason: "columns with unlimited size do not support CDC".to_string(),
569 });
570 }
571 (SqlScalarType::Bytes, SqlServerColumnDecodeType::Bytes)
572 }
573 "json" => (SqlScalarType::Jsonb, SqlServerColumnDecodeType::String),
574 "date" => (SqlScalarType::Date, SqlServerColumnDecodeType::NaiveDate),
575 "time" => (SqlScalarType::Time, SqlServerColumnDecodeType::NaiveTime),
588 dt @ ("smalldatetime" | "datetime" | "datetime2" | "datetimeoffset") => {
589 if raw.scale > 7 {
590 tracing::warn!("unexpected scale '{}' from SQL Server", raw.scale);
591 }
592 if raw.scale > mz_repr::adt::timestamp::MAX_PRECISION {
593 tracing::warn!("truncating scale of '{}' for '{}'", raw.scale, dt);
594 }
595 let precision = std::cmp::min(raw.scale, mz_repr::adt::timestamp::MAX_PRECISION);
596 let precision =
597 Some(TimestampPrecision::try_from(i64::from(precision)).expect("known to fit"));
598
599 match dt {
600 "smalldatetime" | "datetime" | "datetime2" => (
601 SqlScalarType::Timestamp { precision },
602 SqlServerColumnDecodeType::NaiveDateTime,
603 ),
604 "datetimeoffset" => (
605 SqlScalarType::TimestampTz { precision },
606 SqlServerColumnDecodeType::DateTime,
607 ),
608 other => unreachable!("'{other}' checked above"),
609 }
610 }
611 "uniqueidentifier" => (SqlScalarType::Uuid, SqlServerColumnDecodeType::Uuid),
612 other => {
625 return Err(UnsupportedDataType {
626 column_type: other.to_string(),
627 column_name: raw.name.to_string(),
628 reason: format!("'{other}' is unimplemented"),
629 });
630 }
631 };
632 Ok(scalar)
633}
634
635#[derive(Clone, Debug)]
639pub struct SqlServerColumnRaw {
640 pub name: Arc<str>,
642 pub data_type: Arc<str>,
644 pub is_nullable: bool,
646 pub max_length: i16,
656 pub precision: u8,
658 pub scale: u8,
660 pub is_computed: bool,
662}
663
664#[derive(Clone, Debug)]
666pub struct SqlServerTableConstraintRaw {
667 pub constraint_name: String,
668 pub constraint_type: String,
669 pub columns: Vec<String>,
670}
671
672#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Arbitrary)]
674pub enum SqlServerColumnDecodeType {
675 Bool,
676 U8,
677 I16,
678 I32,
679 I64,
680 F32,
681 F64,
682 String,
683 Bytes,
684 Uuid,
686 Numeric,
688 Xml,
690 NaiveDate,
692 NaiveTime,
694 DateTime,
696 NaiveDateTime,
698 Unsupported {
700 context: String,
702 },
703}
704
705impl SqlServerColumnDecodeType {
706 pub fn decode<'a>(
708 &self,
709 data: &'a tiberius::Row,
710 name: &'a str,
711 column: &'a SqlColumnType,
712 arena: &'a RowArena,
713 ) -> Result<Datum<'a>, SqlServerDecodeError> {
714 let maybe_datum = match (&column.scalar_type, self) {
715 (SqlScalarType::Bool, SqlServerColumnDecodeType::Bool) => data
716 .try_get(name)
717 .map_err(|_| SqlServerDecodeError::invalid_column(name, "bool"))?
718 .map(|val: bool| if val { Datum::True } else { Datum::False }),
719 (SqlScalarType::Int16, SqlServerColumnDecodeType::U8) => data
720 .try_get(name)
721 .map_err(|_| SqlServerDecodeError::invalid_column(name, "u8"))?
722 .map(|val: u8| Datum::Int16(i16::cast_from(val))),
723 (SqlScalarType::Int16, SqlServerColumnDecodeType::I16) => data
724 .try_get(name)
725 .map_err(|_| SqlServerDecodeError::invalid_column(name, "i16"))?
726 .map(Datum::Int16),
727 (SqlScalarType::Int32, SqlServerColumnDecodeType::I32) => data
728 .try_get(name)
729 .map_err(|_| SqlServerDecodeError::invalid_column(name, "i32"))?
730 .map(Datum::Int32),
731 (SqlScalarType::Int64, SqlServerColumnDecodeType::I64) => data
732 .try_get(name)
733 .map_err(|_| SqlServerDecodeError::invalid_column(name, "i64"))?
734 .map(Datum::Int64),
735 (SqlScalarType::Float32, SqlServerColumnDecodeType::F32) => data
736 .try_get(name)
737 .map_err(|_| SqlServerDecodeError::invalid_column(name, "f32"))?
738 .map(|val: f32| Datum::Float32(ordered_float::OrderedFloat(val))),
739 (SqlScalarType::Float64, SqlServerColumnDecodeType::F64) => data
740 .try_get(name)
741 .map_err(|_| SqlServerDecodeError::invalid_column(name, "f64"))?
742 .map(|val: f64| Datum::Float64(ordered_float::OrderedFloat(val))),
743 (SqlScalarType::String, SqlServerColumnDecodeType::String) => data
744 .try_get(name)
745 .map_err(|_| SqlServerDecodeError::invalid_column(name, "string"))?
746 .map(Datum::String),
747 (SqlScalarType::Char { length }, SqlServerColumnDecodeType::String) => data
748 .try_get(name)
749 .map_err(|_| SqlServerDecodeError::invalid_column(name, "char"))?
750 .map(|val: &str| match length {
751 Some(expected) => {
752 let found_chars = val.chars().count();
753 let expct_chars = usize::cast_from(expected.into_u32());
754 if found_chars != expct_chars {
755 Err(SqlServerDecodeError::invalid_char(
756 name,
757 expct_chars,
758 found_chars,
759 ))
760 } else {
761 Ok(Datum::String(val))
762 }
763 }
764 None => Ok(Datum::String(val)),
765 })
766 .transpose()?,
767 (SqlScalarType::VarChar { max_length }, SqlServerColumnDecodeType::String) => data
768 .try_get(name)
769 .map_err(|_| SqlServerDecodeError::invalid_column(name, "varchar"))?
770 .map(|val: &str| match max_length {
771 Some(max) => {
772 let found_chars = val.chars().count();
773 let max_chars = usize::cast_from(max.into_u32());
774 if found_chars > max_chars {
775 Err(SqlServerDecodeError::invalid_varchar(
776 name,
777 max_chars,
778 found_chars,
779 ))
780 } else {
781 Ok(Datum::String(val))
782 }
783 }
784 None => Ok(Datum::String(val)),
785 })
786 .transpose()?,
787 (SqlScalarType::Bytes, SqlServerColumnDecodeType::Bytes) => data
788 .try_get(name)
789 .map_err(|_| SqlServerDecodeError::invalid_column(name, "bytes"))?
790 .map(Datum::Bytes),
791 (SqlScalarType::Uuid, SqlServerColumnDecodeType::Uuid) => data
792 .try_get(name)
793 .map_err(|_| SqlServerDecodeError::invalid_column(name, "uuid"))?
794 .map(Datum::Uuid),
795 (SqlScalarType::Numeric { .. }, SqlServerColumnDecodeType::Numeric) => data
796 .try_get(name)
797 .map_err(|_| SqlServerDecodeError::invalid_column(name, "numeric"))?
798 .map(|val: tiberius::numeric::Numeric| {
799 let numeric = tiberius_numeric_to_mz_numeric(val);
800 Datum::Numeric(OrderedDecimal(numeric))
801 }),
802 (SqlScalarType::String, SqlServerColumnDecodeType::Xml) => data
803 .try_get(name)
804 .map_err(|_| SqlServerDecodeError::invalid_column(name, "xml"))?
805 .map(|val: &tiberius::xml::XmlData| Datum::String(val.as_ref())),
806 (SqlScalarType::Date, SqlServerColumnDecodeType::NaiveDate) => data
807 .try_get(name)
808 .map_err(|_| SqlServerDecodeError::invalid_column(name, "date"))?
809 .map(|val: chrono::NaiveDate| {
810 let date = val
811 .try_into()
812 .map_err(|e| SqlServerDecodeError::invalid_date(name, e))?;
813 Ok::<_, SqlServerDecodeError>(Datum::Date(date))
814 })
815 .transpose()?,
816 (SqlScalarType::Time, SqlServerColumnDecodeType::NaiveTime) => data
817 .try_get(name)
818 .map_err(|_| SqlServerDecodeError::invalid_column(name, "time"))?
819 .map(|val: chrono::NaiveTime| {
820 let rounded = val.round_subsecs(6);
825 let val = if rounded < val {
827 val.trunc_subsecs(6)
828 } else {
829 val
830 };
831 Datum::Time(val)
832 }),
833 (SqlScalarType::Timestamp { precision }, SqlServerColumnDecodeType::NaiveDateTime) => {
834 data.try_get(name)
835 .map_err(|_| SqlServerDecodeError::invalid_column(name, "timestamp"))?
836 .map(|val: chrono::NaiveDateTime| {
837 let ts: CheckedTimestamp<chrono::NaiveDateTime> = val
838 .try_into()
839 .map_err(|e| SqlServerDecodeError::invalid_timestamp(name, e))?;
840 let rounded = ts
841 .round_to_precision(*precision)
842 .map_err(|e| SqlServerDecodeError::invalid_timestamp(name, e))?;
843 Ok::<_, SqlServerDecodeError>(Datum::Timestamp(rounded))
844 })
845 .transpose()?
846 }
847 (SqlScalarType::TimestampTz { precision }, SqlServerColumnDecodeType::DateTime) => data
848 .try_get(name)
849 .map_err(|_| SqlServerDecodeError::invalid_column(name, "timestamptz"))?
850 .map(|val: chrono::DateTime<chrono::Utc>| {
851 let ts: CheckedTimestamp<chrono::DateTime<chrono::Utc>> = val
852 .try_into()
853 .map_err(|e| SqlServerDecodeError::invalid_timestamp(name, e))?;
854 let rounded = ts
855 .round_to_precision(*precision)
856 .map_err(|e| SqlServerDecodeError::invalid_timestamp(name, e))?;
857 Ok::<_, SqlServerDecodeError>(Datum::TimestampTz(rounded))
858 })
859 .transpose()?,
860 (SqlScalarType::String, SqlServerColumnDecodeType::Bool) => data
862 .try_get(name)
863 .map_err(|_| SqlServerDecodeError::invalid_column(name, "bool-text"))?
864 .map(|val: bool| {
865 if val {
866 Datum::String("true")
867 } else {
868 Datum::String("false")
869 }
870 }),
871 (SqlScalarType::String, SqlServerColumnDecodeType::U8) => data
872 .try_get(name)
873 .map_err(|_| SqlServerDecodeError::invalid_column(name, "u8-text"))?
874 .map(|val: u8| {
875 arena.make_datum(|packer| packer.push(Datum::String(&val.to_string())))
876 }),
877 (SqlScalarType::String, SqlServerColumnDecodeType::I16) => data
878 .try_get(name)
879 .map_err(|_| SqlServerDecodeError::invalid_column(name, "i16-text"))?
880 .map(|val: i16| {
881 arena.make_datum(|packer| packer.push(Datum::String(&val.to_string())))
882 }),
883 (SqlScalarType::String, SqlServerColumnDecodeType::I32) => data
884 .try_get(name)
885 .map_err(|_| SqlServerDecodeError::invalid_column(name, "i32-text"))?
886 .map(|val: i32| {
887 arena.make_datum(|packer| packer.push(Datum::String(&val.to_string())))
888 }),
889 (SqlScalarType::String, SqlServerColumnDecodeType::I64) => data
890 .try_get(name)
891 .map_err(|_| SqlServerDecodeError::invalid_column(name, "i64-text"))?
892 .map(|val: i64| {
893 arena.make_datum(|packer| packer.push(Datum::String(&val.to_string())))
894 }),
895 (SqlScalarType::String, SqlServerColumnDecodeType::F32) => data
896 .try_get(name)
897 .map_err(|_| SqlServerDecodeError::invalid_column(name, "f32-text"))?
898 .map(|val: f32| {
899 arena.make_datum(|packer| packer.push(Datum::String(&val.to_string())))
900 }),
901 (SqlScalarType::String, SqlServerColumnDecodeType::F64) => data
902 .try_get(name)
903 .map_err(|_| SqlServerDecodeError::invalid_column(name, "f64-text"))?
904 .map(|val: f64| {
905 arena.make_datum(|packer| packer.push(Datum::String(&val.to_string())))
906 }),
907 (SqlScalarType::String, SqlServerColumnDecodeType::Uuid) => data
908 .try_get(name)
909 .map_err(|_| SqlServerDecodeError::invalid_column(name, "uuid-text"))?
910 .map(|val: uuid::Uuid| {
911 arena.make_datum(|packer| packer.push(Datum::String(&val.to_string())))
912 }),
913 (SqlScalarType::String, SqlServerColumnDecodeType::Bytes) => data
914 .try_get(name)
915 .map_err(|_| SqlServerDecodeError::invalid_column(name, "bytes-text"))?
916 .map(|val: &[u8]| {
917 let encoded = base64::engine::general_purpose::STANDARD.encode(val);
918 arena.make_datum(|packer| packer.push(Datum::String(&encoded)))
919 }),
920 (SqlScalarType::String, SqlServerColumnDecodeType::Numeric) => data
921 .try_get(name)
922 .map_err(|_| SqlServerDecodeError::invalid_column(name, "numeric-text"))?
923 .map(|val: tiberius::numeric::Numeric| {
924 arena.make_datum(|packer| packer.push(Datum::String(&val.to_string())))
925 }),
926 (SqlScalarType::String, SqlServerColumnDecodeType::NaiveDate) => data
927 .try_get(name)
928 .map_err(|_| SqlServerDecodeError::invalid_column(name, "naivedate-text"))?
929 .map(|val: chrono::NaiveDate| {
930 arena.make_datum(|packer| packer.push(Datum::String(&val.to_string())))
931 }),
932 (SqlScalarType::String, SqlServerColumnDecodeType::NaiveTime) => data
933 .try_get(name)
934 .map_err(|_| SqlServerDecodeError::invalid_column(name, "naivetime-text"))?
935 .map(|val: chrono::NaiveTime| {
936 arena.make_datum(|packer| packer.push(Datum::String(&val.to_string())))
937 }),
938 (SqlScalarType::String, SqlServerColumnDecodeType::DateTime) => data
939 .try_get(name)
940 .map_err(|_| SqlServerDecodeError::invalid_column(name, "datetime-text"))?
941 .map(|val: chrono::DateTime<chrono::Utc>| {
942 arena.make_datum(|packer| packer.push(Datum::String(&val.to_string())))
943 }),
944 (SqlScalarType::String, SqlServerColumnDecodeType::NaiveDateTime) => data
945 .try_get(name)
946 .map_err(|_| SqlServerDecodeError::invalid_column(name, "naivedatetime-text"))?
947 .map(|val: chrono::NaiveDateTime| {
948 arena.make_datum(|packer| packer.push(Datum::String(&val.to_string())))
949 }),
950 (column_type, decode_type) => {
951 return Err(SqlServerDecodeError::Unsupported {
952 sql_server_type: decode_type.clone(),
953 mz_type: column_type.clone(),
954 });
955 }
956 };
957
958 match (maybe_datum, column.nullable) {
959 (Some(datum), _) => Ok(datum),
960 (None, true) => Ok(Datum::Null),
961 (None, false) => Err(SqlServerDecodeError::InvalidData {
962 column_name: name.to_string(),
963 error: "found Null in non-nullable column".to_string(),
965 }),
966 }
967 }
968}
969
970impl RustType<proto_sql_server_column_desc::DecodeType> for SqlServerColumnDecodeType {
971 fn into_proto(&self) -> proto_sql_server_column_desc::DecodeType {
972 match self {
973 SqlServerColumnDecodeType::Bool => proto_sql_server_column_desc::DecodeType::Bool(()),
974 SqlServerColumnDecodeType::U8 => proto_sql_server_column_desc::DecodeType::U8(()),
975 SqlServerColumnDecodeType::I16 => proto_sql_server_column_desc::DecodeType::I16(()),
976 SqlServerColumnDecodeType::I32 => proto_sql_server_column_desc::DecodeType::I32(()),
977 SqlServerColumnDecodeType::I64 => proto_sql_server_column_desc::DecodeType::I64(()),
978 SqlServerColumnDecodeType::F32 => proto_sql_server_column_desc::DecodeType::F32(()),
979 SqlServerColumnDecodeType::F64 => proto_sql_server_column_desc::DecodeType::F64(()),
980 SqlServerColumnDecodeType::String => {
981 proto_sql_server_column_desc::DecodeType::String(())
982 }
983 SqlServerColumnDecodeType::Bytes => proto_sql_server_column_desc::DecodeType::Bytes(()),
984 SqlServerColumnDecodeType::Uuid => proto_sql_server_column_desc::DecodeType::Uuid(()),
985 SqlServerColumnDecodeType::Numeric => {
986 proto_sql_server_column_desc::DecodeType::Numeric(())
987 }
988 SqlServerColumnDecodeType::Xml => proto_sql_server_column_desc::DecodeType::Xml(()),
989 SqlServerColumnDecodeType::NaiveDate => {
990 proto_sql_server_column_desc::DecodeType::NaiveDate(())
991 }
992 SqlServerColumnDecodeType::NaiveTime => {
993 proto_sql_server_column_desc::DecodeType::NaiveTime(())
994 }
995 SqlServerColumnDecodeType::DateTime => {
996 proto_sql_server_column_desc::DecodeType::DateTime(())
997 }
998 SqlServerColumnDecodeType::NaiveDateTime => {
999 proto_sql_server_column_desc::DecodeType::NaiveDateTime(())
1000 }
1001 SqlServerColumnDecodeType::Unsupported { context } => {
1002 proto_sql_server_column_desc::DecodeType::Unsupported(context.clone())
1003 }
1004 }
1005 }
1006
1007 fn from_proto(
1008 proto: proto_sql_server_column_desc::DecodeType,
1009 ) -> Result<Self, mz_proto::TryFromProtoError> {
1010 let val = match proto {
1011 proto_sql_server_column_desc::DecodeType::Bool(()) => SqlServerColumnDecodeType::Bool,
1012 proto_sql_server_column_desc::DecodeType::U8(()) => SqlServerColumnDecodeType::U8,
1013 proto_sql_server_column_desc::DecodeType::I16(()) => SqlServerColumnDecodeType::I16,
1014 proto_sql_server_column_desc::DecodeType::I32(()) => SqlServerColumnDecodeType::I32,
1015 proto_sql_server_column_desc::DecodeType::I64(()) => SqlServerColumnDecodeType::I64,
1016 proto_sql_server_column_desc::DecodeType::F32(()) => SqlServerColumnDecodeType::F32,
1017 proto_sql_server_column_desc::DecodeType::F64(()) => SqlServerColumnDecodeType::F64,
1018 proto_sql_server_column_desc::DecodeType::String(()) => {
1019 SqlServerColumnDecodeType::String
1020 }
1021 proto_sql_server_column_desc::DecodeType::Bytes(()) => SqlServerColumnDecodeType::Bytes,
1022 proto_sql_server_column_desc::DecodeType::Uuid(()) => SqlServerColumnDecodeType::Uuid,
1023 proto_sql_server_column_desc::DecodeType::Numeric(()) => {
1024 SqlServerColumnDecodeType::Numeric
1025 }
1026 proto_sql_server_column_desc::DecodeType::Xml(()) => SqlServerColumnDecodeType::Xml,
1027 proto_sql_server_column_desc::DecodeType::NaiveDate(()) => {
1028 SqlServerColumnDecodeType::NaiveDate
1029 }
1030 proto_sql_server_column_desc::DecodeType::NaiveTime(()) => {
1031 SqlServerColumnDecodeType::NaiveTime
1032 }
1033 proto_sql_server_column_desc::DecodeType::DateTime(()) => {
1034 SqlServerColumnDecodeType::DateTime
1035 }
1036 proto_sql_server_column_desc::DecodeType::NaiveDateTime(()) => {
1037 SqlServerColumnDecodeType::NaiveDateTime
1038 }
1039 proto_sql_server_column_desc::DecodeType::Unsupported(context) => {
1040 SqlServerColumnDecodeType::Unsupported { context }
1041 }
1042 };
1043 Ok(val)
1044 }
1045}
1046
1047fn tiberius_numeric_to_mz_numeric(val: tiberius::numeric::Numeric) -> Numeric {
1050 let mut numeric = mz_repr::adt::numeric::cx_datum().from_i128(val.value());
1051 mz_repr::adt::numeric::cx_datum().scaleb(&mut numeric, &Numeric::from(-i32::from(val.scale())));
1054 numeric
1055}
1056
1057#[derive(Debug)]
1062pub struct UpdateMask {
1063 mask: Vec<u8>,
1064}
1065
1066impl TryFrom<&tiberius::Row> for UpdateMask {
1067 type Error = SqlServerDecodeError;
1068
1069 fn try_from(row: &tiberius::Row) -> Result<Self, Self::Error> {
1070 static UPDATE_MASK: &str = "__$update_mask";
1071
1072 let mask: Vec<u8> = row
1073 .try_get::<&[u8], _>(UPDATE_MASK)
1074 .inspect_err(|e| tracing::warn!("Failed extracting update mask: {e:?}"))
1075 .map_err(|_| SqlServerDecodeError::InvalidColumn {
1076 column_name: UPDATE_MASK.to_string(),
1077 as_type: "bytes",
1078 })?
1079 .ok_or_else(|| SqlServerDecodeError::InvalidData {
1080 column_name: UPDATE_MASK.to_string(),
1081 error: "column cannot be null".to_string(),
1082 })?
1083 .into();
1084 Ok(UpdateMask { mask })
1085 }
1086}
1087
1088impl UpdateMask {
1089 pub fn data_col_updated(&self, col_index: usize) -> bool {
1102 const CDC_METADATA_COL_COUNT: usize = 4;
1103
1104 if col_index < CDC_METADATA_COL_COUNT {
1105 return false;
1106 }
1107 let adj_col_index = col_index - CDC_METADATA_COL_COUNT;
1108 let byte_offset = adj_col_index / usize::cast_from(u8::BITS);
1109 assert!(
1110 byte_offset < self.mask.len(),
1111 "byte_offset = {byte_offset} mask_len = {}",
1112 self.mask.len()
1113 );
1114 let bit_offset = adj_col_index % usize::cast_from(u8::BITS);
1115 (self.mask[self.mask.len() - byte_offset - 1] >> bit_offset) & 1 == 1
1116 }
1117}
1118
1119#[derive(Debug)]
1124pub struct SqlServerRowDecoder {
1125 decoders: Vec<(Arc<str>, SqlColumnType, SqlServerColumnDecodeType)>,
1126}
1127
1128impl SqlServerRowDecoder {
1129 pub fn try_new(
1133 table: &SqlServerTableDesc,
1134 desc: &RelationDesc,
1135 ) -> Result<Self, SqlServerError> {
1136 let decoders = desc
1137 .iter()
1138 .map(|(col_name, col_type)| {
1139 let sql_server_col = table
1140 .columns
1141 .iter()
1142 .find(|col| col.name.as_ref() == col_name.as_str())
1143 .ok_or_else(|| {
1144 anyhow::anyhow!("no SQL Server column with name {col_name} found")
1146 })?;
1147 let Some(sql_server_col_typ) = sql_server_col.column_type.as_ref() else {
1148 return Err(SqlServerError::ProgrammingError(format!(
1149 "programming error, {col_name} should have been exluded",
1150 )));
1151 };
1152
1153 let matches = match (&sql_server_col_typ.scalar_type, &col_type.scalar_type) {
1161 (SqlScalarType::Timestamp { .. }, SqlScalarType::Timestamp { .. })
1162 | (SqlScalarType::TimestampTz { .. }, SqlScalarType::TimestampTz { .. }) => {
1163 sql_server_col_typ.nullable == col_type.nullable
1165 }
1166 (_, _) => sql_server_col_typ == col_type,
1167 };
1168 if !matches {
1169 return Err(SqlServerError::ProgrammingError(format!(
1170 "programming error, {col_name} has mismatched type {:?} vs {:?}",
1171 sql_server_col.column_type, col_type
1172 )));
1173 }
1174
1175 let name = Arc::clone(&sql_server_col.name);
1176 let decoder = sql_server_col.decode_type.clone();
1177 let col_typ = sql_server_col_typ.clone();
1182
1183 Ok::<_, SqlServerError>((name, col_typ, decoder))
1184 })
1185 .collect::<Result<_, _>>()?;
1186
1187 Ok(SqlServerRowDecoder { decoders })
1188 }
1189
1190 pub fn decode(
1197 &self,
1198 data: &tiberius::Row,
1199 row: &mut Row,
1200 arena: &RowArena,
1201 new_data: Option<&tiberius::Row>,
1202 ) -> Result<(), SqlServerDecodeError> {
1203 let mut packer = row.packer();
1204
1205 for (col_name, col_type, decoder) in &self.decoders {
1206 let datum = decoder.decode(data, col_name, col_type, arena)?;
1207
1208 let datum = if let Some(new_data) = new_data
1209 && matches!(
1210 col_type.scalar_type,
1211 SqlScalarType::VarChar { max_length: None }
1212 )
1213 && matches!(datum, Datum::Null)
1214 {
1215 let update_mask = UpdateMask::try_from(new_data)?;
1216 let col_index = new_data
1217 .columns()
1218 .iter()
1219 .position(|c| c.name() == col_name.as_ref())
1220 .expect("column exists");
1221 if !update_mask.data_col_updated(col_index) {
1226 decoder.decode(new_data, col_name, col_type, arena)?
1227 } else {
1228 datum
1229 }
1230 } else {
1231 datum
1232 };
1233
1234 packer.push(datum);
1235 }
1236 Ok(())
1237 }
1238
1239 pub fn included_column_names(&self) -> Vec<Arc<str>> {
1240 self.decoders
1241 .iter()
1242 .map(|decoder| Arc::clone(&decoder.0))
1243 .collect()
1244 }
1245}
1246
1247#[cfg(test)]
1248mod tests {
1249 use std::collections::BTreeSet;
1250 use std::sync::Arc;
1251
1252 use chrono::NaiveDateTime;
1253 use itertools::Itertools;
1254 use mz_ore::assert_contains;
1255 use mz_ore::collections::CollectionExt;
1256 use mz_repr::adt::numeric::NumericMaxScale;
1257 use mz_repr::adt::varchar::VarCharMaxLength;
1258 use mz_repr::{Datum, RelationDesc, Row, RowArena, SqlScalarType};
1259 use tiberius::RowTestExt;
1260
1261 use crate::desc::{
1262 SqlServerCaptureInstanceRaw, SqlServerColumnDecodeType, SqlServerColumnDesc,
1263 SqlServerTableDesc, SqlServerTableRaw, tiberius_numeric_to_mz_numeric,
1264 };
1265
1266 use super::SqlServerColumnRaw;
1267
1268 impl SqlServerColumnRaw {
1269 fn new(name: &str, data_type: &str) -> Self {
1272 SqlServerColumnRaw {
1273 name: name.into(),
1274 data_type: data_type.into(),
1275 is_nullable: false,
1276 max_length: 0,
1277 precision: 0,
1278 scale: 0,
1279 is_computed: false,
1280 }
1281 }
1282
1283 fn nullable(mut self, nullable: bool) -> Self {
1284 self.is_nullable = nullable;
1285 self
1286 }
1287
1288 fn max_length(mut self, max_length: i16) -> Self {
1289 self.max_length = max_length;
1290 self
1291 }
1292
1293 fn precision(mut self, precision: u8) -> Self {
1294 self.precision = precision;
1295 self
1296 }
1297
1298 fn scale(mut self, scale: u8) -> Self {
1299 self.scale = scale;
1300 self
1301 }
1302 }
1303
1304 #[mz_ore::test]
1305 fn smoketest_column_raw() {
1306 let raw = SqlServerColumnRaw::new("foo", "bit");
1307 let col = SqlServerColumnDesc::new(&raw);
1308
1309 assert_eq!(&*col.name, "foo");
1310 assert_eq!(col.column_type, Some(SqlScalarType::Bool.nullable(false)));
1311 assert_eq!(col.decode_type, SqlServerColumnDecodeType::Bool);
1312
1313 let raw = SqlServerColumnRaw::new("foo", "decimal")
1314 .precision(20)
1315 .scale(10);
1316 let col = SqlServerColumnDesc::new(&raw);
1317
1318 let col_type = SqlScalarType::Numeric {
1319 max_scale: Some(NumericMaxScale::try_from(10i64).expect("known valid")),
1320 }
1321 .nullable(false);
1322 assert_eq!(col.column_type, Some(col_type));
1323 assert_eq!(col.decode_type, SqlServerColumnDecodeType::Numeric);
1324 }
1325
1326 #[mz_ore::test]
1327 fn smoketest_column_raw_invalid() {
1328 let raw = SqlServerColumnRaw::new("foo", "bad_data_type");
1329 let desc = SqlServerColumnDesc::new(&raw);
1330 let SqlServerColumnDecodeType::Unsupported { context } = desc.decode_type else {
1331 panic!("unexpected decode type {desc:?}");
1332 };
1333 assert_contains!(context, "'bad_data_type' is unimplemented");
1334
1335 let raw = SqlServerColumnRaw::new("foo", "decimal")
1336 .precision(100)
1337 .scale(10);
1338 let desc = SqlServerColumnDesc::new(&raw);
1339 assert!(matches!(
1340 desc.decode_type,
1341 SqlServerColumnDecodeType::Unsupported { .. }
1342 ));
1343
1344 let raw = SqlServerColumnRaw::new("foo", "varbinary").max_length(-1);
1345 let desc = SqlServerColumnDesc::new(&raw);
1346 let SqlServerColumnDecodeType::Unsupported { context } = desc.decode_type else {
1347 panic!("unexpected decode type {desc:?}");
1348 };
1349 assert_contains!(context, "columns with unlimited size do not support CDC");
1350 }
1351
1352 #[mz_ore::test]
1353 fn smoketest_decoder() {
1354 let sql_server_columns = [
1355 SqlServerColumnRaw::new("a", "varchar").max_length(16),
1356 SqlServerColumnRaw::new("b", "int").nullable(true),
1357 SqlServerColumnRaw::new("c", "bit"),
1358 ];
1359 let sql_server_desc = SqlServerTableRaw {
1360 schema_name: "my_schema".into(),
1361 name: "my_table".into(),
1362 capture_instance: Arc::new(SqlServerCaptureInstanceRaw {
1363 name: "my_table_CT".into(),
1364 create_date: NaiveDateTime::parse_from_str(
1365 "2024-01-01 00:00:00",
1366 "%Y-%m-%d %H:%M:%S",
1367 )
1368 .unwrap()
1369 .into(),
1370 }),
1371 columns: sql_server_columns.into(),
1372 };
1373 let sql_server_desc = SqlServerTableDesc::new(sql_server_desc, vec![]).unwrap();
1374
1375 let max_length = Some(VarCharMaxLength::try_from(16).unwrap());
1376 let relation_desc = RelationDesc::builder()
1377 .with_column("a", SqlScalarType::VarChar { max_length }.nullable(false))
1378 .with_column("c", SqlScalarType::Bool.nullable(false))
1380 .with_column("b", SqlScalarType::Int32.nullable(true))
1381 .finish();
1382
1383 let decoder = sql_server_desc
1385 .decoder(&relation_desc)
1386 .expect("known valid");
1387
1388 let sql_server_columns = [
1389 tiberius::Column::new("a".to_string(), tiberius::ColumnType::BigVarChar),
1390 tiberius::Column::new("b".to_string(), tiberius::ColumnType::Int4),
1391 tiberius::Column::new("c".to_string(), tiberius::ColumnType::Bit),
1392 ];
1393
1394 let data_a = [
1395 tiberius::ColumnData::String(Some("hello world".into())),
1396 tiberius::ColumnData::I32(Some(42)),
1397 tiberius::ColumnData::Bit(Some(true)),
1398 ];
1399 let sql_server_row_a = tiberius::Row::build(
1400 sql_server_columns
1401 .iter()
1402 .cloned()
1403 .zip_eq(data_a.into_iter()),
1404 );
1405
1406 let data_b = [
1407 tiberius::ColumnData::String(Some("foo bar".into())),
1408 tiberius::ColumnData::I32(None),
1409 tiberius::ColumnData::Bit(Some(false)),
1410 ];
1411 let sql_server_row_b =
1412 tiberius::Row::build(sql_server_columns.into_iter().zip_eq(data_b.into_iter()));
1413
1414 let mut rnd_row = Row::default();
1415 let arena = RowArena::default();
1416
1417 decoder
1418 .decode(&sql_server_row_a, &mut rnd_row, &arena, None)
1419 .unwrap();
1420 assert_eq!(
1421 &rnd_row,
1422 &Row::pack_slice(&[Datum::String("hello world"), Datum::True, Datum::Int32(42)])
1423 );
1424
1425 decoder
1426 .decode(&sql_server_row_b, &mut rnd_row, &arena, None)
1427 .unwrap();
1428 assert_eq!(
1429 &rnd_row,
1430 &Row::pack_slice(&[Datum::String("foo bar"), Datum::False, Datum::Null])
1431 );
1432 }
1433
1434 #[mz_ore::test]
1435 fn smoketest_decode_to_string() {
1436 #[track_caller]
1437 fn testcase(
1438 data_type: &'static str,
1439 col_type: tiberius::ColumnType,
1440 col_data: tiberius::ColumnData<'static>,
1441 ) {
1442 let columns = [SqlServerColumnRaw::new("a", data_type)];
1443 let sql_server_desc = SqlServerTableRaw {
1444 schema_name: "my_schema".into(),
1445 name: "my_table".into(),
1446 capture_instance: Arc::new(SqlServerCaptureInstanceRaw {
1447 name: "my_table_CT".into(),
1448 create_date: NaiveDateTime::parse_from_str(
1449 "2024-01-01 00:00:00",
1450 "%Y-%m-%d %H:%M:%S",
1451 )
1452 .unwrap()
1453 .into(),
1454 }),
1455 columns: columns.into(),
1456 };
1457 let mut sql_server_desc = SqlServerTableDesc::new(sql_server_desc, vec![]).unwrap();
1458 sql_server_desc.apply_text_columns(&BTreeSet::from(["a"]));
1459
1460 let relation_desc = RelationDesc::builder()
1462 .with_column("a", SqlScalarType::String.nullable(false))
1463 .finish();
1464
1465 let decoder = sql_server_desc
1467 .decoder(&relation_desc)
1468 .expect("known valid");
1469
1470 let sql_server_row = tiberius::Row::build([(
1471 tiberius::Column::new("a".to_string(), col_type),
1472 col_data,
1473 )]);
1474 let mut mz_row = Row::default();
1475 let arena = RowArena::new();
1476 decoder
1477 .decode(&sql_server_row, &mut mz_row, &arena, None)
1478 .unwrap();
1479
1480 let str_datum = mz_row.into_element();
1481 assert!(matches!(str_datum, Datum::String(_)));
1482 }
1483
1484 use tiberius::ColumnData;
1485
1486 testcase(
1487 "bit",
1488 tiberius::ColumnType::Bit,
1489 ColumnData::Bit(Some(true)),
1490 );
1491 testcase(
1492 "bit",
1493 tiberius::ColumnType::Bit,
1494 ColumnData::Bit(Some(false)),
1495 );
1496 testcase(
1497 "tinyint",
1498 tiberius::ColumnType::Int1,
1499 ColumnData::U8(Some(33)),
1500 );
1501 testcase(
1502 "smallint",
1503 tiberius::ColumnType::Int2,
1504 ColumnData::I16(Some(101)),
1505 );
1506 testcase(
1507 "int",
1508 tiberius::ColumnType::Int4,
1509 ColumnData::I32(Some(-42)),
1510 );
1511 {
1512 let datetime = tiberius::time::DateTime::new(10, 300);
1513 testcase(
1514 "datetime",
1515 tiberius::ColumnType::Datetime,
1516 ColumnData::DateTime(Some(datetime)),
1517 );
1518 }
1519 }
1520
1521 #[mz_ore::test]
1522 #[cfg_attr(miri, ignore)] fn smoketest_numeric_conversion() {
1524 let a = tiberius::numeric::Numeric::new_with_scale(12345, 2);
1525 let rnd = tiberius_numeric_to_mz_numeric(a);
1526 let og = mz_repr::adt::numeric::cx_datum().parse("123.45").unwrap();
1527 assert_eq!(og, rnd);
1528
1529 let a = tiberius::numeric::Numeric::new_with_scale(-99999, 5);
1530 let rnd = tiberius_numeric_to_mz_numeric(a);
1531 let og = mz_repr::adt::numeric::cx_datum().parse("-.99999").unwrap();
1532 assert_eq!(og, rnd);
1533
1534 let a = tiberius::numeric::Numeric::new_with_scale(1, 29);
1535 let rnd = tiberius_numeric_to_mz_numeric(a);
1536 let og = mz_repr::adt::numeric::cx_datum()
1537 .parse("0.00000000000000000000000000001")
1538 .unwrap();
1539 assert_eq!(og, rnd);
1540
1541 let a = tiberius::numeric::Numeric::new_with_scale(-111111111111111111, 0);
1542 let rnd = tiberius_numeric_to_mz_numeric(a);
1543 let og = mz_repr::adt::numeric::cx_datum()
1544 .parse("-111111111111111111")
1545 .unwrap();
1546 assert_eq!(og, rnd);
1547 }
1548
1549 }