mz_pgrepr/
value.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::BTreeMap;
11use std::error::Error;
12use std::{io, str};
13
14use bytes::{BufMut, BytesMut};
15use chrono::{DateTime, NaiveDateTime, NaiveTime, Utc};
16use mz_ore::cast::ReinterpretCast;
17use mz_pgwire_common::Format;
18use mz_repr::adt::array::ArrayDimension;
19use mz_repr::adt::char;
20use mz_repr::adt::date::Date;
21use mz_repr::adt::jsonb::JsonbRef;
22use mz_repr::adt::mz_acl_item::{AclItem, MzAclItem};
23use mz_repr::adt::pg_legacy_name::NAME_MAX_BYTES;
24use mz_repr::adt::range::{Range, RangeInner};
25use mz_repr::adt::timestamp::CheckedTimestamp;
26use mz_repr::strconv::{self, Nestable};
27use mz_repr::{Datum, RelationType, RowArena, RowPacker, RowRef, ScalarType};
28use postgres_types::{FromSql, IsNull, ToSql, Type as PgType};
29use uuid::Uuid;
30
31use crate::types::{UINT2, UINT4, UINT8};
32use crate::{Interval, Jsonb, Numeric, Type, UInt2, UInt4, UInt8};
33
34pub mod interval;
35pub mod jsonb;
36pub mod numeric;
37pub mod record;
38pub mod unsigned;
39
40/// A PostgreSQL datum.
41#[derive(Debug)]
42pub enum Value {
43    /// A variable-length, multi-dimensional array of values.
44    Array {
45        /// The dimensions of the array.
46        dims: Vec<ArrayDimension>,
47        /// The elements of the array.
48        elements: Vec<Option<Value>>,
49    },
50    /// A boolean value.
51    Bool(bool),
52    /// A byte array, i.e., a variable-length binary string.
53    Bytea(Vec<u8>),
54    /// A single-byte character.
55    Char(u8),
56    /// A date.
57    Date(Date),
58    /// A 4-byte floating point number.
59    Float4(f32),
60    /// An 8-byte floating point number.
61    Float8(f64),
62    /// A 2-byte signed integer.
63    Int2(i16),
64    /// A 4-byte signed integer.
65    Int4(i32),
66    /// An 8-byte signed integer.
67    Int8(i64),
68    /// A 2-byte unsigned integer.
69    UInt2(UInt2),
70    /// A 4-byte unsigned integer.
71    UInt4(UInt4),
72    /// An 8-byte unsigned integer.
73    UInt8(UInt8),
74    /// A time interval.
75    Interval(Interval),
76    /// A binary JSON blob.
77    Jsonb(Jsonb),
78    /// A sequence of homogeneous values.
79    List(Vec<Option<Value>>),
80    /// A map of string keys and homogeneous values.
81    Map(BTreeMap<String, Option<Value>>),
82    /// An identifier string of no more than 64 characters in length.
83    Name(String),
84    /// An arbitrary precision number.
85    Numeric(Numeric),
86    /// An object identifier.
87    Oid(u32),
88    /// A sequence of heterogeneous values.
89    Record(Vec<Option<Value>>),
90    /// A time.
91    Time(NaiveTime),
92    /// A date and time, without a timezone.
93    Timestamp(CheckedTimestamp<NaiveDateTime>),
94    /// A date and time, with a timezone.
95    TimestampTz(CheckedTimestamp<DateTime<Utc>>),
96    /// A variable-length string.
97    Text(String),
98    /// A fixed-length string.
99    BpChar(String),
100    /// A variable-length string with an optional limit.
101    VarChar(String),
102    /// A universally unique identifier.
103    Uuid(Uuid),
104    /// A small int vector.
105    Int2Vector {
106        /// The elements of the vector.
107        elements: Vec<Option<Value>>,
108    },
109    /// A Materialize timestamp.
110    MzTimestamp(mz_repr::Timestamp),
111    /// A contiguous range of values along a domain.
112    Range(Range<Box<Value>>),
113    /// A list of privileges granted to a role, that uses [`mz_repr::role_id::RoleId`]s for role
114    /// references.
115    MzAclItem(MzAclItem),
116    /// A list of privileges granted to a user that uses [`mz_repr::adt::system::Oid`]s for role
117    /// references. This type is used primarily for compatibility with PostgreSQL.
118    AclItem(AclItem),
119}
120
121impl Value {
122    /// Constructs a new `Value` from a Materialize datum.
123    ///
124    /// The conversion happens in the obvious manner, except that `Datum::Null`
125    /// is converted to `None` to align with how PostgreSQL handles NULL.
126    pub fn from_datum(datum: Datum, typ: &ScalarType) -> Option<Value> {
127        match (datum, typ) {
128            (Datum::Null, _) => None,
129            (Datum::True, ScalarType::Bool) => Some(Value::Bool(true)),
130            (Datum::False, ScalarType::Bool) => Some(Value::Bool(false)),
131            (Datum::Int16(i), ScalarType::Int16) => Some(Value::Int2(i)),
132            (Datum::Int32(i), ScalarType::Int32) => Some(Value::Int4(i)),
133            (Datum::Int64(i), ScalarType::Int64) => Some(Value::Int8(i)),
134            (Datum::UInt8(c), ScalarType::PgLegacyChar) => Some(Value::Char(c)),
135            (Datum::UInt16(u), ScalarType::UInt16) => Some(Value::UInt2(UInt2(u))),
136            (Datum::UInt32(oid), ScalarType::Oid) => Some(Value::Oid(oid)),
137            (Datum::UInt32(oid), ScalarType::RegClass) => Some(Value::Oid(oid)),
138            (Datum::UInt32(oid), ScalarType::RegProc) => Some(Value::Oid(oid)),
139            (Datum::UInt32(oid), ScalarType::RegType) => Some(Value::Oid(oid)),
140            (Datum::UInt32(u), ScalarType::UInt32) => Some(Value::UInt4(UInt4(u))),
141            (Datum::UInt64(u), ScalarType::UInt64) => Some(Value::UInt8(UInt8(u))),
142            (Datum::Float32(f), ScalarType::Float32) => Some(Value::Float4(*f)),
143            (Datum::Float64(f), ScalarType::Float64) => Some(Value::Float8(*f)),
144            (Datum::Numeric(d), ScalarType::Numeric { .. }) => Some(Value::Numeric(Numeric(d))),
145            (Datum::MzTimestamp(t), ScalarType::MzTimestamp) => Some(Value::MzTimestamp(t)),
146            (Datum::MzAclItem(mai), ScalarType::MzAclItem) => Some(Value::MzAclItem(mai)),
147            (Datum::AclItem(ai), ScalarType::AclItem) => Some(Value::AclItem(ai)),
148            (Datum::Date(d), ScalarType::Date) => Some(Value::Date(d)),
149            (Datum::Time(t), ScalarType::Time) => Some(Value::Time(t)),
150            (Datum::Timestamp(ts), ScalarType::Timestamp { .. }) => Some(Value::Timestamp(ts)),
151            (Datum::TimestampTz(ts), ScalarType::TimestampTz { .. }) => {
152                Some(Value::TimestampTz(ts))
153            }
154            (Datum::Interval(iv), ScalarType::Interval) => Some(Value::Interval(Interval(iv))),
155            (Datum::Bytes(b), ScalarType::Bytes) => Some(Value::Bytea(b.to_vec())),
156            (Datum::String(s), ScalarType::String) => Some(Value::Text(s.to_owned())),
157            (Datum::String(s), ScalarType::VarChar { .. }) => Some(Value::VarChar(s.to_owned())),
158            (Datum::String(s), ScalarType::Char { length }) => {
159                Some(Value::BpChar(char::format_str_pad(s, *length)))
160            }
161            (Datum::String(s), ScalarType::PgLegacyName) => Some(Value::Name(s.into())),
162            (_, ScalarType::Jsonb) => {
163                Some(Value::Jsonb(Jsonb(JsonbRef::from_datum(datum).to_owned())))
164            }
165            (Datum::Uuid(u), ScalarType::Uuid) => Some(Value::Uuid(u)),
166            (Datum::Array(array), ScalarType::Array(elem_type)) => {
167                let dims = array.dims().into_iter().collect();
168                let elements = array
169                    .elements()
170                    .iter()
171                    .map(|elem| Value::from_datum(elem, elem_type))
172                    .collect();
173                Some(Value::Array { dims, elements })
174            }
175            (Datum::Array(array), ScalarType::Int2Vector) => {
176                let dims = array.dims().into_iter();
177                assert!(dims.count() == 1, "int2vector must be 1 dimensional");
178                let elements = array
179                    .elements()
180                    .iter()
181                    .map(|elem| Value::from_datum(elem, &ScalarType::Int16))
182                    .collect();
183                Some(Value::Int2Vector { elements })
184            }
185            (Datum::List(list), ScalarType::List { element_type, .. }) => {
186                let elements = list
187                    .iter()
188                    .map(|elem| Value::from_datum(elem, element_type))
189                    .collect();
190                Some(Value::List(elements))
191            }
192            (Datum::List(record), ScalarType::Record { fields, .. }) => {
193                let fields = record
194                    .iter()
195                    .zip(fields)
196                    .map(|(e, (_name, ty))| Value::from_datum(e, &ty.scalar_type))
197                    .collect();
198                Some(Value::Record(fields))
199            }
200            (Datum::Map(dict), ScalarType::Map { value_type, .. }) => {
201                let entries = dict
202                    .iter()
203                    .map(|(k, v)| (k.to_owned(), Value::from_datum(v, value_type)))
204                    .collect();
205                Some(Value::Map(entries))
206            }
207            (Datum::Range(range), ScalarType::Range { element_type }) => {
208                let value_range = range.into_bounds(|b| {
209                    Box::new(
210                        Value::from_datum(b.datum(), element_type)
211                            .expect("RangeBounds never contain Datum::Null"),
212                    )
213                });
214                Some(Value::Range(value_range))
215            }
216            _ => panic!("can't serialize {}::{:?}", datum, typ),
217        }
218    }
219
220    /// Converts a Materialize datum from this value.
221    pub fn into_datum<'a>(self, buf: &'a RowArena, typ: &Type) -> Datum<'a> {
222        match self {
223            Value::Array { dims, elements } => {
224                let element_pg_type = match typ {
225                    Type::Array(t) => &*t,
226                    _ => panic!("Value::Array should have type Type::Array. Found {:?}", typ),
227                };
228                buf.make_datum(|packer| {
229                    packer
230                        .try_push_array(
231                            &dims,
232                            elements.into_iter().map(|element| match element {
233                                Some(element) => element.into_datum(buf, element_pg_type),
234                                None => Datum::Null,
235                            }),
236                        )
237                        .unwrap();
238                })
239            }
240            Value::Int2Vector { .. } => {
241                // This situation is handled gracefully by Value::decode; if we
242                // wind up here it's a programming error.
243                unreachable!("into_datum cannot be called on Value::Int2Vector");
244            }
245            Value::Bool(true) => Datum::True,
246            Value::Bool(false) => Datum::False,
247            Value::Bytea(b) => Datum::Bytes(buf.push_bytes(b)),
248            Value::Char(c) => Datum::UInt8(c),
249            Value::Date(d) => Datum::Date(d),
250            Value::Float4(f) => Datum::Float32(f.into()),
251            Value::Float8(f) => Datum::Float64(f.into()),
252            Value::Int2(i) => Datum::Int16(i),
253            Value::Int4(i) => Datum::Int32(i),
254            Value::Int8(i) => Datum::Int64(i),
255            Value::UInt2(u) => Datum::UInt16(u.0),
256            Value::UInt4(u) => Datum::UInt32(u.0),
257            Value::UInt8(u) => Datum::UInt64(u.0),
258            Value::Jsonb(js) => buf.push_unary_row(js.0.into_row()),
259            Value::List(elems) => {
260                let elem_pg_type = match typ {
261                    Type::List(t) => &*t,
262                    _ => panic!("Value::List should have type Type::List. Found {:?}", typ),
263                };
264                buf.make_datum(|packer| {
265                    packer.push_list(elems.into_iter().map(|elem| match elem {
266                        Some(elem) => elem.into_datum(buf, elem_pg_type),
267                        None => Datum::Null,
268                    }));
269                })
270            }
271            Value::Map(map) => {
272                let elem_pg_type = match typ {
273                    Type::Map { value_type } => &*value_type,
274                    _ => panic!("Value::Map should have type Type::Map. Found {:?}", typ),
275                };
276                buf.make_datum(|packer| {
277                    packer.push_dict_with(|row| {
278                        for (k, v) in map {
279                            row.push(Datum::String(&k));
280                            row.push(match v {
281                                Some(elem) => elem.into_datum(buf, elem_pg_type),
282                                None => Datum::Null,
283                            });
284                        }
285                    });
286                })
287            }
288            Value::Oid(oid) => Datum::UInt32(oid),
289            Value::Record(_) => {
290                // This situation is handled gracefully by Value::decode; if we
291                // wind up here it's a programming error.
292                unreachable!("into_datum cannot be called on Value::Record");
293            }
294            Value::Time(t) => Datum::Time(t),
295            Value::Timestamp(ts) => Datum::Timestamp(ts),
296            Value::TimestampTz(ts) => Datum::TimestampTz(ts),
297            Value::Interval(iv) => Datum::Interval(iv.0),
298            Value::Text(s) | Value::VarChar(s) | Value::Name(s) => {
299                Datum::String(buf.push_string(s))
300            }
301            Value::BpChar(s) => Datum::String(buf.push_string(s.trim_end().into())),
302            Value::Uuid(u) => Datum::Uuid(u),
303            Value::Numeric(n) => Datum::Numeric(n.0),
304            Value::MzTimestamp(t) => Datum::MzTimestamp(t),
305            Value::Range(range) => {
306                let elem_pg_type = match typ {
307                    Type::Range { element_type } => &*element_type,
308                    _ => panic!("Value::Range should have type Type::Range. Found {:?}", typ),
309                };
310                let range = range.into_bounds(|elem| elem.into_datum(buf, elem_pg_type));
311
312                buf.make_datum(|packer| packer.push_range(range).unwrap())
313            }
314            Value::MzAclItem(mz_acl_item) => Datum::MzAclItem(mz_acl_item),
315            Value::AclItem(acl_item) => Datum::AclItem(acl_item),
316        }
317    }
318
319    /// Serializes this value to `buf` in the specified `format`.
320    pub fn encode(&self, ty: &Type, format: Format, buf: &mut BytesMut) -> Result<(), io::Error> {
321        match format {
322            Format::Text => {
323                self.encode_text(buf);
324                Ok(())
325            }
326            Format::Binary => self.encode_binary(ty, buf),
327        }
328    }
329
330    /// Serializes this value to `buf` using the [text encoding
331    /// format](Format::Text).
332    pub fn encode_text(&self, buf: &mut BytesMut) -> Nestable {
333        match self {
334            Value::Array { dims, elements } => {
335                strconv::format_array(buf, dims, elements, |buf, elem| match elem {
336                    None => Ok::<_, ()>(buf.write_null()),
337                    Some(elem) => Ok(elem.encode_text(buf.nonnull_buffer())),
338                })
339                .expect("provided closure never fails")
340            }
341            Value::Int2Vector { elements } => {
342                strconv::format_legacy_vector(buf, elements, |buf, elem| {
343                    Ok::<_, ()>(
344                        elem.as_ref()
345                            .expect("Int2Vector does not support NULL values")
346                            .encode_text(buf.nonnull_buffer()),
347                    )
348                })
349                .expect("provided closure never fails")
350            }
351            Value::Bool(b) => strconv::format_bool(buf, *b),
352            Value::Bytea(b) => strconv::format_bytes(buf, b),
353            Value::Char(c) => {
354                buf.put_u8(*c);
355                Nestable::MayNeedEscaping
356            }
357            Value::Date(d) => strconv::format_date(buf, *d),
358            Value::Int2(i) => strconv::format_int16(buf, *i),
359            Value::Int4(i) => strconv::format_int32(buf, *i),
360            Value::Int8(i) => strconv::format_int64(buf, *i),
361            Value::UInt2(u) => strconv::format_uint16(buf, u.0),
362            Value::UInt4(u) => strconv::format_uint32(buf, u.0),
363            Value::UInt8(u) => strconv::format_uint64(buf, u.0),
364            Value::Interval(iv) => strconv::format_interval(buf, iv.0),
365            Value::Float4(f) => strconv::format_float32(buf, *f),
366            Value::Float8(f) => strconv::format_float64(buf, *f),
367            Value::Jsonb(js) => strconv::format_jsonb(buf, js.0.as_ref()),
368            Value::List(elems) => strconv::format_list(buf, elems, |buf, elem| match elem {
369                None => Ok::<_, ()>(buf.write_null()),
370                Some(elem) => Ok(elem.encode_text(buf.nonnull_buffer())),
371            })
372            .expect("provided closure never fails"),
373            Value::Map(elems) => strconv::format_map(buf, elems, |buf, value| match value {
374                None => Ok::<_, ()>(buf.write_null()),
375                Some(elem) => Ok(elem.encode_text(buf.nonnull_buffer())),
376            })
377            .expect("provided closure never fails"),
378            Value::Oid(oid) => strconv::format_uint32(buf, *oid),
379            Value::Record(elems) => strconv::format_record(buf, elems, |buf, elem| match elem {
380                None => Ok::<_, ()>(buf.write_null()),
381                Some(elem) => Ok(elem.encode_text(buf.nonnull_buffer())),
382            })
383            .expect("provided closure never fails"),
384            Value::Text(s) | Value::VarChar(s) | Value::BpChar(s) | Value::Name(s) => {
385                strconv::format_string(buf, s)
386            }
387            Value::Time(t) => strconv::format_time(buf, *t),
388            Value::Timestamp(ts) => strconv::format_timestamp(buf, ts),
389            Value::TimestampTz(ts) => strconv::format_timestamptz(buf, ts),
390            Value::Uuid(u) => strconv::format_uuid(buf, *u),
391            Value::Numeric(d) => strconv::format_numeric(buf, &d.0),
392            Value::MzTimestamp(t) => strconv::format_mz_timestamp(buf, *t),
393            Value::Range(range) => strconv::format_range(buf, range, |buf, elem| match elem {
394                Some(elem) => Ok(elem.encode_text(buf.nonnull_buffer())),
395                None => Ok::<_, ()>(buf.write_null()),
396            })
397            .expect("provided closure never fails"),
398            Value::MzAclItem(mz_acl_item) => strconv::format_mz_acl_item(buf, *mz_acl_item),
399            Value::AclItem(acl_item) => strconv::format_acl_item(buf, *acl_item),
400        }
401    }
402
403    /// Serializes this value to `buf` using the [binary encoding
404    /// format](Format::Binary).
405    pub fn encode_binary(&self, ty: &Type, buf: &mut BytesMut) -> Result<(), io::Error> {
406        // NOTE: If implementing binary encoding for a previously unsupported `Value` type,
407        // please update the `can_encode_binary` method below.
408        let is_null = match self {
409            Value::Array { dims, elements } => {
410                let ndims = pg_len("number of array dimensions", dims.len())?;
411                let has_null = elements.iter().any(|e| e.is_none());
412                let elem_type = match ty {
413                    Type::Array(elem_type) => elem_type,
414                    _ => unreachable!(),
415                };
416                buf.put_i32(ndims);
417                buf.put_i32(has_null.into());
418                buf.put_u32(elem_type.oid());
419                for dim in dims {
420                    buf.put_i32(pg_len("array dimension length", dim.length)?);
421                    buf.put_i32(dim.lower_bound.try_into().map_err(|_| {
422                        io::Error::new(
423                            io::ErrorKind::Other,
424                            "array dimension lower bound does not fit into an i32",
425                        )
426                    })?);
427                }
428                for elem in elements {
429                    encode_element(buf, elem.as_ref(), elem_type)?;
430                }
431                Ok(postgres_types::IsNull::No)
432            }
433            // TODO: what is the binary format of vector types?
434            Value::Int2Vector { .. } => {
435                Err("binary encoding of int2vector is not implemented".into())
436            }
437            Value::Bool(b) => b.to_sql(&PgType::BOOL, buf),
438            Value::Bytea(b) => b.to_sql(&PgType::BYTEA, buf),
439            Value::Char(c) => i8::reinterpret_cast(*c).to_sql(&PgType::CHAR, buf),
440            Value::Date(d) => d.pg_epoch_days().to_sql(&PgType::DATE, buf),
441            Value::Float4(f) => f.to_sql(&PgType::FLOAT4, buf),
442            Value::Float8(f) => f.to_sql(&PgType::FLOAT8, buf),
443            Value::Int2(i) => i.to_sql(&PgType::INT2, buf),
444            Value::Int4(i) => i.to_sql(&PgType::INT4, buf),
445            Value::Int8(i) => i.to_sql(&PgType::INT8, buf),
446            Value::UInt2(u) => u.to_sql(&*UINT2, buf),
447            Value::UInt4(u) => u.to_sql(&*UINT4, buf),
448            Value::UInt8(u) => u.to_sql(&*UINT8, buf),
449            Value::Interval(iv) => iv.to_sql(&PgType::INTERVAL, buf),
450            Value::Jsonb(js) => js.to_sql(&PgType::JSONB, buf),
451            Value::List(_) => {
452                // A binary encoding for list is tricky. We only get one OID to
453                // describe the type of this list to the client. And we can't
454                // just up front allocate an OID for every possible list type,
455                // like PostgreSQL does for arrays, because, unlike arrays,
456                // lists can be arbitrarily nested.
457                //
458                // So, we'd need to synthesize a type with a stable OID whenever
459                // a new anonymous list type is *observed* in Materialize. Or we
460                // could mandate that only named list types can be sent over
461                // pgwire, and not anonymous list types, since named list types
462                // get a stable OID when they're created. Then we'd need to
463                // expose a table with the list OID -> element OID mapping for
464                // clients to query. And THEN we'd need to teach every client we
465                // care about how to query this table.
466                //
467                // This isn't intractible. It's how PostgreSQL's range type
468                // works, which is supported by many drivers. But our job is
469                // harder because most PostgreSQL drivers don't want to carry
470                // around code for Materialize-specific types. So we'd have to
471                // add type plugin infrastructure for those drivers, then
472                // distribute the list/map support as a plugin.
473                //
474                // Serializing the actual list would be simple, though: just a
475                // 32-bit integer describing the list length, followed by the
476                // encoding of each element in order.
477                //
478                // tl;dr it's a lot of work. For now, the recommended workaround
479                // is to either use the text encoding or convert the list to a
480                // different type (JSON, an array, unnest into rows) that does
481                // have a binary encoding.
482                Err("binary encoding of list types is not implemented".into())
483            }
484            Value::Map(_) => {
485                // Map binary encodings are hard for the same reason as list
486                // binary encodings (described above). You just have key and
487                // value OIDs to deal with rather than an element OID.
488                Err("binary encoding of map types is not implemented".into())
489            }
490            Value::Name(s) => s.to_sql(&PgType::NAME, buf),
491            Value::Oid(i) => i.to_sql(&PgType::OID, buf),
492            Value::Record(fields) => {
493                let nfields = pg_len("record field length", fields.len())?;
494                buf.put_i32(nfields);
495                let field_types = match ty {
496                    Type::Record(fields) => fields,
497                    _ => unreachable!(),
498                };
499                for (f, ty) in fields.iter().zip(field_types) {
500                    buf.put_u32(ty.oid());
501                    encode_element(buf, f.as_ref(), ty)?;
502                }
503                Ok(postgres_types::IsNull::No)
504            }
505            Value::Text(s) => s.to_sql(&PgType::TEXT, buf),
506            Value::BpChar(s) => s.to_sql(&PgType::BPCHAR, buf),
507            Value::VarChar(s) => s.to_sql(&PgType::VARCHAR, buf),
508            Value::Time(t) => t.to_sql(&PgType::TIME, buf),
509            Value::Timestamp(ts) => ts.to_sql(&PgType::TIMESTAMP, buf),
510            Value::TimestampTz(ts) => ts.to_sql(&PgType::TIMESTAMPTZ, buf),
511            Value::Uuid(u) => u.to_sql(&PgType::UUID, buf),
512            Value::Numeric(a) => a.to_sql(&PgType::NUMERIC, buf),
513            Value::MzTimestamp(t) => t.to_string().to_sql(&PgType::TEXT, buf),
514            Value::Range(range) => {
515                buf.put_u8(range.pg_flag_bits());
516
517                let elem_type = match ty {
518                    Type::Range { element_type } => element_type,
519                    _ => unreachable!(),
520                };
521
522                if let Some(RangeInner { lower, upper }) = &range.inner {
523                    for bound in [&lower.bound, &upper.bound] {
524                        if let Some(bound) = bound {
525                            let base = buf.len();
526                            buf.put_i32(0);
527                            bound.encode_binary(elem_type, buf)?;
528                            let len = pg_len("encoded range bound", buf.len() - base - 4)?;
529                            buf[base..base + 4].copy_from_slice(&len.to_be_bytes());
530                        }
531                    }
532                }
533                Ok(postgres_types::IsNull::No)
534            }
535            Value::MzAclItem(mz_acl_item) => {
536                buf.extend_from_slice(&mz_acl_item.encode_binary());
537                Ok(postgres_types::IsNull::No)
538            }
539            Value::AclItem(_) => Err("aclitem has no binary encoding".into()),
540        }
541        .expect("encode_binary should never trigger a to_sql failure");
542        if let IsNull::Yes = is_null {
543            panic!("encode_binary impossibly called on a null value")
544        }
545        Ok(())
546    }
547
548    /// Static helper method to pre-validate that a given Datum corresponding to
549    /// the provided `ScalarType` can be converted into a `Value` and then encoded
550    /// as binary using `encode_binary` without an error.
551    pub fn can_encode_binary(typ: &ScalarType) -> bool {
552        match typ {
553            ScalarType::Bool => true,
554            ScalarType::Int16 => true,
555            ScalarType::Int32 => true,
556            ScalarType::Int64 => true,
557            ScalarType::PgLegacyChar => true,
558            ScalarType::UInt16 => true,
559            ScalarType::Oid => true,
560            ScalarType::RegClass => true,
561            ScalarType::RegProc => true,
562            ScalarType::RegType => true,
563            ScalarType::UInt32 => true,
564            ScalarType::UInt64 => true,
565            ScalarType::Float32 => true,
566            ScalarType::Float64 => true,
567            ScalarType::Numeric { .. } => true,
568            ScalarType::MzTimestamp => true,
569            ScalarType::MzAclItem => true,
570            ScalarType::AclItem => false, // "aclitem has no binary encoding"
571            ScalarType::Date => true,
572            ScalarType::Time => true,
573            ScalarType::Timestamp { .. } => true,
574            ScalarType::TimestampTz { .. } => true,
575            ScalarType::Interval => true,
576            ScalarType::Bytes => true,
577            ScalarType::String => true,
578            ScalarType::VarChar { .. } => true,
579            ScalarType::Char { .. } => true,
580            ScalarType::PgLegacyName => true,
581            ScalarType::Jsonb => true,
582            ScalarType::Uuid => true,
583            ScalarType::Array(elem_type) => Self::can_encode_binary(elem_type),
584            ScalarType::Int2Vector => false, // "binary encoding of int2vector is not implemented"
585            ScalarType::List { .. } => false, // "binary encoding of list types is not implemented"
586            ScalarType::Map { .. } => false, // "binary encoding of map types is not implemented"
587            ScalarType::Record { fields, .. } => fields
588                .iter()
589                .all(|(_, ty)| Self::can_encode_binary(&ty.scalar_type)),
590            ScalarType::Range { element_type } => Self::can_encode_binary(element_type),
591        }
592    }
593
594    /// Deserializes a value of type `ty` from `raw` using the specified
595    /// `format`.
596    pub fn decode(
597        format: Format,
598        ty: &Type,
599        raw: &[u8],
600    ) -> Result<Value, Box<dyn Error + Sync + Send>> {
601        match format {
602            Format::Text => Value::decode_text(ty, raw),
603            Format::Binary => Value::decode_binary(ty, raw),
604        }
605    }
606
607    /// Deserializes a value of type `ty` from `raw` using the [text encoding
608    /// format](Format::Text).
609    pub fn decode_text<'a>(
610        ty: &'a Type,
611        raw: &'a [u8],
612    ) -> Result<Value, Box<dyn Error + Sync + Send>> {
613        let s = str::from_utf8(raw)?;
614        Ok(match ty {
615            Type::Array(elem_type) => {
616                let (elements, dims) = strconv::parse_array(
617                    s,
618                    || None,
619                    |elem_text| Value::decode_text(elem_type, elem_text.as_bytes()).map(Some),
620                )?;
621                Value::Array { dims, elements }
622            }
623            Type::Int2Vector { .. } => {
624                return Err("input of Int2Vector types is not implemented".into());
625            }
626            Type::Bool => Value::Bool(strconv::parse_bool(s)?),
627            Type::Bytea => Value::Bytea(strconv::parse_bytes(s)?),
628            Type::Char => Value::Char(raw.get(0).copied().unwrap_or(0)),
629            Type::Date => Value::Date(strconv::parse_date(s)?),
630            Type::Float4 => Value::Float4(strconv::parse_float32(s)?),
631            Type::Float8 => Value::Float8(strconv::parse_float64(s)?),
632            Type::Int2 => Value::Int2(strconv::parse_int16(s)?),
633            Type::Int4 => Value::Int4(strconv::parse_int32(s)?),
634            Type::Int8 => Value::Int8(strconv::parse_int64(s)?),
635            Type::UInt2 => Value::UInt2(UInt2(strconv::parse_uint16(s)?)),
636            Type::UInt4 => Value::UInt4(UInt4(strconv::parse_uint32(s)?)),
637            Type::UInt8 => Value::UInt8(UInt8(strconv::parse_uint64(s)?)),
638            Type::Interval { .. } => Value::Interval(Interval(strconv::parse_interval(s)?)),
639            Type::Json => return Err("input of json types is not implemented".into()),
640            Type::Jsonb => Value::Jsonb(Jsonb(strconv::parse_jsonb(s)?)),
641            Type::List(elem_type) => Value::List(strconv::parse_list(
642                s,
643                matches!(**elem_type, Type::List(..)),
644                || None,
645                |elem_text| Value::decode_text(elem_type, elem_text.as_bytes()).map(Some),
646            )?),
647            Type::Map { value_type } => Value::Map(strconv::parse_map(
648                s,
649                matches!(**value_type, Type::Map { .. }),
650                |elem_text| {
651                    elem_text
652                        .map(|t| Value::decode_text(value_type, t.as_bytes()))
653                        .transpose()
654                },
655            )?),
656            Type::Name => Value::Name(strconv::parse_pg_legacy_name(s)),
657            Type::Numeric { .. } => Value::Numeric(Numeric(strconv::parse_numeric(s)?)),
658            Type::Oid | Type::RegClass | Type::RegProc | Type::RegType => {
659                Value::Oid(strconv::parse_oid(s)?)
660            }
661            Type::Record(_) => {
662                return Err("input of anonymous composite types is not implemented".into());
663            }
664            Type::Text => Value::Text(s.to_owned()),
665            Type::BpChar { .. } => Value::BpChar(s.to_owned()),
666            Type::VarChar { .. } => Value::VarChar(s.to_owned()),
667            Type::Time { .. } => Value::Time(strconv::parse_time(s)?),
668            Type::TimeTz { .. } => return Err("input of timetz types is not implemented".into()),
669            Type::Timestamp { .. } => Value::Timestamp(strconv::parse_timestamp(s)?),
670            Type::TimestampTz { .. } => Value::TimestampTz(strconv::parse_timestamptz(s)?),
671            Type::Uuid => Value::Uuid(Uuid::parse_str(s)?),
672            Type::MzTimestamp => Value::MzTimestamp(strconv::parse_mz_timestamp(s)?),
673            Type::Range { element_type } => Value::Range(strconv::parse_range(s, |elem_text| {
674                Value::decode_text(element_type, elem_text.as_bytes()).map(Box::new)
675            })?),
676            Type::MzAclItem => Value::MzAclItem(strconv::parse_mz_acl_item(s)?),
677            Type::AclItem => Value::AclItem(strconv::parse_acl_item(s)?),
678        })
679    }
680
681    /// Deserializes a value of type `ty` from `s` using the [text encoding format](Format::Text).
682    pub fn decode_text_into_row<'a>(
683        ty: &'a Type,
684        s: &'a str,
685        packer: &mut RowPacker,
686    ) -> Result<(), Box<dyn Error + Sync + Send>> {
687        Ok(match ty {
688            Type::Array(elem_type) => {
689                let (elements, dims) =
690                    strconv::parse_array(s, || None, |elem_text| Ok::<_, String>(Some(elem_text)))?;
691                // SAFETY: The function returns the number of times it called `push` on the packer.
692                unsafe {
693                    packer.push_array_with_unchecked(&dims, |packer| {
694                        let mut nelements = 0;
695                        for element in elements {
696                            match element {
697                                Some(elem_text) => {
698                                    Value::decode_text_into_row(elem_type, &elem_text, packer)?
699                                }
700
701                                None => packer.push(Datum::Null),
702                            }
703                            nelements += 1;
704                        }
705                        Ok::<_, Box<dyn Error + Sync + Send>>(nelements)
706                    })?
707                }
708            }
709            Type::Int2Vector { .. } => {
710                return Err("input of Int2Vector types is not implemented".into());
711            }
712            Type::Bool => packer.push(Datum::from(strconv::parse_bool(s)?)),
713            Type::Bytea => packer.push(Datum::Bytes(&strconv::parse_bytes(s)?)),
714            Type::Char => packer.push(Datum::UInt8(s.as_bytes().get(0).copied().unwrap_or(0))),
715            Type::Date => packer.push(Datum::Date(strconv::parse_date(s)?)),
716            Type::Float4 => packer.push(Datum::Float32(strconv::parse_float32(s)?.into())),
717            Type::Float8 => packer.push(Datum::Float64(strconv::parse_float64(s)?.into())),
718            Type::Int2 => packer.push(Datum::Int16(strconv::parse_int16(s)?)),
719            Type::Int4 => packer.push(Datum::Int32(strconv::parse_int32(s)?)),
720            Type::Int8 => packer.push(Datum::Int64(strconv::parse_int64(s)?)),
721            Type::UInt2 => packer.push(Datum::UInt16(strconv::parse_uint16(s)?)),
722            Type::UInt4 => packer.push(Datum::UInt32(strconv::parse_uint32(s)?)),
723            Type::UInt8 => packer.push(Datum::UInt64(strconv::parse_uint64(s)?)),
724            Type::Interval { .. } => packer.push(Datum::Interval(strconv::parse_interval(s)?)),
725            Type::Json => return Err("input of json types is not implemented".into()),
726            Type::Jsonb => packer.push(strconv::parse_jsonb(s)?.into_row().unpack_first()),
727            Type::List(elem_type) => {
728                let elems = strconv::parse_list(
729                    s,
730                    matches!(**elem_type, Type::List(..)),
731                    || None,
732                    |elem_text| Ok::<_, String>(Some(elem_text)),
733                )?;
734                packer.push_list_with(|packer| {
735                    for elem in elems {
736                        match elem {
737                            Some(elem) => Value::decode_text_into_row(elem_type, &elem, packer)?,
738                            None => packer.push(Datum::Null),
739                        }
740                    }
741                    Ok::<_, Box<dyn Error + Sync + Send>>(())
742                })?;
743            }
744            Type::Map { value_type } => {
745                let map =
746                    strconv::parse_map(s, matches!(**value_type, Type::Map { .. }), |elem_text| {
747                        elem_text.map(Ok::<_, String>).transpose()
748                    })?;
749                packer.push_dict_with(|row| {
750                    for (k, v) in map {
751                        row.push(Datum::String(&k));
752                        match v {
753                            Some(elem) => Value::decode_text_into_row(value_type, &elem, row)?,
754                            None => row.push(Datum::Null),
755                        }
756                    }
757                    Ok::<_, Box<dyn Error + Sync + Send>>(())
758                })?;
759            }
760            Type::Name => packer.push(Datum::String(&strconv::parse_pg_legacy_name(s))),
761            Type::Numeric { .. } => packer.push(Datum::Numeric(strconv::parse_numeric(s)?)),
762            Type::Oid | Type::RegClass | Type::RegProc | Type::RegType => {
763                packer.push(Datum::UInt32(strconv::parse_oid(s)?))
764            }
765            Type::Record(_) => {
766                return Err("input of anonymous composite types is not implemented".into());
767            }
768            Type::Text => packer.push(Datum::String(s)),
769            Type::BpChar { .. } => packer.push(Datum::String(s.trim_end())),
770            Type::VarChar { .. } => packer.push(Datum::String(s)),
771            Type::Time { .. } => packer.push(Datum::Time(strconv::parse_time(s)?)),
772            Type::TimeTz { .. } => return Err("input of timetz types is not implemented".into()),
773            Type::Timestamp { .. } => packer.push(Datum::Timestamp(strconv::parse_timestamp(s)?)),
774            Type::TimestampTz { .. } => {
775                packer.push(Datum::TimestampTz(strconv::parse_timestamptz(s)?))
776            }
777            Type::Uuid => packer.push(Datum::Uuid(Uuid::parse_str(s)?)),
778            Type::MzTimestamp => packer.push(Datum::MzTimestamp(strconv::parse_mz_timestamp(s)?)),
779            Type::Range { element_type } => {
780                let range = strconv::parse_range(s, |elem_text| {
781                    Value::decode_text(element_type, elem_text.as_bytes()).map(Box::new)
782                })?;
783                // TODO: We should be able to push ranges without scratch space, but that requires
784                // a different `push_range` API.
785                let buf = RowArena::new();
786                let range = range.into_bounds(|elem| elem.into_datum(&buf, element_type));
787
788                packer.push_range(range).unwrap()
789            }
790            Type::MzAclItem => packer.push(Datum::MzAclItem(strconv::parse_mz_acl_item(s)?)),
791            Type::AclItem => packer.push(Datum::AclItem(strconv::parse_acl_item(s)?)),
792        })
793    }
794
795    /// Deserializes a value of type `ty` from `raw` using the [binary encoding
796    /// format](Format::Binary).
797    pub fn decode_binary(ty: &Type, raw: &[u8]) -> Result<Value, Box<dyn Error + Sync + Send>> {
798        match ty {
799            Type::Array(_) => Err("input of array types is not implemented".into()),
800            Type::Int2Vector => Err("input of int2vector types is not implemented".into()),
801            Type::Bool => bool::from_sql(ty.inner(), raw).map(Value::Bool),
802            Type::Bytea => Vec::<u8>::from_sql(ty.inner(), raw).map(Value::Bytea),
803            Type::Char => {
804                i8::from_sql(ty.inner(), raw).map(|c| Value::Char(u8::reinterpret_cast(c)))
805            }
806            Type::Date => {
807                let days = i32::from_sql(ty.inner(), raw)?;
808                Ok(Value::Date(Date::from_pg_epoch(days)?))
809            }
810            Type::Float4 => f32::from_sql(ty.inner(), raw).map(Value::Float4),
811            Type::Float8 => f64::from_sql(ty.inner(), raw).map(Value::Float8),
812            Type::Int2 => i16::from_sql(ty.inner(), raw).map(Value::Int2),
813            Type::Int4 => i32::from_sql(ty.inner(), raw).map(Value::Int4),
814            Type::Int8 => i64::from_sql(ty.inner(), raw).map(Value::Int8),
815            Type::UInt2 => UInt2::from_sql(ty.inner(), raw).map(Value::UInt2),
816            Type::UInt4 => UInt4::from_sql(ty.inner(), raw).map(Value::UInt4),
817            Type::UInt8 => UInt8::from_sql(ty.inner(), raw).map(Value::UInt8),
818            Type::Interval { .. } => Interval::from_sql(ty.inner(), raw).map(Value::Interval),
819            Type::Json => Err("input of json types is not implemented".into()),
820            Type::Jsonb => Jsonb::from_sql(ty.inner(), raw).map(Value::Jsonb),
821            Type::List(_) => Err("binary decoding of list types is not implemented".into()),
822            Type::Map { .. } => Err("binary decoding of map types is not implemented".into()),
823            Type::Name => {
824                let s = String::from_sql(ty.inner(), raw)?;
825                if s.len() > NAME_MAX_BYTES {
826                    return Err("identifier too long".into());
827                }
828                Ok(Value::Name(s))
829            }
830            Type::Numeric { .. } => Numeric::from_sql(ty.inner(), raw).map(Value::Numeric),
831            Type::Oid | Type::RegClass | Type::RegProc | Type::RegType => {
832                u32::from_sql(ty.inner(), raw).map(Value::Oid)
833            }
834            Type::Record(_) => Err("input of anonymous composite types is not implemented".into()),
835            Type::Text => String::from_sql(ty.inner(), raw).map(Value::Text),
836            Type::BpChar { .. } => String::from_sql(ty.inner(), raw).map(Value::BpChar),
837            Type::VarChar { .. } => String::from_sql(ty.inner(), raw).map(Value::VarChar),
838            Type::Time { .. } => NaiveTime::from_sql(ty.inner(), raw).map(Value::Time),
839            Type::TimeTz { .. } => Err("input of timetz types is not implemented".into()),
840            Type::Timestamp { .. } => {
841                let ts = NaiveDateTime::from_sql(ty.inner(), raw)?;
842                Ok(Value::Timestamp(CheckedTimestamp::from_timestamplike(ts)?))
843            }
844            Type::TimestampTz { .. } => {
845                let ts = DateTime::<Utc>::from_sql(ty.inner(), raw)?;
846                Ok(Value::TimestampTz(CheckedTimestamp::from_timestamplike(
847                    ts,
848                )?))
849            }
850            Type::Uuid => Uuid::from_sql(ty.inner(), raw).map(Value::Uuid),
851            Type::MzTimestamp => {
852                let s = String::from_sql(ty.inner(), raw)?;
853                let t: mz_repr::Timestamp = s.parse()?;
854                Ok(Value::MzTimestamp(t))
855            }
856            Type::Range { .. } => Err("binary decoding of range types is not implemented".into()),
857            Type::MzAclItem => {
858                let mz_acl_item = MzAclItem::decode_binary(raw)?;
859                Ok(Value::MzAclItem(mz_acl_item))
860            }
861            Type::AclItem => Err("aclitem has no binary encoding".into()),
862        }
863    }
864}
865
866fn encode_element(buf: &mut BytesMut, elem: Option<&Value>, ty: &Type) -> Result<(), io::Error> {
867    match elem {
868        None => buf.put_i32(-1),
869        Some(elem) => {
870            let base = buf.len();
871            buf.put_i32(0);
872            elem.encode_binary(ty, buf)?;
873            let len = pg_len("encoded element", buf.len() - base - 4)?;
874            buf[base..base + 4].copy_from_slice(&len.to_be_bytes());
875        }
876    }
877    Ok(())
878}
879
880fn pg_len(what: &str, len: usize) -> Result<i32, io::Error> {
881    len.try_into().map_err(|_| {
882        io::Error::new(
883            io::ErrorKind::Other,
884            format!("{} does not fit into an i32", what),
885        )
886    })
887}
888
889/// Converts a Materialize row into a vector of PostgreSQL values.
890///
891/// Calling this function is equivalent to mapping [`Value::from_datum`] over
892/// every datum in `row`.
893pub fn values_from_row(row: &RowRef, typ: &RelationType) -> Vec<Option<Value>> {
894    row.iter()
895        .zip(typ.column_types.iter())
896        .map(|(col, typ)| Value::from_datum(col, &typ.scalar_type))
897        .collect()
898}
899
900#[cfg(test)]
901mod tests {
902    use super::*;
903
904    /// Verifies that we correctly print the chain of parsing errors, all the way through the stack.
905    #[mz_ore::test]
906    fn decode_text_error_smoke_test() {
907        let bool_array = Value::Array {
908            dims: vec![ArrayDimension {
909                lower_bound: 0,
910                length: 1,
911            }],
912            elements: vec![Some(Value::Bool(true))],
913        };
914
915        let mut buf = BytesMut::new();
916        bool_array.encode_text(&mut buf);
917        let buf = buf.to_vec();
918
919        let int_array_tpe = Type::Array(Box::new(Type::Int4));
920        let decoded_int_array = Value::decode_text(&int_array_tpe, &buf);
921
922        assert_eq!(
923            decoded_int_array.map_err(|e| e.to_string()).unwrap_err(),
924            "invalid input syntax for type array: Specifying array lower bounds is not supported: \"[0:0]={t}\"".to_string()
925        );
926    }
927}