Skip to main content

mz_pgrepr/
value.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::BTreeMap;
11use std::error::Error;
12use std::{io, str};
13
14use bytes::{BufMut, BytesMut};
15use chrono::{DateTime, NaiveDateTime, NaiveTime, Utc};
16use itertools::Itertools;
17use mz_ore::cast::ReinterpretCast;
18use mz_pgwire_common::Format;
19use mz_repr::adt::array::ArrayDimension;
20use mz_repr::adt::char;
21use mz_repr::adt::date::Date;
22use mz_repr::adt::jsonb::JsonbRef;
23use mz_repr::adt::mz_acl_item::{AclItem, MzAclItem};
24use mz_repr::adt::pg_legacy_name::NAME_MAX_BYTES;
25use mz_repr::adt::range::{Range, RangeInner};
26use mz_repr::adt::timestamp::CheckedTimestamp;
27use mz_repr::strconv::{self, Nestable};
28use mz_repr::{Datum, RowArena, RowPacker, RowRef, SqlRelationType, SqlScalarType};
29use postgres_types::{FromSql, IsNull, ToSql, Type as PgType};
30use uuid::Uuid;
31
32use crate::types::{UINT2, UINT4, UINT8};
33use crate::{Interval, Jsonb, Numeric, Type, UInt2, UInt4, UInt8};
34
35pub mod interval;
36pub mod jsonb;
37pub mod numeric;
38pub mod record;
39pub mod unsigned;
40
41/// A PostgreSQL datum.
42#[derive(Debug)]
43pub enum Value {
44    /// A variable-length, multi-dimensional array of values.
45    Array {
46        /// The dimensions of the array.
47        dims: Vec<ArrayDimension>,
48        /// The elements of the array.
49        elements: Vec<Option<Value>>,
50    },
51    /// A boolean value.
52    Bool(bool),
53    /// A byte array, i.e., a variable-length binary string.
54    Bytea(Vec<u8>),
55    /// A single-byte character.
56    Char(u8),
57    /// A date.
58    Date(Date),
59    /// A 4-byte floating point number.
60    Float4(f32),
61    /// An 8-byte floating point number.
62    Float8(f64),
63    /// A 2-byte signed integer.
64    Int2(i16),
65    /// A 4-byte signed integer.
66    Int4(i32),
67    /// An 8-byte signed integer.
68    Int8(i64),
69    /// A 2-byte unsigned integer.
70    UInt2(UInt2),
71    /// A 4-byte unsigned integer.
72    UInt4(UInt4),
73    /// An 8-byte unsigned integer.
74    UInt8(UInt8),
75    /// A time interval.
76    Interval(Interval),
77    /// A binary JSON blob.
78    Jsonb(Jsonb),
79    /// A sequence of homogeneous values.
80    List(Vec<Option<Value>>),
81    /// A map of string keys and homogeneous values.
82    Map(BTreeMap<String, Option<Value>>),
83    /// An identifier string of no more than 64 characters in length.
84    Name(String),
85    /// An arbitrary precision number.
86    Numeric(Numeric),
87    /// An object identifier.
88    Oid(u32),
89    /// A sequence of heterogeneous values.
90    Record(Vec<Option<Value>>),
91    /// A time.
92    Time(NaiveTime),
93    /// A date and time, without a timezone.
94    Timestamp(CheckedTimestamp<NaiveDateTime>),
95    /// A date and time, with a timezone.
96    TimestampTz(CheckedTimestamp<DateTime<Utc>>),
97    /// A variable-length string.
98    Text(String),
99    /// A fixed-length string.
100    BpChar(String),
101    /// A variable-length string with an optional limit.
102    VarChar(String),
103    /// A universally unique identifier.
104    Uuid(Uuid),
105    /// A small int vector.
106    Int2Vector {
107        /// The elements of the vector.
108        elements: Vec<Option<Value>>,
109    },
110    /// A Materialize timestamp.
111    MzTimestamp(mz_repr::Timestamp),
112    /// A contiguous range of values along a domain.
113    Range(Range<Box<Value>>),
114    /// A list of privileges granted to a role, that uses [`mz_repr::role_id::RoleId`]s for role
115    /// references.
116    MzAclItem(MzAclItem),
117    /// A list of privileges granted to a user that uses [`mz_repr::adt::system::Oid`]s for role
118    /// references. This type is used primarily for compatibility with PostgreSQL.
119    AclItem(AclItem),
120}
121
122impl Value {
123    /// Constructs a new `Value` from a Materialize datum.
124    ///
125    /// The conversion happens in the obvious manner, except that `Datum::Null`
126    /// is converted to `None` to align with how PostgreSQL handles NULL.
127    pub fn from_datum(datum: Datum, typ: &SqlScalarType) -> Option<Value> {
128        match (datum, typ) {
129            (Datum::Null, _) => None,
130            (Datum::True, SqlScalarType::Bool) => Some(Value::Bool(true)),
131            (Datum::False, SqlScalarType::Bool) => Some(Value::Bool(false)),
132            (Datum::Int16(i), SqlScalarType::Int16) => Some(Value::Int2(i)),
133            (Datum::Int32(i), SqlScalarType::Int32) => Some(Value::Int4(i)),
134            (Datum::Int64(i), SqlScalarType::Int64) => Some(Value::Int8(i)),
135            (Datum::UInt8(c), SqlScalarType::PgLegacyChar) => Some(Value::Char(c)),
136            (Datum::UInt16(u), SqlScalarType::UInt16) => Some(Value::UInt2(UInt2(u))),
137            (Datum::UInt32(oid), SqlScalarType::Oid) => Some(Value::Oid(oid)),
138            (Datum::UInt32(oid), SqlScalarType::RegClass) => Some(Value::Oid(oid)),
139            (Datum::UInt32(oid), SqlScalarType::RegProc) => Some(Value::Oid(oid)),
140            (Datum::UInt32(oid), SqlScalarType::RegType) => Some(Value::Oid(oid)),
141            (Datum::UInt32(u), SqlScalarType::UInt32) => Some(Value::UInt4(UInt4(u))),
142            (Datum::UInt64(u), SqlScalarType::UInt64) => Some(Value::UInt8(UInt8(u))),
143            (Datum::Float32(f), SqlScalarType::Float32) => Some(Value::Float4(*f)),
144            (Datum::Float64(f), SqlScalarType::Float64) => Some(Value::Float8(*f)),
145            (Datum::Numeric(d), SqlScalarType::Numeric { .. }) => Some(Value::Numeric(Numeric(d))),
146            (Datum::MzTimestamp(t), SqlScalarType::MzTimestamp) => Some(Value::MzTimestamp(t)),
147            (Datum::MzAclItem(mai), SqlScalarType::MzAclItem) => Some(Value::MzAclItem(mai)),
148            (Datum::AclItem(ai), SqlScalarType::AclItem) => Some(Value::AclItem(ai)),
149            (Datum::Date(d), SqlScalarType::Date) => Some(Value::Date(d)),
150            (Datum::Time(t), SqlScalarType::Time) => Some(Value::Time(t)),
151            (Datum::Timestamp(ts), SqlScalarType::Timestamp { .. }) => Some(Value::Timestamp(ts)),
152            (Datum::TimestampTz(ts), SqlScalarType::TimestampTz { .. }) => {
153                Some(Value::TimestampTz(ts))
154            }
155            (Datum::Interval(iv), SqlScalarType::Interval) => Some(Value::Interval(Interval(iv))),
156            (Datum::Bytes(b), SqlScalarType::Bytes) => Some(Value::Bytea(b.to_vec())),
157            (Datum::String(s), SqlScalarType::String) => Some(Value::Text(s.to_owned())),
158            (Datum::String(s), SqlScalarType::VarChar { .. }) => Some(Value::VarChar(s.to_owned())),
159            (Datum::String(s), SqlScalarType::Char { length }) => {
160                Some(Value::BpChar(char::format_str_pad(s, *length)))
161            }
162            (Datum::String(s), SqlScalarType::PgLegacyName) => Some(Value::Name(s.into())),
163            (_, SqlScalarType::Jsonb) => {
164                Some(Value::Jsonb(Jsonb(JsonbRef::from_datum(datum).to_owned())))
165            }
166            (Datum::Uuid(u), SqlScalarType::Uuid) => Some(Value::Uuid(u)),
167            (Datum::Array(array), SqlScalarType::Array(elem_type)) => {
168                let dims = array.dims().into_iter().collect();
169                let elements = array
170                    .elements()
171                    .iter()
172                    .map(|elem| Value::from_datum(elem, elem_type))
173                    .collect();
174                Some(Value::Array { dims, elements })
175            }
176            (Datum::Array(array), SqlScalarType::Int2Vector) => {
177                assert!(
178                    array.has_int2vector_dims(),
179                    "int2vector must be 1 dimensional, or empty"
180                );
181                let elements = array
182                    .elements()
183                    .iter()
184                    .map(|elem| Value::from_datum(elem, &SqlScalarType::Int16))
185                    .collect();
186                Some(Value::Int2Vector { elements })
187            }
188            (Datum::List(list), SqlScalarType::List { element_type, .. }) => {
189                let elements = list
190                    .iter()
191                    .map(|elem| Value::from_datum(elem, element_type))
192                    .collect();
193                Some(Value::List(elements))
194            }
195            (Datum::List(record), SqlScalarType::Record { fields, .. }) => {
196                let fields = record
197                    .iter()
198                    .zip_eq(fields)
199                    .map(|(e, (_name, ty))| Value::from_datum(e, &ty.scalar_type))
200                    .collect();
201                Some(Value::Record(fields))
202            }
203            (Datum::Map(dict), SqlScalarType::Map { value_type, .. }) => {
204                let entries = dict
205                    .iter()
206                    .map(|(k, v)| (k.to_owned(), Value::from_datum(v, value_type)))
207                    .collect();
208                Some(Value::Map(entries))
209            }
210            (Datum::Range(range), SqlScalarType::Range { element_type }) => {
211                let value_range = range.into_bounds(|b| {
212                    Box::new(
213                        Value::from_datum(b.datum(), element_type)
214                            .expect("RangeBounds never contain Datum::Null"),
215                    )
216                });
217                Some(Value::Range(value_range))
218            }
219            _ => panic!("can't serialize {}::{:?}", datum, typ),
220        }
221    }
222
223    /// Converts a Materialize datum from this value.
224    pub fn into_datum<'a>(self, buf: &'a RowArena, typ: &Type) -> Datum<'a> {
225        match self {
226            Value::Array { dims, elements } => {
227                let element_pg_type = match typ {
228                    Type::Array(t) => &*t,
229                    _ => panic!("Value::Array should have type Type::Array. Found {:?}", typ),
230                };
231                buf.make_datum(|packer| {
232                    packer
233                        .try_push_array(
234                            &dims,
235                            elements.into_iter().map(|element| match element {
236                                Some(element) => element.into_datum(buf, element_pg_type),
237                                None => Datum::Null,
238                            }),
239                        )
240                        .unwrap();
241                })
242            }
243            Value::Int2Vector { .. } => {
244                // This situation is handled gracefully by Value::decode; if we
245                // wind up here it's a programming error.
246                unreachable!("into_datum cannot be called on Value::Int2Vector");
247            }
248            Value::Bool(true) => Datum::True,
249            Value::Bool(false) => Datum::False,
250            Value::Bytea(b) => Datum::Bytes(buf.push_bytes(b)),
251            Value::Char(c) => Datum::UInt8(c),
252            Value::Date(d) => Datum::Date(d),
253            Value::Float4(f) => Datum::Float32(f.into()),
254            Value::Float8(f) => Datum::Float64(f.into()),
255            Value::Int2(i) => Datum::Int16(i),
256            Value::Int4(i) => Datum::Int32(i),
257            Value::Int8(i) => Datum::Int64(i),
258            Value::UInt2(u) => Datum::UInt16(u.0),
259            Value::UInt4(u) => Datum::UInt32(u.0),
260            Value::UInt8(u) => Datum::UInt64(u.0),
261            Value::Jsonb(js) => buf.push_unary_row(js.0.into_row()),
262            Value::List(elems) => {
263                let elem_pg_type = match typ {
264                    Type::List(t) => &*t,
265                    _ => panic!("Value::List should have type Type::List. Found {:?}", typ),
266                };
267                buf.make_datum(|packer| {
268                    packer.push_list(elems.into_iter().map(|elem| match elem {
269                        Some(elem) => elem.into_datum(buf, elem_pg_type),
270                        None => Datum::Null,
271                    }));
272                })
273            }
274            Value::Map(map) => {
275                let elem_pg_type = match typ {
276                    Type::Map { value_type } => &*value_type,
277                    _ => panic!("Value::Map should have type Type::Map. Found {:?}", typ),
278                };
279                buf.make_datum(|packer| {
280                    packer.push_dict_with(|row| {
281                        for (k, v) in map {
282                            row.push(Datum::String(&k));
283                            row.push(match v {
284                                Some(elem) => elem.into_datum(buf, elem_pg_type),
285                                None => Datum::Null,
286                            });
287                        }
288                    });
289                })
290            }
291            Value::Oid(oid) => Datum::UInt32(oid),
292            Value::Record(_) => {
293                // This situation is handled gracefully by Value::decode; if we
294                // wind up here it's a programming error.
295                unreachable!("into_datum cannot be called on Value::Record");
296            }
297            Value::Time(t) => Datum::Time(t),
298            Value::Timestamp(ts) => Datum::Timestamp(ts),
299            Value::TimestampTz(ts) => Datum::TimestampTz(ts),
300            Value::Interval(iv) => Datum::Interval(iv.0),
301            Value::Text(s) | Value::VarChar(s) | Value::Name(s) => {
302                Datum::String(buf.push_string(s))
303            }
304            Value::BpChar(s) => Datum::String(buf.push_string(s.trim_end().into())),
305            Value::Uuid(u) => Datum::Uuid(u),
306            Value::Numeric(n) => Datum::Numeric(n.0),
307            Value::MzTimestamp(t) => Datum::MzTimestamp(t),
308            Value::Range(range) => {
309                let elem_pg_type = match typ {
310                    Type::Range { element_type } => &*element_type,
311                    _ => panic!("Value::Range should have type Type::Range. Found {:?}", typ),
312                };
313                let range = range.into_bounds(|elem| elem.into_datum(buf, elem_pg_type));
314
315                buf.make_datum(|packer| packer.push_range(range).unwrap())
316            }
317            Value::MzAclItem(mz_acl_item) => Datum::MzAclItem(mz_acl_item),
318            Value::AclItem(acl_item) => Datum::AclItem(acl_item),
319        }
320    }
321
322    /// Serializes this value to `buf` in the specified `format`.
323    pub fn encode(&self, ty: &Type, format: Format, buf: &mut BytesMut) -> Result<(), io::Error> {
324        match format {
325            Format::Text => {
326                self.encode_text(buf);
327                Ok(())
328            }
329            Format::Binary => self.encode_binary(ty, buf),
330        }
331    }
332
333    /// Serializes this value to `buf` using the [text encoding
334    /// format](Format::Text).
335    pub fn encode_text(&self, buf: &mut BytesMut) -> Nestable {
336        match self {
337            Value::Array { dims, elements } => {
338                strconv::format_array(buf, dims, elements, |buf, elem| match elem {
339                    None => Ok::<_, ()>(buf.write_null()),
340                    Some(elem) => Ok(elem.encode_text(buf.nonnull_buffer())),
341                })
342                .expect("provided closure never fails")
343            }
344            Value::Int2Vector { elements } => {
345                strconv::format_legacy_vector(buf, elements, |buf, elem| {
346                    Ok::<_, ()>(
347                        elem.as_ref()
348                            .expect("Int2Vector does not support NULL values")
349                            .encode_text(buf.nonnull_buffer()),
350                    )
351                })
352                .expect("provided closure never fails")
353            }
354            Value::Bool(b) => strconv::format_bool(buf, *b),
355            Value::Bytea(b) => strconv::format_bytes(buf, b),
356            Value::Char(c) => {
357                buf.put_u8(*c);
358                Nestable::MayNeedEscaping
359            }
360            Value::Date(d) => strconv::format_date(buf, *d),
361            Value::Int2(i) => strconv::format_int16(buf, *i),
362            Value::Int4(i) => strconv::format_int32(buf, *i),
363            Value::Int8(i) => strconv::format_int64(buf, *i),
364            Value::UInt2(u) => strconv::format_uint16(buf, u.0),
365            Value::UInt4(u) => strconv::format_uint32(buf, u.0),
366            Value::UInt8(u) => strconv::format_uint64(buf, u.0),
367            Value::Interval(iv) => strconv::format_interval(buf, iv.0),
368            Value::Float4(f) => strconv::format_float32(buf, *f),
369            Value::Float8(f) => strconv::format_float64(buf, *f),
370            Value::Jsonb(js) => strconv::format_jsonb(buf, js.0.as_ref()),
371            Value::List(elems) => strconv::format_list(buf, elems, |buf, elem| match elem {
372                None => Ok::<_, ()>(buf.write_null()),
373                Some(elem) => Ok(elem.encode_text(buf.nonnull_buffer())),
374            })
375            .expect("provided closure never fails"),
376            Value::Map(elems) => strconv::format_map(buf, elems, |buf, value| match value {
377                None => Ok::<_, ()>(buf.write_null()),
378                Some(elem) => Ok(elem.encode_text(buf.nonnull_buffer())),
379            })
380            .expect("provided closure never fails"),
381            Value::Oid(oid) => strconv::format_uint32(buf, *oid),
382            Value::Record(elems) => strconv::format_record(buf, elems, |buf, elem| match elem {
383                None => Ok::<_, ()>(buf.write_null()),
384                Some(elem) => Ok(elem.encode_text(buf.nonnull_buffer())),
385            })
386            .expect("provided closure never fails"),
387            Value::Text(s) | Value::VarChar(s) | Value::BpChar(s) | Value::Name(s) => {
388                strconv::format_string(buf, s)
389            }
390            Value::Time(t) => strconv::format_time(buf, *t),
391            Value::Timestamp(ts) => strconv::format_timestamp(buf, ts),
392            Value::TimestampTz(ts) => strconv::format_timestamptz(buf, ts),
393            Value::Uuid(u) => strconv::format_uuid(buf, *u),
394            Value::Numeric(d) => strconv::format_numeric(buf, &d.0),
395            Value::MzTimestamp(t) => strconv::format_mz_timestamp(buf, *t),
396            Value::Range(range) => strconv::format_range(buf, range, |buf, elem| match elem {
397                Some(elem) => Ok(elem.encode_text(buf.nonnull_buffer())),
398                None => Ok::<_, ()>(buf.write_null()),
399            })
400            .expect("provided closure never fails"),
401            Value::MzAclItem(mz_acl_item) => strconv::format_mz_acl_item(buf, *mz_acl_item),
402            Value::AclItem(acl_item) => strconv::format_acl_item(buf, *acl_item),
403        }
404    }
405
406    /// Serializes this value to `buf` using the [binary encoding
407    /// format](Format::Binary).
408    pub fn encode_binary(&self, ty: &Type, buf: &mut BytesMut) -> Result<(), io::Error> {
409        // NOTE: If implementing binary encoding for a previously unsupported `Value` type,
410        // please update the `can_encode_binary` method below.
411        let is_null = match self {
412            Value::Array { dims, elements } => {
413                let ndims = pg_len("number of array dimensions", dims.len())?;
414                let has_null = elements.iter().any(|e| e.is_none());
415                let elem_type = match ty {
416                    Type::Array(elem_type) => elem_type,
417                    _ => unreachable!(),
418                };
419                buf.put_i32(ndims);
420                buf.put_i32(has_null.into());
421                buf.put_u32(elem_type.oid());
422                for dim in dims {
423                    buf.put_i32(pg_len("array dimension length", dim.length)?);
424                    buf.put_i32(dim.lower_bound.try_into().map_err(|_| {
425                        io::Error::new(
426                            io::ErrorKind::Other,
427                            "array dimension lower bound does not fit into an i32",
428                        )
429                    })?);
430                }
431                for elem in elements {
432                    encode_element(buf, elem.as_ref(), elem_type)?;
433                }
434                Ok(postgres_types::IsNull::No)
435            }
436            // TODO: what is the binary format of vector types?
437            Value::Int2Vector { .. } => {
438                Err("binary encoding of int2vector is not implemented".into())
439            }
440            Value::Bool(b) => b.to_sql(&PgType::BOOL, buf),
441            Value::Bytea(b) => b.to_sql(&PgType::BYTEA, buf),
442            Value::Char(c) => i8::reinterpret_cast(*c).to_sql(&PgType::CHAR, buf),
443            Value::Date(d) => d.pg_epoch_days().to_sql(&PgType::DATE, buf),
444            Value::Float4(f) => f.to_sql(&PgType::FLOAT4, buf),
445            Value::Float8(f) => f.to_sql(&PgType::FLOAT8, buf),
446            Value::Int2(i) => i.to_sql(&PgType::INT2, buf),
447            Value::Int4(i) => i.to_sql(&PgType::INT4, buf),
448            Value::Int8(i) => i.to_sql(&PgType::INT8, buf),
449            Value::UInt2(u) => u.to_sql(&*UINT2, buf),
450            Value::UInt4(u) => u.to_sql(&*UINT4, buf),
451            Value::UInt8(u) => u.to_sql(&*UINT8, buf),
452            Value::Interval(iv) => iv.to_sql(&PgType::INTERVAL, buf),
453            Value::Jsonb(js) => js.to_sql(&PgType::JSONB, buf),
454            Value::List(_) => {
455                // A binary encoding for list is tricky. We only get one OID to
456                // describe the type of this list to the client. And we can't
457                // just up front allocate an OID for every possible list type,
458                // like PostgreSQL does for arrays, because, unlike arrays,
459                // lists can be arbitrarily nested.
460                //
461                // So, we'd need to synthesize a type with a stable OID whenever
462                // a new anonymous list type is *observed* in Materialize. Or we
463                // could mandate that only named list types can be sent over
464                // pgwire, and not anonymous list types, since named list types
465                // get a stable OID when they're created. Then we'd need to
466                // expose a table with the list OID -> element OID mapping for
467                // clients to query. And THEN we'd need to teach every client we
468                // care about how to query this table.
469                //
470                // This isn't intractible. It's how PostgreSQL's range type
471                // works, which is supported by many drivers. But our job is
472                // harder because most PostgreSQL drivers don't want to carry
473                // around code for Materialize-specific types. So we'd have to
474                // add type plugin infrastructure for those drivers, then
475                // distribute the list/map support as a plugin.
476                //
477                // Serializing the actual list would be simple, though: just a
478                // 32-bit integer describing the list length, followed by the
479                // encoding of each element in order.
480                //
481                // tl;dr it's a lot of work. For now, the recommended workaround
482                // is to either use the text encoding or convert the list to a
483                // different type (JSON, an array, unnest into rows) that does
484                // have a binary encoding.
485                Err("binary encoding of list types is not implemented".into())
486            }
487            Value::Map(_) => {
488                // Map binary encodings are hard for the same reason as list
489                // binary encodings (described above). You just have key and
490                // value OIDs to deal with rather than an element OID.
491                Err("binary encoding of map types is not implemented".into())
492            }
493            Value::Name(s) => s.to_sql(&PgType::NAME, buf),
494            Value::Oid(i) => i.to_sql(&PgType::OID, buf),
495            Value::Record(fields) => {
496                let nfields = pg_len("record field length", fields.len())?;
497                buf.put_i32(nfields);
498                let field_types = match ty {
499                    Type::Record(fields) => fields,
500                    _ => unreachable!(),
501                };
502                for (f, ty) in fields.iter().zip_eq(field_types) {
503                    buf.put_u32(ty.oid());
504                    encode_element(buf, f.as_ref(), ty)?;
505                }
506                Ok(postgres_types::IsNull::No)
507            }
508            Value::Text(s) => s.to_sql(&PgType::TEXT, buf),
509            Value::BpChar(s) => s.to_sql(&PgType::BPCHAR, buf),
510            Value::VarChar(s) => s.to_sql(&PgType::VARCHAR, buf),
511            Value::Time(t) => t.to_sql(&PgType::TIME, buf),
512            Value::Timestamp(ts) => ts.to_sql(&PgType::TIMESTAMP, buf),
513            Value::TimestampTz(ts) => ts.to_sql(&PgType::TIMESTAMPTZ, buf),
514            Value::Uuid(u) => u.to_sql(&PgType::UUID, buf),
515            Value::Numeric(a) => a.to_sql(&PgType::NUMERIC, buf),
516            Value::MzTimestamp(t) => t.to_string().to_sql(&PgType::TEXT, buf),
517            Value::Range(range) => {
518                buf.put_u8(range.pg_flag_bits());
519
520                let elem_type = match ty {
521                    Type::Range { element_type } => element_type,
522                    _ => unreachable!(),
523                };
524
525                if let Some(RangeInner { lower, upper }) = &range.inner {
526                    for bound in [&lower.bound, &upper.bound] {
527                        if let Some(bound) = bound {
528                            let base = buf.len();
529                            buf.put_i32(0);
530                            bound.encode_binary(elem_type, buf)?;
531                            let len = pg_len("encoded range bound", buf.len() - base - 4)?;
532                            buf[base..base + 4].copy_from_slice(&len.to_be_bytes());
533                        }
534                    }
535                }
536                Ok(postgres_types::IsNull::No)
537            }
538            Value::MzAclItem(mz_acl_item) => {
539                buf.extend_from_slice(&mz_acl_item.encode_binary());
540                Ok(postgres_types::IsNull::No)
541            }
542            Value::AclItem(_) => Err("aclitem has no binary encoding".into()),
543        }
544        .expect("encode_binary should never trigger a to_sql failure");
545        if let IsNull::Yes = is_null {
546            panic!("encode_binary impossibly called on a null value")
547        }
548        Ok(())
549    }
550
551    /// Static helper method to pre-validate that a given Datum corresponding to
552    /// the provided `SqlScalarType` can be converted into a `Value` and then encoded
553    /// as binary using `encode_binary` without an error.
554    pub fn can_encode_binary(typ: &SqlScalarType) -> bool {
555        match typ {
556            SqlScalarType::Bool => true,
557            SqlScalarType::Int16 => true,
558            SqlScalarType::Int32 => true,
559            SqlScalarType::Int64 => true,
560            SqlScalarType::PgLegacyChar => true,
561            SqlScalarType::UInt16 => true,
562            SqlScalarType::Oid => true,
563            SqlScalarType::RegClass => true,
564            SqlScalarType::RegProc => true,
565            SqlScalarType::RegType => true,
566            SqlScalarType::UInt32 => true,
567            SqlScalarType::UInt64 => true,
568            SqlScalarType::Float32 => true,
569            SqlScalarType::Float64 => true,
570            SqlScalarType::Numeric { .. } => true,
571            SqlScalarType::MzTimestamp => true,
572            SqlScalarType::MzAclItem => true,
573            SqlScalarType::AclItem => false, // "aclitem has no binary encoding"
574            SqlScalarType::Date => true,
575            SqlScalarType::Time => true,
576            SqlScalarType::Timestamp { .. } => true,
577            SqlScalarType::TimestampTz { .. } => true,
578            SqlScalarType::Interval => true,
579            SqlScalarType::Bytes => true,
580            SqlScalarType::String => true,
581            SqlScalarType::VarChar { .. } => true,
582            SqlScalarType::Char { .. } => true,
583            SqlScalarType::PgLegacyName => true,
584            SqlScalarType::Jsonb => true,
585            SqlScalarType::Uuid => true,
586            SqlScalarType::Array(elem_type) => Self::can_encode_binary(elem_type),
587            SqlScalarType::Int2Vector => false, // "binary encoding of int2vector is not implemented"
588            SqlScalarType::List { .. } => false, // "binary encoding of list types is not implemented"
589            SqlScalarType::Map { .. } => false, // "binary encoding of map types is not implemented"
590            SqlScalarType::Record { fields, .. } => fields
591                .iter()
592                .all(|(_, ty)| Self::can_encode_binary(&ty.scalar_type)),
593            SqlScalarType::Range { element_type } => Self::can_encode_binary(element_type),
594        }
595    }
596
597    /// Deserializes a value of type `ty` from `raw` using the specified
598    /// `format`.
599    pub fn decode(
600        format: Format,
601        ty: &Type,
602        raw: &[u8],
603    ) -> Result<Value, Box<dyn Error + Sync + Send>> {
604        match format {
605            Format::Text => Value::decode_text(ty, raw),
606            Format::Binary => Value::decode_binary(ty, raw),
607        }
608    }
609
610    /// Deserializes a value of type `ty` from `raw` using the [text encoding
611    /// format](Format::Text).
612    pub fn decode_text<'a>(
613        ty: &'a Type,
614        raw: &'a [u8],
615    ) -> Result<Value, Box<dyn Error + Sync + Send>> {
616        let s = str::from_utf8(raw)?;
617        Ok(match ty {
618            Type::Array(elem_type) => {
619                let (elements, dims) = strconv::parse_array(
620                    s,
621                    || None,
622                    |elem_text| Value::decode_text(elem_type, elem_text.as_bytes()).map(Some),
623                )?;
624                Value::Array { dims, elements }
625            }
626            Type::Int2Vector { .. } => {
627                return Err("input of Int2Vector types is not implemented".into());
628            }
629            Type::Bool => Value::Bool(strconv::parse_bool(s)?),
630            Type::Bytea => Value::Bytea(strconv::parse_bytes(s)?),
631            Type::Char => Value::Char(raw.get(0).copied().unwrap_or(0)),
632            Type::Date => Value::Date(strconv::parse_date(s)?),
633            Type::Float4 => Value::Float4(strconv::parse_float32(s)?),
634            Type::Float8 => Value::Float8(strconv::parse_float64(s)?),
635            Type::Int2 => Value::Int2(strconv::parse_int16(s)?),
636            Type::Int4 => Value::Int4(strconv::parse_int32(s)?),
637            Type::Int8 => Value::Int8(strconv::parse_int64(s)?),
638            Type::UInt2 => Value::UInt2(UInt2(strconv::parse_uint16(s)?)),
639            Type::UInt4 => Value::UInt4(UInt4(strconv::parse_uint32(s)?)),
640            Type::UInt8 => Value::UInt8(UInt8(strconv::parse_uint64(s)?)),
641            Type::Interval { .. } => Value::Interval(Interval(strconv::parse_interval(s)?)),
642            Type::Json => return Err("input of json types is not implemented".into()),
643            Type::Jsonb => Value::Jsonb(Jsonb(strconv::parse_jsonb(s)?)),
644            Type::List(elem_type) => Value::List(strconv::parse_list(
645                s,
646                matches!(**elem_type, Type::List(..)),
647                || None,
648                |elem_text| Value::decode_text(elem_type, elem_text.as_bytes()).map(Some),
649            )?),
650            Type::Map { value_type } => Value::Map(strconv::parse_map(
651                s,
652                matches!(**value_type, Type::Map { .. }),
653                |elem_text| {
654                    elem_text
655                        .map(|t| Value::decode_text(value_type, t.as_bytes()))
656                        .transpose()
657                },
658            )?),
659            Type::Name => Value::Name(strconv::parse_pg_legacy_name(s)),
660            Type::Numeric { .. } => Value::Numeric(Numeric(strconv::parse_numeric(s)?)),
661            Type::Oid | Type::RegClass | Type::RegProc | Type::RegType => {
662                Value::Oid(strconv::parse_oid(s)?)
663            }
664            Type::Record(_) => {
665                return Err("input of anonymous composite types is not implemented".into());
666            }
667            Type::Text => Value::Text(s.to_owned()),
668            Type::BpChar { .. } => Value::BpChar(s.to_owned()),
669            Type::VarChar { .. } => Value::VarChar(s.to_owned()),
670            Type::Time { .. } => Value::Time(strconv::parse_time(s)?),
671            Type::TimeTz { .. } => return Err("input of timetz types is not implemented".into()),
672            Type::Timestamp { .. } => Value::Timestamp(strconv::parse_timestamp(s)?),
673            Type::TimestampTz { .. } => Value::TimestampTz(strconv::parse_timestamptz(s)?),
674            Type::Uuid => Value::Uuid(Uuid::parse_str(s)?),
675            Type::MzTimestamp => Value::MzTimestamp(strconv::parse_mz_timestamp(s)?),
676            Type::Range { element_type } => Value::Range(strconv::parse_range(s, |elem_text| {
677                Value::decode_text(element_type, elem_text.as_bytes()).map(Box::new)
678            })?),
679            Type::MzAclItem => Value::MzAclItem(strconv::parse_mz_acl_item(s)?),
680            Type::AclItem => Value::AclItem(strconv::parse_acl_item(s)?),
681        })
682    }
683
684    /// Deserializes a value of type `ty` from `s` using the [text encoding format](Format::Text).
685    pub fn decode_text_into_row<'a>(
686        ty: &'a Type,
687        s: &'a str,
688        packer: &mut RowPacker,
689    ) -> Result<(), Box<dyn Error + Sync + Send>> {
690        Ok(match ty {
691            Type::Array(elem_type) => {
692                let (elements, dims) =
693                    strconv::parse_array(s, || None, |elem_text| Ok::<_, String>(Some(elem_text)))?;
694                // SAFETY: The function returns the number of times it called `push` on the packer.
695                unsafe {
696                    packer.push_array_with_unchecked(&dims, |packer| {
697                        let mut nelements = 0;
698                        for element in elements {
699                            match element {
700                                Some(elem_text) => {
701                                    Value::decode_text_into_row(elem_type, &elem_text, packer)?
702                                }
703
704                                None => packer.push(Datum::Null),
705                            }
706                            nelements += 1;
707                        }
708                        Ok::<_, Box<dyn Error + Sync + Send>>(nelements)
709                    })?
710                }
711            }
712            Type::Int2Vector { .. } => {
713                return Err("input of Int2Vector types is not implemented".into());
714            }
715            Type::Bool => packer.push(Datum::from(strconv::parse_bool(s)?)),
716            Type::Bytea => packer.push(Datum::Bytes(&strconv::parse_bytes(s)?)),
717            Type::Char => packer.push(Datum::UInt8(s.as_bytes().get(0).copied().unwrap_or(0))),
718            Type::Date => packer.push(Datum::Date(strconv::parse_date(s)?)),
719            Type::Float4 => packer.push(Datum::Float32(strconv::parse_float32(s)?.into())),
720            Type::Float8 => packer.push(Datum::Float64(strconv::parse_float64(s)?.into())),
721            Type::Int2 => packer.push(Datum::Int16(strconv::parse_int16(s)?)),
722            Type::Int4 => packer.push(Datum::Int32(strconv::parse_int32(s)?)),
723            Type::Int8 => packer.push(Datum::Int64(strconv::parse_int64(s)?)),
724            Type::UInt2 => packer.push(Datum::UInt16(strconv::parse_uint16(s)?)),
725            Type::UInt4 => packer.push(Datum::UInt32(strconv::parse_uint32(s)?)),
726            Type::UInt8 => packer.push(Datum::UInt64(strconv::parse_uint64(s)?)),
727            Type::Interval { .. } => packer.push(Datum::Interval(strconv::parse_interval(s)?)),
728            Type::Json => return Err("input of json types is not implemented".into()),
729            Type::Jsonb => packer.push(strconv::parse_jsonb(s)?.into_row().unpack_first()),
730            Type::List(elem_type) => {
731                let elems = strconv::parse_list(
732                    s,
733                    matches!(**elem_type, Type::List(..)),
734                    || None,
735                    |elem_text| Ok::<_, String>(Some(elem_text)),
736                )?;
737                packer.push_list_with(|packer| {
738                    for elem in elems {
739                        match elem {
740                            Some(elem) => Value::decode_text_into_row(elem_type, &elem, packer)?,
741                            None => packer.push(Datum::Null),
742                        }
743                    }
744                    Ok::<_, Box<dyn Error + Sync + Send>>(())
745                })?;
746            }
747            Type::Map { value_type } => {
748                let map =
749                    strconv::parse_map(s, matches!(**value_type, Type::Map { .. }), |elem_text| {
750                        elem_text.map(Ok::<_, String>).transpose()
751                    })?;
752                packer.push_dict_with(|row| {
753                    for (k, v) in map {
754                        row.push(Datum::String(&k));
755                        match v {
756                            Some(elem) => Value::decode_text_into_row(value_type, &elem, row)?,
757                            None => row.push(Datum::Null),
758                        }
759                    }
760                    Ok::<_, Box<dyn Error + Sync + Send>>(())
761                })?;
762            }
763            Type::Name => packer.push(Datum::String(&strconv::parse_pg_legacy_name(s))),
764            Type::Numeric { .. } => packer.push(Datum::Numeric(strconv::parse_numeric(s)?)),
765            Type::Oid | Type::RegClass | Type::RegProc | Type::RegType => {
766                packer.push(Datum::UInt32(strconv::parse_oid(s)?))
767            }
768            Type::Record(_) => {
769                return Err("input of anonymous composite types is not implemented".into());
770            }
771            Type::Text => packer.push(Datum::String(s)),
772            Type::BpChar { .. } => packer.push(Datum::String(s.trim_end())),
773            Type::VarChar { .. } => packer.push(Datum::String(s)),
774            Type::Time { .. } => packer.push(Datum::Time(strconv::parse_time(s)?)),
775            Type::TimeTz { .. } => return Err("input of timetz types is not implemented".into()),
776            Type::Timestamp { .. } => packer.push(Datum::Timestamp(strconv::parse_timestamp(s)?)),
777            Type::TimestampTz { .. } => {
778                packer.push(Datum::TimestampTz(strconv::parse_timestamptz(s)?))
779            }
780            Type::Uuid => packer.push(Datum::Uuid(Uuid::parse_str(s)?)),
781            Type::MzTimestamp => packer.push(Datum::MzTimestamp(strconv::parse_mz_timestamp(s)?)),
782            Type::Range { element_type } => {
783                let range = strconv::parse_range(s, |elem_text| {
784                    Value::decode_text(element_type, elem_text.as_bytes()).map(Box::new)
785                })?;
786                // TODO: We should be able to push ranges without scratch space, but that requires
787                // a different `push_range` API.
788                let buf = RowArena::new();
789                let range = range.into_bounds(|elem| elem.into_datum(&buf, element_type));
790
791                packer.push_range(range).unwrap()
792            }
793            Type::MzAclItem => packer.push(Datum::MzAclItem(strconv::parse_mz_acl_item(s)?)),
794            Type::AclItem => packer.push(Datum::AclItem(strconv::parse_acl_item(s)?)),
795        })
796    }
797
798    /// Deserializes a value of type `ty` from `raw` using the [binary encoding
799    /// format](Format::Binary).
800    pub fn decode_binary(ty: &Type, raw: &[u8]) -> Result<Value, Box<dyn Error + Sync + Send>> {
801        match ty {
802            Type::Array(_) => Err("input of array types is not implemented".into()),
803            Type::Int2Vector => Err("input of int2vector types is not implemented".into()),
804            Type::Bool => bool::from_sql(ty.inner(), raw).map(Value::Bool),
805            Type::Bytea => Vec::<u8>::from_sql(ty.inner(), raw).map(Value::Bytea),
806            Type::Char => {
807                i8::from_sql(ty.inner(), raw).map(|c| Value::Char(u8::reinterpret_cast(c)))
808            }
809            Type::Date => {
810                let days = i32::from_sql(ty.inner(), raw)?;
811                Ok(Value::Date(Date::from_pg_epoch(days)?))
812            }
813            Type::Float4 => f32::from_sql(ty.inner(), raw).map(Value::Float4),
814            Type::Float8 => f64::from_sql(ty.inner(), raw).map(Value::Float8),
815            Type::Int2 => i16::from_sql(ty.inner(), raw).map(Value::Int2),
816            Type::Int4 => i32::from_sql(ty.inner(), raw).map(Value::Int4),
817            Type::Int8 => i64::from_sql(ty.inner(), raw).map(Value::Int8),
818            Type::UInt2 => UInt2::from_sql(ty.inner(), raw).map(Value::UInt2),
819            Type::UInt4 => UInt4::from_sql(ty.inner(), raw).map(Value::UInt4),
820            Type::UInt8 => UInt8::from_sql(ty.inner(), raw).map(Value::UInt8),
821            Type::Interval { .. } => Interval::from_sql(ty.inner(), raw).map(Value::Interval),
822            Type::Json => Err("input of json types is not implemented".into()),
823            Type::Jsonb => Jsonb::from_sql(ty.inner(), raw).map(Value::Jsonb),
824            Type::List(_) => Err("binary decoding of list types is not implemented".into()),
825            Type::Map { .. } => Err("binary decoding of map types is not implemented".into()),
826            Type::Name => {
827                let s = String::from_sql(ty.inner(), raw)?;
828                if s.len() > NAME_MAX_BYTES {
829                    return Err("identifier too long".into());
830                }
831                Ok(Value::Name(s))
832            }
833            Type::Numeric { .. } => Numeric::from_sql(ty.inner(), raw).map(Value::Numeric),
834            Type::Oid | Type::RegClass | Type::RegProc | Type::RegType => {
835                u32::from_sql(ty.inner(), raw).map(Value::Oid)
836            }
837            Type::Record(_) => Err("input of anonymous composite types is not implemented".into()),
838            Type::Text => String::from_sql(ty.inner(), raw).map(Value::Text),
839            Type::BpChar { .. } => String::from_sql(ty.inner(), raw).map(Value::BpChar),
840            Type::VarChar { .. } => String::from_sql(ty.inner(), raw).map(Value::VarChar),
841            Type::Time { .. } => NaiveTime::from_sql(ty.inner(), raw).map(Value::Time),
842            Type::TimeTz { .. } => Err("input of timetz types is not implemented".into()),
843            Type::Timestamp { .. } => {
844                let ts = NaiveDateTime::from_sql(ty.inner(), raw)?;
845                Ok(Value::Timestamp(CheckedTimestamp::from_timestamplike(ts)?))
846            }
847            Type::TimestampTz { .. } => {
848                let ts = DateTime::<Utc>::from_sql(ty.inner(), raw)?;
849                Ok(Value::TimestampTz(CheckedTimestamp::from_timestamplike(
850                    ts,
851                )?))
852            }
853            Type::Uuid => Uuid::from_sql(ty.inner(), raw).map(Value::Uuid),
854            Type::MzTimestamp => {
855                let s = String::from_sql(ty.inner(), raw)?;
856                let t: mz_repr::Timestamp = s.parse()?;
857                Ok(Value::MzTimestamp(t))
858            }
859            Type::Range { .. } => Err("binary decoding of range types is not implemented".into()),
860            Type::MzAclItem => {
861                let mz_acl_item = MzAclItem::decode_binary(raw)?;
862                Ok(Value::MzAclItem(mz_acl_item))
863            }
864            Type::AclItem => Err("aclitem has no binary encoding".into()),
865        }
866    }
867}
868
869fn encode_element(buf: &mut BytesMut, elem: Option<&Value>, ty: &Type) -> Result<(), io::Error> {
870    match elem {
871        None => buf.put_i32(-1),
872        Some(elem) => {
873            let base = buf.len();
874            buf.put_i32(0);
875            elem.encode_binary(ty, buf)?;
876            let len = pg_len("encoded element", buf.len() - base - 4)?;
877            buf[base..base + 4].copy_from_slice(&len.to_be_bytes());
878        }
879    }
880    Ok(())
881}
882
883fn pg_len(what: &str, len: usize) -> Result<i32, io::Error> {
884    len.try_into().map_err(|_| {
885        io::Error::new(
886            io::ErrorKind::Other,
887            format!("{} does not fit into an i32", what),
888        )
889    })
890}
891
892/// Converts a Materialize row into a vector of PostgreSQL values.
893///
894/// Calling this function is equivalent to mapping [`Value::from_datum`] over
895/// every datum in `row`.
896pub fn values_from_row(row: &RowRef, typ: &SqlRelationType) -> Vec<Option<Value>> {
897    row.iter()
898        .zip_eq(typ.column_types.iter())
899        .map(|(col, typ)| Value::from_datum(col, &typ.scalar_type))
900        .collect()
901}
902
903#[cfg(test)]
904mod tests {
905    use super::*;
906
907    /// Verifies that we correctly print the chain of parsing errors, all the way through the stack.
908    #[mz_ore::test]
909    fn decode_text_error_smoke_test() {
910        let bool_array = Value::Array {
911            dims: vec![ArrayDimension {
912                lower_bound: 0,
913                length: 1,
914            }],
915            elements: vec![Some(Value::Bool(true))],
916        };
917
918        let mut buf = BytesMut::new();
919        bool_array.encode_text(&mut buf);
920        let buf = buf.to_vec();
921
922        let int_array_tpe = Type::Array(Box::new(Type::Int4));
923        let decoded_int_array = Value::decode_text(&int_array_tpe, &buf);
924
925        assert_eq!(
926            decoded_int_array.map_err(|e| e.to_string()).unwrap_err(),
927            "invalid input syntax for type array: Specifying array lower bounds is not supported: \"[0:0]={t}\"".to_string()
928        );
929    }
930}