Skip to main content

mz_pgrepr/
value.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::BTreeMap;
11use std::error::Error;
12use std::{io, str};
13
14use bytes::{BufMut, BytesMut};
15use chrono::{DateTime, NaiveDateTime, NaiveTime, Utc};
16use itertools::Itertools;
17use mz_ore::cast::ReinterpretCast;
18use mz_pgrepr_consts::oid::TYPE_INT2_OID;
19use mz_pgwire_common::Format;
20use mz_repr::adt::array::ArrayDimension;
21use mz_repr::adt::char;
22use mz_repr::adt::date::Date;
23use mz_repr::adt::jsonb::JsonbRef;
24use mz_repr::adt::mz_acl_item::{AclItem, MzAclItem};
25use mz_repr::adt::pg_legacy_name::NAME_MAX_BYTES;
26use mz_repr::adt::range::{Range, RangeInner};
27use mz_repr::adt::timestamp::CheckedTimestamp;
28use mz_repr::strconv::{self, Nestable};
29use mz_repr::{Datum, RowArena, RowPacker, RowRef, SqlRelationType, SqlScalarType};
30use postgres_types::{FromSql, IsNull, ToSql, Type as PgType};
31use uuid::Uuid;
32
33use crate::types::{UINT2, UINT4, UINT8};
34use crate::value::error::IntoDatumError;
35use crate::{Interval, Jsonb, Numeric, Type, UInt2, UInt4, UInt8};
36
37pub mod error;
38pub mod interval;
39pub mod jsonb;
40pub mod numeric;
41pub mod record;
42pub mod unsigned;
43
44/// A PostgreSQL datum.
45#[derive(Debug)]
46pub enum Value {
47    /// A variable-length, multi-dimensional array of values.
48    Array {
49        /// The dimensions of the array.
50        dims: Vec<ArrayDimension>,
51        /// The elements of the array.
52        elements: Vec<Option<Value>>,
53    },
54    /// A boolean value.
55    Bool(bool),
56    /// A byte array, i.e., a variable-length binary string.
57    Bytea(Vec<u8>),
58    /// A single-byte character.
59    Char(u8),
60    /// A date.
61    Date(Date),
62    /// A 4-byte floating point number.
63    Float4(f32),
64    /// An 8-byte floating point number.
65    Float8(f64),
66    /// A 2-byte signed integer.
67    Int2(i16),
68    /// A 4-byte signed integer.
69    Int4(i32),
70    /// An 8-byte signed integer.
71    Int8(i64),
72    /// A 2-byte unsigned integer.
73    UInt2(UInt2),
74    /// A 4-byte unsigned integer.
75    UInt4(UInt4),
76    /// An 8-byte unsigned integer.
77    UInt8(UInt8),
78    /// A time interval.
79    Interval(Interval),
80    /// A binary JSON blob.
81    Jsonb(Jsonb),
82    /// A sequence of homogeneous values.
83    List(Vec<Option<Value>>),
84    /// A map of string keys and homogeneous values.
85    Map(BTreeMap<String, Option<Value>>),
86    /// An identifier string of no more than 64 characters in length.
87    Name(String),
88    /// An arbitrary precision number.
89    Numeric(Numeric),
90    /// An object identifier.
91    Oid(u32),
92    /// A sequence of heterogeneous values.
93    Record(Vec<Option<Value>>),
94    /// A time.
95    Time(NaiveTime),
96    /// A date and time, without a timezone.
97    Timestamp(CheckedTimestamp<NaiveDateTime>),
98    /// A date and time, with a timezone.
99    TimestampTz(CheckedTimestamp<DateTime<Utc>>),
100    /// A variable-length string.
101    Text(String),
102    /// A fixed-length string.
103    BpChar(String),
104    /// A variable-length string with an optional limit.
105    VarChar(String),
106    /// A universally unique identifier.
107    Uuid(Uuid),
108    /// A small int vector.
109    Int2Vector {
110        /// The elements of the vector.
111        elements: Vec<Option<Value>>,
112    },
113    /// A Materialize timestamp.
114    MzTimestamp(mz_repr::Timestamp),
115    /// A contiguous range of values along a domain.
116    Range(Range<Box<Value>>),
117    /// A list of privileges granted to a role, that uses [`mz_repr::role_id::RoleId`]s for role
118    /// references.
119    MzAclItem(MzAclItem),
120    /// A list of privileges granted to a user that uses [`mz_repr::adt::system::Oid`]s for role
121    /// references. This type is used primarily for compatibility with PostgreSQL.
122    AclItem(AclItem),
123}
124
125impl Value {
126    /// Constructs a new `Value` from a Materialize datum.
127    ///
128    /// The conversion happens in the obvious manner, except that `Datum::Null`
129    /// is converted to `None` to align with how PostgreSQL handles NULL.
130    pub fn from_datum(datum: Datum, typ: &SqlScalarType) -> Option<Value> {
131        match (datum, typ) {
132            (Datum::Null, _) => None,
133            (Datum::True, SqlScalarType::Bool) => Some(Value::Bool(true)),
134            (Datum::False, SqlScalarType::Bool) => Some(Value::Bool(false)),
135            (Datum::Int16(i), SqlScalarType::Int16) => Some(Value::Int2(i)),
136            (Datum::Int32(i), SqlScalarType::Int32) => Some(Value::Int4(i)),
137            (Datum::Int64(i), SqlScalarType::Int64) => Some(Value::Int8(i)),
138            (Datum::UInt8(c), SqlScalarType::PgLegacyChar) => Some(Value::Char(c)),
139            (Datum::UInt16(u), SqlScalarType::UInt16) => Some(Value::UInt2(UInt2(u))),
140            (Datum::UInt32(oid), SqlScalarType::Oid) => Some(Value::Oid(oid)),
141            (Datum::UInt32(oid), SqlScalarType::RegClass) => Some(Value::Oid(oid)),
142            (Datum::UInt32(oid), SqlScalarType::RegProc) => Some(Value::Oid(oid)),
143            (Datum::UInt32(oid), SqlScalarType::RegType) => Some(Value::Oid(oid)),
144            (Datum::UInt32(u), SqlScalarType::UInt32) => Some(Value::UInt4(UInt4(u))),
145            (Datum::UInt64(u), SqlScalarType::UInt64) => Some(Value::UInt8(UInt8(u))),
146            (Datum::Float32(f), SqlScalarType::Float32) => Some(Value::Float4(*f)),
147            (Datum::Float64(f), SqlScalarType::Float64) => Some(Value::Float8(*f)),
148            (Datum::Numeric(d), SqlScalarType::Numeric { .. }) => Some(Value::Numeric(Numeric(d))),
149            (Datum::MzTimestamp(t), SqlScalarType::MzTimestamp) => Some(Value::MzTimestamp(t)),
150            (Datum::MzAclItem(mai), SqlScalarType::MzAclItem) => Some(Value::MzAclItem(mai)),
151            (Datum::AclItem(ai), SqlScalarType::AclItem) => Some(Value::AclItem(ai)),
152            (Datum::Date(d), SqlScalarType::Date) => Some(Value::Date(d)),
153            (Datum::Time(t), SqlScalarType::Time) => Some(Value::Time(t)),
154            (Datum::Timestamp(ts), SqlScalarType::Timestamp { .. }) => Some(Value::Timestamp(ts)),
155            (Datum::TimestampTz(ts), SqlScalarType::TimestampTz { .. }) => {
156                Some(Value::TimestampTz(ts))
157            }
158            (Datum::Interval(iv), SqlScalarType::Interval) => Some(Value::Interval(Interval(iv))),
159            (Datum::Bytes(b), SqlScalarType::Bytes) => Some(Value::Bytea(b.to_vec())),
160            (Datum::String(s), SqlScalarType::String) => Some(Value::Text(s.to_owned())),
161            (Datum::String(s), SqlScalarType::VarChar { .. }) => Some(Value::VarChar(s.to_owned())),
162            (Datum::String(s), SqlScalarType::Char { length }) => {
163                Some(Value::BpChar(char::format_str_pad(s, *length)))
164            }
165            (Datum::String(s), SqlScalarType::PgLegacyName) => Some(Value::Name(s.into())),
166            (_, SqlScalarType::Jsonb) => {
167                Some(Value::Jsonb(Jsonb(JsonbRef::from_datum(datum).to_owned())))
168            }
169            (Datum::Uuid(u), SqlScalarType::Uuid) => Some(Value::Uuid(u)),
170            (Datum::Array(array), SqlScalarType::Array(elem_type)) => {
171                let dims = array.dims().into_iter().collect();
172                let elements = array
173                    .elements()
174                    .iter()
175                    .map(|elem| Value::from_datum(elem, elem_type))
176                    .collect();
177                Some(Value::Array { dims, elements })
178            }
179            (Datum::Array(array), SqlScalarType::Int2Vector) => {
180                assert!(
181                    array.has_int2vector_dims(),
182                    "int2vector must be 1 dimensional, or empty"
183                );
184                let elements = array
185                    .elements()
186                    .iter()
187                    .map(|elem| Value::from_datum(elem, &SqlScalarType::Int16))
188                    .collect();
189                Some(Value::Int2Vector { elements })
190            }
191            (Datum::List(list), SqlScalarType::List { element_type, .. }) => {
192                let elements = list
193                    .iter()
194                    .map(|elem| Value::from_datum(elem, element_type))
195                    .collect();
196                Some(Value::List(elements))
197            }
198            (Datum::List(record), SqlScalarType::Record { fields, .. }) => {
199                let fields = record
200                    .iter()
201                    .zip_eq(fields)
202                    .map(|(e, (_name, ty))| Value::from_datum(e, &ty.scalar_type))
203                    .collect();
204                Some(Value::Record(fields))
205            }
206            (Datum::Map(dict), SqlScalarType::Map { value_type, .. }) => {
207                let entries = dict
208                    .iter()
209                    .map(|(k, v)| (k.to_owned(), Value::from_datum(v, value_type)))
210                    .collect();
211                Some(Value::Map(entries))
212            }
213            (Datum::Range(range), SqlScalarType::Range { element_type }) => {
214                let value_range = range.into_bounds(|b| {
215                    Box::new(
216                        Value::from_datum(b.datum(), element_type)
217                            .expect("RangeBounds never contain Datum::Null"),
218                    )
219                });
220                Some(Value::Range(value_range))
221            }
222            _ => panic!("can't serialize {}::{:?}", datum, typ),
223        }
224    }
225
226    /// Converts a Materialize datum from this value.
227    pub fn into_datum<'a>(
228        self,
229        buf: &'a RowArena,
230        typ: &Type,
231    ) -> Result<Datum<'a>, IntoDatumError> {
232        Ok(match self {
233            Value::Array { dims, elements } => {
234                let element_pg_type = match typ {
235                    Type::Array(t) => &*t,
236                    _ => panic!("Value::Array should have type Type::Array. Found {:?}", typ),
237                };
238                let elements: Result<Vec<_>, _> = elements
239                    .into_iter()
240                    .map(|element| match element {
241                        Some(element) => element.into_datum(buf, element_pg_type),
242                        None => Ok(Datum::Null),
243                    })
244                    .collect();
245                let elements = elements?;
246                buf.try_make_datum(|packer| {
247                    packer
248                        .try_push_array(&dims, elements.into_iter())
249                        .map_err(IntoDatumError::from)
250                })?
251            }
252            Value::Int2Vector { .. } => {
253                // This situation is handled gracefully by Value::decode; if we
254                // wind up here it's a programming error.
255                unreachable!("into_datum cannot be called on Value::Int2Vector");
256            }
257            Value::Bool(true) => Datum::True,
258            Value::Bool(false) => Datum::False,
259            Value::Bytea(b) => Datum::Bytes(buf.push_bytes(b)),
260            Value::Char(c) => Datum::UInt8(c),
261            Value::Date(d) => Datum::Date(d),
262            Value::Float4(f) => Datum::Float32(f.into()),
263            Value::Float8(f) => Datum::Float64(f.into()),
264            Value::Int2(i) => Datum::Int16(i),
265            Value::Int4(i) => Datum::Int32(i),
266            Value::Int8(i) => Datum::Int64(i),
267            Value::UInt2(u) => Datum::UInt16(u.0),
268            Value::UInt4(u) => Datum::UInt32(u.0),
269            Value::UInt8(u) => Datum::UInt64(u.0),
270            Value::Jsonb(js) => buf.push_unary_row(js.0.into_row()),
271            Value::List(elems) => {
272                let elem_pg_type = match typ {
273                    Type::List(t) => &*t,
274                    _ => panic!("Value::List should have type Type::List. Found {:?}", typ),
275                };
276                let elems: Result<Vec<_>, _> = elems
277                    .into_iter()
278                    .map(|elem| match elem {
279                        Some(elem) => elem.into_datum(buf, elem_pg_type),
280                        None => Ok(Datum::Null),
281                    })
282                    .collect();
283                let elems = elems?;
284                buf.make_datum(|packer| packer.push_list(elems))
285            }
286            Value::Map(map) => {
287                let elem_pg_type = match typ {
288                    Type::Map { value_type } => &*value_type,
289                    _ => panic!("Value::Map should have type Type::Map. Found {:?}", typ),
290                };
291                buf.try_make_datum(|packer| {
292                    packer.try_push_dict_with(|row| {
293                        for (k, v) in map {
294                            row.push(Datum::String(buf.push_string(k)));
295                            let datum = match v {
296                                Some(elem) => elem.into_datum(buf, elem_pg_type)?,
297                                None => Datum::Null,
298                            };
299                            row.push(datum);
300                        }
301                        Ok::<_, IntoDatumError>(())
302                    })
303                })?
304            }
305            Value::Oid(oid) => Datum::UInt32(oid),
306            Value::Record(_) => {
307                // This situation is handled gracefully by Value::decode; if we
308                // wind up here it's a programming error.
309                unreachable!("into_datum cannot be called on Value::Record");
310            }
311            Value::Time(t) => Datum::Time(t),
312            Value::Timestamp(ts) => Datum::Timestamp(ts),
313            Value::TimestampTz(ts) => Datum::TimestampTz(ts),
314            Value::Interval(iv) => Datum::Interval(iv.0),
315            Value::Text(s) | Value::VarChar(s) | Value::Name(s) => {
316                Datum::String(buf.push_string(s))
317            }
318            Value::BpChar(s) => Datum::String(buf.push_string(s.trim_end().into())),
319            Value::Uuid(u) => Datum::Uuid(u),
320            Value::Numeric(n) => Datum::Numeric(n.0),
321            Value::MzTimestamp(t) => Datum::MzTimestamp(t),
322            Value::Range(range) => {
323                let elem_pg_type = match typ {
324                    Type::Range { element_type } => &*element_type,
325                    _ => panic!("Value::Range should have type Type::Range. Found {:?}", typ),
326                };
327                let range = range.try_into_bounds(|elem| elem.into_datum(buf, elem_pg_type))?;
328                buf.try_make_datum(|packer| packer.push_range(range).map_err(IntoDatumError::from))?
329            }
330            Value::MzAclItem(mz_acl_item) => Datum::MzAclItem(mz_acl_item),
331            Value::AclItem(acl_item) => Datum::AclItem(acl_item),
332        })
333    }
334
335    /// Like [`Self::into_datum`] but maps the error to a formatted string for decode/parameter contexts.
336    ///
337    /// Callers can then convert the `String` to their preferred error type (e.g. `io::Error`,
338    /// protocol error). The message is `"unable to decode {context}: {error}"`.
339    pub fn into_datum_decode_error<'a>(
340        self,
341        buf: &'a RowArena,
342        typ: &Type,
343        context: &str,
344    ) -> Result<Datum<'a>, String> {
345        self.into_datum(buf, typ)
346            .map_err(|e| format!("unable to decode {}: {}", context, e))
347    }
348
349    /// Serializes this value to `buf` in the specified `format`.
350    pub fn encode(&self, ty: &Type, format: Format, buf: &mut BytesMut) -> Result<(), io::Error> {
351        match format {
352            Format::Text => {
353                self.encode_text(buf);
354                Ok(())
355            }
356            Format::Binary => self.encode_binary(ty, buf),
357        }
358    }
359
360    /// Serializes this value to `buf` using the [text encoding
361    /// format](Format::Text).
362    pub fn encode_text(&self, buf: &mut BytesMut) -> Nestable {
363        match self {
364            Value::Array { dims, elements } => {
365                strconv::format_array(buf, dims, elements, |buf, elem| match elem {
366                    None => Ok::<_, ()>(buf.write_null()),
367                    Some(elem) => Ok(elem.encode_text(buf.nonnull_buffer())),
368                })
369                .expect("provided closure never fails")
370            }
371            Value::Int2Vector { elements } => {
372                strconv::format_legacy_vector(buf, elements, |buf, elem| {
373                    Ok::<_, ()>(
374                        elem.as_ref()
375                            .expect("Int2Vector does not support NULL values")
376                            .encode_text(buf.nonnull_buffer()),
377                    )
378                })
379                .expect("provided closure never fails")
380            }
381            Value::Bool(b) => strconv::format_bool(buf, *b),
382            Value::Bytea(b) => strconv::format_bytes(buf, b),
383            Value::Char(c) => {
384                buf.put_u8(*c);
385                Nestable::MayNeedEscaping
386            }
387            Value::Date(d) => strconv::format_date(buf, *d),
388            Value::Int2(i) => strconv::format_int16(buf, *i),
389            Value::Int4(i) => strconv::format_int32(buf, *i),
390            Value::Int8(i) => strconv::format_int64(buf, *i),
391            Value::UInt2(u) => strconv::format_uint16(buf, u.0),
392            Value::UInt4(u) => strconv::format_uint32(buf, u.0),
393            Value::UInt8(u) => strconv::format_uint64(buf, u.0),
394            Value::Interval(iv) => strconv::format_interval(buf, iv.0),
395            Value::Float4(f) => strconv::format_float32(buf, *f),
396            Value::Float8(f) => strconv::format_float64(buf, *f),
397            Value::Jsonb(js) => strconv::format_jsonb(buf, js.0.as_ref()),
398            Value::List(elems) => strconv::format_list(buf, elems, |buf, elem| match elem {
399                None => Ok::<_, ()>(buf.write_null()),
400                Some(elem) => Ok(elem.encode_text(buf.nonnull_buffer())),
401            })
402            .expect("provided closure never fails"),
403            Value::Map(elems) => strconv::format_map(buf, elems, |buf, value| match value {
404                None => Ok::<_, ()>(buf.write_null()),
405                Some(elem) => Ok(elem.encode_text(buf.nonnull_buffer())),
406            })
407            .expect("provided closure never fails"),
408            Value::Oid(oid) => strconv::format_uint32(buf, *oid),
409            Value::Record(elems) => strconv::format_record(buf, elems, |buf, elem| match elem {
410                None => Ok::<_, ()>(buf.write_null()),
411                Some(elem) => Ok(elem.encode_text(buf.nonnull_buffer())),
412            })
413            .expect("provided closure never fails"),
414            Value::Text(s) | Value::VarChar(s) | Value::BpChar(s) | Value::Name(s) => {
415                strconv::format_string(buf, s)
416            }
417            Value::Time(t) => strconv::format_time(buf, *t),
418            Value::Timestamp(ts) => strconv::format_timestamp(buf, ts),
419            Value::TimestampTz(ts) => strconv::format_timestamptz(buf, ts),
420            Value::Uuid(u) => strconv::format_uuid(buf, *u),
421            Value::Numeric(d) => strconv::format_numeric(buf, &d.0),
422            Value::MzTimestamp(t) => strconv::format_mz_timestamp(buf, *t),
423            Value::Range(range) => strconv::format_range(buf, range, |buf, elem| match elem {
424                Some(elem) => Ok(elem.encode_text(buf.nonnull_buffer())),
425                None => Ok::<_, ()>(buf.write_null()),
426            })
427            .expect("provided closure never fails"),
428            Value::MzAclItem(mz_acl_item) => strconv::format_mz_acl_item(buf, *mz_acl_item),
429            Value::AclItem(acl_item) => strconv::format_acl_item(buf, *acl_item),
430        }
431    }
432
433    /// Serializes this value to `buf` using the [binary encoding
434    /// format](Format::Binary).
435    pub fn encode_binary(&self, ty: &Type, buf: &mut BytesMut) -> Result<(), io::Error> {
436        // NOTE: If implementing binary encoding for a previously unsupported `Value` type,
437        // please update the `can_encode_binary` method below.
438        let is_null = match self {
439            Value::Array { dims, elements } => {
440                let ndims = pg_len("number of array dimensions", dims.len())?;
441                let has_null = elements.iter().any(|e| e.is_none());
442                let elem_type = match ty {
443                    Type::Array(elem_type) => elem_type,
444                    _ => unreachable!(),
445                };
446                buf.put_i32(ndims);
447                buf.put_i32(has_null.into());
448                buf.put_u32(elem_type.oid());
449                for dim in dims {
450                    buf.put_i32(pg_len("array dimension length", dim.length)?);
451                    buf.put_i32(dim.lower_bound.try_into().map_err(|_| {
452                        io::Error::new(
453                            io::ErrorKind::InvalidData,
454                            "array dimension lower bound does not fit into an i32",
455                        )
456                    })?);
457                }
458                for elem in elements {
459                    encode_element(buf, elem.as_ref(), elem_type)?;
460                }
461                Ok(postgres_types::IsNull::No)
462            }
463            Value::Int2Vector { elements } => {
464                // this should always be `false`, but there are exceptions in postgres
465                // feels better to compute this than to assert otherwise
466                let has_null = elements.iter().any(|e| e.is_none());
467                buf.put_i32(1);
468                buf.put_i32(has_null.into());
469                buf.put_u32(TYPE_INT2_OID);
470                buf.put_i32(pg_len("int2vector dimension length", elements.len())?);
471                buf.put_i32(0);
472                for elem in elements {
473                    encode_element(buf, elem.as_ref(), &Type::Int2)?;
474                }
475                Ok(postgres_types::IsNull::No)
476            }
477            Value::Bool(b) => b.to_sql(&PgType::BOOL, buf),
478            Value::Bytea(b) => b.to_sql(&PgType::BYTEA, buf),
479            Value::Char(c) => i8::reinterpret_cast(*c).to_sql(&PgType::CHAR, buf),
480            Value::Date(d) => d.pg_epoch_days().to_sql(&PgType::DATE, buf),
481            Value::Float4(f) => f.to_sql(&PgType::FLOAT4, buf),
482            Value::Float8(f) => f.to_sql(&PgType::FLOAT8, buf),
483            Value::Int2(i) => i.to_sql(&PgType::INT2, buf),
484            Value::Int4(i) => i.to_sql(&PgType::INT4, buf),
485            Value::Int8(i) => i.to_sql(&PgType::INT8, buf),
486            Value::UInt2(u) => u.to_sql(&*UINT2, buf),
487            Value::UInt4(u) => u.to_sql(&*UINT4, buf),
488            Value::UInt8(u) => u.to_sql(&*UINT8, buf),
489            Value::Interval(iv) => iv.to_sql(&PgType::INTERVAL, buf),
490            Value::Jsonb(js) => js.to_sql(&PgType::JSONB, buf),
491            Value::List(_) => {
492                // A binary encoding for list is tricky. We only get one OID to
493                // describe the type of this list to the client. And we can't
494                // just up front allocate an OID for every possible list type,
495                // like PostgreSQL does for arrays, because, unlike arrays,
496                // lists can be arbitrarily nested.
497                //
498                // So, we'd need to synthesize a type with a stable OID whenever
499                // a new anonymous list type is *observed* in Materialize. Or we
500                // could mandate that only named list types can be sent over
501                // pgwire, and not anonymous list types, since named list types
502                // get a stable OID when they're created. Then we'd need to
503                // expose a table with the list OID -> element OID mapping for
504                // clients to query. And THEN we'd need to teach every client we
505                // care about how to query this table.
506                //
507                // This isn't intractible. It's how PostgreSQL's range type
508                // works, which is supported by many drivers. But our job is
509                // harder because most PostgreSQL drivers don't want to carry
510                // around code for Materialize-specific types. So we'd have to
511                // add type plugin infrastructure for those drivers, then
512                // distribute the list/map support as a plugin.
513                //
514                // Serializing the actual list would be simple, though: just a
515                // 32-bit integer describing the list length, followed by the
516                // encoding of each element in order.
517                //
518                // tl;dr it's a lot of work. For now, the recommended workaround
519                // is to either use the text encoding or convert the list to a
520                // different type (JSON, an array, unnest into rows) that does
521                // have a binary encoding.
522                Err("binary encoding of list types is not implemented".into())
523            }
524            Value::Map(_) => {
525                // Map binary encodings are hard for the same reason as list
526                // binary encodings (described above). You just have key and
527                // value OIDs to deal with rather than an element OID.
528                Err("binary encoding of map types is not implemented".into())
529            }
530            Value::Name(s) => s.to_sql(&PgType::NAME, buf),
531            Value::Oid(i) => i.to_sql(&PgType::OID, buf),
532            Value::Record(fields) => {
533                let nfields = pg_len("record field length", fields.len())?;
534                buf.put_i32(nfields);
535                let field_types = match ty {
536                    Type::Record(fields) => fields,
537                    _ => unreachable!(),
538                };
539                for (f, ty) in fields.iter().zip_eq(field_types) {
540                    buf.put_u32(ty.oid());
541                    encode_element(buf, f.as_ref(), ty)?;
542                }
543                Ok(postgres_types::IsNull::No)
544            }
545            Value::Text(s) => s.to_sql(&PgType::TEXT, buf),
546            Value::BpChar(s) => s.to_sql(&PgType::BPCHAR, buf),
547            Value::VarChar(s) => s.to_sql(&PgType::VARCHAR, buf),
548            Value::Time(t) => t.to_sql(&PgType::TIME, buf),
549            Value::Timestamp(ts) => ts.to_sql(&PgType::TIMESTAMP, buf),
550            Value::TimestampTz(ts) => ts.to_sql(&PgType::TIMESTAMPTZ, buf),
551            Value::Uuid(u) => u.to_sql(&PgType::UUID, buf),
552            Value::Numeric(a) => a.to_sql(&PgType::NUMERIC, buf),
553            Value::MzTimestamp(t) => t.to_string().to_sql(&PgType::TEXT, buf),
554            Value::Range(range) => {
555                buf.put_u8(range.pg_flag_bits());
556
557                let elem_type = match ty {
558                    Type::Range { element_type } => element_type,
559                    _ => unreachable!(),
560                };
561
562                if let Some(RangeInner { lower, upper }) = &range.inner {
563                    for bound in [&lower.bound, &upper.bound] {
564                        if let Some(bound) = bound {
565                            let base = buf.len();
566                            buf.put_i32(0);
567                            bound.encode_binary(elem_type, buf)?;
568                            let len = pg_len("encoded range bound", buf.len() - base - 4)?;
569                            buf[base..base + 4].copy_from_slice(&len.to_be_bytes());
570                        }
571                    }
572                }
573                Ok(postgres_types::IsNull::No)
574            }
575            Value::MzAclItem(mz_acl_item) => {
576                buf.extend_from_slice(&mz_acl_item.encode_binary());
577                Ok(postgres_types::IsNull::No)
578            }
579            Value::AclItem(_) => Err("aclitem has no binary encoding".into()),
580        }
581        .expect("encode_binary should never trigger a to_sql failure");
582        if let IsNull::Yes = is_null {
583            panic!("encode_binary impossibly called on a null value")
584        }
585        Ok(())
586    }
587
588    /// Static helper method to pre-validate that a given Datum corresponding to
589    /// the provided `SqlScalarType` can be converted into a `Value` and then encoded
590    /// as binary using `encode_binary` without an error.
591    pub fn can_encode_binary(typ: &SqlScalarType) -> bool {
592        match typ {
593            SqlScalarType::Bool => true,
594            SqlScalarType::Int16 => true,
595            SqlScalarType::Int32 => true,
596            SqlScalarType::Int64 => true,
597            SqlScalarType::PgLegacyChar => true,
598            SqlScalarType::UInt16 => true,
599            SqlScalarType::Oid => true,
600            SqlScalarType::RegClass => true,
601            SqlScalarType::RegProc => true,
602            SqlScalarType::RegType => true,
603            SqlScalarType::UInt32 => true,
604            SqlScalarType::UInt64 => true,
605            SqlScalarType::Float32 => true,
606            SqlScalarType::Float64 => true,
607            SqlScalarType::Numeric { .. } => true,
608            SqlScalarType::MzTimestamp => true,
609            SqlScalarType::MzAclItem => true,
610            SqlScalarType::AclItem => false, // "aclitem has no binary encoding"
611            SqlScalarType::Date => true,
612            SqlScalarType::Time => true,
613            SqlScalarType::Timestamp { .. } => true,
614            SqlScalarType::TimestampTz { .. } => true,
615            SqlScalarType::Interval => true,
616            SqlScalarType::Bytes => true,
617            SqlScalarType::String => true,
618            SqlScalarType::VarChar { .. } => true,
619            SqlScalarType::Char { .. } => true,
620            SqlScalarType::PgLegacyName => true,
621            SqlScalarType::Jsonb => true,
622            SqlScalarType::Uuid => true,
623            SqlScalarType::Array(elem_type) => Self::can_encode_binary(elem_type),
624            SqlScalarType::Int2Vector => true,
625            SqlScalarType::List { .. } => false, // "binary encoding of list types is not implemented"
626            SqlScalarType::Map { .. } => false, // "binary encoding of map types is not implemented"
627            SqlScalarType::Record { fields, .. } => fields
628                .iter()
629                .all(|(_, ty)| Self::can_encode_binary(&ty.scalar_type)),
630            SqlScalarType::Range { element_type } => Self::can_encode_binary(element_type),
631        }
632    }
633
634    /// Deserializes a value of type `ty` from `raw` using the specified
635    /// `format`.
636    pub fn decode(
637        format: Format,
638        ty: &Type,
639        raw: &[u8],
640    ) -> Result<Value, Box<dyn Error + Sync + Send>> {
641        match format {
642            Format::Text => Value::decode_text(ty, raw),
643            Format::Binary => Value::decode_binary(ty, raw),
644        }
645    }
646
647    /// Deserializes a value of type `ty` from `raw` using the [text encoding
648    /// format](Format::Text).
649    pub fn decode_text<'a>(
650        ty: &'a Type,
651        raw: &'a [u8],
652    ) -> Result<Value, Box<dyn Error + Sync + Send>> {
653        let s = str::from_utf8(raw)?;
654        Ok(match ty {
655            Type::Array(elem_type) => {
656                let (elements, dims) = strconv::parse_array(
657                    s,
658                    || None,
659                    |elem_text| Value::decode_text(elem_type, elem_text.as_bytes()).map(Some),
660                )?;
661                Value::Array { dims, elements }
662            }
663            Type::Int2Vector { .. } => {
664                return Err("input of Int2Vector types is not implemented".into());
665            }
666            Type::Bool => Value::Bool(strconv::parse_bool(s)?),
667            Type::Bytea => Value::Bytea(strconv::parse_bytes(s)?),
668            Type::Char => Value::Char(raw.get(0).copied().unwrap_or(0)),
669            Type::Date => Value::Date(strconv::parse_date(s)?),
670            Type::Float4 => Value::Float4(strconv::parse_float32(s)?),
671            Type::Float8 => Value::Float8(strconv::parse_float64(s)?),
672            Type::Int2 => Value::Int2(strconv::parse_int16(s)?),
673            Type::Int4 => Value::Int4(strconv::parse_int32(s)?),
674            Type::Int8 => Value::Int8(strconv::parse_int64(s)?),
675            Type::UInt2 => Value::UInt2(UInt2(strconv::parse_uint16(s)?)),
676            Type::UInt4 => Value::UInt4(UInt4(strconv::parse_uint32(s)?)),
677            Type::UInt8 => Value::UInt8(UInt8(strconv::parse_uint64(s)?)),
678            Type::Interval { .. } => Value::Interval(Interval(strconv::parse_interval(s)?)),
679            Type::Json => return Err("input of json types is not implemented".into()),
680            Type::Jsonb => Value::Jsonb(Jsonb(strconv::parse_jsonb(s)?)),
681            Type::List(elem_type) => Value::List(strconv::parse_list(
682                s,
683                matches!(**elem_type, Type::List(..)),
684                || None,
685                |elem_text| Value::decode_text(elem_type, elem_text.as_bytes()).map(Some),
686            )?),
687            Type::Map { value_type } => Value::Map(strconv::parse_map(
688                s,
689                matches!(**value_type, Type::Map { .. }),
690                |elem_text| {
691                    elem_text
692                        .map(|t| Value::decode_text(value_type, t.as_bytes()))
693                        .transpose()
694                },
695            )?),
696            Type::Name => Value::Name(strconv::parse_pg_legacy_name(s)),
697            Type::Numeric { .. } => Value::Numeric(Numeric(strconv::parse_numeric(s)?)),
698            Type::Oid | Type::RegClass | Type::RegProc | Type::RegType => {
699                Value::Oid(strconv::parse_oid(s)?)
700            }
701            Type::Record(_) => {
702                return Err("input of anonymous composite types is not implemented".into());
703            }
704            Type::Text => Value::Text(s.to_owned()),
705            Type::BpChar { .. } => Value::BpChar(s.to_owned()),
706            Type::VarChar { .. } => Value::VarChar(s.to_owned()),
707            Type::Time { .. } => Value::Time(strconv::parse_time(s)?),
708            Type::TimeTz { .. } => return Err("input of timetz types is not implemented".into()),
709            Type::Timestamp { .. } => Value::Timestamp(strconv::parse_timestamp(s)?),
710            Type::TimestampTz { .. } => Value::TimestampTz(strconv::parse_timestamptz(s)?),
711            Type::Uuid => Value::Uuid(Uuid::parse_str(s)?),
712            Type::MzTimestamp => Value::MzTimestamp(strconv::parse_mz_timestamp(s)?),
713            Type::Range { element_type } => Value::Range(strconv::parse_range(s, |elem_text| {
714                Value::decode_text(element_type, elem_text.as_bytes()).map(Box::new)
715            })?),
716            Type::MzAclItem => Value::MzAclItem(strconv::parse_mz_acl_item(s)?),
717            Type::AclItem => Value::AclItem(strconv::parse_acl_item(s)?),
718        })
719    }
720
721    /// Deserializes a value of type `ty` from `s` using the [text encoding format](Format::Text).
722    pub fn decode_text_into_row<'a>(
723        ty: &'a Type,
724        s: &'a str,
725        packer: &mut RowPacker,
726    ) -> Result<(), Box<dyn Error + Sync + Send>> {
727        Ok(match ty {
728            Type::Array(elem_type) => {
729                let (elements, dims) =
730                    strconv::parse_array(s, || None, |elem_text| Ok::<_, String>(Some(elem_text)))?;
731                // SAFETY: The function returns the number of times it called `push` on the packer.
732                unsafe {
733                    packer.push_array_with_unchecked(&dims, |packer| {
734                        let mut nelements = 0;
735                        for element in elements {
736                            match element {
737                                Some(elem_text) => {
738                                    Value::decode_text_into_row(elem_type, &elem_text, packer)?
739                                }
740
741                                None => packer.push(Datum::Null),
742                            }
743                            nelements += 1;
744                        }
745                        Ok::<_, Box<dyn Error + Sync + Send>>(nelements)
746                    })?
747                }
748            }
749            Type::Int2Vector { .. } => {
750                return Err("input of Int2Vector types is not implemented".into());
751            }
752            Type::Bool => packer.push(Datum::from(strconv::parse_bool(s)?)),
753            Type::Bytea => packer.push(Datum::Bytes(&strconv::parse_bytes(s)?)),
754            Type::Char => packer.push(Datum::UInt8(s.as_bytes().get(0).copied().unwrap_or(0))),
755            Type::Date => packer.push(Datum::Date(strconv::parse_date(s)?)),
756            Type::Float4 => packer.push(Datum::Float32(strconv::parse_float32(s)?.into())),
757            Type::Float8 => packer.push(Datum::Float64(strconv::parse_float64(s)?.into())),
758            Type::Int2 => packer.push(Datum::Int16(strconv::parse_int16(s)?)),
759            Type::Int4 => packer.push(Datum::Int32(strconv::parse_int32(s)?)),
760            Type::Int8 => packer.push(Datum::Int64(strconv::parse_int64(s)?)),
761            Type::UInt2 => packer.push(Datum::UInt16(strconv::parse_uint16(s)?)),
762            Type::UInt4 => packer.push(Datum::UInt32(strconv::parse_uint32(s)?)),
763            Type::UInt8 => packer.push(Datum::UInt64(strconv::parse_uint64(s)?)),
764            Type::Interval { .. } => packer.push(Datum::Interval(strconv::parse_interval(s)?)),
765            Type::Json => return Err("input of json types is not implemented".into()),
766            Type::Jsonb => packer.push(strconv::parse_jsonb(s)?.into_row().unpack_first()),
767            Type::List(elem_type) => {
768                let elems = strconv::parse_list(
769                    s,
770                    matches!(**elem_type, Type::List(..)),
771                    || None,
772                    |elem_text| Ok::<_, String>(Some(elem_text)),
773                )?;
774                packer.push_list_with(|packer| {
775                    for elem in elems {
776                        match elem {
777                            Some(elem) => Value::decode_text_into_row(elem_type, &elem, packer)?,
778                            None => packer.push(Datum::Null),
779                        }
780                    }
781                    Ok::<_, Box<dyn Error + Sync + Send>>(())
782                })?;
783            }
784            Type::Map { value_type } => {
785                let map =
786                    strconv::parse_map(s, matches!(**value_type, Type::Map { .. }), |elem_text| {
787                        elem_text.map(Ok::<_, String>).transpose()
788                    })?;
789                packer.push_dict_with(|row| {
790                    for (k, v) in map {
791                        row.push(Datum::String(&k));
792                        match v {
793                            Some(elem) => Value::decode_text_into_row(value_type, &elem, row)?,
794                            None => row.push(Datum::Null),
795                        }
796                    }
797                    Ok::<_, Box<dyn Error + Sync + Send>>(())
798                })?;
799            }
800            Type::Name => packer.push(Datum::String(&strconv::parse_pg_legacy_name(s))),
801            Type::Numeric { .. } => packer.push(Datum::Numeric(strconv::parse_numeric(s)?)),
802            Type::Oid | Type::RegClass | Type::RegProc | Type::RegType => {
803                packer.push(Datum::UInt32(strconv::parse_oid(s)?))
804            }
805            Type::Record(_) => {
806                return Err("input of anonymous composite types is not implemented".into());
807            }
808            Type::Text => packer.push(Datum::String(s)),
809            Type::BpChar { .. } => packer.push(Datum::String(s.trim_end())),
810            Type::VarChar { .. } => packer.push(Datum::String(s)),
811            Type::Time { .. } => packer.push(Datum::Time(strconv::parse_time(s)?)),
812            Type::TimeTz { .. } => return Err("input of timetz types is not implemented".into()),
813            Type::Timestamp { .. } => packer.push(Datum::Timestamp(strconv::parse_timestamp(s)?)),
814            Type::TimestampTz { .. } => {
815                packer.push(Datum::TimestampTz(strconv::parse_timestamptz(s)?))
816            }
817            Type::Uuid => packer.push(Datum::Uuid(Uuid::parse_str(s)?)),
818            Type::MzTimestamp => packer.push(Datum::MzTimestamp(strconv::parse_mz_timestamp(s)?)),
819            Type::Range { element_type } => {
820                let range = strconv::parse_range(s, |elem_text| {
821                    Value::decode_text(element_type, elem_text.as_bytes()).map(Box::new)
822                })?;
823                // TODO: We should be able to push ranges without scratch space, but that requires
824                // a different `push_range` API.
825                let buf = RowArena::new();
826                let range = range
827                    .try_into_bounds(|elem| elem.into_datum(&buf, element_type))
828                    .map_err(Box::<dyn Error + Sync + Send>::from)?;
829                packer
830                    .push_range(range)
831                    .map_err(Box::<dyn Error + Sync + Send>::from)?;
832            }
833            Type::MzAclItem => packer.push(Datum::MzAclItem(strconv::parse_mz_acl_item(s)?)),
834            Type::AclItem => packer.push(Datum::AclItem(strconv::parse_acl_item(s)?)),
835        })
836    }
837
838    /// Deserializes a value of type `ty` from `raw` using the [binary encoding
839    /// format](Format::Binary).
840    pub fn decode_binary(ty: &Type, raw: &[u8]) -> Result<Value, Box<dyn Error + Sync + Send>> {
841        match ty {
842            Type::Array(_) => Err("input of array types is not implemented".into()),
843            Type::Int2Vector => Err("input of int2vector types is not implemented".into()),
844            Type::Bool => bool::from_sql(ty.inner(), raw).map(Value::Bool),
845            Type::Bytea => Vec::<u8>::from_sql(ty.inner(), raw).map(Value::Bytea),
846            Type::Char => {
847                i8::from_sql(ty.inner(), raw).map(|c| Value::Char(u8::reinterpret_cast(c)))
848            }
849            Type::Date => {
850                let days = i32::from_sql(ty.inner(), raw)?;
851                Ok(Value::Date(Date::from_pg_epoch(days)?))
852            }
853            Type::Float4 => f32::from_sql(ty.inner(), raw).map(Value::Float4),
854            Type::Float8 => f64::from_sql(ty.inner(), raw).map(Value::Float8),
855            Type::Int2 => i16::from_sql(ty.inner(), raw).map(Value::Int2),
856            Type::Int4 => i32::from_sql(ty.inner(), raw).map(Value::Int4),
857            Type::Int8 => i64::from_sql(ty.inner(), raw).map(Value::Int8),
858            Type::UInt2 => UInt2::from_sql(ty.inner(), raw).map(Value::UInt2),
859            Type::UInt4 => UInt4::from_sql(ty.inner(), raw).map(Value::UInt4),
860            Type::UInt8 => UInt8::from_sql(ty.inner(), raw).map(Value::UInt8),
861            Type::Interval { .. } => Interval::from_sql(ty.inner(), raw).map(Value::Interval),
862            Type::Json => Err("input of json types is not implemented".into()),
863            Type::Jsonb => Jsonb::from_sql(ty.inner(), raw).map(Value::Jsonb),
864            Type::List(_) => Err("binary decoding of list types is not implemented".into()),
865            Type::Map { .. } => Err("binary decoding of map types is not implemented".into()),
866            Type::Name => {
867                let s = String::from_sql(ty.inner(), raw)?;
868                if s.len() > NAME_MAX_BYTES {
869                    return Err("identifier too long".into());
870                }
871                Ok(Value::Name(s))
872            }
873            Type::Numeric { .. } => Numeric::from_sql(ty.inner(), raw).map(Value::Numeric),
874            Type::Oid | Type::RegClass | Type::RegProc | Type::RegType => {
875                u32::from_sql(ty.inner(), raw).map(Value::Oid)
876            }
877            Type::Record(_) => Err("input of anonymous composite types is not implemented".into()),
878            Type::Text => String::from_sql(ty.inner(), raw).map(Value::Text),
879            Type::BpChar { .. } => String::from_sql(ty.inner(), raw).map(Value::BpChar),
880            Type::VarChar { .. } => String::from_sql(ty.inner(), raw).map(Value::VarChar),
881            Type::Time { .. } => NaiveTime::from_sql(ty.inner(), raw).map(Value::Time),
882            Type::TimeTz { .. } => Err("input of timetz types is not implemented".into()),
883            Type::Timestamp { .. } => {
884                let ts = NaiveDateTime::from_sql(ty.inner(), raw)?;
885                Ok(Value::Timestamp(CheckedTimestamp::from_timestamplike(ts)?))
886            }
887            Type::TimestampTz { .. } => {
888                let ts = DateTime::<Utc>::from_sql(ty.inner(), raw)?;
889                Ok(Value::TimestampTz(CheckedTimestamp::from_timestamplike(
890                    ts,
891                )?))
892            }
893            Type::Uuid => Uuid::from_sql(ty.inner(), raw).map(Value::Uuid),
894            Type::MzTimestamp => {
895                let s = String::from_sql(ty.inner(), raw)?;
896                let t: mz_repr::Timestamp = s.parse()?;
897                Ok(Value::MzTimestamp(t))
898            }
899            Type::Range { .. } => Err("binary decoding of range types is not implemented".into()),
900            Type::MzAclItem => {
901                let mz_acl_item = MzAclItem::decode_binary(raw)?;
902                Ok(Value::MzAclItem(mz_acl_item))
903            }
904            Type::AclItem => Err("aclitem has no binary encoding".into()),
905        }
906    }
907}
908
909fn encode_element(buf: &mut BytesMut, elem: Option<&Value>, ty: &Type) -> Result<(), io::Error> {
910    match elem {
911        None => buf.put_i32(-1),
912        Some(elem) => {
913            let base = buf.len();
914            buf.put_i32(0);
915            elem.encode_binary(ty, buf)?;
916            let len = pg_len("encoded element", buf.len() - base - 4)?;
917            buf[base..base + 4].copy_from_slice(&len.to_be_bytes());
918        }
919    }
920    Ok(())
921}
922
923fn pg_len(what: &str, len: usize) -> Result<i32, io::Error> {
924    len.try_into().map_err(|_| {
925        io::Error::new(
926            io::ErrorKind::InvalidData,
927            format!("{} does not fit into an i32", what),
928        )
929    })
930}
931
932/// Converts a Materialize row into a vector of PostgreSQL values.
933///
934/// Calling this function is equivalent to mapping [`Value::from_datum`] over
935/// every datum in `row`.
936pub fn values_from_row(row: &RowRef, typ: &SqlRelationType) -> Vec<Option<Value>> {
937    row.iter()
938        .zip_eq(typ.column_types.iter())
939        .map(|(col, typ)| Value::from_datum(col, &typ.scalar_type))
940        .collect()
941}
942
943#[cfg(test)]
944mod tests {
945    use super::*;
946
947    /// Verifies that we correctly print the chain of parsing errors, all the way through the stack.
948    #[mz_ore::test]
949    fn decode_text_error_smoke_test() {
950        let bool_array = Value::Array {
951            dims: vec![ArrayDimension {
952                lower_bound: 0,
953                length: 1,
954            }],
955            elements: vec![Some(Value::Bool(true))],
956        };
957
958        let mut buf = BytesMut::new();
959        bool_array.encode_text(&mut buf);
960        let buf = buf.to_vec();
961
962        let int_array_tpe = Type::Array(Box::new(Type::Int4));
963        let decoded_int_array = Value::decode_text(&int_array_tpe, &buf);
964
965        assert_eq!(
966            decoded_int_array.map_err(|e| e.to_string()).unwrap_err(),
967            "invalid input syntax for type array: Specifying array lower bounds is not supported: \"[0:0]={t}\"".to_string()
968        );
969    }
970}