mysql_common/binlog/decimal/
mod.rs

1// Copyright (c) 2020 Anatoly Ikorsky
2//
3// Licensed under the Apache License, Version 2.0
4// <LICENSE-APACHE or http://www.apache.org/licenses/LICENSE-2.0> or the MIT
5// license <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
6// option. All files in the project carrying such notice may not be copied,
7// modified, or distributed except according to those terms.
8
9//! Functions and types related to the mysql decimal type.
10
11use byteorder::{BigEndian as BE, ReadBytesExt, WriteBytesExt};
12
13use std::{
14    cmp::{Ord, Ordering, PartialEq, PartialOrd},
15    fmt,
16    io::{self, Read, Write},
17    mem::size_of,
18    str::FromStr,
19};
20
21#[cfg(test)]
22mod test;
23
24#[derive(Debug, Clone, Copy)]
25pub struct ParseDecimalError;
26
27/// Type of a base 9 digit.
28pub type Digit = i32;
29
30/// Number of decimal digits per `Digit`.
31pub const DIG_PER_DEC: usize = 9;
32
33/// Base of the `Digit`.
34pub const DIG_BASE: usize = 1_000_000_000;
35
36/// Number of bytes required to store given number of decimal digits.
37pub const DIG_TO_BYTES: [u8; DIG_PER_DEC + 1] = [0, 1, 1, 2, 2, 3, 3, 4, 4, 4];
38
39pub const POWERS_10: [i32; DIG_PER_DEC + 1] = [
40    1, 10, 100, 1000, 10000, 100000, 1000000, 10000000, 100000000, 1000000000,
41];
42
43/// MySql decimal.
44///
45/// The only purpose of this type is to parse binary decimal from binlogs,
46/// so it isn't meant to be an alternative to `decimal_t`.
47///
48/// This type supports:
49///
50/// *   serialization/deserialization to/from binary format
51///     (see `read_bin` and `write_bin` functions);
52/// *   parsing from decimal string/buffer (see `Decimal::parse_str_bytes`, `FromStr` impl);
53/// *   conversion to decimal string (using `Display`).
54///
55/// # Notes
56///
57/// *   `Ord` and `Eq` impls are relied on the binary representation,
58///     i.e. both `rhs` and `lhs` will be serialized into temporary buffers;
59/// *   even though MySql's `string2decimal` function allows scientific notation,
60///     this implementation denies it.
61#[derive(Default, Debug, Eq, Clone)]
62pub struct Decimal {
63    /// The number of *decimal* digits (NOT number of `Digit`s!) before the point.
64    intg: usize,
65    /// The number of decimal digits after the point.
66    frac: usize,
67    /// `false` means positive, `true` means negative.
68    sign: bool,
69    /// Array of `Digit`s.
70    buf: Vec<Digit>,
71}
72
73impl Decimal {
74    pub fn bin_size(&self) -> usize {
75        decimal_bin_size(self.intg + self.frac, self.frac)
76    }
77
78    /// See [`Decimal::parse_str_bytes`].
79    #[deprecated = "use parse_str_bytes"]
80    pub fn parse_bytes(bytes: &[u8]) -> Result<Self, ParseDecimalError> {
81        match std::str::from_utf8(bytes) {
82            Ok(string) => Decimal::from_str(string),
83            Err(_) => Err(ParseDecimalError),
84        }
85    }
86
87    /// Runs `Decimal::from_str` on the given bytes.
88    pub fn parse_str_bytes(bytes: &[u8]) -> Result<Self, ParseDecimalError> {
89        macro_rules! decimal_str {
90            ($x:ident) => {
91                if $x
92                    .iter()
93                    .all(|x| x.is_ascii_digit() || *x == b'+' || matches!(x, b'-'..=b'.'))
94                {
95                    // SAFETY: UTF-8 is asserted by the if condition
96                    Some(unsafe { std::str::from_utf8_unchecked($x) })
97                } else {
98                    None
99                }
100            };
101        }
102        Decimal::from_str(decimal_str!(bytes).ok_or(ParseDecimalError)?)
103    }
104
105    pub fn write_bin<T: Write>(&self, mut output: T) -> io::Result<()> {
106        // result bits must be inverted if the sign is negative,
107        // we'll XOR it with `mask` to achieve this.
108        let mask: Digit = if self.sign {
109            // XOR with this mask will invert bits
110            -1
111        } else {
112            // XOR with this mask will do nothing
113            0
114        };
115
116        let mut out_buf = Vec::with_capacity(self.buf.len() * size_of::<Digit>());
117
118        let mut digits = self.buf.iter();
119
120        let mut intg = self.intg;
121        let num_prefix_digits = self.intg % DIG_PER_DEC;
122        if num_prefix_digits > 0 {
123            let digit = *digits.next().expect("decimal is ill-formed");
124            match DIG_TO_BYTES[num_prefix_digits] {
125                1 => out_buf.write_i8((digit ^ mask) as i8)?,
126                2 => out_buf.write_i16::<BE>((digit ^ mask) as i16)?,
127                3 => out_buf.write_i24::<BE>((digit ^ mask) as i32)?,
128                4 => out_buf.write_i32::<BE>((digit ^ mask) as i32)?,
129                _ => unreachable!(),
130            }
131            intg -= num_prefix_digits;
132        }
133        while intg > 0 {
134            let digit = *digits.next().expect("decimal is ill-formed");
135            out_buf.write_i32::<BE>((digit ^ mask) as i32)?;
136            intg -= DIG_PER_DEC;
137        }
138        let mut frac = self.frac;
139        while frac > 0 {
140            let len = std::cmp::min(DIG_PER_DEC, frac);
141            let mut digit = *digits.next().expect("decimal is ill-formed");
142            if len < DIG_PER_DEC {
143                digit /= POWERS_10[DIG_PER_DEC - len];
144                match DIG_TO_BYTES[len] {
145                    1 => out_buf.write_i8((digit ^ mask) as i8)?,
146                    2 => out_buf.write_i16::<BE>((digit ^ mask) as i16)?,
147                    3 => out_buf.write_i24::<BE>((digit ^ mask) as i32)?,
148                    4 => out_buf.write_i32::<BE>((digit ^ mask) as i32)?,
149                    _ => unreachable!(),
150                }
151            } else {
152                out_buf.write_i32::<BE>((digit ^ mask) as i32)?;
153            }
154            frac -= len
155        }
156
157        out_buf[0] ^= 0x80;
158
159        output.write_all(&out_buf)
160    }
161
162    /// Reads packed representation of a [`Decimal`].
163    ///
164    /// Packed representation is:
165    ///
166    /// 1. precision (u8)
167    /// 2. scale (u8)
168    /// 3. serialized decimal value (see [`Decimal::read_bin`])
169    pub fn read_packed<T: Read>(mut input: T, keep_precision: bool) -> io::Result<Self> {
170        let mut precision_and_scale = [0_u8, 0_u8];
171        input.read_exact(&mut precision_and_scale)?;
172        Self::read_bin(
173            input,
174            precision_and_scale[0] as usize,
175            precision_and_scale[1] as usize,
176            keep_precision,
177        )
178    }
179
180    /// Reads serialized representation of a decimal value.
181    ///
182    /// The value is usually written in the packed form (see [`Decimal::read_packed`]).
183    pub fn read_bin<T: Read>(
184        mut input: T,
185        precision: usize,
186        scale: usize,
187        keep_prec: bool,
188    ) -> io::Result<Self> {
189        let mut out = Self::default();
190
191        let bin_size = decimal_bin_size(precision, scale);
192        let mut buffer = vec![0_u8; bin_size];
193        input.read_exact(&mut buffer)?;
194
195        // we should invert back the very first bit of a binary representation
196        if let Some(x) = buffer.get_mut(0) {
197            *x ^= 0x80
198        }
199
200        // is it negative or not
201        let mask = if buffer.first().copied().unwrap_or(0) & 0x80 == 0 {
202            // positive, so mask should do nothing
203            0
204        } else {
205            // negative, so mask should invert bits
206            -1
207        };
208
209        let intg = precision - scale;
210        let prefix_len = intg % DIG_PER_DEC;
211        let intg_full = intg / DIG_PER_DEC;
212
213        let frac = scale;
214        let suffix_len = frac % DIG_PER_DEC;
215        let frac_full = frac / DIG_PER_DEC;
216
217        out.sign = mask != 0;
218        out.intg = intg;
219        out.frac = frac;
220
221        let mut input = &buffer[..];
222
223        let mut trimmed = keep_prec;
224        if prefix_len > 0 {
225            let len = DIG_TO_BYTES[prefix_len];
226            let x = match len {
227                1 => input.read_i8()? as i32,
228                2 => input.read_i16::<BE>()? as i32,
229                3 => input.read_i24::<BE>()?,
230                4 => input.read_i32::<BE>()?,
231                _ => unreachable!(),
232            } ^ mask;
233            if x == 0 && !trimmed {
234                out.intg -= prefix_len;
235            } else {
236                trimmed = true;
237                out.buf.push(x);
238            }
239        }
240        for _ in 0..intg_full {
241            let x = input.read_i32::<BE>()? ^ mask;
242            if x == 0 && !trimmed {
243                out.intg -= DIG_PER_DEC;
244            } else {
245                trimmed = true;
246                out.buf.push(x);
247            }
248        }
249        for _ in 0..frac_full {
250            out.buf.push(input.read_i32::<BE>()? ^ mask);
251        }
252        if suffix_len > 0 {
253            let len = DIG_TO_BYTES[suffix_len];
254            let mut x = match len {
255                1 => input.read_i8()? as i32,
256                2 => input.read_i16::<BE>()? as i32,
257                3 => input.read_i24::<BE>()?,
258                4 => input.read_i32::<BE>()?,
259                _ => unreachable!(),
260            } ^ mask;
261            x *= POWERS_10[DIG_PER_DEC - suffix_len];
262            out.buf.push(x);
263        }
264
265        if out.intg == 0 && out.frac == 0 {
266            out.intg = 1;
267            out.frac = 0;
268            out.sign = false;
269            out.buf.resize(1, 0);
270            out.buf[0] = 0;
271        }
272
273        Ok(out)
274    }
275}
276
277impl Ord for Decimal {
278    fn cmp(&self, other: &Self) -> Ordering {
279        let mut left = Vec::with_capacity(self.bin_size());
280        let mut right = Vec::with_capacity(self.bin_size());
281        self.write_bin(&mut left).expect("OOM");
282        other.write_bin(&mut right).expect("OOM");
283        left.cmp(&right)
284    }
285}
286
287impl PartialOrd for Decimal {
288    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
289        Some(self.cmp(other))
290    }
291}
292
293impl PartialEq for Decimal {
294    fn eq(&self, other: &Self) -> bool {
295        self.cmp(other) == Ordering::Equal
296    }
297}
298
299impl fmt::Display for Decimal {
300    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
301        let prefix_len = self.intg % DIG_PER_DEC;
302        let suffix_len = self.frac % DIG_PER_DEC;
303
304        let mut intg = self.intg;
305        if self.sign {
306            '-'.fmt(f)?;
307        }
308
309        let mut i = 0;
310
311        while i < self.buf.len() && intg > 0 {
312            let x = self.buf[i];
313            i += 1;
314            if prefix_len > 0 && i == 1 {
315                x.fmt(f)?;
316                intg -= prefix_len;
317            } else {
318                if i == 1 {
319                    x.fmt(f)?;
320                } else {
321                    write!(f, "{:09}", x)?;
322                }
323                intg -= DIG_PER_DEC;
324            }
325        }
326
327        if self.intg == 0 {
328            '0'.fmt(f)?;
329        }
330
331        if i < self.buf.len() {
332            '.'.fmt(f)?;
333        }
334
335        while i < self.buf.len() {
336            let mut x = self.buf[i];
337            i += 1;
338            if i == self.buf.len() && suffix_len > 0 {
339                x /= POWERS_10[DIG_PER_DEC - suffix_len];
340                write!(&mut *f, "{:0width$}", x, width = suffix_len)?;
341            } else {
342                write!(f, "{:09}", x)?;
343            }
344        }
345
346        Ok(())
347    }
348}
349
350impl FromStr for Decimal {
351    type Err = ParseDecimalError;
352    fn from_str(mut from: &str) -> Result<Self, Self::Err> {
353        let mut out = Decimal::default();
354
355        from = from.trim();
356
357        if from.is_empty() {
358            return Err(ParseDecimalError);
359        }
360
361        // Skip leading zeros.
362        while from.starts_with("00") {
363            from = &from[1..];
364        }
365
366        if from.starts_with('-') {
367            out.sign = true;
368            from = &from[1..];
369        } else if from.starts_with('+') {
370            from = &from[1..];
371        }
372
373        let point_idx = from.find('.').unwrap_or(from.len());
374        let (mut integral, mut fractional) = from.split_at(point_idx);
375        fractional = fractional.get(1..).unwrap_or(fractional);
376
377        out.intg = integral.len();
378        out.frac = fractional.len();
379
380        if out.intg + out.frac == 0 {
381            return Err(ParseDecimalError);
382        }
383
384        if integral.bytes().any(|x| !x.is_ascii_digit())
385            || fractional.bytes().any(|x| !x.is_ascii_digit())
386        {
387            return Err(ParseDecimalError);
388        }
389
390        let mut prefix_len = integral.len() % DIG_PER_DEC;
391        if prefix_len == 0 {
392            prefix_len = DIG_PER_DEC;
393        }
394        while !integral.is_empty() {
395            let prefix = &integral[..prefix_len];
396            let x: i32 = prefix.parse().expect("should not fail");
397            out.buf.push(x);
398            integral = &integral[prefix_len..];
399            prefix_len = DIG_PER_DEC;
400        }
401
402        while !fractional.is_empty() {
403            let len = std::cmp::min(DIG_PER_DEC, fractional.len());
404            let prefix = &fractional[..len];
405            let mut x: i32 = prefix.parse().expect("should not fail");
406            if len < DIG_PER_DEC {
407                x *= POWERS_10[DIG_PER_DEC - len];
408            }
409            out.buf.push(x);
410            fractional = &fractional[len..];
411        }
412
413        if out.buf.iter().all(|x| *x == 0) {
414            out.sign = false;
415        }
416
417        Ok(out)
418    }
419}
420
421/// Returns binary representation size (in bytes) for given precision and scale.
422#[inline]
423pub fn decimal_bin_size(precision: usize, scale: usize) -> usize {
424    let intg = precision - scale;
425    let intg0 = intg / DIG_PER_DEC;
426    let frac0 = scale / DIG_PER_DEC;
427    let intg0x = intg - intg0 * DIG_PER_DEC;
428    let frac0x = scale - frac0 * DIG_PER_DEC;
429
430    intg0 * size_of::<Digit>()
431        + DIG_TO_BYTES[intg0x] as usize
432        + frac0 * size_of::<Digit>()
433        + DIG_TO_BYTES[frac0x] as usize
434}