1use base64::Engine;
26use chrono::{NaiveDateTime, SubsecRound};
27use dec::OrderedDecimal;
28use mz_ore::cast::CastFrom;
29use mz_proto::{IntoRustIfSome, ProtoType, RustType};
30use mz_repr::adt::char::CharLength;
31use mz_repr::adt::numeric::{Numeric, NumericMaxScale};
32use mz_repr::adt::timestamp::{CheckedTimestamp, TimestampPrecision};
33use mz_repr::adt::varchar::VarCharMaxLength;
34use mz_repr::{Datum, RelationDesc, Row, RowArena, SqlColumnType, SqlScalarType};
35use proptest_derive::Arbitrary;
36use serde::{Deserialize, Serialize};
37
38use std::collections::BTreeSet;
39use std::sync::Arc;
40
41use crate::desc::proto_sql_server_table_constraint::ConstraintType;
42use crate::{SqlServerDecodeError, SqlServerError};
43
44include!(concat!(env!("OUT_DIR"), "/mz_sql_server_util.rs"));
45
46#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Arbitrary)]
56pub struct SqlServerTableDesc {
57 pub schema_name: Arc<str>,
59 pub name: Arc<str>,
61 pub columns: Box<[SqlServerColumnDesc]>,
63 pub constraints: Vec<SqlServerTableConstraint>,
65}
66
67impl SqlServerTableDesc {
68 pub fn new(
73 raw: SqlServerTableRaw,
74 raw_constraints: Vec<SqlServerTableConstraintRaw>,
75 ) -> Result<Self, SqlServerError> {
76 let columns: Box<[_]> = raw
77 .columns
78 .into_iter()
79 .map(SqlServerColumnDesc::new)
80 .collect();
81 let constraints = raw_constraints
82 .into_iter()
83 .map(SqlServerTableConstraint::try_from)
84 .collect::<Result<Vec<_>, _>>()?;
85 Ok(SqlServerTableDesc {
86 schema_name: raw.schema_name,
87 name: raw.name,
88 columns,
89 constraints,
90 })
91 }
92
93 pub fn qualified_name(&self) -> SqlServerQualifiedTableName {
95 SqlServerQualifiedTableName {
96 schema_name: Arc::clone(&self.schema_name),
97 table_name: Arc::clone(&self.name),
98 }
99 }
100
101 pub fn apply_text_columns(&mut self, text_columns: &BTreeSet<&str>) {
104 for column in &mut self.columns {
105 if text_columns.contains(column.name.as_ref()) {
106 column.represent_as_text();
107 }
108 }
109 }
110
111 pub fn apply_excl_columns(&mut self, excl_columns: &BTreeSet<&str>) {
114 for column in &mut self.columns {
115 if excl_columns.contains(column.name.as_ref()) {
116 column.exclude();
117 }
118 }
119 }
120
121 pub fn decoder(&self, desc: &RelationDesc) -> Result<SqlServerRowDecoder, SqlServerError> {
124 let decoder = SqlServerRowDecoder::try_new(self, desc)?;
125 Ok(decoder)
126 }
127}
128
129impl RustType<ProtoSqlServerTableDesc> for SqlServerTableDesc {
130 fn into_proto(&self) -> ProtoSqlServerTableDesc {
131 ProtoSqlServerTableDesc {
132 name: self.name.to_string(),
133 schema_name: self.schema_name.to_string(),
134 columns: self.columns.iter().map(|c| c.into_proto()).collect(),
135 constraints: self.constraints.iter().map(|c| c.into_proto()).collect(),
136 }
137 }
138
139 fn from_proto(proto: ProtoSqlServerTableDesc) -> Result<Self, mz_proto::TryFromProtoError> {
140 let columns = proto
141 .columns
142 .into_iter()
143 .map(|c| c.into_rust())
144 .collect::<Result<_, _>>()?;
145 let constraints = proto
146 .constraints
147 .into_iter()
148 .map(|c| c.into_rust())
149 .collect::<Result<_, _>>()?;
150 Ok(SqlServerTableDesc {
151 schema_name: proto.schema_name.into(),
152 name: proto.name.into(),
153 columns,
154 constraints,
155 })
156 }
157}
158
159#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, Arbitrary)]
162pub enum SqlServerTableConstraintType {
163 PrimaryKey,
164 Unique,
165}
166
167impl TryFrom<String> for SqlServerTableConstraintType {
168 type Error = SqlServerError;
169
170 fn try_from(value: String) -> Result<Self, Self::Error> {
171 match value.as_str() {
172 "PRIMARY KEY" => Ok(Self::PrimaryKey),
173 "UNIQUE" => Ok(Self::Unique),
174 name => Err(SqlServerError::InvalidData {
175 column_name: "constraint_type".into(),
176 error: format!("Unknown constraint type: {name}"),
177 }),
178 }
179 }
180}
181
182impl RustType<proto_sql_server_table_constraint::ConstraintType> for SqlServerTableConstraintType {
183 fn into_proto(&self) -> proto_sql_server_table_constraint::ConstraintType {
184 match self {
185 SqlServerTableConstraintType::PrimaryKey => ConstraintType::PrimaryKey(()),
186 SqlServerTableConstraintType::Unique => ConstraintType::Unique(()),
187 }
188 }
189
190 fn from_proto(
191 proto: proto_sql_server_table_constraint::ConstraintType,
192 ) -> Result<Self, mz_proto::TryFromProtoError> {
193 Ok(match proto {
194 ConstraintType::PrimaryKey(_) => SqlServerTableConstraintType::PrimaryKey,
195 ConstraintType::Unique(_) => SqlServerTableConstraintType::Unique,
196 })
197 }
198}
199
200#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, Arbitrary)]
202pub struct SqlServerTableConstraint {
203 pub constraint_name: String,
204 pub constraint_type: SqlServerTableConstraintType,
205 pub column_names: Vec<String>,
206}
207
208impl TryFrom<SqlServerTableConstraintRaw> for SqlServerTableConstraint {
209 type Error = SqlServerError;
210
211 fn try_from(value: SqlServerTableConstraintRaw) -> Result<Self, Self::Error> {
212 Ok(SqlServerTableConstraint {
213 constraint_name: value.constraint_name,
214 constraint_type: value.constraint_type.try_into()?,
215 column_names: value.columns,
216 })
217 }
218}
219
220impl RustType<ProtoSqlServerTableConstraint> for SqlServerTableConstraint {
221 fn into_proto(&self) -> ProtoSqlServerTableConstraint {
222 ProtoSqlServerTableConstraint {
223 constraint_name: self.constraint_name.clone(),
224 constraint_type: Some(self.constraint_type.into_proto()),
225 column_names: self.column_names.clone(),
226 }
227 }
228
229 fn from_proto(
230 proto: ProtoSqlServerTableConstraint,
231 ) -> Result<Self, mz_proto::TryFromProtoError> {
232 Ok(SqlServerTableConstraint {
233 constraint_name: proto.constraint_name,
234 constraint_type: proto
235 .constraint_type
236 .into_rust_if_some("ProtoSqlServerTableConstraint::constraint_type")?,
237 column_names: proto.column_names,
238 })
239 }
240}
241
242#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
246pub struct SqlServerQualifiedTableName {
247 pub schema_name: Arc<str>,
248 pub table_name: Arc<str>,
249}
250
251impl ToString for SqlServerQualifiedTableName {
252 fn to_string(&self) -> String {
253 format!("[{}].[{}]", self.schema_name, self.table_name)
254 }
255}
256
257#[derive(Debug, Clone)]
262pub struct SqlServerTableRaw {
263 pub schema_name: Arc<str>,
265 pub name: Arc<str>,
267 pub capture_instance: Arc<SqlServerCaptureInstanceRaw>,
269 pub columns: Arc<[SqlServerColumnRaw]>,
271}
272
273#[derive(Debug, Clone)]
275pub struct SqlServerCaptureInstanceRaw {
276 pub name: Arc<str>,
278 pub create_date: Arc<NaiveDateTime>,
280}
281
282#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Arbitrary)]
284pub struct SqlServerColumnDesc {
285 pub name: Arc<str>,
287 pub column_type: Option<SqlColumnType>,
293 pub primary_key_constraint: Option<Arc<str>>,
296 pub decode_type: SqlServerColumnDecodeType,
298 pub raw_type: Arc<str>,
302}
303
304impl SqlServerColumnDesc {
305 pub fn new(raw: &SqlServerColumnRaw) -> Self {
307 let (column_type, decode_type) = match parse_data_type(raw) {
308 Ok((scalar_type, decode_type)) => {
309 let column_type = scalar_type.nullable(raw.is_nullable);
310 (Some(column_type), decode_type)
311 }
312 Err(err) => {
313 tracing::warn!(
314 ?err,
315 ?raw,
316 "found an unsupported data type when parsing raw data"
317 );
318 (
319 None,
320 SqlServerColumnDecodeType::Unsupported {
321 context: err.reason,
322 },
323 )
324 }
325 };
326 SqlServerColumnDesc {
327 name: Arc::clone(&raw.name),
328 primary_key_constraint: None,
329 column_type,
330 decode_type,
331 raw_type: Arc::clone(&raw.data_type),
332 }
333 }
334
335 pub fn represent_as_text(&mut self) {
337 self.column_type = self
338 .column_type
339 .as_ref()
340 .map(|ct| SqlScalarType::String.nullable(ct.nullable));
341 }
342
343 pub fn exclude(&mut self) {
345 self.column_type = None;
346 }
347
348 pub fn is_excluded(&self) -> bool {
350 self.column_type.is_none()
351 }
352}
353
354impl RustType<ProtoSqlServerColumnDesc> for SqlServerColumnDesc {
355 fn into_proto(&self) -> ProtoSqlServerColumnDesc {
356 ProtoSqlServerColumnDesc {
357 name: self.name.to_string(),
358 column_type: self.column_type.into_proto(),
359 primary_key_constraint: self.primary_key_constraint.as_ref().map(|v| v.to_string()),
360 decode_type: Some(self.decode_type.into_proto()),
361 raw_type: self.raw_type.to_string(),
362 }
363 }
364
365 fn from_proto(proto: ProtoSqlServerColumnDesc) -> Result<Self, mz_proto::TryFromProtoError> {
366 Ok(SqlServerColumnDesc {
367 name: proto.name.into(),
368 column_type: proto.column_type.into_rust()?,
369 primary_key_constraint: proto.primary_key_constraint.map(|v| v.into()),
370 decode_type: proto
371 .decode_type
372 .into_rust_if_some("ProtoSqlServerColumnDesc::decode_type")?,
373 raw_type: proto.raw_type.into(),
374 })
375 }
376}
377
378#[derive(Debug)]
380#[allow(dead_code)]
381pub struct UnsupportedDataType {
382 column_name: String,
383 column_type: String,
384 reason: String,
385}
386
387fn parse_data_type(
392 raw: &SqlServerColumnRaw,
393) -> Result<(SqlScalarType, SqlServerColumnDecodeType), UnsupportedDataType> {
394 if raw.is_computed {
398 return Err(UnsupportedDataType {
399 column_name: raw.name.to_string(),
400 column_type: format!("{} (computed)", raw.data_type.to_lowercase()),
401 reason: "column is computed".into(),
402 });
403 }
404
405 let scalar = match raw.data_type.to_lowercase().as_str() {
406 "tinyint" => (SqlScalarType::Int16, SqlServerColumnDecodeType::U8),
407 "smallint" => (SqlScalarType::Int16, SqlServerColumnDecodeType::I16),
408 "int" => (SqlScalarType::Int32, SqlServerColumnDecodeType::I32),
409 "bigint" => (SqlScalarType::Int64, SqlServerColumnDecodeType::I64),
410 "bit" => (SqlScalarType::Bool, SqlServerColumnDecodeType::Bool),
411 "decimal" | "numeric" | "money" | "smallmoney" => {
412 if raw.precision > 38 || raw.scale > raw.precision {
419 tracing::warn!(
420 "unexpected value from SQL Server, precision of {} and scale of {}",
421 raw.precision,
422 raw.scale,
423 );
424 }
425 if raw.precision > 39 {
426 let reason = format!(
427 "precision of {} is greater than our maximum of 39",
428 raw.precision
429 );
430 return Err(UnsupportedDataType {
431 column_name: raw.name.to_string(),
432 column_type: raw.data_type.to_string(),
433 reason,
434 });
435 }
436
437 let raw_scale = usize::cast_from(raw.scale);
438 let max_scale =
439 NumericMaxScale::try_from(raw_scale).map_err(|_| UnsupportedDataType {
440 column_type: raw.data_type.to_string(),
441 column_name: raw.name.to_string(),
442 reason: format!("scale of {} is too large", raw.scale),
443 })?;
444 let column_type = SqlScalarType::Numeric {
445 max_scale: Some(max_scale),
446 };
447
448 (column_type, SqlServerColumnDecodeType::Numeric)
449 }
450 "real" | "float" | "double precision" => match raw.max_length {
458 4 => (SqlScalarType::Float32, SqlServerColumnDecodeType::F32),
461 8 => (SqlScalarType::Float64, SqlServerColumnDecodeType::F64),
462 _ => {
463 return Err(UnsupportedDataType {
464 column_name: raw.name.to_string(),
465 column_type: raw.data_type.to_string(),
466 reason: format!("unsupported length {}", raw.max_length),
467 });
468 }
469 },
470 dt @ ("char" | "nchar" | "varchar" | "nvarchar" | "sysname") => {
471 if raw.max_length == -1 {
476 return Err(UnsupportedDataType {
477 column_name: raw.name.to_string(),
478 column_type: raw.data_type.to_string(),
479 reason: "columns with unlimited size do not support CDC".to_string(),
480 });
481 }
482
483 let column_type = match dt {
484 "char" => {
485 let length = CharLength::try_from(i64::from(raw.max_length)).map_err(|e| {
486 UnsupportedDataType {
487 column_name: raw.name.to_string(),
488 column_type: raw.data_type.to_string(),
489 reason: e.to_string(),
490 }
491 })?;
492 SqlScalarType::Char {
493 length: Some(length),
494 }
495 }
496 "varchar" => {
497 let length =
498 VarCharMaxLength::try_from(i64::from(raw.max_length)).map_err(|e| {
499 UnsupportedDataType {
500 column_name: raw.name.to_string(),
501 column_type: raw.data_type.to_string(),
502 reason: e.to_string(),
503 }
504 })?;
505 SqlScalarType::VarChar {
506 max_length: Some(length),
507 }
508 }
509 "nchar" | "nvarchar" | "sysname" => SqlScalarType::String,
513 other => unreachable!("'{other}' checked above"),
514 };
515
516 (column_type, SqlServerColumnDecodeType::String)
517 }
518 "text" | "ntext" | "image" => {
519 mz_ore::soft_assert_eq_no_log!(raw.max_length, 16);
522
523 return Err(UnsupportedDataType {
525 column_name: raw.name.to_string(),
526 column_type: raw.data_type.to_string(),
527 reason: "columns with unlimited size do not support CDC".to_string(),
528 });
529 }
530 "xml" => {
531 if raw.max_length == -1 {
536 return Err(UnsupportedDataType {
537 column_name: raw.name.to_string(),
538 column_type: raw.data_type.to_string(),
539 reason: "columns with unlimited size do not support CDC".to_string(),
540 });
541 }
542 (SqlScalarType::String, SqlServerColumnDecodeType::Xml)
543 }
544 "binary" | "varbinary" => {
545 if raw.max_length == -1 {
551 return Err(UnsupportedDataType {
552 column_name: raw.name.to_string(),
553 column_type: raw.data_type.to_string(),
554 reason: "columns with unlimited size do not support CDC".to_string(),
555 });
556 }
557
558 (SqlScalarType::Bytes, SqlServerColumnDecodeType::Bytes)
559 }
560 "json" => (SqlScalarType::Jsonb, SqlServerColumnDecodeType::String),
561 "date" => (SqlScalarType::Date, SqlServerColumnDecodeType::NaiveDate),
562 "time" => (SqlScalarType::Time, SqlServerColumnDecodeType::NaiveTime),
575 dt @ ("smalldatetime" | "datetime" | "datetime2" | "datetimeoffset") => {
576 if raw.scale > 7 {
577 tracing::warn!("unexpected scale '{}' from SQL Server", raw.scale);
578 }
579 if raw.scale > mz_repr::adt::timestamp::MAX_PRECISION {
580 tracing::warn!("truncating scale of '{}' for '{}'", raw.scale, dt);
581 }
582 let precision = std::cmp::min(raw.scale, mz_repr::adt::timestamp::MAX_PRECISION);
583 let precision =
584 Some(TimestampPrecision::try_from(i64::from(precision)).expect("known to fit"));
585
586 match dt {
587 "smalldatetime" | "datetime" | "datetime2" => (
588 SqlScalarType::Timestamp { precision },
589 SqlServerColumnDecodeType::NaiveDateTime,
590 ),
591 "datetimeoffset" => (
592 SqlScalarType::TimestampTz { precision },
593 SqlServerColumnDecodeType::DateTime,
594 ),
595 other => unreachable!("'{other}' checked above"),
596 }
597 }
598 "uniqueidentifier" => (SqlScalarType::Uuid, SqlServerColumnDecodeType::Uuid),
599 other => {
612 return Err(UnsupportedDataType {
613 column_type: other.to_string(),
614 column_name: raw.name.to_string(),
615 reason: format!("'{other}' is unimplemented"),
616 });
617 }
618 };
619 Ok(scalar)
620}
621
622#[derive(Clone, Debug)]
626pub struct SqlServerColumnRaw {
627 pub name: Arc<str>,
629 pub data_type: Arc<str>,
631 pub is_nullable: bool,
633 pub max_length: i16,
643 pub precision: u8,
645 pub scale: u8,
647 pub is_computed: bool,
649}
650
651#[derive(Clone, Debug)]
653pub struct SqlServerTableConstraintRaw {
654 pub constraint_name: String,
655 pub constraint_type: String,
656 pub columns: Vec<String>,
657}
658
659#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Arbitrary)]
661pub enum SqlServerColumnDecodeType {
662 Bool,
663 U8,
664 I16,
665 I32,
666 I64,
667 F32,
668 F64,
669 String,
670 Bytes,
671 Uuid,
673 Numeric,
675 Xml,
677 NaiveDate,
679 NaiveTime,
681 DateTime,
683 NaiveDateTime,
685 Unsupported {
687 context: String,
689 },
690}
691
692impl SqlServerColumnDecodeType {
693 pub fn decode<'a>(
695 &self,
696 data: &'a tiberius::Row,
697 name: &'a str,
698 column: &'a SqlColumnType,
699 arena: &'a RowArena,
700 ) -> Result<Datum<'a>, SqlServerDecodeError> {
701 let maybe_datum = match (&column.scalar_type, self) {
702 (SqlScalarType::Bool, SqlServerColumnDecodeType::Bool) => data
703 .try_get(name)
704 .map_err(|_| SqlServerDecodeError::invalid_column(name, "bool"))?
705 .map(|val: bool| if val { Datum::True } else { Datum::False }),
706 (SqlScalarType::Int16, SqlServerColumnDecodeType::U8) => data
707 .try_get(name)
708 .map_err(|_| SqlServerDecodeError::invalid_column(name, "u8"))?
709 .map(|val: u8| Datum::Int16(i16::cast_from(val))),
710 (SqlScalarType::Int16, SqlServerColumnDecodeType::I16) => data
711 .try_get(name)
712 .map_err(|_| SqlServerDecodeError::invalid_column(name, "i16"))?
713 .map(Datum::Int16),
714 (SqlScalarType::Int32, SqlServerColumnDecodeType::I32) => data
715 .try_get(name)
716 .map_err(|_| SqlServerDecodeError::invalid_column(name, "i32"))?
717 .map(Datum::Int32),
718 (SqlScalarType::Int64, SqlServerColumnDecodeType::I64) => data
719 .try_get(name)
720 .map_err(|_| SqlServerDecodeError::invalid_column(name, "i64"))?
721 .map(Datum::Int64),
722 (SqlScalarType::Float32, SqlServerColumnDecodeType::F32) => data
723 .try_get(name)
724 .map_err(|_| SqlServerDecodeError::invalid_column(name, "f32"))?
725 .map(|val: f32| Datum::Float32(ordered_float::OrderedFloat(val))),
726 (SqlScalarType::Float64, SqlServerColumnDecodeType::F64) => data
727 .try_get(name)
728 .map_err(|_| SqlServerDecodeError::invalid_column(name, "f64"))?
729 .map(|val: f64| Datum::Float64(ordered_float::OrderedFloat(val))),
730 (SqlScalarType::String, SqlServerColumnDecodeType::String) => data
731 .try_get(name)
732 .map_err(|_| SqlServerDecodeError::invalid_column(name, "string"))?
733 .map(Datum::String),
734 (SqlScalarType::Char { length }, SqlServerColumnDecodeType::String) => data
735 .try_get(name)
736 .map_err(|_| SqlServerDecodeError::invalid_column(name, "char"))?
737 .map(|val: &str| match length {
738 Some(expected) => {
739 let found_chars = val.chars().count();
740 let expct_chars = usize::cast_from(expected.into_u32());
741 if found_chars != expct_chars {
742 Err(SqlServerDecodeError::invalid_char(
743 name,
744 expct_chars,
745 found_chars,
746 ))
747 } else {
748 Ok(Datum::String(val))
749 }
750 }
751 None => Ok(Datum::String(val)),
752 })
753 .transpose()?,
754 (SqlScalarType::VarChar { max_length }, SqlServerColumnDecodeType::String) => data
755 .try_get(name)
756 .map_err(|_| SqlServerDecodeError::invalid_column(name, "varchar"))?
757 .map(|val: &str| match max_length {
758 Some(max) => {
759 let found_chars = val.chars().count();
760 let max_chars = usize::cast_from(max.into_u32());
761 if found_chars > max_chars {
762 Err(SqlServerDecodeError::invalid_varchar(
763 name,
764 max_chars,
765 found_chars,
766 ))
767 } else {
768 Ok(Datum::String(val))
769 }
770 }
771 None => Ok(Datum::String(val)),
772 })
773 .transpose()?,
774 (SqlScalarType::Bytes, SqlServerColumnDecodeType::Bytes) => data
775 .try_get(name)
776 .map_err(|_| SqlServerDecodeError::invalid_column(name, "bytes"))?
777 .map(Datum::Bytes),
778 (SqlScalarType::Uuid, SqlServerColumnDecodeType::Uuid) => data
779 .try_get(name)
780 .map_err(|_| SqlServerDecodeError::invalid_column(name, "uuid"))?
781 .map(Datum::Uuid),
782 (SqlScalarType::Numeric { .. }, SqlServerColumnDecodeType::Numeric) => data
783 .try_get(name)
784 .map_err(|_| SqlServerDecodeError::invalid_column(name, "numeric"))?
785 .map(|val: tiberius::numeric::Numeric| {
786 let numeric = tiberius_numeric_to_mz_numeric(val);
787 Datum::Numeric(OrderedDecimal(numeric))
788 }),
789 (SqlScalarType::String, SqlServerColumnDecodeType::Xml) => data
790 .try_get(name)
791 .map_err(|_| SqlServerDecodeError::invalid_column(name, "xml"))?
792 .map(|val: &tiberius::xml::XmlData| Datum::String(val.as_ref())),
793 (SqlScalarType::Date, SqlServerColumnDecodeType::NaiveDate) => data
794 .try_get(name)
795 .map_err(|_| SqlServerDecodeError::invalid_column(name, "date"))?
796 .map(|val: chrono::NaiveDate| {
797 let date = val
798 .try_into()
799 .map_err(|e| SqlServerDecodeError::invalid_date(name, e))?;
800 Ok::<_, SqlServerDecodeError>(Datum::Date(date))
801 })
802 .transpose()?,
803 (SqlScalarType::Time, SqlServerColumnDecodeType::NaiveTime) => data
804 .try_get(name)
805 .map_err(|_| SqlServerDecodeError::invalid_column(name, "time"))?
806 .map(|val: chrono::NaiveTime| {
807 let rounded = val.round_subsecs(6);
812 let val = if rounded < val {
814 val.trunc_subsecs(6)
815 } else {
816 val
817 };
818 Datum::Time(val)
819 }),
820 (SqlScalarType::Timestamp { precision }, SqlServerColumnDecodeType::NaiveDateTime) => {
821 data.try_get(name)
822 .map_err(|_| SqlServerDecodeError::invalid_column(name, "timestamp"))?
823 .map(|val: chrono::NaiveDateTime| {
824 let ts: CheckedTimestamp<chrono::NaiveDateTime> = val
825 .try_into()
826 .map_err(|e| SqlServerDecodeError::invalid_timestamp(name, e))?;
827 let rounded = ts
828 .round_to_precision(*precision)
829 .map_err(|e| SqlServerDecodeError::invalid_timestamp(name, e))?;
830 Ok::<_, SqlServerDecodeError>(Datum::Timestamp(rounded))
831 })
832 .transpose()?
833 }
834 (SqlScalarType::TimestampTz { precision }, SqlServerColumnDecodeType::DateTime) => data
835 .try_get(name)
836 .map_err(|_| SqlServerDecodeError::invalid_column(name, "timestamptz"))?
837 .map(|val: chrono::DateTime<chrono::Utc>| {
838 let ts: CheckedTimestamp<chrono::DateTime<chrono::Utc>> = val
839 .try_into()
840 .map_err(|e| SqlServerDecodeError::invalid_timestamp(name, e))?;
841 let rounded = ts
842 .round_to_precision(*precision)
843 .map_err(|e| SqlServerDecodeError::invalid_timestamp(name, e))?;
844 Ok::<_, SqlServerDecodeError>(Datum::TimestampTz(rounded))
845 })
846 .transpose()?,
847 (SqlScalarType::String, SqlServerColumnDecodeType::Bool) => data
849 .try_get(name)
850 .map_err(|_| SqlServerDecodeError::invalid_column(name, "bool-text"))?
851 .map(|val: bool| {
852 if val {
853 Datum::String("true")
854 } else {
855 Datum::String("false")
856 }
857 }),
858 (SqlScalarType::String, SqlServerColumnDecodeType::U8) => data
859 .try_get(name)
860 .map_err(|_| SqlServerDecodeError::invalid_column(name, "u8-text"))?
861 .map(|val: u8| {
862 arena.make_datum(|packer| packer.push(Datum::String(&val.to_string())))
863 }),
864 (SqlScalarType::String, SqlServerColumnDecodeType::I16) => data
865 .try_get(name)
866 .map_err(|_| SqlServerDecodeError::invalid_column(name, "i16-text"))?
867 .map(|val: i16| {
868 arena.make_datum(|packer| packer.push(Datum::String(&val.to_string())))
869 }),
870 (SqlScalarType::String, SqlServerColumnDecodeType::I32) => data
871 .try_get(name)
872 .map_err(|_| SqlServerDecodeError::invalid_column(name, "i32-text"))?
873 .map(|val: i32| {
874 arena.make_datum(|packer| packer.push(Datum::String(&val.to_string())))
875 }),
876 (SqlScalarType::String, SqlServerColumnDecodeType::I64) => data
877 .try_get(name)
878 .map_err(|_| SqlServerDecodeError::invalid_column(name, "i64-text"))?
879 .map(|val: i64| {
880 arena.make_datum(|packer| packer.push(Datum::String(&val.to_string())))
881 }),
882 (SqlScalarType::String, SqlServerColumnDecodeType::F32) => data
883 .try_get(name)
884 .map_err(|_| SqlServerDecodeError::invalid_column(name, "f32-text"))?
885 .map(|val: f32| {
886 arena.make_datum(|packer| packer.push(Datum::String(&val.to_string())))
887 }),
888 (SqlScalarType::String, SqlServerColumnDecodeType::F64) => data
889 .try_get(name)
890 .map_err(|_| SqlServerDecodeError::invalid_column(name, "f64-text"))?
891 .map(|val: f64| {
892 arena.make_datum(|packer| packer.push(Datum::String(&val.to_string())))
893 }),
894 (SqlScalarType::String, SqlServerColumnDecodeType::Uuid) => data
895 .try_get(name)
896 .map_err(|_| SqlServerDecodeError::invalid_column(name, "uuid-text"))?
897 .map(|val: uuid::Uuid| {
898 arena.make_datum(|packer| packer.push(Datum::String(&val.to_string())))
899 }),
900 (SqlScalarType::String, SqlServerColumnDecodeType::Bytes) => data
901 .try_get(name)
902 .map_err(|_| SqlServerDecodeError::invalid_column(name, "bytes-text"))?
903 .map(|val: &[u8]| {
904 let encoded = base64::engine::general_purpose::STANDARD.encode(val);
905 arena.make_datum(|packer| packer.push(Datum::String(&encoded)))
906 }),
907 (SqlScalarType::String, SqlServerColumnDecodeType::Numeric) => data
908 .try_get(name)
909 .map_err(|_| SqlServerDecodeError::invalid_column(name, "numeric-text"))?
910 .map(|val: tiberius::numeric::Numeric| {
911 arena.make_datum(|packer| packer.push(Datum::String(&val.to_string())))
912 }),
913 (SqlScalarType::String, SqlServerColumnDecodeType::NaiveDate) => data
914 .try_get(name)
915 .map_err(|_| SqlServerDecodeError::invalid_column(name, "naivedate-text"))?
916 .map(|val: chrono::NaiveDate| {
917 arena.make_datum(|packer| packer.push(Datum::String(&val.to_string())))
918 }),
919 (SqlScalarType::String, SqlServerColumnDecodeType::NaiveTime) => data
920 .try_get(name)
921 .map_err(|_| SqlServerDecodeError::invalid_column(name, "naivetime-text"))?
922 .map(|val: chrono::NaiveTime| {
923 arena.make_datum(|packer| packer.push(Datum::String(&val.to_string())))
924 }),
925 (SqlScalarType::String, SqlServerColumnDecodeType::DateTime) => data
926 .try_get(name)
927 .map_err(|_| SqlServerDecodeError::invalid_column(name, "datetime-text"))?
928 .map(|val: chrono::DateTime<chrono::Utc>| {
929 arena.make_datum(|packer| packer.push(Datum::String(&val.to_string())))
930 }),
931 (SqlScalarType::String, SqlServerColumnDecodeType::NaiveDateTime) => data
932 .try_get(name)
933 .map_err(|_| SqlServerDecodeError::invalid_column(name, "naivedatetime-text"))?
934 .map(|val: chrono::NaiveDateTime| {
935 arena.make_datum(|packer| packer.push(Datum::String(&val.to_string())))
936 }),
937 (column_type, decode_type) => {
938 return Err(SqlServerDecodeError::Unsupported {
939 sql_server_type: decode_type.clone(),
940 mz_type: column_type.clone(),
941 });
942 }
943 };
944
945 match (maybe_datum, column.nullable) {
946 (Some(datum), _) => Ok(datum),
947 (None, true) => Ok(Datum::Null),
948 (None, false) => Err(SqlServerDecodeError::InvalidData {
949 column_name: name.to_string(),
950 error: "found Null in non-nullable column".to_string(),
952 }),
953 }
954 }
955}
956
957impl RustType<proto_sql_server_column_desc::DecodeType> for SqlServerColumnDecodeType {
958 fn into_proto(&self) -> proto_sql_server_column_desc::DecodeType {
959 match self {
960 SqlServerColumnDecodeType::Bool => proto_sql_server_column_desc::DecodeType::Bool(()),
961 SqlServerColumnDecodeType::U8 => proto_sql_server_column_desc::DecodeType::U8(()),
962 SqlServerColumnDecodeType::I16 => proto_sql_server_column_desc::DecodeType::I16(()),
963 SqlServerColumnDecodeType::I32 => proto_sql_server_column_desc::DecodeType::I32(()),
964 SqlServerColumnDecodeType::I64 => proto_sql_server_column_desc::DecodeType::I64(()),
965 SqlServerColumnDecodeType::F32 => proto_sql_server_column_desc::DecodeType::F32(()),
966 SqlServerColumnDecodeType::F64 => proto_sql_server_column_desc::DecodeType::F64(()),
967 SqlServerColumnDecodeType::String => {
968 proto_sql_server_column_desc::DecodeType::String(())
969 }
970 SqlServerColumnDecodeType::Bytes => proto_sql_server_column_desc::DecodeType::Bytes(()),
971 SqlServerColumnDecodeType::Uuid => proto_sql_server_column_desc::DecodeType::Uuid(()),
972 SqlServerColumnDecodeType::Numeric => {
973 proto_sql_server_column_desc::DecodeType::Numeric(())
974 }
975 SqlServerColumnDecodeType::Xml => proto_sql_server_column_desc::DecodeType::Xml(()),
976 SqlServerColumnDecodeType::NaiveDate => {
977 proto_sql_server_column_desc::DecodeType::NaiveDate(())
978 }
979 SqlServerColumnDecodeType::NaiveTime => {
980 proto_sql_server_column_desc::DecodeType::NaiveTime(())
981 }
982 SqlServerColumnDecodeType::DateTime => {
983 proto_sql_server_column_desc::DecodeType::DateTime(())
984 }
985 SqlServerColumnDecodeType::NaiveDateTime => {
986 proto_sql_server_column_desc::DecodeType::NaiveDateTime(())
987 }
988 SqlServerColumnDecodeType::Unsupported { context } => {
989 proto_sql_server_column_desc::DecodeType::Unsupported(context.clone())
990 }
991 }
992 }
993
994 fn from_proto(
995 proto: proto_sql_server_column_desc::DecodeType,
996 ) -> Result<Self, mz_proto::TryFromProtoError> {
997 let val = match proto {
998 proto_sql_server_column_desc::DecodeType::Bool(()) => SqlServerColumnDecodeType::Bool,
999 proto_sql_server_column_desc::DecodeType::U8(()) => SqlServerColumnDecodeType::U8,
1000 proto_sql_server_column_desc::DecodeType::I16(()) => SqlServerColumnDecodeType::I16,
1001 proto_sql_server_column_desc::DecodeType::I32(()) => SqlServerColumnDecodeType::I32,
1002 proto_sql_server_column_desc::DecodeType::I64(()) => SqlServerColumnDecodeType::I64,
1003 proto_sql_server_column_desc::DecodeType::F32(()) => SqlServerColumnDecodeType::F32,
1004 proto_sql_server_column_desc::DecodeType::F64(()) => SqlServerColumnDecodeType::F64,
1005 proto_sql_server_column_desc::DecodeType::String(()) => {
1006 SqlServerColumnDecodeType::String
1007 }
1008 proto_sql_server_column_desc::DecodeType::Bytes(()) => SqlServerColumnDecodeType::Bytes,
1009 proto_sql_server_column_desc::DecodeType::Uuid(()) => SqlServerColumnDecodeType::Uuid,
1010 proto_sql_server_column_desc::DecodeType::Numeric(()) => {
1011 SqlServerColumnDecodeType::Numeric
1012 }
1013 proto_sql_server_column_desc::DecodeType::Xml(()) => SqlServerColumnDecodeType::Xml,
1014 proto_sql_server_column_desc::DecodeType::NaiveDate(()) => {
1015 SqlServerColumnDecodeType::NaiveDate
1016 }
1017 proto_sql_server_column_desc::DecodeType::NaiveTime(()) => {
1018 SqlServerColumnDecodeType::NaiveTime
1019 }
1020 proto_sql_server_column_desc::DecodeType::DateTime(()) => {
1021 SqlServerColumnDecodeType::DateTime
1022 }
1023 proto_sql_server_column_desc::DecodeType::NaiveDateTime(()) => {
1024 SqlServerColumnDecodeType::NaiveDateTime
1025 }
1026 proto_sql_server_column_desc::DecodeType::Unsupported(context) => {
1027 SqlServerColumnDecodeType::Unsupported { context }
1028 }
1029 };
1030 Ok(val)
1031 }
1032}
1033
1034fn tiberius_numeric_to_mz_numeric(val: tiberius::numeric::Numeric) -> Numeric {
1037 let mut numeric = mz_repr::adt::numeric::cx_datum().from_i128(val.value());
1038 mz_repr::adt::numeric::cx_datum().scaleb(&mut numeric, &Numeric::from(-i32::from(val.scale())));
1041 numeric
1042}
1043
1044#[derive(Debug)]
1049pub struct SqlServerRowDecoder {
1050 decoders: Vec<(Arc<str>, SqlColumnType, SqlServerColumnDecodeType)>,
1051}
1052
1053impl SqlServerRowDecoder {
1054 pub fn try_new(
1058 table: &SqlServerTableDesc,
1059 desc: &RelationDesc,
1060 ) -> Result<Self, SqlServerError> {
1061 let decoders = desc
1062 .iter()
1063 .map(|(col_name, col_type)| {
1064 let sql_server_col = table
1065 .columns
1066 .iter()
1067 .find(|col| col.name.as_ref() == col_name.as_str())
1068 .ok_or_else(|| {
1069 anyhow::anyhow!("no SQL Server column with name {col_name} found")
1071 })?;
1072 let Some(sql_server_col_typ) = sql_server_col.column_type.as_ref() else {
1073 return Err(SqlServerError::ProgrammingError(format!(
1074 "programming error, {col_name} should have been exluded",
1075 )));
1076 };
1077
1078 let matches = match (&sql_server_col_typ.scalar_type, &col_type.scalar_type) {
1086 (SqlScalarType::Timestamp { .. }, SqlScalarType::Timestamp { .. })
1087 | (SqlScalarType::TimestampTz { .. }, SqlScalarType::TimestampTz { .. }) => {
1088 sql_server_col_typ.nullable == col_type.nullable
1090 }
1091 (_, _) => sql_server_col_typ == col_type,
1092 };
1093 if !matches {
1094 return Err(SqlServerError::ProgrammingError(format!(
1095 "programming error, {col_name} has mismatched type {:?} vs {:?}",
1096 sql_server_col.column_type, col_type
1097 )));
1098 }
1099
1100 let name = Arc::clone(&sql_server_col.name);
1101 let decoder = sql_server_col.decode_type.clone();
1102 let col_typ = sql_server_col_typ.clone();
1107
1108 Ok::<_, SqlServerError>((name, col_typ, decoder))
1109 })
1110 .collect::<Result<_, _>>()?;
1111
1112 Ok(SqlServerRowDecoder { decoders })
1113 }
1114
1115 pub fn decode(
1117 &self,
1118 data: &tiberius::Row,
1119 row: &mut Row,
1120 arena: &RowArena,
1121 ) -> Result<(), SqlServerDecodeError> {
1122 let mut packer = row.packer();
1123 for (col_name, col_type, decoder) in &self.decoders {
1124 let datum = decoder.decode(data, col_name, col_type, arena)?;
1125 packer.push(datum);
1126 }
1127 Ok(())
1128 }
1129}
1130
1131#[cfg(test)]
1132mod tests {
1133 use std::collections::BTreeSet;
1134 use std::sync::Arc;
1135
1136 use chrono::NaiveDateTime;
1137 use itertools::Itertools;
1138 use mz_ore::assert_contains;
1139 use mz_ore::collections::CollectionExt;
1140 use mz_repr::adt::numeric::NumericMaxScale;
1141 use mz_repr::adt::varchar::VarCharMaxLength;
1142 use mz_repr::{Datum, RelationDesc, Row, RowArena, SqlScalarType};
1143 use tiberius::RowTestExt;
1144
1145 use crate::desc::{
1146 SqlServerCaptureInstanceRaw, SqlServerColumnDecodeType, SqlServerColumnDesc,
1147 SqlServerTableDesc, SqlServerTableRaw, tiberius_numeric_to_mz_numeric,
1148 };
1149
1150 use super::SqlServerColumnRaw;
1151
1152 impl SqlServerColumnRaw {
1153 fn new(name: &str, data_type: &str) -> Self {
1156 SqlServerColumnRaw {
1157 name: name.into(),
1158 data_type: data_type.into(),
1159 is_nullable: false,
1160 max_length: 0,
1161 precision: 0,
1162 scale: 0,
1163 is_computed: false,
1164 }
1165 }
1166
1167 fn nullable(mut self, nullable: bool) -> Self {
1168 self.is_nullable = nullable;
1169 self
1170 }
1171
1172 fn max_length(mut self, max_length: i16) -> Self {
1173 self.max_length = max_length;
1174 self
1175 }
1176
1177 fn precision(mut self, precision: u8) -> Self {
1178 self.precision = precision;
1179 self
1180 }
1181
1182 fn scale(mut self, scale: u8) -> Self {
1183 self.scale = scale;
1184 self
1185 }
1186 }
1187
1188 #[mz_ore::test]
1189 fn smoketest_column_raw() {
1190 let raw = SqlServerColumnRaw::new("foo", "bit");
1191 let col = SqlServerColumnDesc::new(&raw);
1192
1193 assert_eq!(&*col.name, "foo");
1194 assert_eq!(col.column_type, Some(SqlScalarType::Bool.nullable(false)));
1195 assert_eq!(col.decode_type, SqlServerColumnDecodeType::Bool);
1196
1197 let raw = SqlServerColumnRaw::new("foo", "decimal")
1198 .precision(20)
1199 .scale(10);
1200 let col = SqlServerColumnDesc::new(&raw);
1201
1202 let col_type = SqlScalarType::Numeric {
1203 max_scale: Some(NumericMaxScale::try_from(10i64).expect("known valid")),
1204 }
1205 .nullable(false);
1206 assert_eq!(col.column_type, Some(col_type));
1207 assert_eq!(col.decode_type, SqlServerColumnDecodeType::Numeric);
1208 }
1209
1210 #[mz_ore::test]
1211 fn smoketest_column_raw_invalid() {
1212 let raw = SqlServerColumnRaw::new("foo", "bad_data_type");
1213 let desc = SqlServerColumnDesc::new(&raw);
1214 let SqlServerColumnDecodeType::Unsupported { context } = desc.decode_type else {
1215 panic!("unexpected decode type {desc:?}");
1216 };
1217 assert_contains!(context, "'bad_data_type' is unimplemented");
1218
1219 let raw = SqlServerColumnRaw::new("foo", "decimal")
1220 .precision(100)
1221 .scale(10);
1222 let desc = SqlServerColumnDesc::new(&raw);
1223 assert!(matches!(
1224 desc.decode_type,
1225 SqlServerColumnDecodeType::Unsupported { .. }
1226 ));
1227
1228 let raw = SqlServerColumnRaw::new("foo", "varchar").max_length(-1);
1229 let desc = SqlServerColumnDesc::new(&raw);
1230 let SqlServerColumnDecodeType::Unsupported { context } = desc.decode_type else {
1231 panic!("unexpected decode type {desc:?}");
1232 };
1233 assert_contains!(context, "columns with unlimited size do not support CDC");
1234 }
1235
1236 #[mz_ore::test]
1237 fn smoketest_decoder() {
1238 let sql_server_columns = [
1239 SqlServerColumnRaw::new("a", "varchar").max_length(16),
1240 SqlServerColumnRaw::new("b", "int").nullable(true),
1241 SqlServerColumnRaw::new("c", "bit"),
1242 ];
1243 let sql_server_desc = SqlServerTableRaw {
1244 schema_name: "my_schema".into(),
1245 name: "my_table".into(),
1246 capture_instance: Arc::new(SqlServerCaptureInstanceRaw {
1247 name: "my_table_CT".into(),
1248 create_date: NaiveDateTime::parse_from_str(
1249 "2024-01-01 00:00:00",
1250 "%Y-%m-%d %H:%M:%S",
1251 )
1252 .unwrap()
1253 .into(),
1254 }),
1255 columns: sql_server_columns.into(),
1256 };
1257 let sql_server_desc = SqlServerTableDesc::new(sql_server_desc, vec![]).unwrap();
1258
1259 let max_length = Some(VarCharMaxLength::try_from(16).unwrap());
1260 let relation_desc = RelationDesc::builder()
1261 .with_column("a", SqlScalarType::VarChar { max_length }.nullable(false))
1262 .with_column("c", SqlScalarType::Bool.nullable(false))
1264 .with_column("b", SqlScalarType::Int32.nullable(true))
1265 .finish();
1266
1267 let decoder = sql_server_desc
1269 .decoder(&relation_desc)
1270 .expect("known valid");
1271
1272 let sql_server_columns = [
1273 tiberius::Column::new("a".to_string(), tiberius::ColumnType::BigVarChar),
1274 tiberius::Column::new("b".to_string(), tiberius::ColumnType::Int4),
1275 tiberius::Column::new("c".to_string(), tiberius::ColumnType::Bit),
1276 ];
1277
1278 let data_a = [
1279 tiberius::ColumnData::String(Some("hello world".into())),
1280 tiberius::ColumnData::I32(Some(42)),
1281 tiberius::ColumnData::Bit(Some(true)),
1282 ];
1283 let sql_server_row_a = tiberius::Row::build(
1284 sql_server_columns
1285 .iter()
1286 .cloned()
1287 .zip_eq(data_a.into_iter()),
1288 );
1289
1290 let data_b = [
1291 tiberius::ColumnData::String(Some("foo bar".into())),
1292 tiberius::ColumnData::I32(None),
1293 tiberius::ColumnData::Bit(Some(false)),
1294 ];
1295 let sql_server_row_b =
1296 tiberius::Row::build(sql_server_columns.into_iter().zip_eq(data_b.into_iter()));
1297
1298 let mut rnd_row = Row::default();
1299 let arena = RowArena::default();
1300
1301 decoder
1302 .decode(&sql_server_row_a, &mut rnd_row, &arena)
1303 .unwrap();
1304 assert_eq!(
1305 &rnd_row,
1306 &Row::pack_slice(&[Datum::String("hello world"), Datum::True, Datum::Int32(42)])
1307 );
1308
1309 decoder
1310 .decode(&sql_server_row_b, &mut rnd_row, &arena)
1311 .unwrap();
1312 assert_eq!(
1313 &rnd_row,
1314 &Row::pack_slice(&[Datum::String("foo bar"), Datum::False, Datum::Null])
1315 );
1316 }
1317
1318 #[mz_ore::test]
1319 fn smoketest_decode_to_string() {
1320 #[track_caller]
1321 fn testcase(
1322 data_type: &'static str,
1323 col_type: tiberius::ColumnType,
1324 col_data: tiberius::ColumnData<'static>,
1325 ) {
1326 let columns = [SqlServerColumnRaw::new("a", data_type)];
1327 let sql_server_desc = SqlServerTableRaw {
1328 schema_name: "my_schema".into(),
1329 name: "my_table".into(),
1330 capture_instance: Arc::new(SqlServerCaptureInstanceRaw {
1331 name: "my_table_CT".into(),
1332 create_date: NaiveDateTime::parse_from_str(
1333 "2024-01-01 00:00:00",
1334 "%Y-%m-%d %H:%M:%S",
1335 )
1336 .unwrap()
1337 .into(),
1338 }),
1339 columns: columns.into(),
1340 };
1341 let mut sql_server_desc = SqlServerTableDesc::new(sql_server_desc, vec![]).unwrap();
1342 sql_server_desc.apply_text_columns(&BTreeSet::from(["a"]));
1343
1344 let relation_desc = RelationDesc::builder()
1346 .with_column("a", SqlScalarType::String.nullable(false))
1347 .finish();
1348
1349 let decoder = sql_server_desc
1351 .decoder(&relation_desc)
1352 .expect("known valid");
1353
1354 let sql_server_row = tiberius::Row::build([(
1355 tiberius::Column::new("a".to_string(), col_type),
1356 col_data,
1357 )]);
1358 let mut mz_row = Row::default();
1359 let arena = RowArena::new();
1360 decoder
1361 .decode(&sql_server_row, &mut mz_row, &arena)
1362 .unwrap();
1363
1364 let str_datum = mz_row.into_element();
1365 assert!(matches!(str_datum, Datum::String(_)));
1366 }
1367
1368 use tiberius::ColumnData;
1369
1370 testcase(
1371 "bit",
1372 tiberius::ColumnType::Bit,
1373 ColumnData::Bit(Some(true)),
1374 );
1375 testcase(
1376 "bit",
1377 tiberius::ColumnType::Bit,
1378 ColumnData::Bit(Some(false)),
1379 );
1380 testcase(
1381 "tinyint",
1382 tiberius::ColumnType::Int1,
1383 ColumnData::U8(Some(33)),
1384 );
1385 testcase(
1386 "smallint",
1387 tiberius::ColumnType::Int2,
1388 ColumnData::I16(Some(101)),
1389 );
1390 testcase(
1391 "int",
1392 tiberius::ColumnType::Int4,
1393 ColumnData::I32(Some(-42)),
1394 );
1395 {
1396 let datetime = tiberius::time::DateTime::new(10, 300);
1397 testcase(
1398 "datetime",
1399 tiberius::ColumnType::Datetime,
1400 ColumnData::DateTime(Some(datetime)),
1401 );
1402 }
1403 }
1404
1405 #[mz_ore::test]
1406 #[cfg_attr(miri, ignore)] fn smoketest_numeric_conversion() {
1408 let a = tiberius::numeric::Numeric::new_with_scale(12345, 2);
1409 let rnd = tiberius_numeric_to_mz_numeric(a);
1410 let og = mz_repr::adt::numeric::cx_datum().parse("123.45").unwrap();
1411 assert_eq!(og, rnd);
1412
1413 let a = tiberius::numeric::Numeric::new_with_scale(-99999, 5);
1414 let rnd = tiberius_numeric_to_mz_numeric(a);
1415 let og = mz_repr::adt::numeric::cx_datum().parse("-.99999").unwrap();
1416 assert_eq!(og, rnd);
1417
1418 let a = tiberius::numeric::Numeric::new_with_scale(1, 29);
1419 let rnd = tiberius_numeric_to_mz_numeric(a);
1420 let og = mz_repr::adt::numeric::cx_datum()
1421 .parse("0.00000000000000000000000000001")
1422 .unwrap();
1423 assert_eq!(og, rnd);
1424
1425 let a = tiberius::numeric::Numeric::new_with_scale(-111111111111111111, 0);
1426 let rnd = tiberius_numeric_to_mz_numeric(a);
1427 let og = mz_repr::adt::numeric::cx_datum()
1428 .parse("-111111111111111111")
1429 .unwrap();
1430 assert_eq!(og, rnd);
1431 }
1432
1433 }