1use std::collections::BTreeMap;
11use std::error::Error;
12use std::{io, str};
13
14use bytes::{BufMut, BytesMut};
15use chrono::{DateTime, NaiveDateTime, NaiveTime, Utc};
16use itertools::Itertools;
17use mz_ore::cast::ReinterpretCast;
18use mz_pgwire_common::Format;
19use mz_repr::adt::array::ArrayDimension;
20use mz_repr::adt::char;
21use mz_repr::adt::date::Date;
22use mz_repr::adt::jsonb::JsonbRef;
23use mz_repr::adt::mz_acl_item::{AclItem, MzAclItem};
24use mz_repr::adt::pg_legacy_name::NAME_MAX_BYTES;
25use mz_repr::adt::range::{Range, RangeInner};
26use mz_repr::adt::timestamp::CheckedTimestamp;
27use mz_repr::strconv::{self, Nestable};
28use mz_repr::{Datum, RowArena, RowPacker, RowRef, SqlRelationType, SqlScalarType};
29use postgres_types::{FromSql, IsNull, ToSql, Type as PgType};
30use uuid::Uuid;
31
32use crate::types::{UINT2, UINT4, UINT8};
33use crate::value::error::IntoDatumError;
34use crate::{Interval, Jsonb, Numeric, Type, UInt2, UInt4, UInt8};
35
36pub mod error;
37pub mod interval;
38pub mod jsonb;
39pub mod numeric;
40pub mod record;
41pub mod unsigned;
42
43#[derive(Debug)]
45pub enum Value {
46 Array {
48 dims: Vec<ArrayDimension>,
50 elements: Vec<Option<Value>>,
52 },
53 Bool(bool),
55 Bytea(Vec<u8>),
57 Char(u8),
59 Date(Date),
61 Float4(f32),
63 Float8(f64),
65 Int2(i16),
67 Int4(i32),
69 Int8(i64),
71 UInt2(UInt2),
73 UInt4(UInt4),
75 UInt8(UInt8),
77 Interval(Interval),
79 Jsonb(Jsonb),
81 List(Vec<Option<Value>>),
83 Map(BTreeMap<String, Option<Value>>),
85 Name(String),
87 Numeric(Numeric),
89 Oid(u32),
91 Record(Vec<Option<Value>>),
93 Time(NaiveTime),
95 Timestamp(CheckedTimestamp<NaiveDateTime>),
97 TimestampTz(CheckedTimestamp<DateTime<Utc>>),
99 Text(String),
101 BpChar(String),
103 VarChar(String),
105 Uuid(Uuid),
107 Int2Vector {
109 elements: Vec<Option<Value>>,
111 },
112 MzTimestamp(mz_repr::Timestamp),
114 Range(Range<Box<Value>>),
116 MzAclItem(MzAclItem),
119 AclItem(AclItem),
122}
123
124impl Value {
125 pub fn from_datum(datum: Datum, typ: &SqlScalarType) -> Option<Value> {
130 match (datum, typ) {
131 (Datum::Null, _) => None,
132 (Datum::True, SqlScalarType::Bool) => Some(Value::Bool(true)),
133 (Datum::False, SqlScalarType::Bool) => Some(Value::Bool(false)),
134 (Datum::Int16(i), SqlScalarType::Int16) => Some(Value::Int2(i)),
135 (Datum::Int32(i), SqlScalarType::Int32) => Some(Value::Int4(i)),
136 (Datum::Int64(i), SqlScalarType::Int64) => Some(Value::Int8(i)),
137 (Datum::UInt8(c), SqlScalarType::PgLegacyChar) => Some(Value::Char(c)),
138 (Datum::UInt16(u), SqlScalarType::UInt16) => Some(Value::UInt2(UInt2(u))),
139 (Datum::UInt32(oid), SqlScalarType::Oid) => Some(Value::Oid(oid)),
140 (Datum::UInt32(oid), SqlScalarType::RegClass) => Some(Value::Oid(oid)),
141 (Datum::UInt32(oid), SqlScalarType::RegProc) => Some(Value::Oid(oid)),
142 (Datum::UInt32(oid), SqlScalarType::RegType) => Some(Value::Oid(oid)),
143 (Datum::UInt32(u), SqlScalarType::UInt32) => Some(Value::UInt4(UInt4(u))),
144 (Datum::UInt64(u), SqlScalarType::UInt64) => Some(Value::UInt8(UInt8(u))),
145 (Datum::Float32(f), SqlScalarType::Float32) => Some(Value::Float4(*f)),
146 (Datum::Float64(f), SqlScalarType::Float64) => Some(Value::Float8(*f)),
147 (Datum::Numeric(d), SqlScalarType::Numeric { .. }) => Some(Value::Numeric(Numeric(d))),
148 (Datum::MzTimestamp(t), SqlScalarType::MzTimestamp) => Some(Value::MzTimestamp(t)),
149 (Datum::MzAclItem(mai), SqlScalarType::MzAclItem) => Some(Value::MzAclItem(mai)),
150 (Datum::AclItem(ai), SqlScalarType::AclItem) => Some(Value::AclItem(ai)),
151 (Datum::Date(d), SqlScalarType::Date) => Some(Value::Date(d)),
152 (Datum::Time(t), SqlScalarType::Time) => Some(Value::Time(t)),
153 (Datum::Timestamp(ts), SqlScalarType::Timestamp { .. }) => Some(Value::Timestamp(ts)),
154 (Datum::TimestampTz(ts), SqlScalarType::TimestampTz { .. }) => {
155 Some(Value::TimestampTz(ts))
156 }
157 (Datum::Interval(iv), SqlScalarType::Interval) => Some(Value::Interval(Interval(iv))),
158 (Datum::Bytes(b), SqlScalarType::Bytes) => Some(Value::Bytea(b.to_vec())),
159 (Datum::String(s), SqlScalarType::String) => Some(Value::Text(s.to_owned())),
160 (Datum::String(s), SqlScalarType::VarChar { .. }) => Some(Value::VarChar(s.to_owned())),
161 (Datum::String(s), SqlScalarType::Char { length }) => {
162 Some(Value::BpChar(char::format_str_pad(s, *length)))
163 }
164 (Datum::String(s), SqlScalarType::PgLegacyName) => Some(Value::Name(s.into())),
165 (_, SqlScalarType::Jsonb) => {
166 Some(Value::Jsonb(Jsonb(JsonbRef::from_datum(datum).to_owned())))
167 }
168 (Datum::Uuid(u), SqlScalarType::Uuid) => Some(Value::Uuid(u)),
169 (Datum::Array(array), SqlScalarType::Array(elem_type)) => {
170 let dims = array.dims().into_iter().collect();
171 let elements = array
172 .elements()
173 .iter()
174 .map(|elem| Value::from_datum(elem, elem_type))
175 .collect();
176 Some(Value::Array { dims, elements })
177 }
178 (Datum::Array(array), SqlScalarType::Int2Vector) => {
179 assert!(
180 array.has_int2vector_dims(),
181 "int2vector must be 1 dimensional, or empty"
182 );
183 let elements = array
184 .elements()
185 .iter()
186 .map(|elem| Value::from_datum(elem, &SqlScalarType::Int16))
187 .collect();
188 Some(Value::Int2Vector { elements })
189 }
190 (Datum::List(list), SqlScalarType::List { element_type, .. }) => {
191 let elements = list
192 .iter()
193 .map(|elem| Value::from_datum(elem, element_type))
194 .collect();
195 Some(Value::List(elements))
196 }
197 (Datum::List(record), SqlScalarType::Record { fields, .. }) => {
198 let fields = record
199 .iter()
200 .zip_eq(fields)
201 .map(|(e, (_name, ty))| Value::from_datum(e, &ty.scalar_type))
202 .collect();
203 Some(Value::Record(fields))
204 }
205 (Datum::Map(dict), SqlScalarType::Map { value_type, .. }) => {
206 let entries = dict
207 .iter()
208 .map(|(k, v)| (k.to_owned(), Value::from_datum(v, value_type)))
209 .collect();
210 Some(Value::Map(entries))
211 }
212 (Datum::Range(range), SqlScalarType::Range { element_type }) => {
213 let value_range = range.into_bounds(|b| {
214 Box::new(
215 Value::from_datum(b.datum(), element_type)
216 .expect("RangeBounds never contain Datum::Null"),
217 )
218 });
219 Some(Value::Range(value_range))
220 }
221 _ => panic!("can't serialize {}::{:?}", datum, typ),
222 }
223 }
224
225 pub fn into_datum<'a>(
227 self,
228 buf: &'a RowArena,
229 typ: &Type,
230 ) -> Result<Datum<'a>, IntoDatumError> {
231 Ok(match self {
232 Value::Array { dims, elements } => {
233 let element_pg_type = match typ {
234 Type::Array(t) => &*t,
235 _ => panic!("Value::Array should have type Type::Array. Found {:?}", typ),
236 };
237 let elements: Result<Vec<_>, _> = elements
238 .into_iter()
239 .map(|element| match element {
240 Some(element) => element.into_datum(buf, element_pg_type),
241 None => Ok(Datum::Null),
242 })
243 .collect();
244 let elements = elements?;
245 buf.try_make_datum(|packer| {
246 packer
247 .try_push_array(&dims, elements.into_iter())
248 .map_err(IntoDatumError::from)
249 })?
250 }
251 Value::Int2Vector { .. } => {
252 unreachable!("into_datum cannot be called on Value::Int2Vector");
255 }
256 Value::Bool(true) => Datum::True,
257 Value::Bool(false) => Datum::False,
258 Value::Bytea(b) => Datum::Bytes(buf.push_bytes(b)),
259 Value::Char(c) => Datum::UInt8(c),
260 Value::Date(d) => Datum::Date(d),
261 Value::Float4(f) => Datum::Float32(f.into()),
262 Value::Float8(f) => Datum::Float64(f.into()),
263 Value::Int2(i) => Datum::Int16(i),
264 Value::Int4(i) => Datum::Int32(i),
265 Value::Int8(i) => Datum::Int64(i),
266 Value::UInt2(u) => Datum::UInt16(u.0),
267 Value::UInt4(u) => Datum::UInt32(u.0),
268 Value::UInt8(u) => Datum::UInt64(u.0),
269 Value::Jsonb(js) => buf.push_unary_row(js.0.into_row()),
270 Value::List(elems) => {
271 let elem_pg_type = match typ {
272 Type::List(t) => &*t,
273 _ => panic!("Value::List should have type Type::List. Found {:?}", typ),
274 };
275 let elems: Result<Vec<_>, _> = elems
276 .into_iter()
277 .map(|elem| match elem {
278 Some(elem) => elem.into_datum(buf, elem_pg_type),
279 None => Ok(Datum::Null),
280 })
281 .collect();
282 let elems = elems?;
283 buf.make_datum(|packer| packer.push_list(elems))
284 }
285 Value::Map(map) => {
286 let elem_pg_type = match typ {
287 Type::Map { value_type } => &*value_type,
288 _ => panic!("Value::Map should have type Type::Map. Found {:?}", typ),
289 };
290 buf.try_make_datum(|packer| {
291 packer.try_push_dict_with(|row| {
292 for (k, v) in map {
293 row.push(Datum::String(buf.push_string(k)));
294 let datum = match v {
295 Some(elem) => elem.into_datum(buf, elem_pg_type)?,
296 None => Datum::Null,
297 };
298 row.push(datum);
299 }
300 Ok::<_, IntoDatumError>(())
301 })
302 })?
303 }
304 Value::Oid(oid) => Datum::UInt32(oid),
305 Value::Record(_) => {
306 unreachable!("into_datum cannot be called on Value::Record");
309 }
310 Value::Time(t) => Datum::Time(t),
311 Value::Timestamp(ts) => Datum::Timestamp(ts),
312 Value::TimestampTz(ts) => Datum::TimestampTz(ts),
313 Value::Interval(iv) => Datum::Interval(iv.0),
314 Value::Text(s) | Value::VarChar(s) | Value::Name(s) => {
315 Datum::String(buf.push_string(s))
316 }
317 Value::BpChar(s) => Datum::String(buf.push_string(s.trim_end().into())),
318 Value::Uuid(u) => Datum::Uuid(u),
319 Value::Numeric(n) => Datum::Numeric(n.0),
320 Value::MzTimestamp(t) => Datum::MzTimestamp(t),
321 Value::Range(range) => {
322 let elem_pg_type = match typ {
323 Type::Range { element_type } => &*element_type,
324 _ => panic!("Value::Range should have type Type::Range. Found {:?}", typ),
325 };
326 let range = range.try_into_bounds(|elem| elem.into_datum(buf, elem_pg_type))?;
327 buf.try_make_datum(|packer| packer.push_range(range).map_err(IntoDatumError::from))?
328 }
329 Value::MzAclItem(mz_acl_item) => Datum::MzAclItem(mz_acl_item),
330 Value::AclItem(acl_item) => Datum::AclItem(acl_item),
331 })
332 }
333
334 pub fn into_datum_decode_error<'a>(
339 self,
340 buf: &'a RowArena,
341 typ: &Type,
342 context: &str,
343 ) -> Result<Datum<'a>, String> {
344 self.into_datum(buf, typ)
345 .map_err(|e| format!("unable to decode {}: {}", context, e))
346 }
347
348 pub fn encode(&self, ty: &Type, format: Format, buf: &mut BytesMut) -> Result<(), io::Error> {
350 match format {
351 Format::Text => {
352 self.encode_text(buf);
353 Ok(())
354 }
355 Format::Binary => self.encode_binary(ty, buf),
356 }
357 }
358
359 pub fn encode_text(&self, buf: &mut BytesMut) -> Nestable {
362 match self {
363 Value::Array { dims, elements } => {
364 strconv::format_array(buf, dims, elements, |buf, elem| match elem {
365 None => Ok::<_, ()>(buf.write_null()),
366 Some(elem) => Ok(elem.encode_text(buf.nonnull_buffer())),
367 })
368 .expect("provided closure never fails")
369 }
370 Value::Int2Vector { elements } => {
371 strconv::format_legacy_vector(buf, elements, |buf, elem| {
372 Ok::<_, ()>(
373 elem.as_ref()
374 .expect("Int2Vector does not support NULL values")
375 .encode_text(buf.nonnull_buffer()),
376 )
377 })
378 .expect("provided closure never fails")
379 }
380 Value::Bool(b) => strconv::format_bool(buf, *b),
381 Value::Bytea(b) => strconv::format_bytes(buf, b),
382 Value::Char(c) => {
383 buf.put_u8(*c);
384 Nestable::MayNeedEscaping
385 }
386 Value::Date(d) => strconv::format_date(buf, *d),
387 Value::Int2(i) => strconv::format_int16(buf, *i),
388 Value::Int4(i) => strconv::format_int32(buf, *i),
389 Value::Int8(i) => strconv::format_int64(buf, *i),
390 Value::UInt2(u) => strconv::format_uint16(buf, u.0),
391 Value::UInt4(u) => strconv::format_uint32(buf, u.0),
392 Value::UInt8(u) => strconv::format_uint64(buf, u.0),
393 Value::Interval(iv) => strconv::format_interval(buf, iv.0),
394 Value::Float4(f) => strconv::format_float32(buf, *f),
395 Value::Float8(f) => strconv::format_float64(buf, *f),
396 Value::Jsonb(js) => strconv::format_jsonb(buf, js.0.as_ref()),
397 Value::List(elems) => strconv::format_list(buf, elems, |buf, elem| match elem {
398 None => Ok::<_, ()>(buf.write_null()),
399 Some(elem) => Ok(elem.encode_text(buf.nonnull_buffer())),
400 })
401 .expect("provided closure never fails"),
402 Value::Map(elems) => strconv::format_map(buf, elems, |buf, value| match value {
403 None => Ok::<_, ()>(buf.write_null()),
404 Some(elem) => Ok(elem.encode_text(buf.nonnull_buffer())),
405 })
406 .expect("provided closure never fails"),
407 Value::Oid(oid) => strconv::format_uint32(buf, *oid),
408 Value::Record(elems) => strconv::format_record(buf, elems, |buf, elem| match elem {
409 None => Ok::<_, ()>(buf.write_null()),
410 Some(elem) => Ok(elem.encode_text(buf.nonnull_buffer())),
411 })
412 .expect("provided closure never fails"),
413 Value::Text(s) | Value::VarChar(s) | Value::BpChar(s) | Value::Name(s) => {
414 strconv::format_string(buf, s)
415 }
416 Value::Time(t) => strconv::format_time(buf, *t),
417 Value::Timestamp(ts) => strconv::format_timestamp(buf, ts),
418 Value::TimestampTz(ts) => strconv::format_timestamptz(buf, ts),
419 Value::Uuid(u) => strconv::format_uuid(buf, *u),
420 Value::Numeric(d) => strconv::format_numeric(buf, &d.0),
421 Value::MzTimestamp(t) => strconv::format_mz_timestamp(buf, *t),
422 Value::Range(range) => strconv::format_range(buf, range, |buf, elem| match elem {
423 Some(elem) => Ok(elem.encode_text(buf.nonnull_buffer())),
424 None => Ok::<_, ()>(buf.write_null()),
425 })
426 .expect("provided closure never fails"),
427 Value::MzAclItem(mz_acl_item) => strconv::format_mz_acl_item(buf, *mz_acl_item),
428 Value::AclItem(acl_item) => strconv::format_acl_item(buf, *acl_item),
429 }
430 }
431
432 pub fn encode_binary(&self, ty: &Type, buf: &mut BytesMut) -> Result<(), io::Error> {
435 let is_null = match self {
438 Value::Array { dims, elements } => {
439 let ndims = pg_len("number of array dimensions", dims.len())?;
440 let has_null = elements.iter().any(|e| e.is_none());
441 let elem_type = match ty {
442 Type::Array(elem_type) => elem_type,
443 _ => unreachable!(),
444 };
445 buf.put_i32(ndims);
446 buf.put_i32(has_null.into());
447 buf.put_u32(elem_type.oid());
448 for dim in dims {
449 buf.put_i32(pg_len("array dimension length", dim.length)?);
450 buf.put_i32(dim.lower_bound.try_into().map_err(|_| {
451 io::Error::new(
452 io::ErrorKind::InvalidData,
453 "array dimension lower bound does not fit into an i32",
454 )
455 })?);
456 }
457 for elem in elements {
458 encode_element(buf, elem.as_ref(), elem_type)?;
459 }
460 Ok(postgres_types::IsNull::No)
461 }
462 Value::Int2Vector { .. } => {
464 Err("binary encoding of int2vector is not implemented".into())
465 }
466 Value::Bool(b) => b.to_sql(&PgType::BOOL, buf),
467 Value::Bytea(b) => b.to_sql(&PgType::BYTEA, buf),
468 Value::Char(c) => i8::reinterpret_cast(*c).to_sql(&PgType::CHAR, buf),
469 Value::Date(d) => d.pg_epoch_days().to_sql(&PgType::DATE, buf),
470 Value::Float4(f) => f.to_sql(&PgType::FLOAT4, buf),
471 Value::Float8(f) => f.to_sql(&PgType::FLOAT8, buf),
472 Value::Int2(i) => i.to_sql(&PgType::INT2, buf),
473 Value::Int4(i) => i.to_sql(&PgType::INT4, buf),
474 Value::Int8(i) => i.to_sql(&PgType::INT8, buf),
475 Value::UInt2(u) => u.to_sql(&*UINT2, buf),
476 Value::UInt4(u) => u.to_sql(&*UINT4, buf),
477 Value::UInt8(u) => u.to_sql(&*UINT8, buf),
478 Value::Interval(iv) => iv.to_sql(&PgType::INTERVAL, buf),
479 Value::Jsonb(js) => js.to_sql(&PgType::JSONB, buf),
480 Value::List(_) => {
481 Err("binary encoding of list types is not implemented".into())
512 }
513 Value::Map(_) => {
514 Err("binary encoding of map types is not implemented".into())
518 }
519 Value::Name(s) => s.to_sql(&PgType::NAME, buf),
520 Value::Oid(i) => i.to_sql(&PgType::OID, buf),
521 Value::Record(fields) => {
522 let nfields = pg_len("record field length", fields.len())?;
523 buf.put_i32(nfields);
524 let field_types = match ty {
525 Type::Record(fields) => fields,
526 _ => unreachable!(),
527 };
528 for (f, ty) in fields.iter().zip_eq(field_types) {
529 buf.put_u32(ty.oid());
530 encode_element(buf, f.as_ref(), ty)?;
531 }
532 Ok(postgres_types::IsNull::No)
533 }
534 Value::Text(s) => s.to_sql(&PgType::TEXT, buf),
535 Value::BpChar(s) => s.to_sql(&PgType::BPCHAR, buf),
536 Value::VarChar(s) => s.to_sql(&PgType::VARCHAR, buf),
537 Value::Time(t) => t.to_sql(&PgType::TIME, buf),
538 Value::Timestamp(ts) => ts.to_sql(&PgType::TIMESTAMP, buf),
539 Value::TimestampTz(ts) => ts.to_sql(&PgType::TIMESTAMPTZ, buf),
540 Value::Uuid(u) => u.to_sql(&PgType::UUID, buf),
541 Value::Numeric(a) => a.to_sql(&PgType::NUMERIC, buf),
542 Value::MzTimestamp(t) => t.to_string().to_sql(&PgType::TEXT, buf),
543 Value::Range(range) => {
544 buf.put_u8(range.pg_flag_bits());
545
546 let elem_type = match ty {
547 Type::Range { element_type } => element_type,
548 _ => unreachable!(),
549 };
550
551 if let Some(RangeInner { lower, upper }) = &range.inner {
552 for bound in [&lower.bound, &upper.bound] {
553 if let Some(bound) = bound {
554 let base = buf.len();
555 buf.put_i32(0);
556 bound.encode_binary(elem_type, buf)?;
557 let len = pg_len("encoded range bound", buf.len() - base - 4)?;
558 buf[base..base + 4].copy_from_slice(&len.to_be_bytes());
559 }
560 }
561 }
562 Ok(postgres_types::IsNull::No)
563 }
564 Value::MzAclItem(mz_acl_item) => {
565 buf.extend_from_slice(&mz_acl_item.encode_binary());
566 Ok(postgres_types::IsNull::No)
567 }
568 Value::AclItem(_) => Err("aclitem has no binary encoding".into()),
569 }
570 .expect("encode_binary should never trigger a to_sql failure");
571 if let IsNull::Yes = is_null {
572 panic!("encode_binary impossibly called on a null value")
573 }
574 Ok(())
575 }
576
577 pub fn can_encode_binary(typ: &SqlScalarType) -> bool {
581 match typ {
582 SqlScalarType::Bool => true,
583 SqlScalarType::Int16 => true,
584 SqlScalarType::Int32 => true,
585 SqlScalarType::Int64 => true,
586 SqlScalarType::PgLegacyChar => true,
587 SqlScalarType::UInt16 => true,
588 SqlScalarType::Oid => true,
589 SqlScalarType::RegClass => true,
590 SqlScalarType::RegProc => true,
591 SqlScalarType::RegType => true,
592 SqlScalarType::UInt32 => true,
593 SqlScalarType::UInt64 => true,
594 SqlScalarType::Float32 => true,
595 SqlScalarType::Float64 => true,
596 SqlScalarType::Numeric { .. } => true,
597 SqlScalarType::MzTimestamp => true,
598 SqlScalarType::MzAclItem => true,
599 SqlScalarType::AclItem => false, SqlScalarType::Date => true,
601 SqlScalarType::Time => true,
602 SqlScalarType::Timestamp { .. } => true,
603 SqlScalarType::TimestampTz { .. } => true,
604 SqlScalarType::Interval => true,
605 SqlScalarType::Bytes => true,
606 SqlScalarType::String => true,
607 SqlScalarType::VarChar { .. } => true,
608 SqlScalarType::Char { .. } => true,
609 SqlScalarType::PgLegacyName => true,
610 SqlScalarType::Jsonb => true,
611 SqlScalarType::Uuid => true,
612 SqlScalarType::Array(elem_type) => Self::can_encode_binary(elem_type),
613 SqlScalarType::Int2Vector => false, SqlScalarType::List { .. } => false, SqlScalarType::Map { .. } => false, SqlScalarType::Record { fields, .. } => fields
617 .iter()
618 .all(|(_, ty)| Self::can_encode_binary(&ty.scalar_type)),
619 SqlScalarType::Range { element_type } => Self::can_encode_binary(element_type),
620 }
621 }
622
623 pub fn decode(
626 format: Format,
627 ty: &Type,
628 raw: &[u8],
629 ) -> Result<Value, Box<dyn Error + Sync + Send>> {
630 match format {
631 Format::Text => Value::decode_text(ty, raw),
632 Format::Binary => Value::decode_binary(ty, raw),
633 }
634 }
635
636 pub fn decode_text<'a>(
639 ty: &'a Type,
640 raw: &'a [u8],
641 ) -> Result<Value, Box<dyn Error + Sync + Send>> {
642 let s = str::from_utf8(raw)?;
643 Ok(match ty {
644 Type::Array(elem_type) => {
645 let (elements, dims) = strconv::parse_array(
646 s,
647 || None,
648 |elem_text| Value::decode_text(elem_type, elem_text.as_bytes()).map(Some),
649 )?;
650 Value::Array { dims, elements }
651 }
652 Type::Int2Vector { .. } => {
653 return Err("input of Int2Vector types is not implemented".into());
654 }
655 Type::Bool => Value::Bool(strconv::parse_bool(s)?),
656 Type::Bytea => Value::Bytea(strconv::parse_bytes(s)?),
657 Type::Char => Value::Char(raw.get(0).copied().unwrap_or(0)),
658 Type::Date => Value::Date(strconv::parse_date(s)?),
659 Type::Float4 => Value::Float4(strconv::parse_float32(s)?),
660 Type::Float8 => Value::Float8(strconv::parse_float64(s)?),
661 Type::Int2 => Value::Int2(strconv::parse_int16(s)?),
662 Type::Int4 => Value::Int4(strconv::parse_int32(s)?),
663 Type::Int8 => Value::Int8(strconv::parse_int64(s)?),
664 Type::UInt2 => Value::UInt2(UInt2(strconv::parse_uint16(s)?)),
665 Type::UInt4 => Value::UInt4(UInt4(strconv::parse_uint32(s)?)),
666 Type::UInt8 => Value::UInt8(UInt8(strconv::parse_uint64(s)?)),
667 Type::Interval { .. } => Value::Interval(Interval(strconv::parse_interval(s)?)),
668 Type::Json => return Err("input of json types is not implemented".into()),
669 Type::Jsonb => Value::Jsonb(Jsonb(strconv::parse_jsonb(s)?)),
670 Type::List(elem_type) => Value::List(strconv::parse_list(
671 s,
672 matches!(**elem_type, Type::List(..)),
673 || None,
674 |elem_text| Value::decode_text(elem_type, elem_text.as_bytes()).map(Some),
675 )?),
676 Type::Map { value_type } => Value::Map(strconv::parse_map(
677 s,
678 matches!(**value_type, Type::Map { .. }),
679 |elem_text| {
680 elem_text
681 .map(|t| Value::decode_text(value_type, t.as_bytes()))
682 .transpose()
683 },
684 )?),
685 Type::Name => Value::Name(strconv::parse_pg_legacy_name(s)),
686 Type::Numeric { .. } => Value::Numeric(Numeric(strconv::parse_numeric(s)?)),
687 Type::Oid | Type::RegClass | Type::RegProc | Type::RegType => {
688 Value::Oid(strconv::parse_oid(s)?)
689 }
690 Type::Record(_) => {
691 return Err("input of anonymous composite types is not implemented".into());
692 }
693 Type::Text => Value::Text(s.to_owned()),
694 Type::BpChar { .. } => Value::BpChar(s.to_owned()),
695 Type::VarChar { .. } => Value::VarChar(s.to_owned()),
696 Type::Time { .. } => Value::Time(strconv::parse_time(s)?),
697 Type::TimeTz { .. } => return Err("input of timetz types is not implemented".into()),
698 Type::Timestamp { .. } => Value::Timestamp(strconv::parse_timestamp(s)?),
699 Type::TimestampTz { .. } => Value::TimestampTz(strconv::parse_timestamptz(s)?),
700 Type::Uuid => Value::Uuid(Uuid::parse_str(s)?),
701 Type::MzTimestamp => Value::MzTimestamp(strconv::parse_mz_timestamp(s)?),
702 Type::Range { element_type } => Value::Range(strconv::parse_range(s, |elem_text| {
703 Value::decode_text(element_type, elem_text.as_bytes()).map(Box::new)
704 })?),
705 Type::MzAclItem => Value::MzAclItem(strconv::parse_mz_acl_item(s)?),
706 Type::AclItem => Value::AclItem(strconv::parse_acl_item(s)?),
707 })
708 }
709
710 pub fn decode_text_into_row<'a>(
712 ty: &'a Type,
713 s: &'a str,
714 packer: &mut RowPacker,
715 ) -> Result<(), Box<dyn Error + Sync + Send>> {
716 Ok(match ty {
717 Type::Array(elem_type) => {
718 let (elements, dims) =
719 strconv::parse_array(s, || None, |elem_text| Ok::<_, String>(Some(elem_text)))?;
720 unsafe {
722 packer.push_array_with_unchecked(&dims, |packer| {
723 let mut nelements = 0;
724 for element in elements {
725 match element {
726 Some(elem_text) => {
727 Value::decode_text_into_row(elem_type, &elem_text, packer)?
728 }
729
730 None => packer.push(Datum::Null),
731 }
732 nelements += 1;
733 }
734 Ok::<_, Box<dyn Error + Sync + Send>>(nelements)
735 })?
736 }
737 }
738 Type::Int2Vector { .. } => {
739 return Err("input of Int2Vector types is not implemented".into());
740 }
741 Type::Bool => packer.push(Datum::from(strconv::parse_bool(s)?)),
742 Type::Bytea => packer.push(Datum::Bytes(&strconv::parse_bytes(s)?)),
743 Type::Char => packer.push(Datum::UInt8(s.as_bytes().get(0).copied().unwrap_or(0))),
744 Type::Date => packer.push(Datum::Date(strconv::parse_date(s)?)),
745 Type::Float4 => packer.push(Datum::Float32(strconv::parse_float32(s)?.into())),
746 Type::Float8 => packer.push(Datum::Float64(strconv::parse_float64(s)?.into())),
747 Type::Int2 => packer.push(Datum::Int16(strconv::parse_int16(s)?)),
748 Type::Int4 => packer.push(Datum::Int32(strconv::parse_int32(s)?)),
749 Type::Int8 => packer.push(Datum::Int64(strconv::parse_int64(s)?)),
750 Type::UInt2 => packer.push(Datum::UInt16(strconv::parse_uint16(s)?)),
751 Type::UInt4 => packer.push(Datum::UInt32(strconv::parse_uint32(s)?)),
752 Type::UInt8 => packer.push(Datum::UInt64(strconv::parse_uint64(s)?)),
753 Type::Interval { .. } => packer.push(Datum::Interval(strconv::parse_interval(s)?)),
754 Type::Json => return Err("input of json types is not implemented".into()),
755 Type::Jsonb => packer.push(strconv::parse_jsonb(s)?.into_row().unpack_first()),
756 Type::List(elem_type) => {
757 let elems = strconv::parse_list(
758 s,
759 matches!(**elem_type, Type::List(..)),
760 || None,
761 |elem_text| Ok::<_, String>(Some(elem_text)),
762 )?;
763 packer.push_list_with(|packer| {
764 for elem in elems {
765 match elem {
766 Some(elem) => Value::decode_text_into_row(elem_type, &elem, packer)?,
767 None => packer.push(Datum::Null),
768 }
769 }
770 Ok::<_, Box<dyn Error + Sync + Send>>(())
771 })?;
772 }
773 Type::Map { value_type } => {
774 let map =
775 strconv::parse_map(s, matches!(**value_type, Type::Map { .. }), |elem_text| {
776 elem_text.map(Ok::<_, String>).transpose()
777 })?;
778 packer.push_dict_with(|row| {
779 for (k, v) in map {
780 row.push(Datum::String(&k));
781 match v {
782 Some(elem) => Value::decode_text_into_row(value_type, &elem, row)?,
783 None => row.push(Datum::Null),
784 }
785 }
786 Ok::<_, Box<dyn Error + Sync + Send>>(())
787 })?;
788 }
789 Type::Name => packer.push(Datum::String(&strconv::parse_pg_legacy_name(s))),
790 Type::Numeric { .. } => packer.push(Datum::Numeric(strconv::parse_numeric(s)?)),
791 Type::Oid | Type::RegClass | Type::RegProc | Type::RegType => {
792 packer.push(Datum::UInt32(strconv::parse_oid(s)?))
793 }
794 Type::Record(_) => {
795 return Err("input of anonymous composite types is not implemented".into());
796 }
797 Type::Text => packer.push(Datum::String(s)),
798 Type::BpChar { .. } => packer.push(Datum::String(s.trim_end())),
799 Type::VarChar { .. } => packer.push(Datum::String(s)),
800 Type::Time { .. } => packer.push(Datum::Time(strconv::parse_time(s)?)),
801 Type::TimeTz { .. } => return Err("input of timetz types is not implemented".into()),
802 Type::Timestamp { .. } => packer.push(Datum::Timestamp(strconv::parse_timestamp(s)?)),
803 Type::TimestampTz { .. } => {
804 packer.push(Datum::TimestampTz(strconv::parse_timestamptz(s)?))
805 }
806 Type::Uuid => packer.push(Datum::Uuid(Uuid::parse_str(s)?)),
807 Type::MzTimestamp => packer.push(Datum::MzTimestamp(strconv::parse_mz_timestamp(s)?)),
808 Type::Range { element_type } => {
809 let range = strconv::parse_range(s, |elem_text| {
810 Value::decode_text(element_type, elem_text.as_bytes()).map(Box::new)
811 })?;
812 let buf = RowArena::new();
815 let range = range
816 .try_into_bounds(|elem| elem.into_datum(&buf, element_type))
817 .map_err(Box::<dyn Error + Sync + Send>::from)?;
818 packer
819 .push_range(range)
820 .map_err(Box::<dyn Error + Sync + Send>::from)?;
821 }
822 Type::MzAclItem => packer.push(Datum::MzAclItem(strconv::parse_mz_acl_item(s)?)),
823 Type::AclItem => packer.push(Datum::AclItem(strconv::parse_acl_item(s)?)),
824 })
825 }
826
827 pub fn decode_binary(ty: &Type, raw: &[u8]) -> Result<Value, Box<dyn Error + Sync + Send>> {
830 match ty {
831 Type::Array(_) => Err("input of array types is not implemented".into()),
832 Type::Int2Vector => Err("input of int2vector types is not implemented".into()),
833 Type::Bool => bool::from_sql(ty.inner(), raw).map(Value::Bool),
834 Type::Bytea => Vec::<u8>::from_sql(ty.inner(), raw).map(Value::Bytea),
835 Type::Char => {
836 i8::from_sql(ty.inner(), raw).map(|c| Value::Char(u8::reinterpret_cast(c)))
837 }
838 Type::Date => {
839 let days = i32::from_sql(ty.inner(), raw)?;
840 Ok(Value::Date(Date::from_pg_epoch(days)?))
841 }
842 Type::Float4 => f32::from_sql(ty.inner(), raw).map(Value::Float4),
843 Type::Float8 => f64::from_sql(ty.inner(), raw).map(Value::Float8),
844 Type::Int2 => i16::from_sql(ty.inner(), raw).map(Value::Int2),
845 Type::Int4 => i32::from_sql(ty.inner(), raw).map(Value::Int4),
846 Type::Int8 => i64::from_sql(ty.inner(), raw).map(Value::Int8),
847 Type::UInt2 => UInt2::from_sql(ty.inner(), raw).map(Value::UInt2),
848 Type::UInt4 => UInt4::from_sql(ty.inner(), raw).map(Value::UInt4),
849 Type::UInt8 => UInt8::from_sql(ty.inner(), raw).map(Value::UInt8),
850 Type::Interval { .. } => Interval::from_sql(ty.inner(), raw).map(Value::Interval),
851 Type::Json => Err("input of json types is not implemented".into()),
852 Type::Jsonb => Jsonb::from_sql(ty.inner(), raw).map(Value::Jsonb),
853 Type::List(_) => Err("binary decoding of list types is not implemented".into()),
854 Type::Map { .. } => Err("binary decoding of map types is not implemented".into()),
855 Type::Name => {
856 let s = String::from_sql(ty.inner(), raw)?;
857 if s.len() > NAME_MAX_BYTES {
858 return Err("identifier too long".into());
859 }
860 Ok(Value::Name(s))
861 }
862 Type::Numeric { .. } => Numeric::from_sql(ty.inner(), raw).map(Value::Numeric),
863 Type::Oid | Type::RegClass | Type::RegProc | Type::RegType => {
864 u32::from_sql(ty.inner(), raw).map(Value::Oid)
865 }
866 Type::Record(_) => Err("input of anonymous composite types is not implemented".into()),
867 Type::Text => String::from_sql(ty.inner(), raw).map(Value::Text),
868 Type::BpChar { .. } => String::from_sql(ty.inner(), raw).map(Value::BpChar),
869 Type::VarChar { .. } => String::from_sql(ty.inner(), raw).map(Value::VarChar),
870 Type::Time { .. } => NaiveTime::from_sql(ty.inner(), raw).map(Value::Time),
871 Type::TimeTz { .. } => Err("input of timetz types is not implemented".into()),
872 Type::Timestamp { .. } => {
873 let ts = NaiveDateTime::from_sql(ty.inner(), raw)?;
874 Ok(Value::Timestamp(CheckedTimestamp::from_timestamplike(ts)?))
875 }
876 Type::TimestampTz { .. } => {
877 let ts = DateTime::<Utc>::from_sql(ty.inner(), raw)?;
878 Ok(Value::TimestampTz(CheckedTimestamp::from_timestamplike(
879 ts,
880 )?))
881 }
882 Type::Uuid => Uuid::from_sql(ty.inner(), raw).map(Value::Uuid),
883 Type::MzTimestamp => {
884 let s = String::from_sql(ty.inner(), raw)?;
885 let t: mz_repr::Timestamp = s.parse()?;
886 Ok(Value::MzTimestamp(t))
887 }
888 Type::Range { .. } => Err("binary decoding of range types is not implemented".into()),
889 Type::MzAclItem => {
890 let mz_acl_item = MzAclItem::decode_binary(raw)?;
891 Ok(Value::MzAclItem(mz_acl_item))
892 }
893 Type::AclItem => Err("aclitem has no binary encoding".into()),
894 }
895 }
896}
897
898fn encode_element(buf: &mut BytesMut, elem: Option<&Value>, ty: &Type) -> Result<(), io::Error> {
899 match elem {
900 None => buf.put_i32(-1),
901 Some(elem) => {
902 let base = buf.len();
903 buf.put_i32(0);
904 elem.encode_binary(ty, buf)?;
905 let len = pg_len("encoded element", buf.len() - base - 4)?;
906 buf[base..base + 4].copy_from_slice(&len.to_be_bytes());
907 }
908 }
909 Ok(())
910}
911
912fn pg_len(what: &str, len: usize) -> Result<i32, io::Error> {
913 len.try_into().map_err(|_| {
914 io::Error::new(
915 io::ErrorKind::InvalidData,
916 format!("{} does not fit into an i32", what),
917 )
918 })
919}
920
921pub fn values_from_row(row: &RowRef, typ: &SqlRelationType) -> Vec<Option<Value>> {
926 row.iter()
927 .zip_eq(typ.column_types.iter())
928 .map(|(col, typ)| Value::from_datum(col, &typ.scalar_type))
929 .collect()
930}
931
932#[cfg(test)]
933mod tests {
934 use super::*;
935
936 #[mz_ore::test]
938 fn decode_text_error_smoke_test() {
939 let bool_array = Value::Array {
940 dims: vec![ArrayDimension {
941 lower_bound: 0,
942 length: 1,
943 }],
944 elements: vec![Some(Value::Bool(true))],
945 };
946
947 let mut buf = BytesMut::new();
948 bool_array.encode_text(&mut buf);
949 let buf = buf.to_vec();
950
951 let int_array_tpe = Type::Array(Box::new(Type::Int4));
952 let decoded_int_array = Value::decode_text(&int_array_tpe, &buf);
953
954 assert_eq!(
955 decoded_int_array.map_err(|e| e.to_string()).unwrap_err(),
956 "invalid input syntax for type array: Specifying array lower bounds is not supported: \"[0:0]={t}\"".to_string()
957 );
958 }
959}