1use std::collections::BTreeMap;
11use std::error::Error;
12use std::{io, str};
13
14use bytes::{BufMut, BytesMut};
15use chrono::{DateTime, NaiveDateTime, NaiveTime, Utc};
16use itertools::Itertools;
17use mz_ore::cast::ReinterpretCast;
18use mz_pgrepr_consts::oid::TYPE_INT2_OID;
19use mz_pgwire_common::Format;
20use mz_repr::adt::array::ArrayDimension;
21use mz_repr::adt::char;
22use mz_repr::adt::date::Date;
23use mz_repr::adt::jsonb::JsonbRef;
24use mz_repr::adt::mz_acl_item::{AclItem, MzAclItem};
25use mz_repr::adt::pg_legacy_name::NAME_MAX_BYTES;
26use mz_repr::adt::range::{Range, RangeInner};
27use mz_repr::adt::timestamp::CheckedTimestamp;
28use mz_repr::strconv::{self, Nestable};
29use mz_repr::{Datum, RowArena, RowPacker, RowRef, SqlRelationType, SqlScalarType};
30use postgres_types::{FromSql, IsNull, ToSql, Type as PgType};
31use uuid::Uuid;
32
33use crate::types::{UINT2, UINT4, UINT8};
34use crate::value::error::IntoDatumError;
35use crate::{Interval, Jsonb, Numeric, Type, UInt2, UInt4, UInt8};
36
37pub mod error;
38pub mod interval;
39pub mod jsonb;
40pub mod numeric;
41pub mod record;
42pub mod unsigned;
43
44#[derive(Debug)]
46pub enum Value {
47 Array {
49 dims: Vec<ArrayDimension>,
51 elements: Vec<Option<Value>>,
53 },
54 Bool(bool),
56 Bytea(Vec<u8>),
58 Char(u8),
60 Date(Date),
62 Float4(f32),
64 Float8(f64),
66 Int2(i16),
68 Int4(i32),
70 Int8(i64),
72 UInt2(UInt2),
74 UInt4(UInt4),
76 UInt8(UInt8),
78 Interval(Interval),
80 Jsonb(Jsonb),
82 List(Vec<Option<Value>>),
84 Map(BTreeMap<String, Option<Value>>),
86 Name(String),
88 Numeric(Numeric),
90 Oid(u32),
92 Record(Vec<Option<Value>>),
94 Time(NaiveTime),
96 Timestamp(CheckedTimestamp<NaiveDateTime>),
98 TimestampTz(CheckedTimestamp<DateTime<Utc>>),
100 Text(String),
102 BpChar(String),
104 VarChar(String),
106 Uuid(Uuid),
108 Int2Vector {
110 elements: Vec<Option<Value>>,
112 },
113 MzTimestamp(mz_repr::Timestamp),
115 Range(Range<Box<Value>>),
117 MzAclItem(MzAclItem),
120 AclItem(AclItem),
123}
124
125impl Value {
126 pub fn from_datum(datum: Datum, typ: &SqlScalarType) -> Option<Value> {
131 match (datum, typ) {
132 (Datum::Null, _) => None,
133 (Datum::True, SqlScalarType::Bool) => Some(Value::Bool(true)),
134 (Datum::False, SqlScalarType::Bool) => Some(Value::Bool(false)),
135 (Datum::Int16(i), SqlScalarType::Int16) => Some(Value::Int2(i)),
136 (Datum::Int32(i), SqlScalarType::Int32) => Some(Value::Int4(i)),
137 (Datum::Int64(i), SqlScalarType::Int64) => Some(Value::Int8(i)),
138 (Datum::UInt8(c), SqlScalarType::PgLegacyChar) => Some(Value::Char(c)),
139 (Datum::UInt16(u), SqlScalarType::UInt16) => Some(Value::UInt2(UInt2(u))),
140 (Datum::UInt32(oid), SqlScalarType::Oid) => Some(Value::Oid(oid)),
141 (Datum::UInt32(oid), SqlScalarType::RegClass) => Some(Value::Oid(oid)),
142 (Datum::UInt32(oid), SqlScalarType::RegProc) => Some(Value::Oid(oid)),
143 (Datum::UInt32(oid), SqlScalarType::RegType) => Some(Value::Oid(oid)),
144 (Datum::UInt32(u), SqlScalarType::UInt32) => Some(Value::UInt4(UInt4(u))),
145 (Datum::UInt64(u), SqlScalarType::UInt64) => Some(Value::UInt8(UInt8(u))),
146 (Datum::Float32(f), SqlScalarType::Float32) => Some(Value::Float4(*f)),
147 (Datum::Float64(f), SqlScalarType::Float64) => Some(Value::Float8(*f)),
148 (Datum::Numeric(d), SqlScalarType::Numeric { .. }) => Some(Value::Numeric(Numeric(d))),
149 (Datum::MzTimestamp(t), SqlScalarType::MzTimestamp) => Some(Value::MzTimestamp(t)),
150 (Datum::MzAclItem(mai), SqlScalarType::MzAclItem) => Some(Value::MzAclItem(mai)),
151 (Datum::AclItem(ai), SqlScalarType::AclItem) => Some(Value::AclItem(ai)),
152 (Datum::Date(d), SqlScalarType::Date) => Some(Value::Date(d)),
153 (Datum::Time(t), SqlScalarType::Time) => Some(Value::Time(t)),
154 (Datum::Timestamp(ts), SqlScalarType::Timestamp { .. }) => Some(Value::Timestamp(ts)),
155 (Datum::TimestampTz(ts), SqlScalarType::TimestampTz { .. }) => {
156 Some(Value::TimestampTz(ts))
157 }
158 (Datum::Interval(iv), SqlScalarType::Interval) => Some(Value::Interval(Interval(iv))),
159 (Datum::Bytes(b), SqlScalarType::Bytes) => Some(Value::Bytea(b.to_vec())),
160 (Datum::String(s), SqlScalarType::String) => Some(Value::Text(s.to_owned())),
161 (Datum::String(s), SqlScalarType::VarChar { .. }) => Some(Value::VarChar(s.to_owned())),
162 (Datum::String(s), SqlScalarType::Char { length }) => {
163 Some(Value::BpChar(char::format_str_pad(s, *length)))
164 }
165 (Datum::String(s), SqlScalarType::PgLegacyName) => Some(Value::Name(s.into())),
166 (_, SqlScalarType::Jsonb) => {
167 Some(Value::Jsonb(Jsonb(JsonbRef::from_datum(datum).to_owned())))
168 }
169 (Datum::Uuid(u), SqlScalarType::Uuid) => Some(Value::Uuid(u)),
170 (Datum::Array(array), SqlScalarType::Array(elem_type)) => {
171 let dims = array.dims().into_iter().collect();
172 let elements = array
173 .elements()
174 .iter()
175 .map(|elem| Value::from_datum(elem, elem_type))
176 .collect();
177 Some(Value::Array { dims, elements })
178 }
179 (Datum::Array(array), SqlScalarType::Int2Vector) => {
180 assert!(
181 array.has_int2vector_dims(),
182 "int2vector must be 1 dimensional, or empty"
183 );
184 let elements = array
185 .elements()
186 .iter()
187 .map(|elem| Value::from_datum(elem, &SqlScalarType::Int16))
188 .collect();
189 Some(Value::Int2Vector { elements })
190 }
191 (Datum::List(list), SqlScalarType::List { element_type, .. }) => {
192 let elements = list
193 .iter()
194 .map(|elem| Value::from_datum(elem, element_type))
195 .collect();
196 Some(Value::List(elements))
197 }
198 (Datum::List(record), SqlScalarType::Record { fields, .. }) => {
199 let fields = record
200 .iter()
201 .zip_eq(fields)
202 .map(|(e, (_name, ty))| Value::from_datum(e, &ty.scalar_type))
203 .collect();
204 Some(Value::Record(fields))
205 }
206 (Datum::Map(dict), SqlScalarType::Map { value_type, .. }) => {
207 let entries = dict
208 .iter()
209 .map(|(k, v)| (k.to_owned(), Value::from_datum(v, value_type)))
210 .collect();
211 Some(Value::Map(entries))
212 }
213 (Datum::Range(range), SqlScalarType::Range { element_type }) => {
214 let value_range = range.into_bounds(|b| {
215 Box::new(
216 Value::from_datum(b.datum(), element_type)
217 .expect("RangeBounds never contain Datum::Null"),
218 )
219 });
220 Some(Value::Range(value_range))
221 }
222 _ => panic!("can't serialize {}::{:?}", datum, typ),
223 }
224 }
225
226 pub fn into_datum<'a>(
228 self,
229 buf: &'a RowArena,
230 typ: &Type,
231 ) -> Result<Datum<'a>, IntoDatumError> {
232 Ok(match self {
233 Value::Array { dims, elements } => {
234 let element_pg_type = match typ {
235 Type::Array(t) => &*t,
236 _ => panic!("Value::Array should have type Type::Array. Found {:?}", typ),
237 };
238 let elements: Result<Vec<_>, _> = elements
239 .into_iter()
240 .map(|element| match element {
241 Some(element) => element.into_datum(buf, element_pg_type),
242 None => Ok(Datum::Null),
243 })
244 .collect();
245 let elements = elements?;
246 buf.try_make_datum(|packer| {
247 packer
248 .try_push_array(&dims, elements)
249 .map_err(IntoDatumError::from)
250 })?
251 }
252 Value::Int2Vector { .. } => {
253 unreachable!("into_datum cannot be called on Value::Int2Vector");
256 }
257 Value::Bool(true) => Datum::True,
258 Value::Bool(false) => Datum::False,
259 Value::Bytea(b) => Datum::Bytes(buf.push_bytes(b)),
260 Value::Char(c) => Datum::UInt8(c),
261 Value::Date(d) => Datum::Date(d),
262 Value::Float4(f) => Datum::Float32(f.into()),
263 Value::Float8(f) => Datum::Float64(f.into()),
264 Value::Int2(i) => Datum::Int16(i),
265 Value::Int4(i) => Datum::Int32(i),
266 Value::Int8(i) => Datum::Int64(i),
267 Value::UInt2(u) => Datum::UInt16(u.0),
268 Value::UInt4(u) => Datum::UInt32(u.0),
269 Value::UInt8(u) => Datum::UInt64(u.0),
270 Value::Jsonb(js) => buf.push_unary_row(js.0.into_row()),
271 Value::List(elems) => {
272 let elem_pg_type = match typ {
273 Type::List(t) => &*t,
274 _ => panic!("Value::List should have type Type::List. Found {:?}", typ),
275 };
276 let elems: Result<Vec<_>, _> = elems
277 .into_iter()
278 .map(|elem| match elem {
279 Some(elem) => elem.into_datum(buf, elem_pg_type),
280 None => Ok(Datum::Null),
281 })
282 .collect();
283 let elems = elems?;
284 buf.make_datum(|packer| packer.push_list(elems))
285 }
286 Value::Map(map) => {
287 let elem_pg_type = match typ {
288 Type::Map { value_type } => &*value_type,
289 _ => panic!("Value::Map should have type Type::Map. Found {:?}", typ),
290 };
291 buf.try_make_datum(|packer| {
292 packer.try_push_dict_with(|row| {
293 for (k, v) in map {
294 row.push(Datum::String(buf.push_string(k)));
295 let datum = match v {
296 Some(elem) => elem.into_datum(buf, elem_pg_type)?,
297 None => Datum::Null,
298 };
299 row.push(datum);
300 }
301 Ok::<_, IntoDatumError>(())
302 })
303 })?
304 }
305 Value::Oid(oid) => Datum::UInt32(oid),
306 Value::Record(_) => {
307 unreachable!("into_datum cannot be called on Value::Record");
310 }
311 Value::Time(t) => Datum::Time(t),
312 Value::Timestamp(ts) => Datum::Timestamp(ts),
313 Value::TimestampTz(ts) => Datum::TimestampTz(ts),
314 Value::Interval(iv) => Datum::Interval(iv.0),
315 Value::Text(s) | Value::VarChar(s) | Value::Name(s) => {
316 Datum::String(buf.push_string(s))
317 }
318 Value::BpChar(s) => Datum::String(buf.push_string(s.trim_end().into())),
319 Value::Uuid(u) => Datum::Uuid(u),
320 Value::Numeric(n) => Datum::Numeric(n.0),
321 Value::MzTimestamp(t) => Datum::MzTimestamp(t),
322 Value::Range(range) => {
323 let elem_pg_type = match typ {
324 Type::Range { element_type } => &*element_type,
325 _ => panic!("Value::Range should have type Type::Range. Found {:?}", typ),
326 };
327 let range = range.try_into_bounds(|elem| elem.into_datum(buf, elem_pg_type))?;
328 buf.try_make_datum(|packer| packer.push_range(range).map_err(IntoDatumError::from))?
329 }
330 Value::MzAclItem(mz_acl_item) => Datum::MzAclItem(mz_acl_item),
331 Value::AclItem(acl_item) => Datum::AclItem(acl_item),
332 })
333 }
334
335 pub fn into_datum_decode_error<'a>(
340 self,
341 buf: &'a RowArena,
342 typ: &Type,
343 context: &str,
344 ) -> Result<Datum<'a>, String> {
345 self.into_datum(buf, typ)
346 .map_err(|e| format!("unable to decode {}: {}", context, e))
347 }
348
349 pub fn encode(&self, ty: &Type, format: Format, buf: &mut BytesMut) -> Result<(), io::Error> {
351 match format {
352 Format::Text => {
353 self.encode_text(buf);
354 Ok(())
355 }
356 Format::Binary => self.encode_binary(ty, buf),
357 }
358 }
359
360 pub fn encode_text(&self, buf: &mut BytesMut) -> Nestable {
363 match self {
364 Value::Array { dims, elements } => {
365 strconv::format_array(buf, dims, elements, |buf, elem| match elem {
366 None => Ok::<_, ()>(buf.write_null()),
367 Some(elem) => Ok(elem.encode_text(buf.nonnull_buffer())),
368 })
369 .expect("provided closure never fails")
370 }
371 Value::Int2Vector { elements } => {
372 strconv::format_legacy_vector(buf, elements, |buf, elem| {
373 Ok::<_, ()>(
374 elem.as_ref()
375 .expect("Int2Vector does not support NULL values")
376 .encode_text(buf.nonnull_buffer()),
377 )
378 })
379 .expect("provided closure never fails")
380 }
381 Value::Bool(b) => strconv::format_bool(buf, *b),
382 Value::Bytea(b) => strconv::format_bytes(buf, b),
383 Value::Char(c) => {
384 buf.put_u8(*c);
385 Nestable::MayNeedEscaping
386 }
387 Value::Date(d) => strconv::format_date(buf, *d),
388 Value::Int2(i) => strconv::format_int16(buf, *i),
389 Value::Int4(i) => strconv::format_int32(buf, *i),
390 Value::Int8(i) => strconv::format_int64(buf, *i),
391 Value::UInt2(u) => strconv::format_uint16(buf, u.0),
392 Value::UInt4(u) => strconv::format_uint32(buf, u.0),
393 Value::UInt8(u) => strconv::format_uint64(buf, u.0),
394 Value::Interval(iv) => strconv::format_interval(buf, iv.0),
395 Value::Float4(f) => strconv::format_float32(buf, *f),
396 Value::Float8(f) => strconv::format_float64(buf, *f),
397 Value::Jsonb(js) => strconv::format_jsonb(buf, js.0.as_ref()),
398 Value::List(elems) => strconv::format_list(buf, elems, |buf, elem| match elem {
399 None => Ok::<_, ()>(buf.write_null()),
400 Some(elem) => Ok(elem.encode_text(buf.nonnull_buffer())),
401 })
402 .expect("provided closure never fails"),
403 Value::Map(elems) => strconv::format_map(buf, elems, |buf, value| match value {
404 None => Ok::<_, ()>(buf.write_null()),
405 Some(elem) => Ok(elem.encode_text(buf.nonnull_buffer())),
406 })
407 .expect("provided closure never fails"),
408 Value::Oid(oid) => strconv::format_uint32(buf, *oid),
409 Value::Record(elems) => strconv::format_record(buf, elems, |buf, elem| match elem {
410 None => Ok::<_, ()>(buf.write_null()),
411 Some(elem) => Ok(elem.encode_text(buf.nonnull_buffer())),
412 })
413 .expect("provided closure never fails"),
414 Value::Text(s) | Value::VarChar(s) | Value::BpChar(s) | Value::Name(s) => {
415 strconv::format_string(buf, s)
416 }
417 Value::Time(t) => strconv::format_time(buf, *t),
418 Value::Timestamp(ts) => strconv::format_timestamp(buf, ts),
419 Value::TimestampTz(ts) => strconv::format_timestamptz(buf, ts),
420 Value::Uuid(u) => strconv::format_uuid(buf, *u),
421 Value::Numeric(d) => strconv::format_numeric(buf, &d.0),
422 Value::MzTimestamp(t) => strconv::format_mz_timestamp(buf, *t),
423 Value::Range(range) => strconv::format_range(buf, range, |buf, elem| match elem {
424 Some(elem) => Ok(elem.encode_text(buf.nonnull_buffer())),
425 None => Ok::<_, ()>(buf.write_null()),
426 })
427 .expect("provided closure never fails"),
428 Value::MzAclItem(mz_acl_item) => strconv::format_mz_acl_item(buf, *mz_acl_item),
429 Value::AclItem(acl_item) => strconv::format_acl_item(buf, *acl_item),
430 }
431 }
432
433 pub fn encode_binary(&self, ty: &Type, buf: &mut BytesMut) -> Result<(), io::Error> {
436 let is_null = match self {
439 Value::Array { dims, elements } => {
440 let ndims = pg_len("number of array dimensions", dims.len())?;
441 let has_null = elements.iter().any(|e| e.is_none());
442 let elem_type = match ty {
443 Type::Array(elem_type) => elem_type,
444 _ => unreachable!(),
445 };
446 buf.put_i32(ndims);
447 buf.put_i32(has_null.into());
448 buf.put_u32(elem_type.oid());
449 for dim in dims {
450 buf.put_i32(pg_len("array dimension length", dim.length)?);
451 buf.put_i32(dim.lower_bound.try_into().map_err(|_| {
452 io::Error::new(
453 io::ErrorKind::InvalidData,
454 "array dimension lower bound does not fit into an i32",
455 )
456 })?);
457 }
458 for elem in elements {
459 encode_element(buf, elem.as_ref(), elem_type)?;
460 }
461 Ok(postgres_types::IsNull::No)
462 }
463 Value::Int2Vector { elements } => {
464 let has_null = elements.iter().any(|e| e.is_none());
467 buf.put_i32(1);
468 buf.put_i32(has_null.into());
469 buf.put_u32(TYPE_INT2_OID);
470 buf.put_i32(pg_len("int2vector dimension length", elements.len())?);
471 buf.put_i32(0);
472 for elem in elements {
473 encode_element(buf, elem.as_ref(), &Type::Int2)?;
474 }
475 Ok(postgres_types::IsNull::No)
476 }
477 Value::Bool(b) => b.to_sql(&PgType::BOOL, buf),
478 Value::Bytea(b) => b.to_sql(&PgType::BYTEA, buf),
479 Value::Char(c) => i8::reinterpret_cast(*c).to_sql(&PgType::CHAR, buf),
480 Value::Date(d) => d.pg_epoch_days().to_sql(&PgType::DATE, buf),
481 Value::Float4(f) => f.to_sql(&PgType::FLOAT4, buf),
482 Value::Float8(f) => f.to_sql(&PgType::FLOAT8, buf),
483 Value::Int2(i) => i.to_sql(&PgType::INT2, buf),
484 Value::Int4(i) => i.to_sql(&PgType::INT4, buf),
485 Value::Int8(i) => i.to_sql(&PgType::INT8, buf),
486 Value::UInt2(u) => u.to_sql(&*UINT2, buf),
487 Value::UInt4(u) => u.to_sql(&*UINT4, buf),
488 Value::UInt8(u) => u.to_sql(&*UINT8, buf),
489 Value::Interval(iv) => iv.to_sql(&PgType::INTERVAL, buf),
490 Value::Jsonb(js) => js.to_sql(&PgType::JSONB, buf),
491 Value::List(_) => {
492 Err("binary encoding of list types is not implemented".into())
523 }
524 Value::Map(_) => {
525 Err("binary encoding of map types is not implemented".into())
529 }
530 Value::Name(s) => s.to_sql(&PgType::NAME, buf),
531 Value::Oid(i) => i.to_sql(&PgType::OID, buf),
532 Value::Record(fields) => {
533 let nfields = pg_len("record field length", fields.len())?;
534 buf.put_i32(nfields);
535 let field_types = match ty {
536 Type::Record(fields) => fields,
537 _ => unreachable!(),
538 };
539 for (f, ty) in fields.iter().zip_eq(field_types) {
540 buf.put_u32(ty.oid());
541 encode_element(buf, f.as_ref(), ty)?;
542 }
543 Ok(postgres_types::IsNull::No)
544 }
545 Value::Text(s) => s.to_sql(&PgType::TEXT, buf),
546 Value::BpChar(s) => s.to_sql(&PgType::BPCHAR, buf),
547 Value::VarChar(s) => s.to_sql(&PgType::VARCHAR, buf),
548 Value::Time(t) => t.to_sql(&PgType::TIME, buf),
549 Value::Timestamp(ts) => ts.to_sql(&PgType::TIMESTAMP, buf),
550 Value::TimestampTz(ts) => ts.to_sql(&PgType::TIMESTAMPTZ, buf),
551 Value::Uuid(u) => u.to_sql(&PgType::UUID, buf),
552 Value::Numeric(a) => a.to_sql(&PgType::NUMERIC, buf),
553 Value::MzTimestamp(t) => t.to_string().to_sql(&PgType::TEXT, buf),
554 Value::Range(range) => {
555 buf.put_u8(range.pg_flag_bits());
556
557 let elem_type = match ty {
558 Type::Range { element_type } => element_type,
559 _ => unreachable!(),
560 };
561
562 if let Some(RangeInner { lower, upper }) = &range.inner {
563 for bound in [&lower.bound, &upper.bound] {
564 if let Some(bound) = bound {
565 let base = buf.len();
566 buf.put_i32(0);
567 bound.encode_binary(elem_type, buf)?;
568 let len = pg_len("encoded range bound", buf.len() - base - 4)?;
569 buf[base..base + 4].copy_from_slice(&len.to_be_bytes());
570 }
571 }
572 }
573 Ok(postgres_types::IsNull::No)
574 }
575 Value::MzAclItem(mz_acl_item) => {
576 buf.extend_from_slice(&mz_acl_item.encode_binary());
577 Ok(postgres_types::IsNull::No)
578 }
579 Value::AclItem(_) => Err("aclitem has no binary encoding".into()),
580 }
581 .expect("encode_binary should never trigger a to_sql failure");
582 if let IsNull::Yes = is_null {
583 panic!("encode_binary impossibly called on a null value")
584 }
585 Ok(())
586 }
587
588 pub fn binary_encoding_error(typ: &SqlScalarType) -> Result<(), &'static str> {
602 match typ {
603 SqlScalarType::Bool => Ok(()),
604 SqlScalarType::Int16 => Ok(()),
605 SqlScalarType::Int32 => Ok(()),
606 SqlScalarType::Int64 => Ok(()),
607 SqlScalarType::PgLegacyChar => Ok(()),
608 SqlScalarType::UInt16 => Ok(()),
609 SqlScalarType::Oid => Ok(()),
610 SqlScalarType::RegClass => Ok(()),
611 SqlScalarType::RegProc => Ok(()),
612 SqlScalarType::RegType => Ok(()),
613 SqlScalarType::UInt32 => Ok(()),
614 SqlScalarType::UInt64 => Ok(()),
615 SqlScalarType::Float32 => Ok(()),
616 SqlScalarType::Float64 => Ok(()),
617 SqlScalarType::Numeric { .. } => Ok(()),
618 SqlScalarType::MzTimestamp => Ok(()),
619 SqlScalarType::MzAclItem => Ok(()),
620 SqlScalarType::AclItem => Err("no binary output function available for type aclitem"),
621 SqlScalarType::Date => Ok(()),
622 SqlScalarType::Time => Ok(()),
623 SqlScalarType::Timestamp { .. } => Ok(()),
624 SqlScalarType::TimestampTz { .. } => Ok(()),
625 SqlScalarType::Interval => Ok(()),
626 SqlScalarType::Bytes => Ok(()),
627 SqlScalarType::String => Ok(()),
628 SqlScalarType::VarChar { .. } => Ok(()),
629 SqlScalarType::Char { .. } => Ok(()),
630 SqlScalarType::PgLegacyName => Ok(()),
631 SqlScalarType::Jsonb => Ok(()),
632 SqlScalarType::Uuid => Ok(()),
633 SqlScalarType::Array(elem_type) => Self::binary_encoding_error(elem_type),
634 SqlScalarType::Int2Vector => Ok(()),
635 SqlScalarType::List { .. } => Err("no binary output function available for type list"),
636 SqlScalarType::Map { .. } => Err("no binary output function available for type map"),
637 SqlScalarType::Record { fields, .. } => fields
638 .iter()
639 .try_for_each(|(_, ty)| Self::binary_encoding_error(&ty.scalar_type)),
640 SqlScalarType::Range { element_type } => Self::binary_encoding_error(element_type),
641 }
642 }
643
644 pub fn can_encode_binary(typ: &SqlScalarType) -> bool {
648 Self::binary_encoding_error(typ).is_ok()
649 }
650
651 pub fn decode(
654 format: Format,
655 ty: &Type,
656 raw: &[u8],
657 ) -> Result<Value, Box<dyn Error + Sync + Send>> {
658 match format {
659 Format::Text => Value::decode_text(ty, raw),
660 Format::Binary => Value::decode_binary(ty, raw),
661 }
662 }
663
664 pub fn decode_text<'a>(
667 ty: &'a Type,
668 raw: &'a [u8],
669 ) -> Result<Value, Box<dyn Error + Sync + Send>> {
670 let s = str::from_utf8(raw)?;
671 Ok(match ty {
672 Type::Array(elem_type) => {
673 let (elements, dims) = strconv::parse_array(
674 s,
675 || None,
676 |elem_text| Value::decode_text(elem_type, elem_text.as_bytes()).map(Some),
677 )?;
678 Value::Array { dims, elements }
679 }
680 Type::Int2Vector { .. } => {
681 return Err("input of Int2Vector types is not implemented".into());
682 }
683 Type::Bool => Value::Bool(strconv::parse_bool(s)?),
684 Type::Bytea => Value::Bytea(strconv::parse_bytes(s)?),
685 Type::Char => Value::Char(raw.get(0).copied().unwrap_or(0)),
686 Type::Date => Value::Date(strconv::parse_date(s)?),
687 Type::Float4 => Value::Float4(strconv::parse_float32(s)?),
688 Type::Float8 => Value::Float8(strconv::parse_float64(s)?),
689 Type::Int2 => Value::Int2(strconv::parse_int16(s)?),
690 Type::Int4 => Value::Int4(strconv::parse_int32(s)?),
691 Type::Int8 => Value::Int8(strconv::parse_int64(s)?),
692 Type::UInt2 => Value::UInt2(UInt2(strconv::parse_uint16(s)?)),
693 Type::UInt4 => Value::UInt4(UInt4(strconv::parse_uint32(s)?)),
694 Type::UInt8 => Value::UInt8(UInt8(strconv::parse_uint64(s)?)),
695 Type::Interval { .. } => Value::Interval(Interval(strconv::parse_interval(s)?)),
696 Type::Json => return Err("input of json types is not implemented".into()),
697 Type::Jsonb => Value::Jsonb(Jsonb(strconv::parse_jsonb(s)?)),
698 Type::List(elem_type) => Value::List(strconv::parse_list(
699 s,
700 matches!(**elem_type, Type::List(..)),
701 || None,
702 |elem_text| Value::decode_text(elem_type, elem_text.as_bytes()).map(Some),
703 )?),
704 Type::Map { value_type } => Value::Map(strconv::parse_map(
705 s,
706 matches!(**value_type, Type::Map { .. }),
707 |elem_text| {
708 elem_text
709 .map(|t| Value::decode_text(value_type, t.as_bytes()))
710 .transpose()
711 },
712 )?),
713 Type::Name => Value::Name(strconv::parse_pg_legacy_name(s)),
714 Type::Numeric { .. } => Value::Numeric(Numeric(strconv::parse_numeric(s)?)),
715 Type::Oid | Type::RegClass | Type::RegProc | Type::RegType => {
716 Value::Oid(strconv::parse_oid(s)?)
717 }
718 Type::Record(_) => {
719 return Err("input of anonymous composite types is not implemented".into());
720 }
721 Type::Text => Value::Text(s.to_owned()),
722 Type::BpChar { .. } => Value::BpChar(s.to_owned()),
723 Type::VarChar { .. } => Value::VarChar(s.to_owned()),
724 Type::Time { .. } => Value::Time(strconv::parse_time(s)?),
725 Type::TimeTz { .. } => return Err("input of timetz types is not implemented".into()),
726 Type::Timestamp { .. } => Value::Timestamp(strconv::parse_timestamp(s)?),
727 Type::TimestampTz { .. } => Value::TimestampTz(strconv::parse_timestamptz(s)?),
728 Type::Uuid => Value::Uuid(Uuid::parse_str(s)?),
729 Type::MzTimestamp => Value::MzTimestamp(strconv::parse_mz_timestamp(s)?),
730 Type::Range { element_type } => Value::Range(strconv::parse_range(s, |elem_text| {
731 Value::decode_text(element_type, elem_text.as_bytes()).map(Box::new)
732 })?),
733 Type::MzAclItem => Value::MzAclItem(strconv::parse_mz_acl_item(s)?),
734 Type::AclItem => Value::AclItem(strconv::parse_acl_item(s)?),
735 })
736 }
737
738 pub fn decode_text_into_row<'a>(
740 ty: &'a Type,
741 s: &'a str,
742 packer: &mut RowPacker,
743 ) -> Result<(), Box<dyn Error + Sync + Send>> {
744 Ok(match ty {
745 Type::Array(elem_type) => {
746 let (elements, dims) =
747 strconv::parse_array(s, || None, |elem_text| Ok::<_, String>(Some(elem_text)))?;
748 unsafe {
750 packer.push_array_with_unchecked(&dims, |packer| {
751 let mut nelements = 0;
752 for element in elements {
753 match element {
754 Some(elem_text) => {
755 Value::decode_text_into_row(elem_type, &elem_text, packer)?
756 }
757
758 None => packer.push(Datum::Null),
759 }
760 nelements += 1;
761 }
762 Ok::<_, Box<dyn Error + Sync + Send>>(nelements)
763 })?
764 }
765 }
766 Type::Int2Vector { .. } => {
767 return Err("input of Int2Vector types is not implemented".into());
768 }
769 Type::Bool => packer.push(Datum::from(strconv::parse_bool(s)?)),
770 Type::Bytea => packer.push(Datum::Bytes(&strconv::parse_bytes(s)?)),
771 Type::Char => packer.push(Datum::UInt8(s.as_bytes().get(0).copied().unwrap_or(0))),
772 Type::Date => packer.push(Datum::Date(strconv::parse_date(s)?)),
773 Type::Float4 => packer.push(Datum::Float32(strconv::parse_float32(s)?.into())),
774 Type::Float8 => packer.push(Datum::Float64(strconv::parse_float64(s)?.into())),
775 Type::Int2 => packer.push(Datum::Int16(strconv::parse_int16(s)?)),
776 Type::Int4 => packer.push(Datum::Int32(strconv::parse_int32(s)?)),
777 Type::Int8 => packer.push(Datum::Int64(strconv::parse_int64(s)?)),
778 Type::UInt2 => packer.push(Datum::UInt16(strconv::parse_uint16(s)?)),
779 Type::UInt4 => packer.push(Datum::UInt32(strconv::parse_uint32(s)?)),
780 Type::UInt8 => packer.push(Datum::UInt64(strconv::parse_uint64(s)?)),
781 Type::Interval { .. } => packer.push(Datum::Interval(strconv::parse_interval(s)?)),
782 Type::Json => return Err("input of json types is not implemented".into()),
783 Type::Jsonb => packer.push(strconv::parse_jsonb(s)?.into_row().unpack_first()),
784 Type::List(elem_type) => {
785 let elems = strconv::parse_list(
786 s,
787 matches!(**elem_type, Type::List(..)),
788 || None,
789 |elem_text| Ok::<_, String>(Some(elem_text)),
790 )?;
791 packer.push_list_with(|packer| {
792 for elem in elems {
793 match elem {
794 Some(elem) => Value::decode_text_into_row(elem_type, &elem, packer)?,
795 None => packer.push(Datum::Null),
796 }
797 }
798 Ok::<_, Box<dyn Error + Sync + Send>>(())
799 })?;
800 }
801 Type::Map { value_type } => {
802 let map =
803 strconv::parse_map(s, matches!(**value_type, Type::Map { .. }), |elem_text| {
804 elem_text.map(Ok::<_, String>).transpose()
805 })?;
806 packer.push_dict_with(|row| {
807 for (k, v) in map {
808 row.push(Datum::String(&k));
809 match v {
810 Some(elem) => Value::decode_text_into_row(value_type, &elem, row)?,
811 None => row.push(Datum::Null),
812 }
813 }
814 Ok::<_, Box<dyn Error + Sync + Send>>(())
815 })?;
816 }
817 Type::Name => packer.push(Datum::String(&strconv::parse_pg_legacy_name(s))),
818 Type::Numeric { .. } => packer.push(Datum::Numeric(strconv::parse_numeric(s)?)),
819 Type::Oid | Type::RegClass | Type::RegProc | Type::RegType => {
820 packer.push(Datum::UInt32(strconv::parse_oid(s)?))
821 }
822 Type::Record(_) => {
823 return Err("input of anonymous composite types is not implemented".into());
824 }
825 Type::Text => packer.push(Datum::String(s)),
826 Type::BpChar { .. } => packer.push(Datum::String(s.trim_end())),
827 Type::VarChar { .. } => packer.push(Datum::String(s)),
828 Type::Time { .. } => packer.push(Datum::Time(strconv::parse_time(s)?)),
829 Type::TimeTz { .. } => return Err("input of timetz types is not implemented".into()),
830 Type::Timestamp { .. } => packer.push(Datum::Timestamp(strconv::parse_timestamp(s)?)),
831 Type::TimestampTz { .. } => {
832 packer.push(Datum::TimestampTz(strconv::parse_timestamptz(s)?))
833 }
834 Type::Uuid => packer.push(Datum::Uuid(Uuid::parse_str(s)?)),
835 Type::MzTimestamp => packer.push(Datum::MzTimestamp(strconv::parse_mz_timestamp(s)?)),
836 Type::Range { element_type } => {
837 let range = strconv::parse_range(s, |elem_text| {
838 Value::decode_text(element_type, elem_text.as_bytes()).map(Box::new)
839 })?;
840 let buf = RowArena::new();
843 let range = range
844 .try_into_bounds(|elem| elem.into_datum(&buf, element_type))
845 .map_err(Box::<dyn Error + Sync + Send>::from)?;
846 packer
847 .push_range(range)
848 .map_err(Box::<dyn Error + Sync + Send>::from)?;
849 }
850 Type::MzAclItem => packer.push(Datum::MzAclItem(strconv::parse_mz_acl_item(s)?)),
851 Type::AclItem => packer.push(Datum::AclItem(strconv::parse_acl_item(s)?)),
852 })
853 }
854
855 pub fn decode_binary(ty: &Type, raw: &[u8]) -> Result<Value, Box<dyn Error + Sync + Send>> {
858 match ty {
859 Type::Array(_) => Err("input of array types is not implemented".into()),
860 Type::Int2Vector => Err("input of int2vector types is not implemented".into()),
861 Type::Bool => bool::from_sql(ty.inner(), raw).map(Value::Bool),
862 Type::Bytea => Vec::<u8>::from_sql(ty.inner(), raw).map(Value::Bytea),
863 Type::Char => {
864 i8::from_sql(ty.inner(), raw).map(|c| Value::Char(u8::reinterpret_cast(c)))
865 }
866 Type::Date => {
867 let days = i32::from_sql(ty.inner(), raw)?;
868 Ok(Value::Date(Date::from_pg_epoch(days)?))
869 }
870 Type::Float4 => f32::from_sql(ty.inner(), raw).map(Value::Float4),
871 Type::Float8 => f64::from_sql(ty.inner(), raw).map(Value::Float8),
872 Type::Int2 => i16::from_sql(ty.inner(), raw).map(Value::Int2),
873 Type::Int4 => i32::from_sql(ty.inner(), raw).map(Value::Int4),
874 Type::Int8 => i64::from_sql(ty.inner(), raw).map(Value::Int8),
875 Type::UInt2 => UInt2::from_sql(ty.inner(), raw).map(Value::UInt2),
876 Type::UInt4 => UInt4::from_sql(ty.inner(), raw).map(Value::UInt4),
877 Type::UInt8 => UInt8::from_sql(ty.inner(), raw).map(Value::UInt8),
878 Type::Interval { .. } => Interval::from_sql(ty.inner(), raw).map(Value::Interval),
879 Type::Json => Err("input of json types is not implemented".into()),
880 Type::Jsonb => Jsonb::from_sql(ty.inner(), raw).map(Value::Jsonb),
881 Type::List(_) => Err("binary decoding of list types is not implemented".into()),
882 Type::Map { .. } => Err("binary decoding of map types is not implemented".into()),
883 Type::Name => {
884 let s = String::from_sql(ty.inner(), raw)?;
885 if s.len() > NAME_MAX_BYTES {
886 return Err("identifier too long".into());
887 }
888 Ok(Value::Name(s))
889 }
890 Type::Numeric { .. } => Numeric::from_sql(ty.inner(), raw).map(Value::Numeric),
891 Type::Oid | Type::RegClass | Type::RegProc | Type::RegType => {
892 u32::from_sql(ty.inner(), raw).map(Value::Oid)
893 }
894 Type::Record(_) => Err("input of anonymous composite types is not implemented".into()),
895 Type::Text => String::from_sql(ty.inner(), raw).map(Value::Text),
896 Type::BpChar { .. } => String::from_sql(ty.inner(), raw).map(Value::BpChar),
897 Type::VarChar { .. } => String::from_sql(ty.inner(), raw).map(Value::VarChar),
898 Type::Time { .. } => NaiveTime::from_sql(ty.inner(), raw).map(Value::Time),
899 Type::TimeTz { .. } => Err("input of timetz types is not implemented".into()),
900 Type::Timestamp { .. } => {
901 let ts = NaiveDateTime::from_sql(ty.inner(), raw)?;
902 Ok(Value::Timestamp(CheckedTimestamp::from_timestamplike(ts)?))
903 }
904 Type::TimestampTz { .. } => {
905 let ts = DateTime::<Utc>::from_sql(ty.inner(), raw)?;
906 Ok(Value::TimestampTz(CheckedTimestamp::from_timestamplike(
907 ts,
908 )?))
909 }
910 Type::Uuid => Uuid::from_sql(ty.inner(), raw).map(Value::Uuid),
911 Type::MzTimestamp => {
912 let s = String::from_sql(ty.inner(), raw)?;
913 let t: mz_repr::Timestamp = s.parse()?;
914 Ok(Value::MzTimestamp(t))
915 }
916 Type::Range { .. } => Err("binary decoding of range types is not implemented".into()),
917 Type::MzAclItem => {
918 let mz_acl_item = MzAclItem::decode_binary(raw)?;
919 Ok(Value::MzAclItem(mz_acl_item))
920 }
921 Type::AclItem => Err("aclitem has no binary encoding".into()),
922 }
923 }
924}
925
926fn encode_element(buf: &mut BytesMut, elem: Option<&Value>, ty: &Type) -> Result<(), io::Error> {
927 match elem {
928 None => buf.put_i32(-1),
929 Some(elem) => {
930 let base = buf.len();
931 buf.put_i32(0);
932 elem.encode_binary(ty, buf)?;
933 let len = pg_len("encoded element", buf.len() - base - 4)?;
934 buf[base..base + 4].copy_from_slice(&len.to_be_bytes());
935 }
936 }
937 Ok(())
938}
939
940fn pg_len(what: &str, len: usize) -> Result<i32, io::Error> {
941 len.try_into().map_err(|_| {
942 io::Error::new(
943 io::ErrorKind::InvalidData,
944 format!("{} does not fit into an i32", what),
945 )
946 })
947}
948
949pub fn values_from_row(row: &RowRef, typ: &SqlRelationType) -> Vec<Option<Value>> {
954 row.iter()
955 .zip_eq(typ.column_types.iter())
956 .map(|(col, typ)| Value::from_datum(col, &typ.scalar_type))
957 .collect()
958}
959
960#[cfg(test)]
961mod tests {
962 use mz_repr::arb_datum_for_scalar;
963 use proptest::prelude::*;
964
965 use super::*;
966
967 #[mz_ore::test]
975 #[cfg_attr(miri, ignore)] fn proptest_binary_encoding_error_matches_encode_binary() {
977 let strat =
978 any::<SqlScalarType>().prop_flat_map(|ty| (Just(ty.clone()), arb_datum_for_scalar(ty)));
979 proptest!(ProptestConfig::with_cases(256), |((ty, prop_datum) in strat)| {
980 if Value::binary_encoding_error(&ty).is_err() {
984 return Ok(());
985 }
986 let datum = Datum::from(&prop_datum);
987 let value = match Value::from_datum(datum, &ty) {
988 Some(v) => v,
989 None => return Ok(()),
991 };
992 let pg_ty = Type::from(&ty);
993 let mut buf = BytesMut::new();
994 value
995 .encode_binary(&pg_ty, &mut buf)
996 .expect("encode_binary must succeed when binary_encoding_error returns Ok");
997 });
998 }
999
1000 #[mz_ore::test]
1002 fn decode_text_error_smoke_test() {
1003 let bool_array = Value::Array {
1004 dims: vec![ArrayDimension {
1005 lower_bound: 0,
1006 length: 1,
1007 }],
1008 elements: vec![Some(Value::Bool(true))],
1009 };
1010
1011 let mut buf = BytesMut::new();
1012 bool_array.encode_text(&mut buf);
1013 let buf = buf.to_vec();
1014
1015 let int_array_tpe = Type::Array(Box::new(Type::Int4));
1016 let decoded_int_array = Value::decode_text(&int_array_tpe, &buf);
1017
1018 assert_eq!(
1019 decoded_int_array.map_err(|e| e.to_string()).unwrap_err(),
1020 "invalid input syntax for type array: Specifying array lower bounds is not supported: \"[0:0]={t}\"".to_string()
1021 );
1022 }
1023}