1use std::collections::BTreeMap;
11use std::error::Error;
12use std::{io, str};
13
14use bytes::{BufMut, BytesMut};
15use chrono::{DateTime, NaiveDateTime, NaiveTime, Utc};
16use mz_ore::cast::ReinterpretCast;
17use mz_pgwire_common::Format;
18use mz_repr::adt::array::ArrayDimension;
19use mz_repr::adt::char;
20use mz_repr::adt::date::Date;
21use mz_repr::adt::jsonb::JsonbRef;
22use mz_repr::adt::mz_acl_item::{AclItem, MzAclItem};
23use mz_repr::adt::pg_legacy_name::NAME_MAX_BYTES;
24use mz_repr::adt::range::{Range, RangeInner};
25use mz_repr::adt::timestamp::CheckedTimestamp;
26use mz_repr::strconv::{self, Nestable};
27use mz_repr::{Datum, RelationType, RowArena, RowPacker, RowRef, ScalarType};
28use postgres_types::{FromSql, IsNull, ToSql, Type as PgType};
29use uuid::Uuid;
30
31use crate::types::{UINT2, UINT4, UINT8};
32use crate::{Interval, Jsonb, Numeric, Type, UInt2, UInt4, UInt8};
33
34pub mod interval;
35pub mod jsonb;
36pub mod numeric;
37pub mod record;
38pub mod unsigned;
39
40#[derive(Debug)]
42pub enum Value {
43 Array {
45 dims: Vec<ArrayDimension>,
47 elements: Vec<Option<Value>>,
49 },
50 Bool(bool),
52 Bytea(Vec<u8>),
54 Char(u8),
56 Date(Date),
58 Float4(f32),
60 Float8(f64),
62 Int2(i16),
64 Int4(i32),
66 Int8(i64),
68 UInt2(UInt2),
70 UInt4(UInt4),
72 UInt8(UInt8),
74 Interval(Interval),
76 Jsonb(Jsonb),
78 List(Vec<Option<Value>>),
80 Map(BTreeMap<String, Option<Value>>),
82 Name(String),
84 Numeric(Numeric),
86 Oid(u32),
88 Record(Vec<Option<Value>>),
90 Time(NaiveTime),
92 Timestamp(CheckedTimestamp<NaiveDateTime>),
94 TimestampTz(CheckedTimestamp<DateTime<Utc>>),
96 Text(String),
98 BpChar(String),
100 VarChar(String),
102 Uuid(Uuid),
104 Int2Vector {
106 elements: Vec<Option<Value>>,
108 },
109 MzTimestamp(mz_repr::Timestamp),
111 Range(Range<Box<Value>>),
113 MzAclItem(MzAclItem),
116 AclItem(AclItem),
119}
120
121impl Value {
122 pub fn from_datum(datum: Datum, typ: &ScalarType) -> Option<Value> {
127 match (datum, typ) {
128 (Datum::Null, _) => None,
129 (Datum::True, ScalarType::Bool) => Some(Value::Bool(true)),
130 (Datum::False, ScalarType::Bool) => Some(Value::Bool(false)),
131 (Datum::Int16(i), ScalarType::Int16) => Some(Value::Int2(i)),
132 (Datum::Int32(i), ScalarType::Int32) => Some(Value::Int4(i)),
133 (Datum::Int64(i), ScalarType::Int64) => Some(Value::Int8(i)),
134 (Datum::UInt8(c), ScalarType::PgLegacyChar) => Some(Value::Char(c)),
135 (Datum::UInt16(u), ScalarType::UInt16) => Some(Value::UInt2(UInt2(u))),
136 (Datum::UInt32(oid), ScalarType::Oid) => Some(Value::Oid(oid)),
137 (Datum::UInt32(oid), ScalarType::RegClass) => Some(Value::Oid(oid)),
138 (Datum::UInt32(oid), ScalarType::RegProc) => Some(Value::Oid(oid)),
139 (Datum::UInt32(oid), ScalarType::RegType) => Some(Value::Oid(oid)),
140 (Datum::UInt32(u), ScalarType::UInt32) => Some(Value::UInt4(UInt4(u))),
141 (Datum::UInt64(u), ScalarType::UInt64) => Some(Value::UInt8(UInt8(u))),
142 (Datum::Float32(f), ScalarType::Float32) => Some(Value::Float4(*f)),
143 (Datum::Float64(f), ScalarType::Float64) => Some(Value::Float8(*f)),
144 (Datum::Numeric(d), ScalarType::Numeric { .. }) => Some(Value::Numeric(Numeric(d))),
145 (Datum::MzTimestamp(t), ScalarType::MzTimestamp) => Some(Value::MzTimestamp(t)),
146 (Datum::MzAclItem(mai), ScalarType::MzAclItem) => Some(Value::MzAclItem(mai)),
147 (Datum::AclItem(ai), ScalarType::AclItem) => Some(Value::AclItem(ai)),
148 (Datum::Date(d), ScalarType::Date) => Some(Value::Date(d)),
149 (Datum::Time(t), ScalarType::Time) => Some(Value::Time(t)),
150 (Datum::Timestamp(ts), ScalarType::Timestamp { .. }) => Some(Value::Timestamp(ts)),
151 (Datum::TimestampTz(ts), ScalarType::TimestampTz { .. }) => {
152 Some(Value::TimestampTz(ts))
153 }
154 (Datum::Interval(iv), ScalarType::Interval) => Some(Value::Interval(Interval(iv))),
155 (Datum::Bytes(b), ScalarType::Bytes) => Some(Value::Bytea(b.to_vec())),
156 (Datum::String(s), ScalarType::String) => Some(Value::Text(s.to_owned())),
157 (Datum::String(s), ScalarType::VarChar { .. }) => Some(Value::VarChar(s.to_owned())),
158 (Datum::String(s), ScalarType::Char { length }) => {
159 Some(Value::BpChar(char::format_str_pad(s, *length)))
160 }
161 (Datum::String(s), ScalarType::PgLegacyName) => Some(Value::Name(s.into())),
162 (_, ScalarType::Jsonb) => {
163 Some(Value::Jsonb(Jsonb(JsonbRef::from_datum(datum).to_owned())))
164 }
165 (Datum::Uuid(u), ScalarType::Uuid) => Some(Value::Uuid(u)),
166 (Datum::Array(array), ScalarType::Array(elem_type)) => {
167 let dims = array.dims().into_iter().collect();
168 let elements = array
169 .elements()
170 .iter()
171 .map(|elem| Value::from_datum(elem, elem_type))
172 .collect();
173 Some(Value::Array { dims, elements })
174 }
175 (Datum::Array(array), ScalarType::Int2Vector) => {
176 let dims = array.dims().into_iter();
177 assert!(dims.count() == 1, "int2vector must be 1 dimensional");
178 let elements = array
179 .elements()
180 .iter()
181 .map(|elem| Value::from_datum(elem, &ScalarType::Int16))
182 .collect();
183 Some(Value::Int2Vector { elements })
184 }
185 (Datum::List(list), ScalarType::List { element_type, .. }) => {
186 let elements = list
187 .iter()
188 .map(|elem| Value::from_datum(elem, element_type))
189 .collect();
190 Some(Value::List(elements))
191 }
192 (Datum::List(record), ScalarType::Record { fields, .. }) => {
193 let fields = record
194 .iter()
195 .zip(fields)
196 .map(|(e, (_name, ty))| Value::from_datum(e, &ty.scalar_type))
197 .collect();
198 Some(Value::Record(fields))
199 }
200 (Datum::Map(dict), ScalarType::Map { value_type, .. }) => {
201 let entries = dict
202 .iter()
203 .map(|(k, v)| (k.to_owned(), Value::from_datum(v, value_type)))
204 .collect();
205 Some(Value::Map(entries))
206 }
207 (Datum::Range(range), ScalarType::Range { element_type }) => {
208 let value_range = range.into_bounds(|b| {
209 Box::new(
210 Value::from_datum(b.datum(), element_type)
211 .expect("RangeBounds never contain Datum::Null"),
212 )
213 });
214 Some(Value::Range(value_range))
215 }
216 _ => panic!("can't serialize {}::{:?}", datum, typ),
217 }
218 }
219
220 pub fn into_datum<'a>(self, buf: &'a RowArena, typ: &Type) -> Datum<'a> {
222 match self {
223 Value::Array { dims, elements } => {
224 let element_pg_type = match typ {
225 Type::Array(t) => &*t,
226 _ => panic!("Value::Array should have type Type::Array. Found {:?}", typ),
227 };
228 buf.make_datum(|packer| {
229 packer
230 .try_push_array(
231 &dims,
232 elements.into_iter().map(|element| match element {
233 Some(element) => element.into_datum(buf, element_pg_type),
234 None => Datum::Null,
235 }),
236 )
237 .unwrap();
238 })
239 }
240 Value::Int2Vector { .. } => {
241 unreachable!("into_datum cannot be called on Value::Int2Vector");
244 }
245 Value::Bool(true) => Datum::True,
246 Value::Bool(false) => Datum::False,
247 Value::Bytea(b) => Datum::Bytes(buf.push_bytes(b)),
248 Value::Char(c) => Datum::UInt8(c),
249 Value::Date(d) => Datum::Date(d),
250 Value::Float4(f) => Datum::Float32(f.into()),
251 Value::Float8(f) => Datum::Float64(f.into()),
252 Value::Int2(i) => Datum::Int16(i),
253 Value::Int4(i) => Datum::Int32(i),
254 Value::Int8(i) => Datum::Int64(i),
255 Value::UInt2(u) => Datum::UInt16(u.0),
256 Value::UInt4(u) => Datum::UInt32(u.0),
257 Value::UInt8(u) => Datum::UInt64(u.0),
258 Value::Jsonb(js) => buf.push_unary_row(js.0.into_row()),
259 Value::List(elems) => {
260 let elem_pg_type = match typ {
261 Type::List(t) => &*t,
262 _ => panic!("Value::List should have type Type::List. Found {:?}", typ),
263 };
264 buf.make_datum(|packer| {
265 packer.push_list(elems.into_iter().map(|elem| match elem {
266 Some(elem) => elem.into_datum(buf, elem_pg_type),
267 None => Datum::Null,
268 }));
269 })
270 }
271 Value::Map(map) => {
272 let elem_pg_type = match typ {
273 Type::Map { value_type } => &*value_type,
274 _ => panic!("Value::Map should have type Type::Map. Found {:?}", typ),
275 };
276 buf.make_datum(|packer| {
277 packer.push_dict_with(|row| {
278 for (k, v) in map {
279 row.push(Datum::String(&k));
280 row.push(match v {
281 Some(elem) => elem.into_datum(buf, elem_pg_type),
282 None => Datum::Null,
283 });
284 }
285 });
286 })
287 }
288 Value::Oid(oid) => Datum::UInt32(oid),
289 Value::Record(_) => {
290 unreachable!("into_datum cannot be called on Value::Record");
293 }
294 Value::Time(t) => Datum::Time(t),
295 Value::Timestamp(ts) => Datum::Timestamp(ts),
296 Value::TimestampTz(ts) => Datum::TimestampTz(ts),
297 Value::Interval(iv) => Datum::Interval(iv.0),
298 Value::Text(s) | Value::VarChar(s) | Value::Name(s) => {
299 Datum::String(buf.push_string(s))
300 }
301 Value::BpChar(s) => Datum::String(buf.push_string(s.trim_end().into())),
302 Value::Uuid(u) => Datum::Uuid(u),
303 Value::Numeric(n) => Datum::Numeric(n.0),
304 Value::MzTimestamp(t) => Datum::MzTimestamp(t),
305 Value::Range(range) => {
306 let elem_pg_type = match typ {
307 Type::Range { element_type } => &*element_type,
308 _ => panic!("Value::Range should have type Type::Range. Found {:?}", typ),
309 };
310 let range = range.into_bounds(|elem| elem.into_datum(buf, elem_pg_type));
311
312 buf.make_datum(|packer| packer.push_range(range).unwrap())
313 }
314 Value::MzAclItem(mz_acl_item) => Datum::MzAclItem(mz_acl_item),
315 Value::AclItem(acl_item) => Datum::AclItem(acl_item),
316 }
317 }
318
319 pub fn encode(&self, ty: &Type, format: Format, buf: &mut BytesMut) -> Result<(), io::Error> {
321 match format {
322 Format::Text => {
323 self.encode_text(buf);
324 Ok(())
325 }
326 Format::Binary => self.encode_binary(ty, buf),
327 }
328 }
329
330 pub fn encode_text(&self, buf: &mut BytesMut) -> Nestable {
333 match self {
334 Value::Array { dims, elements } => {
335 strconv::format_array(buf, dims, elements, |buf, elem| match elem {
336 None => Ok::<_, ()>(buf.write_null()),
337 Some(elem) => Ok(elem.encode_text(buf.nonnull_buffer())),
338 })
339 .expect("provided closure never fails")
340 }
341 Value::Int2Vector { elements } => {
342 strconv::format_legacy_vector(buf, elements, |buf, elem| {
343 Ok::<_, ()>(
344 elem.as_ref()
345 .expect("Int2Vector does not support NULL values")
346 .encode_text(buf.nonnull_buffer()),
347 )
348 })
349 .expect("provided closure never fails")
350 }
351 Value::Bool(b) => strconv::format_bool(buf, *b),
352 Value::Bytea(b) => strconv::format_bytes(buf, b),
353 Value::Char(c) => {
354 buf.put_u8(*c);
355 Nestable::MayNeedEscaping
356 }
357 Value::Date(d) => strconv::format_date(buf, *d),
358 Value::Int2(i) => strconv::format_int16(buf, *i),
359 Value::Int4(i) => strconv::format_int32(buf, *i),
360 Value::Int8(i) => strconv::format_int64(buf, *i),
361 Value::UInt2(u) => strconv::format_uint16(buf, u.0),
362 Value::UInt4(u) => strconv::format_uint32(buf, u.0),
363 Value::UInt8(u) => strconv::format_uint64(buf, u.0),
364 Value::Interval(iv) => strconv::format_interval(buf, iv.0),
365 Value::Float4(f) => strconv::format_float32(buf, *f),
366 Value::Float8(f) => strconv::format_float64(buf, *f),
367 Value::Jsonb(js) => strconv::format_jsonb(buf, js.0.as_ref()),
368 Value::List(elems) => strconv::format_list(buf, elems, |buf, elem| match elem {
369 None => Ok::<_, ()>(buf.write_null()),
370 Some(elem) => Ok(elem.encode_text(buf.nonnull_buffer())),
371 })
372 .expect("provided closure never fails"),
373 Value::Map(elems) => strconv::format_map(buf, elems, |buf, value| match value {
374 None => Ok::<_, ()>(buf.write_null()),
375 Some(elem) => Ok(elem.encode_text(buf.nonnull_buffer())),
376 })
377 .expect("provided closure never fails"),
378 Value::Oid(oid) => strconv::format_uint32(buf, *oid),
379 Value::Record(elems) => strconv::format_record(buf, elems, |buf, elem| match elem {
380 None => Ok::<_, ()>(buf.write_null()),
381 Some(elem) => Ok(elem.encode_text(buf.nonnull_buffer())),
382 })
383 .expect("provided closure never fails"),
384 Value::Text(s) | Value::VarChar(s) | Value::BpChar(s) | Value::Name(s) => {
385 strconv::format_string(buf, s)
386 }
387 Value::Time(t) => strconv::format_time(buf, *t),
388 Value::Timestamp(ts) => strconv::format_timestamp(buf, ts),
389 Value::TimestampTz(ts) => strconv::format_timestamptz(buf, ts),
390 Value::Uuid(u) => strconv::format_uuid(buf, *u),
391 Value::Numeric(d) => strconv::format_numeric(buf, &d.0),
392 Value::MzTimestamp(t) => strconv::format_mz_timestamp(buf, *t),
393 Value::Range(range) => strconv::format_range(buf, range, |buf, elem| match elem {
394 Some(elem) => Ok(elem.encode_text(buf.nonnull_buffer())),
395 None => Ok::<_, ()>(buf.write_null()),
396 })
397 .expect("provided closure never fails"),
398 Value::MzAclItem(mz_acl_item) => strconv::format_mz_acl_item(buf, *mz_acl_item),
399 Value::AclItem(acl_item) => strconv::format_acl_item(buf, *acl_item),
400 }
401 }
402
403 pub fn encode_binary(&self, ty: &Type, buf: &mut BytesMut) -> Result<(), io::Error> {
406 let is_null = match self {
409 Value::Array { dims, elements } => {
410 let ndims = pg_len("number of array dimensions", dims.len())?;
411 let has_null = elements.iter().any(|e| e.is_none());
412 let elem_type = match ty {
413 Type::Array(elem_type) => elem_type,
414 _ => unreachable!(),
415 };
416 buf.put_i32(ndims);
417 buf.put_i32(has_null.into());
418 buf.put_u32(elem_type.oid());
419 for dim in dims {
420 buf.put_i32(pg_len("array dimension length", dim.length)?);
421 buf.put_i32(dim.lower_bound.try_into().map_err(|_| {
422 io::Error::new(
423 io::ErrorKind::Other,
424 "array dimension lower bound does not fit into an i32",
425 )
426 })?);
427 }
428 for elem in elements {
429 encode_element(buf, elem.as_ref(), elem_type)?;
430 }
431 Ok(postgres_types::IsNull::No)
432 }
433 Value::Int2Vector { .. } => {
435 Err("binary encoding of int2vector is not implemented".into())
436 }
437 Value::Bool(b) => b.to_sql(&PgType::BOOL, buf),
438 Value::Bytea(b) => b.to_sql(&PgType::BYTEA, buf),
439 Value::Char(c) => i8::reinterpret_cast(*c).to_sql(&PgType::CHAR, buf),
440 Value::Date(d) => d.pg_epoch_days().to_sql(&PgType::DATE, buf),
441 Value::Float4(f) => f.to_sql(&PgType::FLOAT4, buf),
442 Value::Float8(f) => f.to_sql(&PgType::FLOAT8, buf),
443 Value::Int2(i) => i.to_sql(&PgType::INT2, buf),
444 Value::Int4(i) => i.to_sql(&PgType::INT4, buf),
445 Value::Int8(i) => i.to_sql(&PgType::INT8, buf),
446 Value::UInt2(u) => u.to_sql(&*UINT2, buf),
447 Value::UInt4(u) => u.to_sql(&*UINT4, buf),
448 Value::UInt8(u) => u.to_sql(&*UINT8, buf),
449 Value::Interval(iv) => iv.to_sql(&PgType::INTERVAL, buf),
450 Value::Jsonb(js) => js.to_sql(&PgType::JSONB, buf),
451 Value::List(_) => {
452 Err("binary encoding of list types is not implemented".into())
483 }
484 Value::Map(_) => {
485 Err("binary encoding of map types is not implemented".into())
489 }
490 Value::Name(s) => s.to_sql(&PgType::NAME, buf),
491 Value::Oid(i) => i.to_sql(&PgType::OID, buf),
492 Value::Record(fields) => {
493 let nfields = pg_len("record field length", fields.len())?;
494 buf.put_i32(nfields);
495 let field_types = match ty {
496 Type::Record(fields) => fields,
497 _ => unreachable!(),
498 };
499 for (f, ty) in fields.iter().zip(field_types) {
500 buf.put_u32(ty.oid());
501 encode_element(buf, f.as_ref(), ty)?;
502 }
503 Ok(postgres_types::IsNull::No)
504 }
505 Value::Text(s) => s.to_sql(&PgType::TEXT, buf),
506 Value::BpChar(s) => s.to_sql(&PgType::BPCHAR, buf),
507 Value::VarChar(s) => s.to_sql(&PgType::VARCHAR, buf),
508 Value::Time(t) => t.to_sql(&PgType::TIME, buf),
509 Value::Timestamp(ts) => ts.to_sql(&PgType::TIMESTAMP, buf),
510 Value::TimestampTz(ts) => ts.to_sql(&PgType::TIMESTAMPTZ, buf),
511 Value::Uuid(u) => u.to_sql(&PgType::UUID, buf),
512 Value::Numeric(a) => a.to_sql(&PgType::NUMERIC, buf),
513 Value::MzTimestamp(t) => t.to_string().to_sql(&PgType::TEXT, buf),
514 Value::Range(range) => {
515 buf.put_u8(range.pg_flag_bits());
516
517 let elem_type = match ty {
518 Type::Range { element_type } => element_type,
519 _ => unreachable!(),
520 };
521
522 if let Some(RangeInner { lower, upper }) = &range.inner {
523 for bound in [&lower.bound, &upper.bound] {
524 if let Some(bound) = bound {
525 let base = buf.len();
526 buf.put_i32(0);
527 bound.encode_binary(elem_type, buf)?;
528 let len = pg_len("encoded range bound", buf.len() - base - 4)?;
529 buf[base..base + 4].copy_from_slice(&len.to_be_bytes());
530 }
531 }
532 }
533 Ok(postgres_types::IsNull::No)
534 }
535 Value::MzAclItem(mz_acl_item) => {
536 buf.extend_from_slice(&mz_acl_item.encode_binary());
537 Ok(postgres_types::IsNull::No)
538 }
539 Value::AclItem(_) => Err("aclitem has no binary encoding".into()),
540 }
541 .expect("encode_binary should never trigger a to_sql failure");
542 if let IsNull::Yes = is_null {
543 panic!("encode_binary impossibly called on a null value")
544 }
545 Ok(())
546 }
547
548 pub fn can_encode_binary(typ: &ScalarType) -> bool {
552 match typ {
553 ScalarType::Bool => true,
554 ScalarType::Int16 => true,
555 ScalarType::Int32 => true,
556 ScalarType::Int64 => true,
557 ScalarType::PgLegacyChar => true,
558 ScalarType::UInt16 => true,
559 ScalarType::Oid => true,
560 ScalarType::RegClass => true,
561 ScalarType::RegProc => true,
562 ScalarType::RegType => true,
563 ScalarType::UInt32 => true,
564 ScalarType::UInt64 => true,
565 ScalarType::Float32 => true,
566 ScalarType::Float64 => true,
567 ScalarType::Numeric { .. } => true,
568 ScalarType::MzTimestamp => true,
569 ScalarType::MzAclItem => true,
570 ScalarType::AclItem => false, ScalarType::Date => true,
572 ScalarType::Time => true,
573 ScalarType::Timestamp { .. } => true,
574 ScalarType::TimestampTz { .. } => true,
575 ScalarType::Interval => true,
576 ScalarType::Bytes => true,
577 ScalarType::String => true,
578 ScalarType::VarChar { .. } => true,
579 ScalarType::Char { .. } => true,
580 ScalarType::PgLegacyName => true,
581 ScalarType::Jsonb => true,
582 ScalarType::Uuid => true,
583 ScalarType::Array(elem_type) => Self::can_encode_binary(elem_type),
584 ScalarType::Int2Vector => false, ScalarType::List { .. } => false, ScalarType::Map { .. } => false, ScalarType::Record { fields, .. } => fields
588 .iter()
589 .all(|(_, ty)| Self::can_encode_binary(&ty.scalar_type)),
590 ScalarType::Range { element_type } => Self::can_encode_binary(element_type),
591 }
592 }
593
594 pub fn decode(
597 format: Format,
598 ty: &Type,
599 raw: &[u8],
600 ) -> Result<Value, Box<dyn Error + Sync + Send>> {
601 match format {
602 Format::Text => Value::decode_text(ty, raw),
603 Format::Binary => Value::decode_binary(ty, raw),
604 }
605 }
606
607 pub fn decode_text<'a>(
610 ty: &'a Type,
611 raw: &'a [u8],
612 ) -> Result<Value, Box<dyn Error + Sync + Send>> {
613 let s = str::from_utf8(raw)?;
614 Ok(match ty {
615 Type::Array(elem_type) => {
616 let (elements, dims) = strconv::parse_array(
617 s,
618 || None,
619 |elem_text| Value::decode_text(elem_type, elem_text.as_bytes()).map(Some),
620 )?;
621 Value::Array { dims, elements }
622 }
623 Type::Int2Vector { .. } => {
624 return Err("input of Int2Vector types is not implemented".into());
625 }
626 Type::Bool => Value::Bool(strconv::parse_bool(s)?),
627 Type::Bytea => Value::Bytea(strconv::parse_bytes(s)?),
628 Type::Char => Value::Char(raw.get(0).copied().unwrap_or(0)),
629 Type::Date => Value::Date(strconv::parse_date(s)?),
630 Type::Float4 => Value::Float4(strconv::parse_float32(s)?),
631 Type::Float8 => Value::Float8(strconv::parse_float64(s)?),
632 Type::Int2 => Value::Int2(strconv::parse_int16(s)?),
633 Type::Int4 => Value::Int4(strconv::parse_int32(s)?),
634 Type::Int8 => Value::Int8(strconv::parse_int64(s)?),
635 Type::UInt2 => Value::UInt2(UInt2(strconv::parse_uint16(s)?)),
636 Type::UInt4 => Value::UInt4(UInt4(strconv::parse_uint32(s)?)),
637 Type::UInt8 => Value::UInt8(UInt8(strconv::parse_uint64(s)?)),
638 Type::Interval { .. } => Value::Interval(Interval(strconv::parse_interval(s)?)),
639 Type::Json => return Err("input of json types is not implemented".into()),
640 Type::Jsonb => Value::Jsonb(Jsonb(strconv::parse_jsonb(s)?)),
641 Type::List(elem_type) => Value::List(strconv::parse_list(
642 s,
643 matches!(**elem_type, Type::List(..)),
644 || None,
645 |elem_text| Value::decode_text(elem_type, elem_text.as_bytes()).map(Some),
646 )?),
647 Type::Map { value_type } => Value::Map(strconv::parse_map(
648 s,
649 matches!(**value_type, Type::Map { .. }),
650 |elem_text| {
651 elem_text
652 .map(|t| Value::decode_text(value_type, t.as_bytes()))
653 .transpose()
654 },
655 )?),
656 Type::Name => Value::Name(strconv::parse_pg_legacy_name(s)),
657 Type::Numeric { .. } => Value::Numeric(Numeric(strconv::parse_numeric(s)?)),
658 Type::Oid | Type::RegClass | Type::RegProc | Type::RegType => {
659 Value::Oid(strconv::parse_oid(s)?)
660 }
661 Type::Record(_) => {
662 return Err("input of anonymous composite types is not implemented".into());
663 }
664 Type::Text => Value::Text(s.to_owned()),
665 Type::BpChar { .. } => Value::BpChar(s.to_owned()),
666 Type::VarChar { .. } => Value::VarChar(s.to_owned()),
667 Type::Time { .. } => Value::Time(strconv::parse_time(s)?),
668 Type::TimeTz { .. } => return Err("input of timetz types is not implemented".into()),
669 Type::Timestamp { .. } => Value::Timestamp(strconv::parse_timestamp(s)?),
670 Type::TimestampTz { .. } => Value::TimestampTz(strconv::parse_timestamptz(s)?),
671 Type::Uuid => Value::Uuid(Uuid::parse_str(s)?),
672 Type::MzTimestamp => Value::MzTimestamp(strconv::parse_mz_timestamp(s)?),
673 Type::Range { element_type } => Value::Range(strconv::parse_range(s, |elem_text| {
674 Value::decode_text(element_type, elem_text.as_bytes()).map(Box::new)
675 })?),
676 Type::MzAclItem => Value::MzAclItem(strconv::parse_mz_acl_item(s)?),
677 Type::AclItem => Value::AclItem(strconv::parse_acl_item(s)?),
678 })
679 }
680
681 pub fn decode_text_into_row<'a>(
683 ty: &'a Type,
684 s: &'a str,
685 packer: &mut RowPacker,
686 ) -> Result<(), Box<dyn Error + Sync + Send>> {
687 Ok(match ty {
688 Type::Array(elem_type) => {
689 let (elements, dims) =
690 strconv::parse_array(s, || None, |elem_text| Ok::<_, String>(Some(elem_text)))?;
691 unsafe {
693 packer.push_array_with_unchecked(&dims, |packer| {
694 let mut nelements = 0;
695 for element in elements {
696 match element {
697 Some(elem_text) => {
698 Value::decode_text_into_row(elem_type, &elem_text, packer)?
699 }
700
701 None => packer.push(Datum::Null),
702 }
703 nelements += 1;
704 }
705 Ok::<_, Box<dyn Error + Sync + Send>>(nelements)
706 })?
707 }
708 }
709 Type::Int2Vector { .. } => {
710 return Err("input of Int2Vector types is not implemented".into());
711 }
712 Type::Bool => packer.push(Datum::from(strconv::parse_bool(s)?)),
713 Type::Bytea => packer.push(Datum::Bytes(&strconv::parse_bytes(s)?)),
714 Type::Char => packer.push(Datum::UInt8(s.as_bytes().get(0).copied().unwrap_or(0))),
715 Type::Date => packer.push(Datum::Date(strconv::parse_date(s)?)),
716 Type::Float4 => packer.push(Datum::Float32(strconv::parse_float32(s)?.into())),
717 Type::Float8 => packer.push(Datum::Float64(strconv::parse_float64(s)?.into())),
718 Type::Int2 => packer.push(Datum::Int16(strconv::parse_int16(s)?)),
719 Type::Int4 => packer.push(Datum::Int32(strconv::parse_int32(s)?)),
720 Type::Int8 => packer.push(Datum::Int64(strconv::parse_int64(s)?)),
721 Type::UInt2 => packer.push(Datum::UInt16(strconv::parse_uint16(s)?)),
722 Type::UInt4 => packer.push(Datum::UInt32(strconv::parse_uint32(s)?)),
723 Type::UInt8 => packer.push(Datum::UInt64(strconv::parse_uint64(s)?)),
724 Type::Interval { .. } => packer.push(Datum::Interval(strconv::parse_interval(s)?)),
725 Type::Json => return Err("input of json types is not implemented".into()),
726 Type::Jsonb => packer.push(strconv::parse_jsonb(s)?.into_row().unpack_first()),
727 Type::List(elem_type) => {
728 let elems = strconv::parse_list(
729 s,
730 matches!(**elem_type, Type::List(..)),
731 || None,
732 |elem_text| Ok::<_, String>(Some(elem_text)),
733 )?;
734 packer.push_list_with(|packer| {
735 for elem in elems {
736 match elem {
737 Some(elem) => Value::decode_text_into_row(elem_type, &elem, packer)?,
738 None => packer.push(Datum::Null),
739 }
740 }
741 Ok::<_, Box<dyn Error + Sync + Send>>(())
742 })?;
743 }
744 Type::Map { value_type } => {
745 let map =
746 strconv::parse_map(s, matches!(**value_type, Type::Map { .. }), |elem_text| {
747 elem_text.map(Ok::<_, String>).transpose()
748 })?;
749 packer.push_dict_with(|row| {
750 for (k, v) in map {
751 row.push(Datum::String(&k));
752 match v {
753 Some(elem) => Value::decode_text_into_row(value_type, &elem, row)?,
754 None => row.push(Datum::Null),
755 }
756 }
757 Ok::<_, Box<dyn Error + Sync + Send>>(())
758 })?;
759 }
760 Type::Name => packer.push(Datum::String(&strconv::parse_pg_legacy_name(s))),
761 Type::Numeric { .. } => packer.push(Datum::Numeric(strconv::parse_numeric(s)?)),
762 Type::Oid | Type::RegClass | Type::RegProc | Type::RegType => {
763 packer.push(Datum::UInt32(strconv::parse_oid(s)?))
764 }
765 Type::Record(_) => {
766 return Err("input of anonymous composite types is not implemented".into());
767 }
768 Type::Text => packer.push(Datum::String(s)),
769 Type::BpChar { .. } => packer.push(Datum::String(s.trim_end())),
770 Type::VarChar { .. } => packer.push(Datum::String(s)),
771 Type::Time { .. } => packer.push(Datum::Time(strconv::parse_time(s)?)),
772 Type::TimeTz { .. } => return Err("input of timetz types is not implemented".into()),
773 Type::Timestamp { .. } => packer.push(Datum::Timestamp(strconv::parse_timestamp(s)?)),
774 Type::TimestampTz { .. } => {
775 packer.push(Datum::TimestampTz(strconv::parse_timestamptz(s)?))
776 }
777 Type::Uuid => packer.push(Datum::Uuid(Uuid::parse_str(s)?)),
778 Type::MzTimestamp => packer.push(Datum::MzTimestamp(strconv::parse_mz_timestamp(s)?)),
779 Type::Range { element_type } => {
780 let range = strconv::parse_range(s, |elem_text| {
781 Value::decode_text(element_type, elem_text.as_bytes()).map(Box::new)
782 })?;
783 let buf = RowArena::new();
786 let range = range.into_bounds(|elem| elem.into_datum(&buf, element_type));
787
788 packer.push_range(range).unwrap()
789 }
790 Type::MzAclItem => packer.push(Datum::MzAclItem(strconv::parse_mz_acl_item(s)?)),
791 Type::AclItem => packer.push(Datum::AclItem(strconv::parse_acl_item(s)?)),
792 })
793 }
794
795 pub fn decode_binary(ty: &Type, raw: &[u8]) -> Result<Value, Box<dyn Error + Sync + Send>> {
798 match ty {
799 Type::Array(_) => Err("input of array types is not implemented".into()),
800 Type::Int2Vector => Err("input of int2vector types is not implemented".into()),
801 Type::Bool => bool::from_sql(ty.inner(), raw).map(Value::Bool),
802 Type::Bytea => Vec::<u8>::from_sql(ty.inner(), raw).map(Value::Bytea),
803 Type::Char => {
804 i8::from_sql(ty.inner(), raw).map(|c| Value::Char(u8::reinterpret_cast(c)))
805 }
806 Type::Date => {
807 let days = i32::from_sql(ty.inner(), raw)?;
808 Ok(Value::Date(Date::from_pg_epoch(days)?))
809 }
810 Type::Float4 => f32::from_sql(ty.inner(), raw).map(Value::Float4),
811 Type::Float8 => f64::from_sql(ty.inner(), raw).map(Value::Float8),
812 Type::Int2 => i16::from_sql(ty.inner(), raw).map(Value::Int2),
813 Type::Int4 => i32::from_sql(ty.inner(), raw).map(Value::Int4),
814 Type::Int8 => i64::from_sql(ty.inner(), raw).map(Value::Int8),
815 Type::UInt2 => UInt2::from_sql(ty.inner(), raw).map(Value::UInt2),
816 Type::UInt4 => UInt4::from_sql(ty.inner(), raw).map(Value::UInt4),
817 Type::UInt8 => UInt8::from_sql(ty.inner(), raw).map(Value::UInt8),
818 Type::Interval { .. } => Interval::from_sql(ty.inner(), raw).map(Value::Interval),
819 Type::Json => Err("input of json types is not implemented".into()),
820 Type::Jsonb => Jsonb::from_sql(ty.inner(), raw).map(Value::Jsonb),
821 Type::List(_) => Err("binary decoding of list types is not implemented".into()),
822 Type::Map { .. } => Err("binary decoding of map types is not implemented".into()),
823 Type::Name => {
824 let s = String::from_sql(ty.inner(), raw)?;
825 if s.len() > NAME_MAX_BYTES {
826 return Err("identifier too long".into());
827 }
828 Ok(Value::Name(s))
829 }
830 Type::Numeric { .. } => Numeric::from_sql(ty.inner(), raw).map(Value::Numeric),
831 Type::Oid | Type::RegClass | Type::RegProc | Type::RegType => {
832 u32::from_sql(ty.inner(), raw).map(Value::Oid)
833 }
834 Type::Record(_) => Err("input of anonymous composite types is not implemented".into()),
835 Type::Text => String::from_sql(ty.inner(), raw).map(Value::Text),
836 Type::BpChar { .. } => String::from_sql(ty.inner(), raw).map(Value::BpChar),
837 Type::VarChar { .. } => String::from_sql(ty.inner(), raw).map(Value::VarChar),
838 Type::Time { .. } => NaiveTime::from_sql(ty.inner(), raw).map(Value::Time),
839 Type::TimeTz { .. } => Err("input of timetz types is not implemented".into()),
840 Type::Timestamp { .. } => {
841 let ts = NaiveDateTime::from_sql(ty.inner(), raw)?;
842 Ok(Value::Timestamp(CheckedTimestamp::from_timestamplike(ts)?))
843 }
844 Type::TimestampTz { .. } => {
845 let ts = DateTime::<Utc>::from_sql(ty.inner(), raw)?;
846 Ok(Value::TimestampTz(CheckedTimestamp::from_timestamplike(
847 ts,
848 )?))
849 }
850 Type::Uuid => Uuid::from_sql(ty.inner(), raw).map(Value::Uuid),
851 Type::MzTimestamp => {
852 let s = String::from_sql(ty.inner(), raw)?;
853 let t: mz_repr::Timestamp = s.parse()?;
854 Ok(Value::MzTimestamp(t))
855 }
856 Type::Range { .. } => Err("binary decoding of range types is not implemented".into()),
857 Type::MzAclItem => {
858 let mz_acl_item = MzAclItem::decode_binary(raw)?;
859 Ok(Value::MzAclItem(mz_acl_item))
860 }
861 Type::AclItem => Err("aclitem has no binary encoding".into()),
862 }
863 }
864}
865
866fn encode_element(buf: &mut BytesMut, elem: Option<&Value>, ty: &Type) -> Result<(), io::Error> {
867 match elem {
868 None => buf.put_i32(-1),
869 Some(elem) => {
870 let base = buf.len();
871 buf.put_i32(0);
872 elem.encode_binary(ty, buf)?;
873 let len = pg_len("encoded element", buf.len() - base - 4)?;
874 buf[base..base + 4].copy_from_slice(&len.to_be_bytes());
875 }
876 }
877 Ok(())
878}
879
880fn pg_len(what: &str, len: usize) -> Result<i32, io::Error> {
881 len.try_into().map_err(|_| {
882 io::Error::new(
883 io::ErrorKind::Other,
884 format!("{} does not fit into an i32", what),
885 )
886 })
887}
888
889pub fn values_from_row(row: &RowRef, typ: &RelationType) -> Vec<Option<Value>> {
894 row.iter()
895 .zip(typ.column_types.iter())
896 .map(|(col, typ)| Value::from_datum(col, &typ.scalar_type))
897 .collect()
898}
899
900#[cfg(test)]
901mod tests {
902 use super::*;
903
904 #[mz_ore::test]
906 fn decode_text_error_smoke_test() {
907 let bool_array = Value::Array {
908 dims: vec![ArrayDimension {
909 lower_bound: 0,
910 length: 1,
911 }],
912 elements: vec![Some(Value::Bool(true))],
913 };
914
915 let mut buf = BytesMut::new();
916 bool_array.encode_text(&mut buf);
917 let buf = buf.to_vec();
918
919 let int_array_tpe = Type::Array(Box::new(Type::Int4));
920 let decoded_int_array = Value::decode_text(&int_array_tpe, &buf);
921
922 assert_eq!(
923 decoded_int_array.map_err(|e| e.to_string()).unwrap_err(),
924 "invalid input syntax for type array: Specifying array lower bounds is not supported: \"[0:0]={t}\"".to_string()
925 );
926 }
927}