tiberius/tds/codec/
column_data.rs

1mod binary;
2mod bit;
3mod bytes_mut_with_type_info;
4#[cfg(feature = "tds73")]
5mod date;
6#[cfg(feature = "tds73")]
7mod datetime2;
8mod datetimen;
9#[cfg(feature = "tds73")]
10mod datetimeoffsetn;
11mod fixed_len;
12mod float;
13mod guid;
14mod image;
15mod int;
16mod money;
17mod plp;
18mod string;
19mod text;
20#[cfg(feature = "tds73")]
21mod time;
22mod var_len;
23mod xml;
24
25use super::{Encode, FixedLenType, TypeInfo, VarLenType};
26#[cfg(feature = "tds73")]
27use crate::tds::time::{Date, DateTime2, DateTimeOffset, Time};
28use crate::{
29    tds::{time::DateTime, time::SmallDateTime, xml::XmlData, Numeric},
30    SqlReadBytes,
31};
32use bytes::BufMut;
33pub(crate) use bytes_mut_with_type_info::BytesMutWithTypeInfo;
34use std::borrow::{BorrowMut, Cow};
35use uuid::Uuid;
36
37const MAX_NVARCHAR_SIZE: usize = 1 << 30;
38
39#[derive(Clone, Debug, PartialEq)]
40/// A container of a value that can be represented as a TDS value.
41pub enum ColumnData<'a> {
42    /// 8-bit integer, unsigned.
43    U8(Option<u8>),
44    /// 16-bit integer, signed.
45    I16(Option<i16>),
46    /// 32-bit integer, signed.
47    I32(Option<i32>),
48    /// 64-bit integer, signed.
49    I64(Option<i64>),
50    /// 32-bit floating point number.
51    F32(Option<f32>),
52    /// 64-bit floating point number.
53    F64(Option<f64>),
54    /// Boolean.
55    Bit(Option<bool>),
56    /// A string value.
57    String(Option<Cow<'a, str>>),
58    /// A Guid (UUID) value.
59    Guid(Option<Uuid>),
60    /// Binary data.
61    Binary(Option<Cow<'a, [u8]>>),
62    /// Numeric value (a decimal).
63    Numeric(Option<Numeric>),
64    /// XML data.
65    Xml(Option<Cow<'a, XmlData>>),
66    /// DateTime value.
67    DateTime(Option<DateTime>),
68    /// A small DateTime value.
69    SmallDateTime(Option<SmallDateTime>),
70    #[cfg(feature = "tds73")]
71    #[cfg_attr(feature = "docs", doc(cfg(feature = "tds73")))]
72    /// Time value.
73    Time(Option<Time>),
74    #[cfg(feature = "tds73")]
75    #[cfg_attr(feature = "docs", doc(cfg(feature = "tds73")))]
76    /// Date value.
77    Date(Option<Date>),
78    #[cfg(feature = "tds73")]
79    #[cfg_attr(feature = "docs", doc(cfg(feature = "tds73")))]
80    /// DateTime2 value.
81    DateTime2(Option<DateTime2>),
82    #[cfg(feature = "tds73")]
83    #[cfg_attr(feature = "docs", doc(cfg(feature = "tds73")))]
84    /// DateTime2 value with an offset.
85    DateTimeOffset(Option<DateTimeOffset>),
86}
87
88impl<'a> ColumnData<'a> {
89    pub(crate) fn type_name(&self) -> Cow<'static, str> {
90        match self {
91            ColumnData::U8(_) => "tinyint".into(),
92            ColumnData::I16(_) => "smallint".into(),
93            ColumnData::I32(_) => "int".into(),
94            ColumnData::I64(_) => "bigint".into(),
95            ColumnData::F32(_) => "float(24)".into(),
96            ColumnData::F64(_) => "float(53)".into(),
97            ColumnData::Bit(_) => "bit".into(),
98            ColumnData::String(None) => "nvarchar(4000)".into(),
99            ColumnData::String(Some(ref s)) if s.len() <= 4000 => "nvarchar(4000)".into(),
100            ColumnData::String(Some(ref s)) if s.len() <= MAX_NVARCHAR_SIZE => {
101                "nvarchar(max)".into()
102            }
103            ColumnData::String(_) => "ntext(max)".into(),
104            ColumnData::Guid(_) => "uniqueidentifier".into(),
105            ColumnData::Binary(Some(ref b)) if b.len() <= 8000 => "varbinary(8000)".into(),
106            ColumnData::Binary(_) => "varbinary(max)".into(),
107            ColumnData::Numeric(Some(ref n)) => {
108                format!("numeric({},{})", n.precision(), n.scale()).into()
109            }
110            ColumnData::Numeric(None) => "numeric".into(),
111            ColumnData::Xml(_) => "xml".into(),
112            ColumnData::DateTime(_) => "datetime".into(),
113            ColumnData::SmallDateTime(_) => "smalldatetime".into(),
114            #[cfg(feature = "tds73")]
115            ColumnData::Time(_) => "time".into(),
116            #[cfg(feature = "tds73")]
117            ColumnData::Date(_) => "date".into(),
118            #[cfg(feature = "tds73")]
119            ColumnData::DateTime2(_) => "datetime2".into(),
120            #[cfg(feature = "tds73")]
121            ColumnData::DateTimeOffset(_) => "datetimeoffset".into(),
122        }
123    }
124
125    pub(crate) async fn decode<R>(src: &mut R, ctx: &TypeInfo) -> crate::Result<ColumnData<'a>>
126    where
127        R: SqlReadBytes + Unpin,
128    {
129        let res = match ctx {
130            TypeInfo::FixedLen(fixed_ty) => fixed_len::decode(src, fixed_ty).await?,
131            TypeInfo::VarLenSized(cx) => var_len::decode(src, cx).await?,
132            TypeInfo::VarLenSizedPrecision { ty, scale, .. } => match ty {
133                VarLenType::Decimaln | VarLenType::Numericn => {
134                    ColumnData::Numeric(Numeric::decode(src, *scale).await?)
135                }
136                _ => todo!(),
137            },
138            TypeInfo::Xml { schema, size } => xml::decode(src, *size, schema.clone()).await?,
139        };
140
141        Ok(res)
142    }
143}
144
145impl<'a> Encode<BytesMutWithTypeInfo<'a>> for ColumnData<'a> {
146    fn encode(self, dst: &mut BytesMutWithTypeInfo<'a>) -> crate::Result<()> {
147        match (self, dst.type_info()) {
148            (ColumnData::Bit(opt), Some(TypeInfo::VarLenSized(vlc)))
149                if vlc.r#type() == VarLenType::Bitn =>
150            {
151                if let Some(val) = opt {
152                    dst.put_u8(1);
153                    dst.put_u8(val as u8);
154                } else {
155                    dst.put_u8(0);
156                }
157            }
158            (ColumnData::Bit(Some(val)), Some(TypeInfo::FixedLen(FixedLenType::Bit))) => {
159                dst.put_u8(val as u8);
160            }
161            (ColumnData::Bit(Some(val)), None) => {
162                // if TypeInfo was not given, encode a TypeInfo
163                // the first 1 is part of TYPE_INFO
164                // the second 1 is part of TYPE_VARBYTE
165                let header = [VarLenType::Bitn as u8, 1, 1];
166                dst.extend_from_slice(&header);
167                dst.put_u8(val as u8);
168            }
169            (ColumnData::U8(opt), Some(TypeInfo::VarLenSized(vlc)))
170                if vlc.r#type() == VarLenType::Intn =>
171            {
172                if let Some(val) = opt {
173                    dst.put_u8(1);
174                    dst.put_u8(val);
175                } else {
176                    dst.put_u8(0);
177                }
178            }
179            (ColumnData::U8(Some(val)), Some(TypeInfo::FixedLen(FixedLenType::Int1))) => {
180                dst.put_u8(val);
181            }
182            (ColumnData::U8(Some(val)), None) => {
183                let header = [VarLenType::Intn as u8, 1, 1];
184                dst.extend_from_slice(&header);
185                dst.put_u8(val);
186            }
187            (ColumnData::I16(Some(val)), Some(TypeInfo::FixedLen(FixedLenType::Int2))) => {
188                dst.put_i16_le(val);
189            }
190            (ColumnData::I16(opt), Some(TypeInfo::VarLenSized(vlc)))
191                if vlc.r#type() == VarLenType::Intn =>
192            {
193                if let Some(val) = opt {
194                    dst.put_u8(2);
195                    dst.put_i16_le(val);
196                } else {
197                    dst.put_u8(0);
198                }
199            }
200            (ColumnData::I16(Some(val)), None) => {
201                let header = [VarLenType::Intn as u8, 2, 2];
202                dst.extend_from_slice(&header);
203
204                dst.put_i16_le(val);
205            }
206            (ColumnData::I32(Some(val)), Some(TypeInfo::FixedLen(FixedLenType::Int4))) => {
207                dst.put_i32_le(val);
208            }
209            (ColumnData::I32(opt), Some(TypeInfo::VarLenSized(vlc)))
210                if vlc.r#type() == VarLenType::Intn =>
211            {
212                if let Some(val) = opt {
213                    dst.put_u8(4);
214                    dst.put_i32_le(val);
215                } else {
216                    dst.put_u8(0);
217                }
218            }
219            (ColumnData::I32(Some(val)), None) => {
220                let header = [VarLenType::Intn as u8, 4, 4];
221                dst.extend_from_slice(&header);
222                dst.put_i32_le(val);
223            }
224            (ColumnData::I64(Some(val)), Some(TypeInfo::FixedLen(FixedLenType::Int8))) => {
225                dst.put_i64_le(val);
226            }
227            (ColumnData::I64(opt), Some(TypeInfo::VarLenSized(vlc)))
228                if vlc.r#type() == VarLenType::Intn =>
229            {
230                if let Some(val) = opt {
231                    dst.put_u8(8);
232                    dst.put_i64_le(val);
233                } else {
234                    dst.put_u8(0);
235                }
236            }
237            (ColumnData::I64(Some(val)), None) => {
238                let header = [VarLenType::Intn as u8, 8, 8];
239                dst.extend_from_slice(&header);
240                dst.put_i64_le(val);
241            }
242            (ColumnData::F32(Some(val)), Some(TypeInfo::FixedLen(FixedLenType::Float4))) => {
243                dst.put_f32_le(val);
244            }
245            (ColumnData::F32(opt), Some(TypeInfo::VarLenSized(vlc)))
246                if vlc.r#type() == VarLenType::Floatn =>
247            {
248                if let Some(val) = opt {
249                    dst.put_u8(4);
250                    dst.put_f32_le(val);
251                } else {
252                    dst.put_u8(0);
253                }
254            }
255            (ColumnData::F32(Some(val)), None) => {
256                let header = [VarLenType::Floatn as u8, 4, 4];
257                dst.extend_from_slice(&header);
258                dst.put_f32_le(val);
259            }
260            (ColumnData::F64(Some(val)), Some(TypeInfo::FixedLen(FixedLenType::Float8))) => {
261                dst.put_f64_le(val);
262            }
263            (ColumnData::F64(opt), Some(TypeInfo::VarLenSized(vlc)))
264                if vlc.r#type() == VarLenType::Floatn =>
265            {
266                if let Some(val) = opt {
267                    dst.put_u8(8);
268                    dst.put_f64_le(val);
269                } else {
270                    dst.put_u8(0);
271                }
272            }
273            (ColumnData::F64(Some(val)), None) => {
274                let header = [VarLenType::Floatn as u8, 8, 8];
275                dst.extend_from_slice(&header);
276                dst.put_f64_le(val);
277            }
278            (ColumnData::Guid(opt), Some(TypeInfo::VarLenSized(vlc)))
279                if vlc.r#type() == VarLenType::Guid =>
280            {
281                if let Some(uuid) = opt {
282                    dst.put_u8(16);
283
284                    let mut data = *uuid.as_bytes();
285                    super::guid::reorder_bytes(&mut data);
286                    dst.extend_from_slice(&data);
287                } else {
288                    dst.put_u8(0);
289                }
290            }
291            (ColumnData::Guid(Some(uuid)), None) => {
292                let header = [VarLenType::Guid as u8, 16, 16];
293                dst.extend_from_slice(&header);
294
295                let mut data = *uuid.as_bytes();
296                super::guid::reorder_bytes(&mut data);
297                dst.extend_from_slice(&data);
298            }
299            (ColumnData::String(opt), Some(TypeInfo::VarLenSized(vlc)))
300                if vlc.r#type() == VarLenType::BigChar
301                    || vlc.r#type() == VarLenType::BigVarChar =>
302            {
303                if let Some(str) = opt {
304                    let mut encoder = vlc.collation().as_ref().unwrap().encoding()?.new_encoder();
305                    let len = encoder
306                        .max_buffer_length_from_utf8_without_replacement(str.len())
307                        .unwrap();
308                    let mut bytes = Vec::with_capacity(len);
309                    let (res, _) = encoder.encode_from_utf8_to_vec_without_replacement(
310                        str.as_ref(),
311                        &mut bytes,
312                        true,
313                    );
314                    if let encoding_rs::EncoderResult::Unmappable(_) = res {
315                        return Err(crate::Error::Encoding("unrepresentable character".into()));
316                    }
317
318                    if bytes.len() > vlc.len() {
319                        return Err(crate::Error::BulkInput(
320                            format!(
321                                "Encoded string length {} exceed column limit {}",
322                                bytes.len(),
323                                vlc.len()
324                            )
325                            .into(),
326                        ));
327                    }
328
329                    if vlc.len() < 0xffff {
330                        dst.put_u16_le(bytes.len() as u16);
331                        dst.extend_from_slice(bytes.as_slice());
332                    } else {
333                        // unknown size
334                        dst.put_u64_le(0xfffffffffffffffe);
335
336                        assert!(
337                            str.len() < 0xffffffff,
338                            "if str longer than this, need to implement multiple blobs"
339                        );
340
341                        dst.put_u32_le(bytes.len() as u32);
342                        dst.extend_from_slice(bytes.as_slice());
343
344                        if !bytes.is_empty() {
345                            // no next blob
346                            dst.put_u32_le(0u32);
347                        }
348                    }
349                } else if vlc.len() < 0xffff {
350                    dst.put_u16_le(0xffff);
351                } else {
352                    dst.put_u64_le(0xffffffffffffffff)
353                }
354            }
355            (ColumnData::String(opt), Some(TypeInfo::VarLenSized(vlc)))
356                if vlc.r#type() == VarLenType::NVarchar || vlc.r#type() == VarLenType::NChar =>
357            {
358                if let Some(str) = opt {
359                    if vlc.len() < 0xffff {
360                        let len_pos = dst.len();
361                        dst.put_u16_le(0u16);
362
363                        for chr in str.encode_utf16() {
364                            dst.put_u16_le(chr);
365                        }
366
367                        let length = dst.len() - len_pos - 2;
368
369                        if length > vlc.len() {
370                            return Err(crate::Error::BulkInput(
371                                format!(
372                                    "Encoded string length {} exceed column limit {}",
373                                    length,
374                                    vlc.len()
375                                )
376                                .into(),
377                            ));
378                        }
379
380                        let dst: &mut [u8] = dst.borrow_mut();
381                        let mut dst = &mut dst[len_pos..];
382                        dst.put_u16_le(length as u16);
383                    } else {
384                        // unknown size
385                        dst.put_u64_le(0xfffffffffffffffe);
386
387                        assert!(
388                            str.len() < 0xffffffff,
389                            "if str longer than this, need to implement multiple blobs"
390                        );
391
392                        let len_pos = dst.len();
393                        dst.put_u32_le(0u32);
394
395                        for chr in str.encode_utf16() {
396                            dst.put_u16_le(chr);
397                        }
398
399                        let length = dst.len() - len_pos - 4;
400
401                        if length > vlc.len() {
402                            return Err(crate::Error::BulkInput(
403                                format!(
404                                    "Encoded string length {} exceed column limit {}",
405                                    length,
406                                    vlc.len()
407                                )
408                                .into(),
409                            ));
410                        }
411
412                        if length > 0 {
413                            // no next blob
414                            dst.put_u32_le(0u32);
415                        }
416
417                        let dst: &mut [u8] = dst.borrow_mut();
418                        let mut dst = &mut dst[len_pos..];
419                        dst.put_u32_le(length as u32);
420                    }
421                } else if vlc.len() < 0xffff {
422                    dst.put_u16_le(0xffff);
423                } else {
424                    dst.put_u64_le(0xffffffffffffffff)
425                }
426            }
427            (ColumnData::String(Some(ref s)), None) if s.len() <= 4000 => {
428                dst.put_u8(VarLenType::NVarchar as u8);
429                dst.put_u16_le(8000);
430                dst.extend_from_slice(&[0u8; 5][..]);
431
432                let mut length = 0u16;
433                let len_pos = dst.len();
434
435                dst.put_u16_le(length);
436
437                for chr in s.encode_utf16() {
438                    length += 1;
439                    dst.put_u16_le(chr);
440                }
441
442                let dst: &mut [u8] = dst.borrow_mut();
443                let bytes = (length * 2).to_le_bytes(); // u16, two bytes
444
445                for (i, byte) in bytes.iter().enumerate() {
446                    dst[len_pos + i] = *byte;
447                }
448            }
449            (ColumnData::String(Some(ref s)), None) => {
450                // length: 0xffff and raw collation
451                dst.put_u8(VarLenType::NVarchar as u8);
452                dst.extend_from_slice(&[0xff_u8; 2]);
453                dst.extend_from_slice(&[0u8; 5]);
454
455                // we cannot cheaply predetermine the length of the UCS2 string beforehand
456                // (2 * bytes(UTF8) is not always right) - so just let the SQL server handle it
457                dst.put_u64_le(0xfffffffffffffffe_u64);
458
459                // Write the varchar length
460                let mut length = 0u32;
461                let len_pos = dst.len();
462
463                dst.put_u32_le(length);
464
465                for chr in s.encode_utf16() {
466                    length += 1;
467                    dst.put_u16_le(chr);
468                }
469
470                if length > 0 {
471                    // PLP_TERMINATOR
472                    dst.put_u32_le(0);
473                }
474
475                let dst: &mut [u8] = dst.borrow_mut();
476                let bytes = (length * 2).to_le_bytes(); // u32, four bytes
477
478                for (i, byte) in bytes.iter().enumerate() {
479                    dst[len_pos + i] = *byte;
480                }
481            }
482            (ColumnData::Binary(opt), Some(TypeInfo::VarLenSized(vlc)))
483                if vlc.r#type() == VarLenType::BigBinary
484                    || vlc.r#type() == VarLenType::BigVarBin =>
485            {
486                if let Some(bytes) = opt {
487                    if bytes.len() > vlc.len() {
488                        return Err(crate::Error::BulkInput(
489                            format!(
490                                "Binary length {} exceed column limit {}",
491                                bytes.len(),
492                                vlc.len()
493                            )
494                            .into(),
495                        ));
496                    }
497
498                    if vlc.len() < 0xffff {
499                        dst.put_u16_le(bytes.len() as u16);
500                        dst.extend(bytes.into_owned());
501                    } else {
502                        // unknown size
503                        dst.put_u64_le(0xfffffffffffffffe);
504                        dst.put_u32_le(bytes.len() as u32);
505
506                        if !bytes.is_empty() {
507                            dst.extend(bytes.into_owned());
508                            dst.put_u32_le(0);
509                        }
510                    }
511                } else if vlc.len() < 0xffff {
512                    dst.put_u16_le(0xffff);
513                } else {
514                    dst.put_u64_le(0xffffffffffffffff);
515                }
516            }
517            (ColumnData::Binary(Some(bytes)), None) if bytes.len() <= 8000 => {
518                dst.put_u8(VarLenType::BigVarBin as u8);
519                dst.put_u16_le(8000);
520                dst.put_u16_le(bytes.len() as u16);
521                dst.extend(bytes.into_owned());
522            }
523            (ColumnData::Binary(Some(bytes)), None) => {
524                dst.put_u8(VarLenType::BigVarBin as u8);
525                // Max length
526                dst.put_u16_le(0xffff_u16);
527                // Also the length is unknown
528                dst.put_u64_le(0xfffffffffffffffe_u64);
529                // We'll write in one chunk, length is the whole bytes length
530                dst.put_u32_le(bytes.len() as u32);
531
532                if !bytes.is_empty() {
533                    // Payload
534                    dst.extend(bytes.into_owned());
535                    // PLP_TERMINATOR
536                    dst.put_u32_le(0);
537                }
538            }
539            (ColumnData::DateTime(opt), Some(TypeInfo::VarLenSized(vlc)))
540                if vlc.r#type() == VarLenType::Datetimen =>
541            {
542                if let Some(dt) = opt {
543                    dst.put_u8(8);
544                    dt.encode(dst)?;
545                } else {
546                    dst.put_u8(0);
547                }
548            }
549            (ColumnData::DateTime(Some(dt)), Some(TypeInfo::FixedLen(FixedLenType::Datetime))) => {
550                dt.encode(dst)?;
551            }
552            (ColumnData::DateTime(Some(dt)), None) => {
553                dst.extend_from_slice(&[VarLenType::Datetimen as u8, 8, 8]);
554                dt.encode(&mut *dst)?;
555            }
556            (ColumnData::SmallDateTime(opt), Some(TypeInfo::VarLenSized(vlc)))
557                if vlc.r#type() == VarLenType::Datetimen =>
558            {
559                if let Some(dt) = opt {
560                    dst.put_u8(4);
561                    dt.encode(dst)?;
562                } else {
563                    dst.put_u8(0);
564                }
565            }
566            (
567                ColumnData::SmallDateTime(Some(dt)),
568                Some(TypeInfo::FixedLen(FixedLenType::Datetime4)),
569            ) => {
570                dt.encode(dst)?;
571            }
572            (ColumnData::SmallDateTime(Some(dt)), None) => {
573                dst.extend_from_slice(&[VarLenType::Datetimen as u8, 4, 4]);
574                dt.encode(&mut *dst)?;
575            }
576            #[cfg(feature = "tds73")]
577            (ColumnData::Date(opt), Some(TypeInfo::VarLenSized(vlc)))
578                if vlc.r#type() == VarLenType::Daten =>
579            {
580                if let Some(dt) = opt {
581                    dst.put_u8(3);
582                    dt.encode(dst)?;
583                } else {
584                    dst.put_u8(0);
585                }
586            }
587            #[cfg(feature = "tds73")]
588            (ColumnData::Date(Some(date)), None) => {
589                dst.extend_from_slice(&[VarLenType::Daten as u8, 3]);
590                date.encode(&mut *dst)?;
591            }
592            #[cfg(feature = "tds73")]
593            (ColumnData::Time(opt), Some(TypeInfo::VarLenSized(vlc)))
594                if vlc.r#type() == VarLenType::Timen =>
595            {
596                if let Some(time) = opt {
597                    dst.put_u8(time.len()?);
598                    time.encode(dst)?;
599                } else {
600                    dst.put_u8(0);
601                }
602            }
603            #[cfg(feature = "tds73")]
604            (ColumnData::Time(Some(time)), None) => {
605                dst.extend_from_slice(&[VarLenType::Timen as u8, time.scale(), time.len()?]);
606                time.encode(&mut *dst)?;
607            }
608            #[cfg(feature = "tds73")]
609            (ColumnData::DateTime2(opt), Some(TypeInfo::VarLenSized(vlc)))
610                if vlc.r#type() == VarLenType::Datetime2 =>
611            {
612                if let Some(mut dt2) = opt {
613                    if dt2.time().scale() != vlc.len() as u8 {
614                        let time = dt2.time();
615                        let increments = (time.increments() as f64
616                            * 10_f64.powi(vlc.len() as i32 - time.scale() as i32))
617                            as u64;
618                        dt2 = DateTime2::new(dt2.date(), Time::new(increments, vlc.len() as u8));
619                    }
620                    dst.put_u8(dt2.time().len()? + 3);
621                    dt2.encode(dst)?;
622                } else {
623                    dst.put_u8(0);
624                }
625            }
626            #[cfg(feature = "tds73")]
627            (ColumnData::DateTime2(Some(dt)), None) => {
628                let len = dt.time().len()? + 3;
629                dst.extend_from_slice(&[VarLenType::Datetime2 as u8, dt.time().scale(), len]);
630                dt.encode(&mut *dst)?;
631            }
632            #[cfg(feature = "tds73")]
633            (ColumnData::DateTimeOffset(opt), Some(TypeInfo::VarLenSized(vlc)))
634                if vlc.r#type() == VarLenType::DatetimeOffsetn =>
635            {
636                if let Some(dto) = opt {
637                    dst.put_u8(dto.datetime2().time().len()? + 5);
638                    dto.encode(dst)?;
639                } else {
640                    dst.put_u8(0);
641                }
642            }
643            #[cfg(feature = "tds73")]
644            (ColumnData::DateTimeOffset(Some(dto)), None) => {
645                let headers = [
646                    VarLenType::DatetimeOffsetn as u8,
647                    dto.datetime2().time().scale(),
648                    dto.datetime2().time().len()? + 5,
649                ];
650
651                dst.extend_from_slice(&headers);
652                dto.encode(&mut *dst)?;
653            }
654            (ColumnData::Xml(opt), Some(TypeInfo::Xml { .. })) => {
655                if let Some(xml) = opt {
656                    xml.into_owned().encode(dst)?;
657                } else {
658                    dst.put_u64_le(0xffffffffffffffff_u64);
659                }
660            }
661            (ColumnData::Xml(Some(xml)), None) => {
662                dst.put_u8(VarLenType::Xml as u8);
663                dst.put_u8(0);
664                xml.into_owned().encode(&mut *dst)?;
665            }
666            (ColumnData::Numeric(opt), Some(TypeInfo::VarLenSizedPrecision { ty, scale, .. }))
667                if ty == &VarLenType::Numericn || ty == &VarLenType::Decimaln =>
668            {
669                if let Some(num) = opt {
670                    if scale != &num.scale() {
671                        todo!("this still need some work, if client scale not aligned with server, we need to do conversion but will lose precision")
672                    }
673                    num.encode(&mut *dst)?;
674                } else {
675                    dst.put_u8(0);
676                }
677            }
678            (ColumnData::Numeric(Some(num)), None) => {
679                let headers = &[
680                    VarLenType::Numericn as u8,
681                    num.len(),
682                    num.precision(),
683                    num.scale(),
684                ];
685
686                dst.extend_from_slice(headers);
687                num.encode(&mut *dst)?;
688            }
689            (_, None) => {
690                // None/null
691                dst.put_u8(FixedLenType::Null as u8);
692            }
693            (v, ref ti) => {
694                return Err(crate::Error::BulkInput(
695                    format!("invalid data type, expecting {:?} but found {:?}", ti, v).into(),
696                ));
697            }
698        }
699
700        Ok(())
701    }
702}
703
704#[cfg(test)]
705mod tests {
706    use super::*;
707    use crate::sql_read_bytes::test_utils::IntoSqlReadBytes;
708    use crate::tds::Collation;
709    use crate::{Error, VarLenContext};
710    use bytes::BytesMut;
711
712    async fn test_round_trip(ti: TypeInfo, d: ColumnData<'_>) {
713        let mut buf = BytesMut::new();
714        let mut buf_with_ti = BytesMutWithTypeInfo::new(&mut buf).with_type_info(&ti);
715
716        d.clone()
717            .encode(&mut buf_with_ti)
718            .expect("encode must succeed");
719
720        let reader = &mut buf.into_sql_read_bytes();
721        let nd = ColumnData::decode(reader, &ti)
722            .await
723            .expect("decode must succeed");
724
725        assert_eq!(nd, d);
726
727        reader
728            .read_u8()
729            .await
730            .expect_err("decode must consume entire buffer");
731    }
732
733    #[tokio::test]
734    async fn i32_with_varlen_int() {
735        test_round_trip(
736            TypeInfo::VarLenSized(VarLenContext::new(VarLenType::Intn, 4, None)),
737            ColumnData::I32(Some(42)),
738        )
739        .await;
740    }
741
742    #[tokio::test]
743    async fn none_with_varlen_int() {
744        test_round_trip(
745            TypeInfo::VarLenSized(VarLenContext::new(VarLenType::Intn, 4, None)),
746            ColumnData::I32(None),
747        )
748        .await;
749    }
750
751    #[tokio::test]
752    async fn i32_with_fixedlen_int() {
753        test_round_trip(
754            TypeInfo::FixedLen(FixedLenType::Int4),
755            ColumnData::I32(Some(42)),
756        )
757        .await;
758    }
759
760    #[tokio::test]
761    async fn bit_with_varlen_bit() {
762        test_round_trip(
763            TypeInfo::VarLenSized(VarLenContext::new(VarLenType::Bitn, 1, None)),
764            ColumnData::Bit(Some(true)),
765        )
766        .await;
767    }
768
769    #[tokio::test]
770    async fn none_with_varlen_bit() {
771        test_round_trip(
772            TypeInfo::VarLenSized(VarLenContext::new(VarLenType::Bitn, 1, None)),
773            ColumnData::Bit(None),
774        )
775        .await;
776    }
777
778    #[tokio::test]
779    async fn bit_with_fixedlen_bit() {
780        test_round_trip(
781            TypeInfo::FixedLen(FixedLenType::Bit),
782            ColumnData::Bit(Some(true)),
783        )
784        .await;
785    }
786
787    #[tokio::test]
788    async fn u8_with_varlen_int() {
789        test_round_trip(
790            TypeInfo::VarLenSized(VarLenContext::new(VarLenType::Intn, 1, None)),
791            ColumnData::U8(Some(8u8)),
792        )
793        .await;
794    }
795
796    #[tokio::test]
797    async fn none_u8_with_varlen_int() {
798        test_round_trip(
799            TypeInfo::VarLenSized(VarLenContext::new(VarLenType::Intn, 1, None)),
800            ColumnData::U8(None),
801        )
802        .await;
803    }
804
805    #[tokio::test]
806    async fn u8_with_fixedlen_int() {
807        test_round_trip(
808            TypeInfo::FixedLen(FixedLenType::Int1),
809            ColumnData::U8(Some(8u8)),
810        )
811        .await;
812    }
813
814    #[tokio::test]
815    async fn i16_with_varlen_intn() {
816        test_round_trip(
817            TypeInfo::VarLenSized(VarLenContext::new(VarLenType::Intn, 2, None)),
818            ColumnData::I16(Some(8i16)),
819        )
820        .await;
821    }
822
823    #[tokio::test]
824    async fn none_i16_with_varlen_intn() {
825        test_round_trip(
826            TypeInfo::VarLenSized(VarLenContext::new(VarLenType::Intn, 2, None)),
827            ColumnData::I16(None),
828        )
829        .await;
830    }
831
832    #[tokio::test]
833    async fn none_with_varlen_intn() {
834        test_round_trip(
835            TypeInfo::FixedLen(FixedLenType::Int2),
836            ColumnData::I16(Some(8i16)),
837        )
838        .await;
839    }
840
841    #[tokio::test]
842    async fn i64_with_varlen_intn() {
843        test_round_trip(
844            TypeInfo::VarLenSized(VarLenContext::new(VarLenType::Intn, 8, None)),
845            ColumnData::I64(Some(8i64)),
846        )
847        .await;
848    }
849
850    #[tokio::test]
851    async fn i64_none_with_varlen_intn() {
852        test_round_trip(
853            TypeInfo::VarLenSized(VarLenContext::new(VarLenType::Intn, 8, None)),
854            ColumnData::I64(None),
855        )
856        .await;
857    }
858
859    #[tokio::test]
860    async fn i64_with_fixedlen_int8() {
861        test_round_trip(
862            TypeInfo::FixedLen(FixedLenType::Int8),
863            ColumnData::I64(Some(8i64)),
864        )
865        .await;
866    }
867
868    #[tokio::test]
869    async fn f32_with_varlen_floatn() {
870        test_round_trip(
871            TypeInfo::VarLenSized(VarLenContext::new(VarLenType::Floatn, 4, None)),
872            ColumnData::F32(Some(8f32)),
873        )
874        .await;
875    }
876
877    #[tokio::test]
878    async fn null_f32_with_varlen_floatn() {
879        test_round_trip(
880            TypeInfo::VarLenSized(VarLenContext::new(VarLenType::Floatn, 4, None)),
881            ColumnData::F32(None),
882        )
883        .await;
884    }
885
886    #[tokio::test]
887    async fn f32_with_fixedlen_float4() {
888        test_round_trip(
889            TypeInfo::FixedLen(FixedLenType::Float4),
890            ColumnData::F32(Some(8f32)),
891        )
892        .await;
893    }
894
895    #[tokio::test]
896    async fn f64_with_varlen_floatn() {
897        test_round_trip(
898            TypeInfo::VarLenSized(VarLenContext::new(VarLenType::Floatn, 8, None)),
899            ColumnData::F64(Some(8f64)),
900        )
901        .await;
902    }
903
904    #[tokio::test]
905    async fn none_f64_with_varlen_floatn() {
906        test_round_trip(
907            TypeInfo::VarLenSized(VarLenContext::new(VarLenType::Floatn, 8, None)),
908            ColumnData::F64(None),
909        )
910        .await;
911    }
912
913    #[tokio::test]
914    async fn f64_with_fixedlen_float8() {
915        test_round_trip(
916            TypeInfo::FixedLen(FixedLenType::Float8),
917            ColumnData::F64(Some(8f64)),
918        )
919        .await;
920    }
921
922    #[tokio::test]
923    async fn guid_with_varlen_guid() {
924        test_round_trip(
925            TypeInfo::VarLenSized(VarLenContext::new(VarLenType::Guid, 16, None)),
926            ColumnData::Guid(Some(Uuid::new_v4())),
927        )
928        .await;
929    }
930
931    #[tokio::test]
932    async fn none_guid_with_varlen_guid() {
933        test_round_trip(
934            TypeInfo::VarLenSized(VarLenContext::new(VarLenType::Guid, 16, None)),
935            ColumnData::Guid(None),
936        )
937        .await;
938    }
939
940    #[tokio::test]
941    async fn numeric_with_varlen_sized_precision() {
942        test_round_trip(
943            TypeInfo::VarLenSizedPrecision {
944                ty: VarLenType::Numericn,
945                size: 17,
946                precision: 18,
947                scale: 0,
948            },
949            ColumnData::Numeric(Some(Numeric::new_with_scale(23, 0))),
950        )
951        .await;
952    }
953
954    #[tokio::test]
955    async fn none_numeric_with_varlen_sized_precision() {
956        test_round_trip(
957            TypeInfo::VarLenSizedPrecision {
958                ty: VarLenType::Numericn,
959                size: 17,
960                precision: 18,
961                scale: 0,
962            },
963            ColumnData::Numeric(None),
964        )
965        .await;
966    }
967
968    #[tokio::test]
969    async fn string_with_varlen_bigchar() {
970        test_round_trip(
971            TypeInfo::VarLenSized(VarLenContext::new(
972                VarLenType::BigChar,
973                40,
974                Some(Collation::new(13632521, 52)),
975            )),
976            ColumnData::String(Some("aaa".into())),
977        )
978        .await;
979    }
980
981    #[tokio::test]
982    async fn long_string_with_varlen_bigchar() {
983        test_round_trip(
984            TypeInfo::VarLenSized(VarLenContext::new(
985                VarLenType::BigChar,
986                0x8ffff,
987                Some(Collation::new(13632521, 52)),
988            )),
989            ColumnData::String(Some("aaa".into())),
990        )
991        .await;
992    }
993
994    #[tokio::test]
995    async fn none_long_string_with_varlen_bigchar() {
996        test_round_trip(
997            TypeInfo::VarLenSized(VarLenContext::new(
998                VarLenType::BigChar,
999                0x8ffff,
1000                Some(Collation::new(13632521, 52)),
1001            )),
1002            ColumnData::String(None),
1003        )
1004        .await;
1005    }
1006
1007    #[tokio::test]
1008    async fn none_string_with_varlen_bigchar() {
1009        test_round_trip(
1010            TypeInfo::VarLenSized(VarLenContext::new(
1011                VarLenType::BigChar,
1012                40,
1013                Some(Collation::new(13632521, 52)),
1014            )),
1015            ColumnData::String(None),
1016        )
1017        .await;
1018    }
1019
1020    #[tokio::test]
1021    async fn string_with_varlen_bigvarchar() {
1022        test_round_trip(
1023            TypeInfo::VarLenSized(VarLenContext::new(
1024                VarLenType::BigVarChar,
1025                40,
1026                Some(Collation::new(13632521, 52)),
1027            )),
1028            ColumnData::String(Some("aaa".into())),
1029        )
1030        .await;
1031    }
1032
1033    #[tokio::test]
1034    async fn none_string_with_varlen_bigvarchar() {
1035        test_round_trip(
1036            TypeInfo::VarLenSized(VarLenContext::new(
1037                VarLenType::BigVarChar,
1038                40,
1039                Some(Collation::new(13632521, 52)),
1040            )),
1041            ColumnData::String(None),
1042        )
1043        .await;
1044    }
1045
1046    #[tokio::test]
1047    async fn empty_string_with_varlen_bigvarchar() {
1048        test_round_trip(
1049            TypeInfo::VarLenSized(VarLenContext::new(
1050                VarLenType::BigVarChar,
1051                0x8ffff,
1052                Some(Collation::new(13632521, 52)),
1053            )),
1054            ColumnData::String(Some("".into())),
1055        )
1056        .await;
1057    }
1058
1059    #[tokio::test]
1060    async fn string_with_varlen_nvarchar() {
1061        test_round_trip(
1062            TypeInfo::VarLenSized(VarLenContext::new(
1063                VarLenType::NVarchar,
1064                40,
1065                Some(Collation::new(13632521, 52)),
1066            )),
1067            ColumnData::String(Some("hhh".into())),
1068        )
1069        .await;
1070    }
1071
1072    #[tokio::test]
1073    async fn none_string_with_varlen_nvarchar() {
1074        test_round_trip(
1075            TypeInfo::VarLenSized(VarLenContext::new(
1076                VarLenType::NVarchar,
1077                40,
1078                Some(Collation::new(13632521, 52)),
1079            )),
1080            ColumnData::String(None),
1081        )
1082        .await;
1083    }
1084
1085    #[tokio::test]
1086    async fn empty_string_with_varlen_nvarchar() {
1087        test_round_trip(
1088            TypeInfo::VarLenSized(VarLenContext::new(
1089                VarLenType::NVarchar,
1090                0x8ffff,
1091                Some(Collation::new(13632521, 52)),
1092            )),
1093            ColumnData::String(Some("".into())),
1094        )
1095        .await;
1096    }
1097
1098    #[tokio::test]
1099    async fn string_with_varlen_nchar() {
1100        test_round_trip(
1101            TypeInfo::VarLenSized(VarLenContext::new(
1102                VarLenType::NChar,
1103                40,
1104                Some(Collation::new(13632521, 52)),
1105            )),
1106            ColumnData::String(Some("hhh".into())),
1107        )
1108        .await;
1109    }
1110
1111    #[tokio::test]
1112    async fn long_string_with_varlen_nchar() {
1113        test_round_trip(
1114            TypeInfo::VarLenSized(VarLenContext::new(
1115                VarLenType::NChar,
1116                0x8ffff,
1117                Some(Collation::new(13632521, 52)),
1118            )),
1119            ColumnData::String(Some("hhh".into())),
1120        )
1121        .await;
1122    }
1123
1124    #[tokio::test]
1125    async fn none_long_string_with_varlen_nchar() {
1126        test_round_trip(
1127            TypeInfo::VarLenSized(VarLenContext::new(
1128                VarLenType::NChar,
1129                0x8ffff,
1130                Some(Collation::new(13632521, 52)),
1131            )),
1132            ColumnData::String(None),
1133        )
1134        .await;
1135    }
1136
1137    #[tokio::test]
1138    async fn none_string_with_varlen_nchar() {
1139        test_round_trip(
1140            TypeInfo::VarLenSized(VarLenContext::new(
1141                VarLenType::NChar,
1142                40,
1143                Some(Collation::new(13632521, 52)),
1144            )),
1145            ColumnData::String(None),
1146        )
1147        .await;
1148    }
1149
1150    #[tokio::test]
1151    async fn binary_with_varlen_bigbinary() {
1152        test_round_trip(
1153            TypeInfo::VarLenSized(VarLenContext::new(VarLenType::BigBinary, 40, None)),
1154            ColumnData::Binary(Some(b"aaa".as_slice().into())),
1155        )
1156        .await;
1157    }
1158
1159    #[tokio::test]
1160    async fn long_binary_with_varlen_bigbinary() {
1161        test_round_trip(
1162            TypeInfo::VarLenSized(VarLenContext::new(VarLenType::BigBinary, 0x8ffff, None)),
1163            ColumnData::Binary(Some(b"aaa".as_slice().into())),
1164        )
1165        .await;
1166    }
1167
1168    #[tokio::test]
1169    async fn none_binary_with_varlen_bigbinary() {
1170        test_round_trip(
1171            TypeInfo::VarLenSized(VarLenContext::new(VarLenType::BigBinary, 40, None)),
1172            ColumnData::Binary(None),
1173        )
1174        .await;
1175    }
1176
1177    #[tokio::test]
1178    async fn none_long_binary_with_varlen_bigbinary() {
1179        test_round_trip(
1180            TypeInfo::VarLenSized(VarLenContext::new(VarLenType::BigBinary, 0x8ffff, None)),
1181            ColumnData::Binary(None),
1182        )
1183        .await;
1184    }
1185
1186    #[tokio::test]
1187    async fn binary_with_varlen_bigvarbin() {
1188        test_round_trip(
1189            TypeInfo::VarLenSized(VarLenContext::new(VarLenType::BigVarBin, 40, None)),
1190            ColumnData::Binary(Some(b"aaa".as_slice().into())),
1191        )
1192        .await;
1193    }
1194
1195    #[tokio::test]
1196    async fn none_binary_with_varlen_bigvarbin() {
1197        test_round_trip(
1198            TypeInfo::VarLenSized(VarLenContext::new(VarLenType::BigVarBin, 40, None)),
1199            ColumnData::Binary(None),
1200        )
1201        .await;
1202    }
1203
1204    #[tokio::test]
1205    async fn empty_binary_with_varlen_bigvarbin() {
1206        test_round_trip(
1207            TypeInfo::VarLenSized(VarLenContext::new(
1208                VarLenType::BigVarBin,
1209                0x8ffff,
1210                Some(Collation::new(13632521, 52)),
1211            )),
1212            ColumnData::Binary(Some(b"".as_slice().into())),
1213        )
1214        .await;
1215    }
1216
1217    #[tokio::test]
1218    async fn datetime_with_varlen_datetimen() {
1219        test_round_trip(
1220            TypeInfo::VarLenSized(VarLenContext::new(VarLenType::Datetimen, 8, None)),
1221            ColumnData::DateTime(Some(DateTime::new(200, 3000))),
1222        )
1223        .await;
1224    }
1225
1226    // this is inconsistent: decode will decode any None datetime to smalldatetime, ignoring size
1227    // but it's non-critical, so let it be here
1228    #[tokio::test]
1229    async fn none_datetime_with_varlen_datetimen() {
1230        test_round_trip(
1231            TypeInfo::VarLenSized(VarLenContext::new(VarLenType::Datetimen, 8, None)),
1232            ColumnData::DateTime(None),
1233        )
1234        .await;
1235    }
1236
1237    #[tokio::test]
1238    async fn datetime_with_fixedlen_datetime() {
1239        test_round_trip(
1240            TypeInfo::FixedLen(FixedLenType::Datetime),
1241            ColumnData::DateTime(Some(DateTime::new(200, 3000))),
1242        )
1243        .await;
1244    }
1245
1246    #[tokio::test]
1247    async fn smalldatetime_with_varlen_datetimen() {
1248        test_round_trip(
1249            TypeInfo::VarLenSized(VarLenContext::new(VarLenType::Datetimen, 4, None)),
1250            ColumnData::SmallDateTime(Some(SmallDateTime::new(200, 3000))),
1251        )
1252        .await;
1253    }
1254
1255    #[tokio::test]
1256    async fn none_smalldatetime_with_varlen_datetimen() {
1257        test_round_trip(
1258            TypeInfo::VarLenSized(VarLenContext::new(VarLenType::Datetimen, 4, None)),
1259            ColumnData::SmallDateTime(None),
1260        )
1261        .await;
1262    }
1263
1264    #[tokio::test]
1265    async fn smalldatetime_with_fixedlen_datetime4() {
1266        test_round_trip(
1267            TypeInfo::FixedLen(FixedLenType::Datetime4),
1268            ColumnData::SmallDateTime(Some(SmallDateTime::new(200, 3000))),
1269        )
1270        .await;
1271    }
1272
1273    #[cfg(feature = "tds73")]
1274    #[tokio::test]
1275    async fn date_with_varlen_daten() {
1276        test_round_trip(
1277            TypeInfo::VarLenSized(VarLenContext::new(VarLenType::Daten, 3, None)),
1278            ColumnData::Date(Some(Date::new(200))),
1279        )
1280        .await;
1281    }
1282
1283    #[cfg(feature = "tds73")]
1284    #[tokio::test]
1285    async fn none_date_with_varlen_daten() {
1286        test_round_trip(
1287            TypeInfo::VarLenSized(VarLenContext::new(VarLenType::Daten, 3, None)),
1288            ColumnData::Date(None),
1289        )
1290        .await;
1291    }
1292
1293    #[cfg(feature = "tds73")]
1294    #[tokio::test]
1295    async fn time_with_varlen_timen() {
1296        test_round_trip(
1297            TypeInfo::VarLenSized(VarLenContext::new(VarLenType::Timen, 7, None)),
1298            ColumnData::Time(Some(Time::new(55, 7))),
1299        )
1300        .await;
1301    }
1302
1303    #[cfg(feature = "tds73")]
1304    #[tokio::test]
1305    async fn none_time_with_varlen_timen() {
1306        test_round_trip(
1307            TypeInfo::VarLenSized(VarLenContext::new(VarLenType::Timen, 7, None)),
1308            ColumnData::Time(None),
1309        )
1310        .await;
1311    }
1312
1313    #[cfg(feature = "tds73")]
1314    #[tokio::test]
1315    async fn datetime2_with_varlen_datetime2() {
1316        test_round_trip(
1317            TypeInfo::VarLenSized(VarLenContext::new(VarLenType::Datetime2, 7, None)),
1318            ColumnData::DateTime2(Some(DateTime2::new(Date::new(55), Time::new(222, 7)))),
1319        )
1320        .await;
1321    }
1322
1323    #[cfg(feature = "tds73")]
1324    #[tokio::test]
1325    async fn none_datetime2_with_varlen_datetime2() {
1326        test_round_trip(
1327            TypeInfo::VarLenSized(VarLenContext::new(VarLenType::Datetime2, 7, None)),
1328            ColumnData::DateTime2(None),
1329        )
1330        .await;
1331    }
1332
1333    #[cfg(feature = "tds73")]
1334    #[tokio::test]
1335    async fn datetimeoffset_with_varlen_datetimeoffsetn() {
1336        test_round_trip(
1337            TypeInfo::VarLenSized(VarLenContext::new(VarLenType::DatetimeOffsetn, 7, None)),
1338            ColumnData::DateTimeOffset(Some(DateTimeOffset::new(
1339                DateTime2::new(Date::new(55), Time::new(222, 7)),
1340                -8,
1341            ))),
1342        )
1343        .await;
1344    }
1345
1346    #[cfg(feature = "tds73")]
1347    #[tokio::test]
1348    async fn none_datetimeoffset_with_varlen_datetimeoffsetn() {
1349        test_round_trip(
1350            TypeInfo::VarLenSized(VarLenContext::new(VarLenType::DatetimeOffsetn, 7, None)),
1351            ColumnData::DateTimeOffset(None),
1352        )
1353        .await;
1354    }
1355
1356    #[cfg(feature = "tds73")]
1357    #[tokio::test]
1358    async fn xml_with_xml() {
1359        test_round_trip(
1360            TypeInfo::Xml {
1361                schema: None,
1362                size: 0xfffffffffffffffe_usize,
1363            },
1364            ColumnData::Xml(Some(Cow::Owned(XmlData::new("<a>ddd</a>")))),
1365        )
1366        .await;
1367    }
1368
1369    #[cfg(feature = "tds73")]
1370    #[tokio::test]
1371    async fn none_xml_with_xml() {
1372        test_round_trip(
1373            TypeInfo::Xml {
1374                schema: None,
1375                size: 0xfffffffffffffffe_usize,
1376            },
1377            ColumnData::Xml(None),
1378        )
1379        .await;
1380    }
1381
1382    #[tokio::test]
1383    async fn invalid_type_fails() {
1384        let data = vec![
1385            (
1386                TypeInfo::VarLenSized(VarLenContext::new(VarLenType::Floatn, 4, None)),
1387                ColumnData::I32(Some(42)),
1388            ),
1389            (
1390                TypeInfo::VarLenSized(VarLenContext::new(VarLenType::Floatn, 4, None)),
1391                ColumnData::I32(None),
1392            ),
1393            (
1394                TypeInfo::FixedLen(FixedLenType::Int4),
1395                ColumnData::I32(None),
1396            ),
1397        ];
1398
1399        for (ti, d) in data {
1400            let mut buf = BytesMut::new();
1401            let mut buf_ti = BytesMutWithTypeInfo::new(&mut buf).with_type_info(&ti);
1402
1403            let err = d.encode(&mut buf_ti).expect_err("encode should fail");
1404
1405            if let Error::BulkInput(_) = err {
1406            } else {
1407                panic!("Expected: Error::BulkInput, got: {:?}", err);
1408            }
1409        }
1410    }
1411}