mz_sql_server_util/
desc.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Metadata about tables, columns, and other objects from SQL Server.
11//!
12//! ### Tables
13//!
14//! When creating a SQL Server source we will query system tables from the
15//! upstream instance to get a [`SqlServerTableRaw`]. From this raw information
16//! we create a [`SqlServerTableDesc`] which describes how the upstream table
17//! will get represented in Materialize.
18//!
19//! ### Rows
20//!
21//! With a [`SqlServerTableDesc`] and an [`mz_repr::RelationDesc`] we can
22//! create a [`SqlServerRowDecoder`] which will be used when running a source
23//! to efficiently decode [`tiberius::Row`]s into [`mz_repr::Row`]s.
24
25use base64::Engine;
26use chrono::{NaiveDateTime, SubsecRound};
27use dec::OrderedDecimal;
28use mz_ore::cast::CastFrom;
29use mz_proto::{IntoRustIfSome, ProtoType, RustType};
30use mz_repr::adt::char::CharLength;
31use mz_repr::adt::numeric::{Numeric, NumericMaxScale};
32use mz_repr::adt::timestamp::{CheckedTimestamp, TimestampPrecision};
33use mz_repr::adt::varchar::VarCharMaxLength;
34use mz_repr::{Datum, RelationDesc, Row, RowArena, SqlColumnType, SqlScalarType};
35use proptest_derive::Arbitrary;
36use serde::{Deserialize, Serialize};
37
38use std::collections::BTreeSet;
39use std::sync::Arc;
40
41use crate::desc::proto_sql_server_table_constraint::ConstraintType;
42use crate::{SqlServerDecodeError, SqlServerError};
43
44include!(concat!(env!("OUT_DIR"), "/mz_sql_server_util.rs"));
45
46/// Materialize compatible description of a table in Microsoft SQL Server.
47///
48/// See [`SqlServerTableRaw`] for the raw information we read from the upstream
49/// system.
50///
51/// Note: We map a [`SqlServerTableDesc`] to a Materialize [`RelationDesc`] as
52/// part of purification. Specifically we use this description to generate a
53/// SQL statement for subsource and it's the _parsing of that statement_ which
54/// actually generates a [`RelationDesc`].
55#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Arbitrary)]
56pub struct SqlServerTableDesc {
57    /// Name of the schema that the table belongs to.
58    pub schema_name: Arc<str>,
59    /// Name of the table.
60    pub name: Arc<str>,
61    /// Columns for the table.
62    pub columns: Box<[SqlServerColumnDesc]>,
63    /// Constraints for the table.
64    pub constraints: Vec<SqlServerTableConstraint>,
65}
66
67impl SqlServerTableDesc {
68    /// Creating a [`SqlServerTableDesc`] from a [`SqlServerTableRaw`] description.
69    ///
70    /// Note: Not all columns from SQL Server can be ingested into Materialize. To determine if a
71    /// column is supported see [`SqlServerColumnDesc::decode_type`].
72    pub fn new(
73        raw: SqlServerTableRaw,
74        raw_constraints: Vec<SqlServerTableConstraintRaw>,
75    ) -> Result<Self, SqlServerError> {
76        let columns: Box<[_]> = raw
77            .columns
78            .into_iter()
79            .map(SqlServerColumnDesc::new)
80            .collect();
81        let constraints = raw_constraints
82            .into_iter()
83            .map(SqlServerTableConstraint::try_from)
84            .collect::<Result<Vec<_>, _>>()?;
85        Ok(SqlServerTableDesc {
86            schema_name: raw.schema_name,
87            name: raw.name,
88            columns,
89            constraints,
90        })
91    }
92
93    /// Returns the [`SqlServerQualifiedTableName`] for this [`SqlServerTableDesc`].
94    pub fn qualified_name(&self) -> SqlServerQualifiedTableName {
95        SqlServerQualifiedTableName {
96            schema_name: Arc::clone(&self.schema_name),
97            table_name: Arc::clone(&self.name),
98        }
99    }
100
101    /// Update this [`SqlServerTableDesc`] to represent the specified columns
102    /// as text in Materialize.
103    pub fn apply_text_columns(&mut self, text_columns: &BTreeSet<&str>) {
104        for column in &mut self.columns {
105            if text_columns.contains(column.name.as_ref()) {
106                column.represent_as_text();
107            }
108        }
109    }
110
111    /// Update this [`SqlServerTableDesc`] to exclude the specified columns from being
112    /// replicated into Materialize.
113    pub fn apply_excl_columns(&mut self, excl_columns: &BTreeSet<&str>) {
114        for column in &mut self.columns {
115            if excl_columns.contains(column.name.as_ref()) {
116                column.exclude();
117            }
118        }
119    }
120
121    /// Returns a [`SqlServerRowDecoder`] which can be used to decode [`tiberius::Row`]s into
122    /// [`mz_repr::Row`]s that match the shape of the provided [`RelationDesc`].
123    pub fn decoder(&self, desc: &RelationDesc) -> Result<SqlServerRowDecoder, SqlServerError> {
124        let decoder = SqlServerRowDecoder::try_new(self, desc)?;
125        Ok(decoder)
126    }
127}
128
129impl RustType<ProtoSqlServerTableDesc> for SqlServerTableDesc {
130    fn into_proto(&self) -> ProtoSqlServerTableDesc {
131        ProtoSqlServerTableDesc {
132            name: self.name.to_string(),
133            schema_name: self.schema_name.to_string(),
134            columns: self.columns.iter().map(|c| c.into_proto()).collect(),
135            constraints: self.constraints.iter().map(|c| c.into_proto()).collect(),
136        }
137    }
138
139    fn from_proto(proto: ProtoSqlServerTableDesc) -> Result<Self, mz_proto::TryFromProtoError> {
140        let columns = proto
141            .columns
142            .into_iter()
143            .map(|c| c.into_rust())
144            .collect::<Result<_, _>>()?;
145        let constraints = proto
146            .constraints
147            .into_iter()
148            .map(|c| c.into_rust())
149            .collect::<Result<_, _>>()?;
150        Ok(SqlServerTableDesc {
151            schema_name: proto.schema_name.into(),
152            name: proto.name.into(),
153            columns,
154            constraints,
155        })
156    }
157}
158
159/// SQL Server table constraint type (e.g. PRIMARY KEY, UNIQUE, etc.)
160/// See <https://learn.microsoft.com/en-us/sql/relational-databases/system-information-schema-views/table-constraints-transact-sql?view=sql-server-ver17>
161#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, Arbitrary)]
162pub enum SqlServerTableConstraintType {
163    PrimaryKey,
164    Unique,
165}
166
167impl TryFrom<String> for SqlServerTableConstraintType {
168    type Error = SqlServerError;
169
170    fn try_from(value: String) -> Result<Self, Self::Error> {
171        match value.as_str() {
172            "PRIMARY KEY" => Ok(Self::PrimaryKey),
173            "UNIQUE" => Ok(Self::Unique),
174            name => Err(SqlServerError::InvalidData {
175                column_name: "constraint_type".into(),
176                error: format!("Unknown constraint type: {name}"),
177            }),
178        }
179    }
180}
181
182impl RustType<proto_sql_server_table_constraint::ConstraintType> for SqlServerTableConstraintType {
183    fn into_proto(&self) -> proto_sql_server_table_constraint::ConstraintType {
184        match self {
185            SqlServerTableConstraintType::PrimaryKey => ConstraintType::PrimaryKey(()),
186            SqlServerTableConstraintType::Unique => ConstraintType::Unique(()),
187        }
188    }
189
190    fn from_proto(
191        proto: proto_sql_server_table_constraint::ConstraintType,
192    ) -> Result<Self, mz_proto::TryFromProtoError> {
193        Ok(match proto {
194            ConstraintType::PrimaryKey(_) => SqlServerTableConstraintType::PrimaryKey,
195            ConstraintType::Unique(_) => SqlServerTableConstraintType::Unique,
196        })
197    }
198}
199
200/// SQL Server table constraint.
201#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, Arbitrary)]
202pub struct SqlServerTableConstraint {
203    pub constraint_name: String,
204    pub constraint_type: SqlServerTableConstraintType,
205    pub column_names: Vec<String>,
206}
207
208impl TryFrom<SqlServerTableConstraintRaw> for SqlServerTableConstraint {
209    type Error = SqlServerError;
210
211    fn try_from(value: SqlServerTableConstraintRaw) -> Result<Self, Self::Error> {
212        Ok(SqlServerTableConstraint {
213            constraint_name: value.constraint_name,
214            constraint_type: value.constraint_type.try_into()?,
215            column_names: value.columns,
216        })
217    }
218}
219
220impl RustType<ProtoSqlServerTableConstraint> for SqlServerTableConstraint {
221    fn into_proto(&self) -> ProtoSqlServerTableConstraint {
222        ProtoSqlServerTableConstraint {
223            constraint_name: self.constraint_name.clone(),
224            constraint_type: Some(self.constraint_type.into_proto()),
225            column_names: self.column_names.clone(),
226        }
227    }
228
229    fn from_proto(
230        proto: ProtoSqlServerTableConstraint,
231    ) -> Result<Self, mz_proto::TryFromProtoError> {
232        Ok(SqlServerTableConstraint {
233            constraint_name: proto.constraint_name,
234            constraint_type: proto
235                .constraint_type
236                .into_rust_if_some("ProtoSqlServerTableConstraint::constraint_type")?,
237            column_names: proto.column_names,
238        })
239    }
240}
241
242/// Partially qualified name of a table from Microsoft SQL Server.
243///
244/// TODO(sql_server3): Change this to use a &str.
245#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
246pub struct SqlServerQualifiedTableName {
247    pub schema_name: Arc<str>,
248    pub table_name: Arc<str>,
249}
250
251impl ToString for SqlServerQualifiedTableName {
252    fn to_string(&self) -> String {
253        format!("[{}].[{}]", self.schema_name, self.table_name)
254    }
255}
256
257/// Raw metadata for a table from Microsoft SQL Server.
258///
259/// See [`SqlServerTableDesc`] for a refined description that is compatible
260/// with Materialize.
261#[derive(Debug, Clone)]
262pub struct SqlServerTableRaw {
263    /// Name of the schema the table belongs to.
264    pub schema_name: Arc<str>,
265    /// Name of the table.
266    pub name: Arc<str>,
267    /// The capture instance replicating changes.
268    pub capture_instance: Arc<SqlServerCaptureInstanceRaw>,
269    /// Columns for the table.
270    pub columns: Arc<[SqlServerColumnRaw]>,
271}
272
273/// Raw capture instance metadata.
274#[derive(Debug, Clone)]
275pub struct SqlServerCaptureInstanceRaw {
276    /// The capture instance replicating changes.
277    pub name: Arc<str>,
278    /// The creation date of the capture instance.
279    pub create_date: Arc<NaiveDateTime>,
280}
281
282/// Description of a column from a table in Microsoft SQL Server.
283#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Arbitrary)]
284pub struct SqlServerColumnDesc {
285    /// Name of the column.
286    pub name: Arc<str>,
287    /// The intended data type of the this column in Materialize. `None` indicates this
288    /// column should be excluded when replicating into Materialize.
289    ///
290    /// Note: This type might differ from the `decode_type`, e.g. a user can
291    /// specify `TEXT COLUMNS` to decode columns as text.
292    pub column_type: Option<SqlColumnType>,
293    /// This field is deprecated and will be removed in a future version.  This exists only for the
294    /// purpose of migrating from old representations.
295    pub primary_key_constraint: Option<Arc<str>>,
296    /// Rust type we should parse the data from a [`tiberius::Row`] as.
297    pub decode_type: SqlServerColumnDecodeType,
298    /// Raw type of the column as we read it from upstream.
299    ///
300    /// This is useful to keep around for debugging purposes.
301    pub raw_type: Arc<str>,
302}
303
304impl SqlServerColumnDesc {
305    /// Create a [`SqlServerColumnDesc`] from a [`SqlServerColumnRaw`] description.
306    pub fn new(raw: &SqlServerColumnRaw) -> Self {
307        let (column_type, decode_type) = match parse_data_type(raw) {
308            Ok((scalar_type, decode_type)) => {
309                let column_type = scalar_type.nullable(raw.is_nullable);
310                (Some(column_type), decode_type)
311            }
312            Err(err) => {
313                tracing::warn!(
314                    ?err,
315                    ?raw,
316                    "found an unsupported data type when parsing raw data"
317                );
318                (
319                    None,
320                    SqlServerColumnDecodeType::Unsupported {
321                        context: err.reason,
322                    },
323                )
324            }
325        };
326        SqlServerColumnDesc {
327            name: Arc::clone(&raw.name),
328            primary_key_constraint: None,
329            column_type,
330            decode_type,
331            raw_type: Arc::clone(&raw.data_type),
332        }
333    }
334
335    /// Change this [`SqlServerColumnDesc`] to be represented as text in Materialize.
336    pub fn represent_as_text(&mut self) {
337        self.column_type = self
338            .column_type
339            .as_ref()
340            .map(|ct| SqlScalarType::String.nullable(ct.nullable));
341    }
342
343    /// Exclude this [`SqlServerColumnDesc`] from being replicated into Materialize.
344    pub fn exclude(&mut self) {
345        self.column_type = None;
346    }
347
348    /// Check if this [`SqlServerColumnDesc`] is excluded from being replicated into Materialize.
349    pub fn is_excluded(&self) -> bool {
350        self.column_type.is_none()
351    }
352}
353
354impl RustType<ProtoSqlServerColumnDesc> for SqlServerColumnDesc {
355    fn into_proto(&self) -> ProtoSqlServerColumnDesc {
356        ProtoSqlServerColumnDesc {
357            name: self.name.to_string(),
358            column_type: self.column_type.into_proto(),
359            primary_key_constraint: self.primary_key_constraint.as_ref().map(|v| v.to_string()),
360            decode_type: Some(self.decode_type.into_proto()),
361            raw_type: self.raw_type.to_string(),
362        }
363    }
364
365    fn from_proto(proto: ProtoSqlServerColumnDesc) -> Result<Self, mz_proto::TryFromProtoError> {
366        Ok(SqlServerColumnDesc {
367            name: proto.name.into(),
368            column_type: proto.column_type.into_rust()?,
369            primary_key_constraint: proto.primary_key_constraint.map(|v| v.into()),
370            decode_type: proto
371                .decode_type
372                .into_rust_if_some("ProtoSqlServerColumnDesc::decode_type")?,
373            raw_type: proto.raw_type.into(),
374        })
375    }
376}
377
378/// The raw datatype from SQL Server is not supported in Materialize.
379#[derive(Debug)]
380#[allow(dead_code)]
381pub struct UnsupportedDataType {
382    column_name: String,
383    column_type: String,
384    reason: String,
385}
386
387/// Parse a raw data type from SQL Server into a Materialize [`SqlScalarType`].
388///
389/// Returns the [`SqlScalarType`] that we'll map this column to and the [`SqlServerColumnDecodeType`]
390/// that we use to decode the raw value.
391fn parse_data_type(
392    raw: &SqlServerColumnRaw,
393) -> Result<(SqlScalarType, SqlServerColumnDecodeType), UnsupportedDataType> {
394    // The value of a computed column, persisted or not, will be readable by the snapshot, but will
395    // always be NULL in the CDC stream.  This can lead to issues in MZ (e.g. decoding errors,
396    // negative accumulations, etc.).
397    if raw.is_computed {
398        return Err(UnsupportedDataType {
399            column_name: raw.name.to_string(),
400            column_type: format!("{} (computed)", raw.data_type.to_lowercase()),
401            reason: "column is computed".into(),
402        });
403    }
404
405    let scalar = match raw.data_type.to_lowercase().as_str() {
406        "tinyint" => (SqlScalarType::Int16, SqlServerColumnDecodeType::U8),
407        "smallint" => (SqlScalarType::Int16, SqlServerColumnDecodeType::I16),
408        "int" => (SqlScalarType::Int32, SqlServerColumnDecodeType::I32),
409        "bigint" => (SqlScalarType::Int64, SqlServerColumnDecodeType::I64),
410        "bit" => (SqlScalarType::Bool, SqlServerColumnDecodeType::Bool),
411        "decimal" | "numeric" | "money" | "smallmoney" => {
412            // SQL Server supports a precision in the range of [1, 38] and then
413            // the scale is 0 <= scale <= precision.
414            //
415            // Materialize numerics are floating point with a fixed precision of 39.
416            //
417            // See: <https://learn.microsoft.com/en-us/sql/t-sql/data-types/decimal-and-numeric-transact-sql?view=sql-server-ver16#arguments>
418            if raw.precision > 38 || raw.scale > raw.precision {
419                tracing::warn!(
420                    "unexpected value from SQL Server, precision of {} and scale of {}",
421                    raw.precision,
422                    raw.scale,
423                );
424            }
425            if raw.precision > 39 {
426                let reason = format!(
427                    "precision of {} is greater than our maximum of 39",
428                    raw.precision
429                );
430                return Err(UnsupportedDataType {
431                    column_name: raw.name.to_string(),
432                    column_type: raw.data_type.to_string(),
433                    reason,
434                });
435            }
436
437            let raw_scale = usize::cast_from(raw.scale);
438            let max_scale =
439                NumericMaxScale::try_from(raw_scale).map_err(|_| UnsupportedDataType {
440                    column_type: raw.data_type.to_string(),
441                    column_name: raw.name.to_string(),
442                    reason: format!("scale of {} is too large", raw.scale),
443                })?;
444            let column_type = SqlScalarType::Numeric {
445                max_scale: Some(max_scale),
446            };
447
448            (column_type, SqlServerColumnDecodeType::Numeric)
449        }
450        // SQL Server has a few IEEE 754 floating point type names. The underlying type is float(n),
451        // where n is the number of bits used. SQL Server still ends up with only 2 distinct types
452        // as it treats 1 <= n <= 24 as n=24, and 25 <= n <= 53 as n=53.
453        //
454        // Additionally, `real` and `double precision` exist as synonyms of float(24) and float(53),
455        // respectively.  What doesn't appear to be documented is how these appear in `sys.types`.
456        // See <https://learn.microsoft.com/en-us/sql/t-sql/data-types/float-and-real-transact-sql?view=sql-server-ver17>
457        "real" | "float" | "double precision" => match raw.max_length {
458            // Decide the MZ type based on the number of bytes rather than the name, just in case
459            // there is inconsistency among versions.
460            4 => (SqlScalarType::Float32, SqlServerColumnDecodeType::F32),
461            8 => (SqlScalarType::Float64, SqlServerColumnDecodeType::F64),
462            _ => {
463                return Err(UnsupportedDataType {
464                    column_name: raw.name.to_string(),
465                    column_type: raw.data_type.to_string(),
466                    reason: format!("unsupported length {}", raw.max_length),
467                });
468            }
469        },
470        dt @ ("char" | "nchar" | "varchar" | "nvarchar" | "sysname") => {
471            // When the `max_length` is -1 SQL Server will not present us with the "before" value
472            // for updated columns.
473            //
474            // TODO(sql_server3): Support UPSERT semantics for SQL Server.
475            if raw.max_length == -1 {
476                return Err(UnsupportedDataType {
477                    column_name: raw.name.to_string(),
478                    column_type: raw.data_type.to_string(),
479                    reason: "columns with unlimited size do not support CDC".to_string(),
480                });
481            }
482
483            let column_type = match dt {
484                "char" => {
485                    let length = CharLength::try_from(i64::from(raw.max_length)).map_err(|e| {
486                        UnsupportedDataType {
487                            column_name: raw.name.to_string(),
488                            column_type: raw.data_type.to_string(),
489                            reason: e.to_string(),
490                        }
491                    })?;
492                    SqlScalarType::Char {
493                        length: Some(length),
494                    }
495                }
496                "varchar" => {
497                    let length =
498                        VarCharMaxLength::try_from(i64::from(raw.max_length)).map_err(|e| {
499                            UnsupportedDataType {
500                                column_name: raw.name.to_string(),
501                                column_type: raw.data_type.to_string(),
502                                reason: e.to_string(),
503                            }
504                        })?;
505                    SqlScalarType::VarChar {
506                        max_length: Some(length),
507                    }
508                }
509                // Determining the max character count for these types is difficult
510                // because of different character encodings, so we fallback to just
511                // representing them as "text".
512                "nchar" | "nvarchar" | "sysname" => SqlScalarType::String,
513                other => unreachable!("'{other}' checked above"),
514            };
515
516            (column_type, SqlServerColumnDecodeType::String)
517        }
518        "text" | "ntext" | "image" => {
519            // SQL Server docs indicate this should always be 16. There's no
520            // issue if it's not, but it's good to track.
521            mz_ore::soft_assert_eq_no_log!(raw.max_length, 16);
522
523            // TODO(sql_server3): Support UPSERT semantics for SQL Server.
524            return Err(UnsupportedDataType {
525                column_name: raw.name.to_string(),
526                column_type: raw.data_type.to_string(),
527                reason: "columns with unlimited size do not support CDC".to_string(),
528            });
529        }
530        "xml" => {
531            // When the `max_length` is -1 SQL Server will not present us with the "before" value
532            // for updated columns.
533            //
534            // TODO(sql_server3): Support UPSERT semantics for SQL Server.
535            if raw.max_length == -1 {
536                return Err(UnsupportedDataType {
537                    column_name: raw.name.to_string(),
538                    column_type: raw.data_type.to_string(),
539                    reason: "columns with unlimited size do not support CDC".to_string(),
540                });
541            }
542            (SqlScalarType::String, SqlServerColumnDecodeType::Xml)
543        }
544        "binary" | "varbinary" => {
545            // When the `max_length` is -1 if this column changes as part of an `UPDATE`
546            // or `DELETE` statement, SQL Server will not provide the "old" value for
547            // this column, but we need this value so we can emit a retraction.
548            //
549            // TODO(sql_server3): Support UPSERT semantics for SQL Server.
550            if raw.max_length == -1 {
551                return Err(UnsupportedDataType {
552                    column_name: raw.name.to_string(),
553                    column_type: raw.data_type.to_string(),
554                    reason: "columns with unlimited size do not support CDC".to_string(),
555                });
556            }
557
558            (SqlScalarType::Bytes, SqlServerColumnDecodeType::Bytes)
559        }
560        "json" => (SqlScalarType::Jsonb, SqlServerColumnDecodeType::String),
561        "date" => (SqlScalarType::Date, SqlServerColumnDecodeType::NaiveDate),
562        // SQL Server supports a scale of (and defaults to) 7 digits (aka 100 nanoseconds)
563        // for time related types.
564        //
565        // Internally Materialize supports a scale of 9 (aka nanoseconds), but for Postgres
566        // compatibility we constraint ourselves to a scale of 6 (aka microseconds). By
567        // default we will round values we get from  SQL Server to fit in Materialize.
568        //
569        // TODO(sql_server3): Support a "strict" mode where we're fail the creation of the
570        // source if the scale is too large.
571        // TODO(sql_server3): Support specifying a precision for SqlScalarType::Time.
572        //
573        // See: <https://learn.microsoft.com/en-us/sql/t-sql/data-types/datetime2-transact-sql?view=sql-server-ver16>.
574        "time" => (SqlScalarType::Time, SqlServerColumnDecodeType::NaiveTime),
575        dt @ ("smalldatetime" | "datetime" | "datetime2" | "datetimeoffset") => {
576            if raw.scale > 7 {
577                tracing::warn!("unexpected scale '{}' from SQL Server", raw.scale);
578            }
579            if raw.scale > mz_repr::adt::timestamp::MAX_PRECISION {
580                tracing::warn!("truncating scale of '{}' for '{}'", raw.scale, dt);
581            }
582            let precision = std::cmp::min(raw.scale, mz_repr::adt::timestamp::MAX_PRECISION);
583            let precision =
584                Some(TimestampPrecision::try_from(i64::from(precision)).expect("known to fit"));
585
586            match dt {
587                "smalldatetime" | "datetime" | "datetime2" => (
588                    SqlScalarType::Timestamp { precision },
589                    SqlServerColumnDecodeType::NaiveDateTime,
590                ),
591                "datetimeoffset" => (
592                    SqlScalarType::TimestampTz { precision },
593                    SqlServerColumnDecodeType::DateTime,
594                ),
595                other => unreachable!("'{other}' checked above"),
596            }
597        }
598        "uniqueidentifier" => (SqlScalarType::Uuid, SqlServerColumnDecodeType::Uuid),
599        // TODO(sql_server3): Support reading the following types, at least as text:
600        //
601        // * geography
602        // * geometry
603        // * json (preview)
604        // * vector (preview)
605        //
606        // None of these types are implemented in `tiberius`, the crate that
607        // provides our SQL Server client, so we'll need to implement support
608        // for decoding them.
609        //
610        // See <https://learn.microsoft.com/en-us/openspecs/windows_protocols/ms-tds/355f7890-6e91-4978-ab76-2ded17ee09bc>.
611        other => {
612            return Err(UnsupportedDataType {
613                column_type: other.to_string(),
614                column_name: raw.name.to_string(),
615                reason: format!("'{other}' is unimplemented"),
616            });
617        }
618    };
619    Ok(scalar)
620}
621
622/// Raw metadata for a column from a table in Microsoft SQL Server.
623///
624/// See: <https://learn.microsoft.com/en-us/sql/relational-databases/system-catalog-views/sys-columns-transact-sql?view=sql-server-ver16>.
625#[derive(Clone, Debug)]
626pub struct SqlServerColumnRaw {
627    /// Name of this column.
628    pub name: Arc<str>,
629    /// Name of the data type.
630    pub data_type: Arc<str>,
631    /// Whether or not the column is nullable.
632    pub is_nullable: bool,
633    /// Maximum length (in bytes) of the column.
634    ///
635    /// For `varchar(max)`, `nvarchar(max)`, `varbinary(max)`, or `xml` this will be `-1`. For
636    /// `text`, `ntext`, and `image` columns this will be 16.
637    ///
638    /// See: <https://learn.microsoft.com/en-us/sql/relational-databases/system-catalog-views/sys-columns-transact-sql?view=sql-server-ver16>.
639    ///
640    /// TODO(sql_server2): Validate this value for `json` columns where were introduced
641    /// Azure SQL 2024.
642    pub max_length: i16,
643    /// Precision of the column, if numeric-based; otherwise 0.
644    pub precision: u8,
645    /// Scale of the columns, if numeric-based; otherwise 0.
646    pub scale: u8,
647    /// Whether the column is computed.
648    pub is_computed: bool,
649}
650
651/// Raw metadata for a table constraint.
652#[derive(Clone, Debug)]
653pub struct SqlServerTableConstraintRaw {
654    pub constraint_name: String,
655    pub constraint_type: String,
656    pub columns: Vec<String>,
657}
658
659/// Rust type that we should use when reading a column from SQL Server.
660#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Arbitrary)]
661pub enum SqlServerColumnDecodeType {
662    Bool,
663    U8,
664    I16,
665    I32,
666    I64,
667    F32,
668    F64,
669    String,
670    Bytes,
671    /// [`uuid::Uuid`].
672    Uuid,
673    /// [`tiberius::numeric::Numeric`].
674    Numeric,
675    /// [`tiberius::xml::XmlData`].
676    Xml,
677    /// [`chrono::NaiveDate`].
678    NaiveDate,
679    /// [`chrono::NaiveTime`].
680    NaiveTime,
681    /// [`chrono::DateTime`].
682    DateTime,
683    /// [`chrono::NaiveDateTime`].
684    NaiveDateTime,
685    /// Decoding this type isn't supported.
686    Unsupported {
687        /// Any additional context as to why this type isn't supported.
688        context: String,
689    },
690}
691
692impl SqlServerColumnDecodeType {
693    /// Decode the column with `name` out of the provided `data`.
694    pub fn decode<'a>(
695        &self,
696        data: &'a tiberius::Row,
697        name: &'a str,
698        column: &'a SqlColumnType,
699        arena: &'a RowArena,
700    ) -> Result<Datum<'a>, SqlServerDecodeError> {
701        let maybe_datum = match (&column.scalar_type, self) {
702            (SqlScalarType::Bool, SqlServerColumnDecodeType::Bool) => data
703                .try_get(name)
704                .map_err(|_| SqlServerDecodeError::invalid_column(name, "bool"))?
705                .map(|val: bool| if val { Datum::True } else { Datum::False }),
706            (SqlScalarType::Int16, SqlServerColumnDecodeType::U8) => data
707                .try_get(name)
708                .map_err(|_| SqlServerDecodeError::invalid_column(name, "u8"))?
709                .map(|val: u8| Datum::Int16(i16::cast_from(val))),
710            (SqlScalarType::Int16, SqlServerColumnDecodeType::I16) => data
711                .try_get(name)
712                .map_err(|_| SqlServerDecodeError::invalid_column(name, "i16"))?
713                .map(Datum::Int16),
714            (SqlScalarType::Int32, SqlServerColumnDecodeType::I32) => data
715                .try_get(name)
716                .map_err(|_| SqlServerDecodeError::invalid_column(name, "i32"))?
717                .map(Datum::Int32),
718            (SqlScalarType::Int64, SqlServerColumnDecodeType::I64) => data
719                .try_get(name)
720                .map_err(|_| SqlServerDecodeError::invalid_column(name, "i64"))?
721                .map(Datum::Int64),
722            (SqlScalarType::Float32, SqlServerColumnDecodeType::F32) => data
723                .try_get(name)
724                .map_err(|_| SqlServerDecodeError::invalid_column(name, "f32"))?
725                .map(|val: f32| Datum::Float32(ordered_float::OrderedFloat(val))),
726            (SqlScalarType::Float64, SqlServerColumnDecodeType::F64) => data
727                .try_get(name)
728                .map_err(|_| SqlServerDecodeError::invalid_column(name, "f64"))?
729                .map(|val: f64| Datum::Float64(ordered_float::OrderedFloat(val))),
730            (SqlScalarType::String, SqlServerColumnDecodeType::String) => data
731                .try_get(name)
732                .map_err(|_| SqlServerDecodeError::invalid_column(name, "string"))?
733                .map(Datum::String),
734            (SqlScalarType::Char { length }, SqlServerColumnDecodeType::String) => data
735                .try_get(name)
736                .map_err(|_| SqlServerDecodeError::invalid_column(name, "char"))?
737                .map(|val: &str| match length {
738                    Some(expected) => {
739                        let found_chars = val.chars().count();
740                        let expct_chars = usize::cast_from(expected.into_u32());
741                        if found_chars != expct_chars {
742                            Err(SqlServerDecodeError::invalid_char(
743                                name,
744                                expct_chars,
745                                found_chars,
746                            ))
747                        } else {
748                            Ok(Datum::String(val))
749                        }
750                    }
751                    None => Ok(Datum::String(val)),
752                })
753                .transpose()?,
754            (SqlScalarType::VarChar { max_length }, SqlServerColumnDecodeType::String) => data
755                .try_get(name)
756                .map_err(|_| SqlServerDecodeError::invalid_column(name, "varchar"))?
757                .map(|val: &str| match max_length {
758                    Some(max) => {
759                        let found_chars = val.chars().count();
760                        let max_chars = usize::cast_from(max.into_u32());
761                        if found_chars > max_chars {
762                            Err(SqlServerDecodeError::invalid_varchar(
763                                name,
764                                max_chars,
765                                found_chars,
766                            ))
767                        } else {
768                            Ok(Datum::String(val))
769                        }
770                    }
771                    None => Ok(Datum::String(val)),
772                })
773                .transpose()?,
774            (SqlScalarType::Bytes, SqlServerColumnDecodeType::Bytes) => data
775                .try_get(name)
776                .map_err(|_| SqlServerDecodeError::invalid_column(name, "bytes"))?
777                .map(Datum::Bytes),
778            (SqlScalarType::Uuid, SqlServerColumnDecodeType::Uuid) => data
779                .try_get(name)
780                .map_err(|_| SqlServerDecodeError::invalid_column(name, "uuid"))?
781                .map(Datum::Uuid),
782            (SqlScalarType::Numeric { .. }, SqlServerColumnDecodeType::Numeric) => data
783                .try_get(name)
784                .map_err(|_| SqlServerDecodeError::invalid_column(name, "numeric"))?
785                .map(|val: tiberius::numeric::Numeric| {
786                    let numeric = tiberius_numeric_to_mz_numeric(val);
787                    Datum::Numeric(OrderedDecimal(numeric))
788                }),
789            (SqlScalarType::String, SqlServerColumnDecodeType::Xml) => data
790                .try_get(name)
791                .map_err(|_| SqlServerDecodeError::invalid_column(name, "xml"))?
792                .map(|val: &tiberius::xml::XmlData| Datum::String(val.as_ref())),
793            (SqlScalarType::Date, SqlServerColumnDecodeType::NaiveDate) => data
794                .try_get(name)
795                .map_err(|_| SqlServerDecodeError::invalid_column(name, "date"))?
796                .map(|val: chrono::NaiveDate| {
797                    let date = val
798                        .try_into()
799                        .map_err(|e| SqlServerDecodeError::invalid_date(name, e))?;
800                    Ok::<_, SqlServerDecodeError>(Datum::Date(date))
801                })
802                .transpose()?,
803            (SqlScalarType::Time, SqlServerColumnDecodeType::NaiveTime) => data
804                .try_get(name)
805                .map_err(|_| SqlServerDecodeError::invalid_column(name, "time"))?
806                .map(|val: chrono::NaiveTime| {
807                    // Postgres' maximum precision is 6 (aka microseconds).
808                    //
809                    // While the Postgres spec supports specifying a precision
810                    // Materialize does not.
811                    let rounded = val.round_subsecs(6);
812                    // Overflowed.
813                    let val = if rounded < val {
814                        val.trunc_subsecs(6)
815                    } else {
816                        val
817                    };
818                    Datum::Time(val)
819                }),
820            (SqlScalarType::Timestamp { precision }, SqlServerColumnDecodeType::NaiveDateTime) => {
821                data.try_get(name)
822                    .map_err(|_| SqlServerDecodeError::invalid_column(name, "timestamp"))?
823                    .map(|val: chrono::NaiveDateTime| {
824                        let ts: CheckedTimestamp<chrono::NaiveDateTime> = val
825                            .try_into()
826                            .map_err(|e| SqlServerDecodeError::invalid_timestamp(name, e))?;
827                        let rounded = ts
828                            .round_to_precision(*precision)
829                            .map_err(|e| SqlServerDecodeError::invalid_timestamp(name, e))?;
830                        Ok::<_, SqlServerDecodeError>(Datum::Timestamp(rounded))
831                    })
832                    .transpose()?
833            }
834            (SqlScalarType::TimestampTz { precision }, SqlServerColumnDecodeType::DateTime) => data
835                .try_get(name)
836                .map_err(|_| SqlServerDecodeError::invalid_column(name, "timestamptz"))?
837                .map(|val: chrono::DateTime<chrono::Utc>| {
838                    let ts: CheckedTimestamp<chrono::DateTime<chrono::Utc>> = val
839                        .try_into()
840                        .map_err(|e| SqlServerDecodeError::invalid_timestamp(name, e))?;
841                    let rounded = ts
842                        .round_to_precision(*precision)
843                        .map_err(|e| SqlServerDecodeError::invalid_timestamp(name, e))?;
844                    Ok::<_, SqlServerDecodeError>(Datum::TimestampTz(rounded))
845                })
846                .transpose()?,
847            // We support mapping any type to a string.
848            (SqlScalarType::String, SqlServerColumnDecodeType::Bool) => data
849                .try_get(name)
850                .map_err(|_| SqlServerDecodeError::invalid_column(name, "bool-text"))?
851                .map(|val: bool| {
852                    if val {
853                        Datum::String("true")
854                    } else {
855                        Datum::String("false")
856                    }
857                }),
858            (SqlScalarType::String, SqlServerColumnDecodeType::U8) => data
859                .try_get(name)
860                .map_err(|_| SqlServerDecodeError::invalid_column(name, "u8-text"))?
861                .map(|val: u8| {
862                    arena.make_datum(|packer| packer.push(Datum::String(&val.to_string())))
863                }),
864            (SqlScalarType::String, SqlServerColumnDecodeType::I16) => data
865                .try_get(name)
866                .map_err(|_| SqlServerDecodeError::invalid_column(name, "i16-text"))?
867                .map(|val: i16| {
868                    arena.make_datum(|packer| packer.push(Datum::String(&val.to_string())))
869                }),
870            (SqlScalarType::String, SqlServerColumnDecodeType::I32) => data
871                .try_get(name)
872                .map_err(|_| SqlServerDecodeError::invalid_column(name, "i32-text"))?
873                .map(|val: i32| {
874                    arena.make_datum(|packer| packer.push(Datum::String(&val.to_string())))
875                }),
876            (SqlScalarType::String, SqlServerColumnDecodeType::I64) => data
877                .try_get(name)
878                .map_err(|_| SqlServerDecodeError::invalid_column(name, "i64-text"))?
879                .map(|val: i64| {
880                    arena.make_datum(|packer| packer.push(Datum::String(&val.to_string())))
881                }),
882            (SqlScalarType::String, SqlServerColumnDecodeType::F32) => data
883                .try_get(name)
884                .map_err(|_| SqlServerDecodeError::invalid_column(name, "f32-text"))?
885                .map(|val: f32| {
886                    arena.make_datum(|packer| packer.push(Datum::String(&val.to_string())))
887                }),
888            (SqlScalarType::String, SqlServerColumnDecodeType::F64) => data
889                .try_get(name)
890                .map_err(|_| SqlServerDecodeError::invalid_column(name, "f64-text"))?
891                .map(|val: f64| {
892                    arena.make_datum(|packer| packer.push(Datum::String(&val.to_string())))
893                }),
894            (SqlScalarType::String, SqlServerColumnDecodeType::Uuid) => data
895                .try_get(name)
896                .map_err(|_| SqlServerDecodeError::invalid_column(name, "uuid-text"))?
897                .map(|val: uuid::Uuid| {
898                    arena.make_datum(|packer| packer.push(Datum::String(&val.to_string())))
899                }),
900            (SqlScalarType::String, SqlServerColumnDecodeType::Bytes) => data
901                .try_get(name)
902                .map_err(|_| SqlServerDecodeError::invalid_column(name, "bytes-text"))?
903                .map(|val: &[u8]| {
904                    let encoded = base64::engine::general_purpose::STANDARD.encode(val);
905                    arena.make_datum(|packer| packer.push(Datum::String(&encoded)))
906                }),
907            (SqlScalarType::String, SqlServerColumnDecodeType::Numeric) => data
908                .try_get(name)
909                .map_err(|_| SqlServerDecodeError::invalid_column(name, "numeric-text"))?
910                .map(|val: tiberius::numeric::Numeric| {
911                    arena.make_datum(|packer| packer.push(Datum::String(&val.to_string())))
912                }),
913            (SqlScalarType::String, SqlServerColumnDecodeType::NaiveDate) => data
914                .try_get(name)
915                .map_err(|_| SqlServerDecodeError::invalid_column(name, "naivedate-text"))?
916                .map(|val: chrono::NaiveDate| {
917                    arena.make_datum(|packer| packer.push(Datum::String(&val.to_string())))
918                }),
919            (SqlScalarType::String, SqlServerColumnDecodeType::NaiveTime) => data
920                .try_get(name)
921                .map_err(|_| SqlServerDecodeError::invalid_column(name, "naivetime-text"))?
922                .map(|val: chrono::NaiveTime| {
923                    arena.make_datum(|packer| packer.push(Datum::String(&val.to_string())))
924                }),
925            (SqlScalarType::String, SqlServerColumnDecodeType::DateTime) => data
926                .try_get(name)
927                .map_err(|_| SqlServerDecodeError::invalid_column(name, "datetime-text"))?
928                .map(|val: chrono::DateTime<chrono::Utc>| {
929                    arena.make_datum(|packer| packer.push(Datum::String(&val.to_string())))
930                }),
931            (SqlScalarType::String, SqlServerColumnDecodeType::NaiveDateTime) => data
932                .try_get(name)
933                .map_err(|_| SqlServerDecodeError::invalid_column(name, "naivedatetime-text"))?
934                .map(|val: chrono::NaiveDateTime| {
935                    arena.make_datum(|packer| packer.push(Datum::String(&val.to_string())))
936                }),
937            (column_type, decode_type) => {
938                return Err(SqlServerDecodeError::Unsupported {
939                    sql_server_type: decode_type.clone(),
940                    mz_type: column_type.clone(),
941                });
942            }
943        };
944
945        match (maybe_datum, column.nullable) {
946            (Some(datum), _) => Ok(datum),
947            (None, true) => Ok(Datum::Null),
948            (None, false) => Err(SqlServerDecodeError::InvalidData {
949                column_name: name.to_string(),
950                // Note: This error string is durably recorded in Persist, do not change.
951                error: "found Null in non-nullable column".to_string(),
952            }),
953        }
954    }
955}
956
957impl RustType<proto_sql_server_column_desc::DecodeType> for SqlServerColumnDecodeType {
958    fn into_proto(&self) -> proto_sql_server_column_desc::DecodeType {
959        match self {
960            SqlServerColumnDecodeType::Bool => proto_sql_server_column_desc::DecodeType::Bool(()),
961            SqlServerColumnDecodeType::U8 => proto_sql_server_column_desc::DecodeType::U8(()),
962            SqlServerColumnDecodeType::I16 => proto_sql_server_column_desc::DecodeType::I16(()),
963            SqlServerColumnDecodeType::I32 => proto_sql_server_column_desc::DecodeType::I32(()),
964            SqlServerColumnDecodeType::I64 => proto_sql_server_column_desc::DecodeType::I64(()),
965            SqlServerColumnDecodeType::F32 => proto_sql_server_column_desc::DecodeType::F32(()),
966            SqlServerColumnDecodeType::F64 => proto_sql_server_column_desc::DecodeType::F64(()),
967            SqlServerColumnDecodeType::String => {
968                proto_sql_server_column_desc::DecodeType::String(())
969            }
970            SqlServerColumnDecodeType::Bytes => proto_sql_server_column_desc::DecodeType::Bytes(()),
971            SqlServerColumnDecodeType::Uuid => proto_sql_server_column_desc::DecodeType::Uuid(()),
972            SqlServerColumnDecodeType::Numeric => {
973                proto_sql_server_column_desc::DecodeType::Numeric(())
974            }
975            SqlServerColumnDecodeType::Xml => proto_sql_server_column_desc::DecodeType::Xml(()),
976            SqlServerColumnDecodeType::NaiveDate => {
977                proto_sql_server_column_desc::DecodeType::NaiveDate(())
978            }
979            SqlServerColumnDecodeType::NaiveTime => {
980                proto_sql_server_column_desc::DecodeType::NaiveTime(())
981            }
982            SqlServerColumnDecodeType::DateTime => {
983                proto_sql_server_column_desc::DecodeType::DateTime(())
984            }
985            SqlServerColumnDecodeType::NaiveDateTime => {
986                proto_sql_server_column_desc::DecodeType::NaiveDateTime(())
987            }
988            SqlServerColumnDecodeType::Unsupported { context } => {
989                proto_sql_server_column_desc::DecodeType::Unsupported(context.clone())
990            }
991        }
992    }
993
994    fn from_proto(
995        proto: proto_sql_server_column_desc::DecodeType,
996    ) -> Result<Self, mz_proto::TryFromProtoError> {
997        let val = match proto {
998            proto_sql_server_column_desc::DecodeType::Bool(()) => SqlServerColumnDecodeType::Bool,
999            proto_sql_server_column_desc::DecodeType::U8(()) => SqlServerColumnDecodeType::U8,
1000            proto_sql_server_column_desc::DecodeType::I16(()) => SqlServerColumnDecodeType::I16,
1001            proto_sql_server_column_desc::DecodeType::I32(()) => SqlServerColumnDecodeType::I32,
1002            proto_sql_server_column_desc::DecodeType::I64(()) => SqlServerColumnDecodeType::I64,
1003            proto_sql_server_column_desc::DecodeType::F32(()) => SqlServerColumnDecodeType::F32,
1004            proto_sql_server_column_desc::DecodeType::F64(()) => SqlServerColumnDecodeType::F64,
1005            proto_sql_server_column_desc::DecodeType::String(()) => {
1006                SqlServerColumnDecodeType::String
1007            }
1008            proto_sql_server_column_desc::DecodeType::Bytes(()) => SqlServerColumnDecodeType::Bytes,
1009            proto_sql_server_column_desc::DecodeType::Uuid(()) => SqlServerColumnDecodeType::Uuid,
1010            proto_sql_server_column_desc::DecodeType::Numeric(()) => {
1011                SqlServerColumnDecodeType::Numeric
1012            }
1013            proto_sql_server_column_desc::DecodeType::Xml(()) => SqlServerColumnDecodeType::Xml,
1014            proto_sql_server_column_desc::DecodeType::NaiveDate(()) => {
1015                SqlServerColumnDecodeType::NaiveDate
1016            }
1017            proto_sql_server_column_desc::DecodeType::NaiveTime(()) => {
1018                SqlServerColumnDecodeType::NaiveTime
1019            }
1020            proto_sql_server_column_desc::DecodeType::DateTime(()) => {
1021                SqlServerColumnDecodeType::DateTime
1022            }
1023            proto_sql_server_column_desc::DecodeType::NaiveDateTime(()) => {
1024                SqlServerColumnDecodeType::NaiveDateTime
1025            }
1026            proto_sql_server_column_desc::DecodeType::Unsupported(context) => {
1027                SqlServerColumnDecodeType::Unsupported { context }
1028            }
1029        };
1030        Ok(val)
1031    }
1032}
1033
1034/// Numerics in SQL Server have a maximum precision of 38 digits, where [`Numeric`]s in
1035/// Materialize have a maximum precision of 39 digits, so this conversion is infallible.
1036fn tiberius_numeric_to_mz_numeric(val: tiberius::numeric::Numeric) -> Numeric {
1037    let mut numeric = mz_repr::adt::numeric::cx_datum().from_i128(val.value());
1038    // Use scaleb to adjust the exponent directly, avoiding precision loss from division
1039    // scaleb(x, -n) computes x * 10^(-n)
1040    mz_repr::adt::numeric::cx_datum().scaleb(&mut numeric, &Numeric::from(-i32::from(val.scale())));
1041    numeric
1042}
1043
1044/// A decoder from [`tiberius::Row`] to [`mz_repr::Row`].
1045///
1046/// The goal of this type is to perform any expensive "downcasts" so in the hot
1047/// path of decoding rows we do the minimal amount of work.
1048#[derive(Debug)]
1049pub struct SqlServerRowDecoder {
1050    decoders: Vec<(Arc<str>, SqlColumnType, SqlServerColumnDecodeType)>,
1051}
1052
1053impl SqlServerRowDecoder {
1054    /// Try to create a [`SqlServerRowDecoder`] that will decode [`tiberius::Row`]s that match
1055    /// the shape of the provided [`SqlServerTableDesc`], to [`mz_repr::Row`]s that match the
1056    /// shape of the provided [`RelationDesc`].
1057    pub fn try_new(
1058        table: &SqlServerTableDesc,
1059        desc: &RelationDesc,
1060    ) -> Result<Self, SqlServerError> {
1061        let decoders = desc
1062            .iter()
1063            .map(|(col_name, col_type)| {
1064                let sql_server_col = table
1065                    .columns
1066                    .iter()
1067                    .find(|col| col.name.as_ref() == col_name.as_str())
1068                    .ok_or_else(|| {
1069                        // TODO(sql_server2): Structured Error.
1070                        anyhow::anyhow!("no SQL Server column with name {col_name} found")
1071                    })?;
1072                let Some(sql_server_col_typ) = sql_server_col.column_type.as_ref() else {
1073                    return Err(SqlServerError::ProgrammingError(format!(
1074                        "programming error, {col_name} should have been exluded",
1075                    )));
1076                };
1077
1078                // This shouldn't be true, but be defensive.
1079                //
1080                // TODO(sql_server2): Maybe allow the Materialize column type to be
1081                // more nullable than our decoding type?
1082                //
1083                // Sad. Our timestamp types don't roundtrip their precision through
1084                // parsing so we ignore the mismatch here.
1085                let matches = match (&sql_server_col_typ.scalar_type, &col_type.scalar_type) {
1086                    (SqlScalarType::Timestamp { .. }, SqlScalarType::Timestamp { .. })
1087                    | (SqlScalarType::TimestampTz { .. }, SqlScalarType::TimestampTz { .. }) => {
1088                        // Types match so check nullability.
1089                        sql_server_col_typ.nullable == col_type.nullable
1090                    }
1091                    (_, _) => sql_server_col_typ == col_type,
1092                };
1093                if !matches {
1094                    return Err(SqlServerError::ProgrammingError(format!(
1095                        "programming error, {col_name} has mismatched type {:?} vs {:?}",
1096                        sql_server_col.column_type, col_type
1097                    )));
1098                }
1099
1100                let name = Arc::clone(&sql_server_col.name);
1101                let decoder = sql_server_col.decode_type.clone();
1102                // Note: We specifically use the `SqlColumnType` from the SqlServerTableDesc
1103                // because it retains precision.
1104                //
1105                // See: <https://github.com/MaterializeInc/database-issues/issues/3179>.
1106                let col_typ = sql_server_col_typ.clone();
1107
1108                Ok::<_, SqlServerError>((name, col_typ, decoder))
1109            })
1110            .collect::<Result<_, _>>()?;
1111
1112        Ok(SqlServerRowDecoder { decoders })
1113    }
1114
1115    /// Decode data from the provided [`tiberius::Row`] into the provided [`Row`].
1116    pub fn decode(
1117        &self,
1118        data: &tiberius::Row,
1119        row: &mut Row,
1120        arena: &RowArena,
1121    ) -> Result<(), SqlServerDecodeError> {
1122        let mut packer = row.packer();
1123        for (col_name, col_type, decoder) in &self.decoders {
1124            let datum = decoder.decode(data, col_name, col_type, arena)?;
1125            packer.push(datum);
1126        }
1127        Ok(())
1128    }
1129}
1130
1131#[cfg(test)]
1132mod tests {
1133    use std::collections::BTreeSet;
1134    use std::sync::Arc;
1135
1136    use chrono::NaiveDateTime;
1137    use itertools::Itertools;
1138    use mz_ore::assert_contains;
1139    use mz_ore::collections::CollectionExt;
1140    use mz_repr::adt::numeric::NumericMaxScale;
1141    use mz_repr::adt::varchar::VarCharMaxLength;
1142    use mz_repr::{Datum, RelationDesc, Row, RowArena, SqlScalarType};
1143    use tiberius::RowTestExt;
1144
1145    use crate::desc::{
1146        SqlServerCaptureInstanceRaw, SqlServerColumnDecodeType, SqlServerColumnDesc,
1147        SqlServerTableDesc, SqlServerTableRaw, tiberius_numeric_to_mz_numeric,
1148    };
1149
1150    use super::SqlServerColumnRaw;
1151
1152    impl SqlServerColumnRaw {
1153        /// Create a new [`SqlServerColumnRaw`]. The specified `data_type` is
1154        /// _not_ checked for validity.
1155        fn new(name: &str, data_type: &str) -> Self {
1156            SqlServerColumnRaw {
1157                name: name.into(),
1158                data_type: data_type.into(),
1159                is_nullable: false,
1160                max_length: 0,
1161                precision: 0,
1162                scale: 0,
1163                is_computed: false,
1164            }
1165        }
1166
1167        fn nullable(mut self, nullable: bool) -> Self {
1168            self.is_nullable = nullable;
1169            self
1170        }
1171
1172        fn max_length(mut self, max_length: i16) -> Self {
1173            self.max_length = max_length;
1174            self
1175        }
1176
1177        fn precision(mut self, precision: u8) -> Self {
1178            self.precision = precision;
1179            self
1180        }
1181
1182        fn scale(mut self, scale: u8) -> Self {
1183            self.scale = scale;
1184            self
1185        }
1186    }
1187
1188    #[mz_ore::test]
1189    fn smoketest_column_raw() {
1190        let raw = SqlServerColumnRaw::new("foo", "bit");
1191        let col = SqlServerColumnDesc::new(&raw);
1192
1193        assert_eq!(&*col.name, "foo");
1194        assert_eq!(col.column_type, Some(SqlScalarType::Bool.nullable(false)));
1195        assert_eq!(col.decode_type, SqlServerColumnDecodeType::Bool);
1196
1197        let raw = SqlServerColumnRaw::new("foo", "decimal")
1198            .precision(20)
1199            .scale(10);
1200        let col = SqlServerColumnDesc::new(&raw);
1201
1202        let col_type = SqlScalarType::Numeric {
1203            max_scale: Some(NumericMaxScale::try_from(10i64).expect("known valid")),
1204        }
1205        .nullable(false);
1206        assert_eq!(col.column_type, Some(col_type));
1207        assert_eq!(col.decode_type, SqlServerColumnDecodeType::Numeric);
1208    }
1209
1210    #[mz_ore::test]
1211    fn smoketest_column_raw_invalid() {
1212        let raw = SqlServerColumnRaw::new("foo", "bad_data_type");
1213        let desc = SqlServerColumnDesc::new(&raw);
1214        let SqlServerColumnDecodeType::Unsupported { context } = desc.decode_type else {
1215            panic!("unexpected decode type {desc:?}");
1216        };
1217        assert_contains!(context, "'bad_data_type' is unimplemented");
1218
1219        let raw = SqlServerColumnRaw::new("foo", "decimal")
1220            .precision(100)
1221            .scale(10);
1222        let desc = SqlServerColumnDesc::new(&raw);
1223        assert!(matches!(
1224            desc.decode_type,
1225            SqlServerColumnDecodeType::Unsupported { .. }
1226        ));
1227
1228        let raw = SqlServerColumnRaw::new("foo", "varchar").max_length(-1);
1229        let desc = SqlServerColumnDesc::new(&raw);
1230        let SqlServerColumnDecodeType::Unsupported { context } = desc.decode_type else {
1231            panic!("unexpected decode type {desc:?}");
1232        };
1233        assert_contains!(context, "columns with unlimited size do not support CDC");
1234    }
1235
1236    #[mz_ore::test]
1237    fn smoketest_decoder() {
1238        let sql_server_columns = [
1239            SqlServerColumnRaw::new("a", "varchar").max_length(16),
1240            SqlServerColumnRaw::new("b", "int").nullable(true),
1241            SqlServerColumnRaw::new("c", "bit"),
1242        ];
1243        let sql_server_desc = SqlServerTableRaw {
1244            schema_name: "my_schema".into(),
1245            name: "my_table".into(),
1246            capture_instance: Arc::new(SqlServerCaptureInstanceRaw {
1247                name: "my_table_CT".into(),
1248                create_date: NaiveDateTime::parse_from_str(
1249                    "2024-01-01 00:00:00",
1250                    "%Y-%m-%d %H:%M:%S",
1251                )
1252                .unwrap()
1253                .into(),
1254            }),
1255            columns: sql_server_columns.into(),
1256        };
1257        let sql_server_desc = SqlServerTableDesc::new(sql_server_desc, vec![]).unwrap();
1258
1259        let max_length = Some(VarCharMaxLength::try_from(16).unwrap());
1260        let relation_desc = RelationDesc::builder()
1261            .with_column("a", SqlScalarType::VarChar { max_length }.nullable(false))
1262            // Note: In the upstream table 'c' is ordered after 'b'.
1263            .with_column("c", SqlScalarType::Bool.nullable(false))
1264            .with_column("b", SqlScalarType::Int32.nullable(true))
1265            .finish();
1266
1267        // This decoder should shape the SQL Server Rows into Rows compatible with the RelationDesc.
1268        let decoder = sql_server_desc
1269            .decoder(&relation_desc)
1270            .expect("known valid");
1271
1272        let sql_server_columns = [
1273            tiberius::Column::new("a".to_string(), tiberius::ColumnType::BigVarChar),
1274            tiberius::Column::new("b".to_string(), tiberius::ColumnType::Int4),
1275            tiberius::Column::new("c".to_string(), tiberius::ColumnType::Bit),
1276        ];
1277
1278        let data_a = [
1279            tiberius::ColumnData::String(Some("hello world".into())),
1280            tiberius::ColumnData::I32(Some(42)),
1281            tiberius::ColumnData::Bit(Some(true)),
1282        ];
1283        let sql_server_row_a = tiberius::Row::build(
1284            sql_server_columns
1285                .iter()
1286                .cloned()
1287                .zip_eq(data_a.into_iter()),
1288        );
1289
1290        let data_b = [
1291            tiberius::ColumnData::String(Some("foo bar".into())),
1292            tiberius::ColumnData::I32(None),
1293            tiberius::ColumnData::Bit(Some(false)),
1294        ];
1295        let sql_server_row_b =
1296            tiberius::Row::build(sql_server_columns.into_iter().zip_eq(data_b.into_iter()));
1297
1298        let mut rnd_row = Row::default();
1299        let arena = RowArena::default();
1300
1301        decoder
1302            .decode(&sql_server_row_a, &mut rnd_row, &arena)
1303            .unwrap();
1304        assert_eq!(
1305            &rnd_row,
1306            &Row::pack_slice(&[Datum::String("hello world"), Datum::True, Datum::Int32(42)])
1307        );
1308
1309        decoder
1310            .decode(&sql_server_row_b, &mut rnd_row, &arena)
1311            .unwrap();
1312        assert_eq!(
1313            &rnd_row,
1314            &Row::pack_slice(&[Datum::String("foo bar"), Datum::False, Datum::Null])
1315        );
1316    }
1317
1318    #[mz_ore::test]
1319    fn smoketest_decode_to_string() {
1320        #[track_caller]
1321        fn testcase(
1322            data_type: &'static str,
1323            col_type: tiberius::ColumnType,
1324            col_data: tiberius::ColumnData<'static>,
1325        ) {
1326            let columns = [SqlServerColumnRaw::new("a", data_type)];
1327            let sql_server_desc = SqlServerTableRaw {
1328                schema_name: "my_schema".into(),
1329                name: "my_table".into(),
1330                capture_instance: Arc::new(SqlServerCaptureInstanceRaw {
1331                    name: "my_table_CT".into(),
1332                    create_date: NaiveDateTime::parse_from_str(
1333                        "2024-01-01 00:00:00",
1334                        "%Y-%m-%d %H:%M:%S",
1335                    )
1336                    .unwrap()
1337                    .into(),
1338                }),
1339                columns: columns.into(),
1340            };
1341            let mut sql_server_desc = SqlServerTableDesc::new(sql_server_desc, vec![]).unwrap();
1342            sql_server_desc.apply_text_columns(&BTreeSet::from(["a"]));
1343
1344            // We should support decoding every datatype to a string.
1345            let relation_desc = RelationDesc::builder()
1346                .with_column("a", SqlScalarType::String.nullable(false))
1347                .finish();
1348
1349            // This decoder should shape the SQL Server Rows into Rows compatible with the RelationDesc.
1350            let decoder = sql_server_desc
1351                .decoder(&relation_desc)
1352                .expect("known valid");
1353
1354            let sql_server_row = tiberius::Row::build([(
1355                tiberius::Column::new("a".to_string(), col_type),
1356                col_data,
1357            )]);
1358            let mut mz_row = Row::default();
1359            let arena = RowArena::new();
1360            decoder
1361                .decode(&sql_server_row, &mut mz_row, &arena)
1362                .unwrap();
1363
1364            let str_datum = mz_row.into_element();
1365            assert!(matches!(str_datum, Datum::String(_)));
1366        }
1367
1368        use tiberius::ColumnData;
1369
1370        testcase(
1371            "bit",
1372            tiberius::ColumnType::Bit,
1373            ColumnData::Bit(Some(true)),
1374        );
1375        testcase(
1376            "bit",
1377            tiberius::ColumnType::Bit,
1378            ColumnData::Bit(Some(false)),
1379        );
1380        testcase(
1381            "tinyint",
1382            tiberius::ColumnType::Int1,
1383            ColumnData::U8(Some(33)),
1384        );
1385        testcase(
1386            "smallint",
1387            tiberius::ColumnType::Int2,
1388            ColumnData::I16(Some(101)),
1389        );
1390        testcase(
1391            "int",
1392            tiberius::ColumnType::Int4,
1393            ColumnData::I32(Some(-42)),
1394        );
1395        {
1396            let datetime = tiberius::time::DateTime::new(10, 300);
1397            testcase(
1398                "datetime",
1399                tiberius::ColumnType::Datetime,
1400                ColumnData::DateTime(Some(datetime)),
1401            );
1402        }
1403    }
1404
1405    #[mz_ore::test]
1406    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `decNumberFromInt32` on OS `linux`
1407    fn smoketest_numeric_conversion() {
1408        let a = tiberius::numeric::Numeric::new_with_scale(12345, 2);
1409        let rnd = tiberius_numeric_to_mz_numeric(a);
1410        let og = mz_repr::adt::numeric::cx_datum().parse("123.45").unwrap();
1411        assert_eq!(og, rnd);
1412
1413        let a = tiberius::numeric::Numeric::new_with_scale(-99999, 5);
1414        let rnd = tiberius_numeric_to_mz_numeric(a);
1415        let og = mz_repr::adt::numeric::cx_datum().parse("-.99999").unwrap();
1416        assert_eq!(og, rnd);
1417
1418        let a = tiberius::numeric::Numeric::new_with_scale(1, 29);
1419        let rnd = tiberius_numeric_to_mz_numeric(a);
1420        let og = mz_repr::adt::numeric::cx_datum()
1421            .parse("0.00000000000000000000000000001")
1422            .unwrap();
1423        assert_eq!(og, rnd);
1424
1425        let a = tiberius::numeric::Numeric::new_with_scale(-111111111111111111, 0);
1426        let rnd = tiberius_numeric_to_mz_numeric(a);
1427        let og = mz_repr::adt::numeric::cx_datum()
1428            .parse("-111111111111111111")
1429            .unwrap();
1430        assert_eq!(og, rnd);
1431    }
1432
1433    // TODO(sql_server2): Proptest the decoder.
1434}