1use std::collections::BTreeMap;
11use std::error::Error;
12use std::{io, str};
13
14use bytes::{BufMut, BytesMut};
15use chrono::{DateTime, NaiveDateTime, NaiveTime, Utc};
16use itertools::Itertools;
17use mz_ore::cast::ReinterpretCast;
18use mz_pgwire_common::Format;
19use mz_repr::adt::array::ArrayDimension;
20use mz_repr::adt::char;
21use mz_repr::adt::date::Date;
22use mz_repr::adt::jsonb::JsonbRef;
23use mz_repr::adt::mz_acl_item::{AclItem, MzAclItem};
24use mz_repr::adt::pg_legacy_name::NAME_MAX_BYTES;
25use mz_repr::adt::range::{Range, RangeInner};
26use mz_repr::adt::timestamp::CheckedTimestamp;
27use mz_repr::strconv::{self, Nestable};
28use mz_repr::{Datum, RowArena, RowPacker, RowRef, SqlRelationType, SqlScalarType};
29use postgres_types::{FromSql, IsNull, ToSql, Type as PgType};
30use uuid::Uuid;
31
32use crate::types::{UINT2, UINT4, UINT8};
33use crate::{Interval, Jsonb, Numeric, Type, UInt2, UInt4, UInt8};
34
35pub mod interval;
36pub mod jsonb;
37pub mod numeric;
38pub mod record;
39pub mod unsigned;
40
41#[derive(Debug)]
43pub enum Value {
44 Array {
46 dims: Vec<ArrayDimension>,
48 elements: Vec<Option<Value>>,
50 },
51 Bool(bool),
53 Bytea(Vec<u8>),
55 Char(u8),
57 Date(Date),
59 Float4(f32),
61 Float8(f64),
63 Int2(i16),
65 Int4(i32),
67 Int8(i64),
69 UInt2(UInt2),
71 UInt4(UInt4),
73 UInt8(UInt8),
75 Interval(Interval),
77 Jsonb(Jsonb),
79 List(Vec<Option<Value>>),
81 Map(BTreeMap<String, Option<Value>>),
83 Name(String),
85 Numeric(Numeric),
87 Oid(u32),
89 Record(Vec<Option<Value>>),
91 Time(NaiveTime),
93 Timestamp(CheckedTimestamp<NaiveDateTime>),
95 TimestampTz(CheckedTimestamp<DateTime<Utc>>),
97 Text(String),
99 BpChar(String),
101 VarChar(String),
103 Uuid(Uuid),
105 Int2Vector {
107 elements: Vec<Option<Value>>,
109 },
110 MzTimestamp(mz_repr::Timestamp),
112 Range(Range<Box<Value>>),
114 MzAclItem(MzAclItem),
117 AclItem(AclItem),
120}
121
122impl Value {
123 pub fn from_datum(datum: Datum, typ: &SqlScalarType) -> Option<Value> {
128 match (datum, typ) {
129 (Datum::Null, _) => None,
130 (Datum::True, SqlScalarType::Bool) => Some(Value::Bool(true)),
131 (Datum::False, SqlScalarType::Bool) => Some(Value::Bool(false)),
132 (Datum::Int16(i), SqlScalarType::Int16) => Some(Value::Int2(i)),
133 (Datum::Int32(i), SqlScalarType::Int32) => Some(Value::Int4(i)),
134 (Datum::Int64(i), SqlScalarType::Int64) => Some(Value::Int8(i)),
135 (Datum::UInt8(c), SqlScalarType::PgLegacyChar) => Some(Value::Char(c)),
136 (Datum::UInt16(u), SqlScalarType::UInt16) => Some(Value::UInt2(UInt2(u))),
137 (Datum::UInt32(oid), SqlScalarType::Oid) => Some(Value::Oid(oid)),
138 (Datum::UInt32(oid), SqlScalarType::RegClass) => Some(Value::Oid(oid)),
139 (Datum::UInt32(oid), SqlScalarType::RegProc) => Some(Value::Oid(oid)),
140 (Datum::UInt32(oid), SqlScalarType::RegType) => Some(Value::Oid(oid)),
141 (Datum::UInt32(u), SqlScalarType::UInt32) => Some(Value::UInt4(UInt4(u))),
142 (Datum::UInt64(u), SqlScalarType::UInt64) => Some(Value::UInt8(UInt8(u))),
143 (Datum::Float32(f), SqlScalarType::Float32) => Some(Value::Float4(*f)),
144 (Datum::Float64(f), SqlScalarType::Float64) => Some(Value::Float8(*f)),
145 (Datum::Numeric(d), SqlScalarType::Numeric { .. }) => Some(Value::Numeric(Numeric(d))),
146 (Datum::MzTimestamp(t), SqlScalarType::MzTimestamp) => Some(Value::MzTimestamp(t)),
147 (Datum::MzAclItem(mai), SqlScalarType::MzAclItem) => Some(Value::MzAclItem(mai)),
148 (Datum::AclItem(ai), SqlScalarType::AclItem) => Some(Value::AclItem(ai)),
149 (Datum::Date(d), SqlScalarType::Date) => Some(Value::Date(d)),
150 (Datum::Time(t), SqlScalarType::Time) => Some(Value::Time(t)),
151 (Datum::Timestamp(ts), SqlScalarType::Timestamp { .. }) => Some(Value::Timestamp(ts)),
152 (Datum::TimestampTz(ts), SqlScalarType::TimestampTz { .. }) => {
153 Some(Value::TimestampTz(ts))
154 }
155 (Datum::Interval(iv), SqlScalarType::Interval) => Some(Value::Interval(Interval(iv))),
156 (Datum::Bytes(b), SqlScalarType::Bytes) => Some(Value::Bytea(b.to_vec())),
157 (Datum::String(s), SqlScalarType::String) => Some(Value::Text(s.to_owned())),
158 (Datum::String(s), SqlScalarType::VarChar { .. }) => Some(Value::VarChar(s.to_owned())),
159 (Datum::String(s), SqlScalarType::Char { length }) => {
160 Some(Value::BpChar(char::format_str_pad(s, *length)))
161 }
162 (Datum::String(s), SqlScalarType::PgLegacyName) => Some(Value::Name(s.into())),
163 (_, SqlScalarType::Jsonb) => {
164 Some(Value::Jsonb(Jsonb(JsonbRef::from_datum(datum).to_owned())))
165 }
166 (Datum::Uuid(u), SqlScalarType::Uuid) => Some(Value::Uuid(u)),
167 (Datum::Array(array), SqlScalarType::Array(elem_type)) => {
168 let dims = array.dims().into_iter().collect();
169 let elements = array
170 .elements()
171 .iter()
172 .map(|elem| Value::from_datum(elem, elem_type))
173 .collect();
174 Some(Value::Array { dims, elements })
175 }
176 (Datum::Array(array), SqlScalarType::Int2Vector) => {
177 assert!(
178 array.has_int2vector_dims(),
179 "int2vector must be 1 dimensional, or empty"
180 );
181 let elements = array
182 .elements()
183 .iter()
184 .map(|elem| Value::from_datum(elem, &SqlScalarType::Int16))
185 .collect();
186 Some(Value::Int2Vector { elements })
187 }
188 (Datum::List(list), SqlScalarType::List { element_type, .. }) => {
189 let elements = list
190 .iter()
191 .map(|elem| Value::from_datum(elem, element_type))
192 .collect();
193 Some(Value::List(elements))
194 }
195 (Datum::List(record), SqlScalarType::Record { fields, .. }) => {
196 let fields = record
197 .iter()
198 .zip_eq(fields)
199 .map(|(e, (_name, ty))| Value::from_datum(e, &ty.scalar_type))
200 .collect();
201 Some(Value::Record(fields))
202 }
203 (Datum::Map(dict), SqlScalarType::Map { value_type, .. }) => {
204 let entries = dict
205 .iter()
206 .map(|(k, v)| (k.to_owned(), Value::from_datum(v, value_type)))
207 .collect();
208 Some(Value::Map(entries))
209 }
210 (Datum::Range(range), SqlScalarType::Range { element_type }) => {
211 let value_range = range.into_bounds(|b| {
212 Box::new(
213 Value::from_datum(b.datum(), element_type)
214 .expect("RangeBounds never contain Datum::Null"),
215 )
216 });
217 Some(Value::Range(value_range))
218 }
219 _ => panic!("can't serialize {}::{:?}", datum, typ),
220 }
221 }
222
223 pub fn into_datum<'a>(self, buf: &'a RowArena, typ: &Type) -> Datum<'a> {
225 match self {
226 Value::Array { dims, elements } => {
227 let element_pg_type = match typ {
228 Type::Array(t) => &*t,
229 _ => panic!("Value::Array should have type Type::Array. Found {:?}", typ),
230 };
231 buf.make_datum(|packer| {
232 packer
233 .try_push_array(
234 &dims,
235 elements.into_iter().map(|element| match element {
236 Some(element) => element.into_datum(buf, element_pg_type),
237 None => Datum::Null,
238 }),
239 )
240 .unwrap();
241 })
242 }
243 Value::Int2Vector { .. } => {
244 unreachable!("into_datum cannot be called on Value::Int2Vector");
247 }
248 Value::Bool(true) => Datum::True,
249 Value::Bool(false) => Datum::False,
250 Value::Bytea(b) => Datum::Bytes(buf.push_bytes(b)),
251 Value::Char(c) => Datum::UInt8(c),
252 Value::Date(d) => Datum::Date(d),
253 Value::Float4(f) => Datum::Float32(f.into()),
254 Value::Float8(f) => Datum::Float64(f.into()),
255 Value::Int2(i) => Datum::Int16(i),
256 Value::Int4(i) => Datum::Int32(i),
257 Value::Int8(i) => Datum::Int64(i),
258 Value::UInt2(u) => Datum::UInt16(u.0),
259 Value::UInt4(u) => Datum::UInt32(u.0),
260 Value::UInt8(u) => Datum::UInt64(u.0),
261 Value::Jsonb(js) => buf.push_unary_row(js.0.into_row()),
262 Value::List(elems) => {
263 let elem_pg_type = match typ {
264 Type::List(t) => &*t,
265 _ => panic!("Value::List should have type Type::List. Found {:?}", typ),
266 };
267 buf.make_datum(|packer| {
268 packer.push_list(elems.into_iter().map(|elem| match elem {
269 Some(elem) => elem.into_datum(buf, elem_pg_type),
270 None => Datum::Null,
271 }));
272 })
273 }
274 Value::Map(map) => {
275 let elem_pg_type = match typ {
276 Type::Map { value_type } => &*value_type,
277 _ => panic!("Value::Map should have type Type::Map. Found {:?}", typ),
278 };
279 buf.make_datum(|packer| {
280 packer.push_dict_with(|row| {
281 for (k, v) in map {
282 row.push(Datum::String(&k));
283 row.push(match v {
284 Some(elem) => elem.into_datum(buf, elem_pg_type),
285 None => Datum::Null,
286 });
287 }
288 });
289 })
290 }
291 Value::Oid(oid) => Datum::UInt32(oid),
292 Value::Record(_) => {
293 unreachable!("into_datum cannot be called on Value::Record");
296 }
297 Value::Time(t) => Datum::Time(t),
298 Value::Timestamp(ts) => Datum::Timestamp(ts),
299 Value::TimestampTz(ts) => Datum::TimestampTz(ts),
300 Value::Interval(iv) => Datum::Interval(iv.0),
301 Value::Text(s) | Value::VarChar(s) | Value::Name(s) => {
302 Datum::String(buf.push_string(s))
303 }
304 Value::BpChar(s) => Datum::String(buf.push_string(s.trim_end().into())),
305 Value::Uuid(u) => Datum::Uuid(u),
306 Value::Numeric(n) => Datum::Numeric(n.0),
307 Value::MzTimestamp(t) => Datum::MzTimestamp(t),
308 Value::Range(range) => {
309 let elem_pg_type = match typ {
310 Type::Range { element_type } => &*element_type,
311 _ => panic!("Value::Range should have type Type::Range. Found {:?}", typ),
312 };
313 let range = range.into_bounds(|elem| elem.into_datum(buf, elem_pg_type));
314
315 buf.make_datum(|packer| packer.push_range(range).unwrap())
316 }
317 Value::MzAclItem(mz_acl_item) => Datum::MzAclItem(mz_acl_item),
318 Value::AclItem(acl_item) => Datum::AclItem(acl_item),
319 }
320 }
321
322 pub fn encode(&self, ty: &Type, format: Format, buf: &mut BytesMut) -> Result<(), io::Error> {
324 match format {
325 Format::Text => {
326 self.encode_text(buf);
327 Ok(())
328 }
329 Format::Binary => self.encode_binary(ty, buf),
330 }
331 }
332
333 pub fn encode_text(&self, buf: &mut BytesMut) -> Nestable {
336 match self {
337 Value::Array { dims, elements } => {
338 strconv::format_array(buf, dims, elements, |buf, elem| match elem {
339 None => Ok::<_, ()>(buf.write_null()),
340 Some(elem) => Ok(elem.encode_text(buf.nonnull_buffer())),
341 })
342 .expect("provided closure never fails")
343 }
344 Value::Int2Vector { elements } => {
345 strconv::format_legacy_vector(buf, elements, |buf, elem| {
346 Ok::<_, ()>(
347 elem.as_ref()
348 .expect("Int2Vector does not support NULL values")
349 .encode_text(buf.nonnull_buffer()),
350 )
351 })
352 .expect("provided closure never fails")
353 }
354 Value::Bool(b) => strconv::format_bool(buf, *b),
355 Value::Bytea(b) => strconv::format_bytes(buf, b),
356 Value::Char(c) => {
357 buf.put_u8(*c);
358 Nestable::MayNeedEscaping
359 }
360 Value::Date(d) => strconv::format_date(buf, *d),
361 Value::Int2(i) => strconv::format_int16(buf, *i),
362 Value::Int4(i) => strconv::format_int32(buf, *i),
363 Value::Int8(i) => strconv::format_int64(buf, *i),
364 Value::UInt2(u) => strconv::format_uint16(buf, u.0),
365 Value::UInt4(u) => strconv::format_uint32(buf, u.0),
366 Value::UInt8(u) => strconv::format_uint64(buf, u.0),
367 Value::Interval(iv) => strconv::format_interval(buf, iv.0),
368 Value::Float4(f) => strconv::format_float32(buf, *f),
369 Value::Float8(f) => strconv::format_float64(buf, *f),
370 Value::Jsonb(js) => strconv::format_jsonb(buf, js.0.as_ref()),
371 Value::List(elems) => strconv::format_list(buf, elems, |buf, elem| match elem {
372 None => Ok::<_, ()>(buf.write_null()),
373 Some(elem) => Ok(elem.encode_text(buf.nonnull_buffer())),
374 })
375 .expect("provided closure never fails"),
376 Value::Map(elems) => strconv::format_map(buf, elems, |buf, value| match value {
377 None => Ok::<_, ()>(buf.write_null()),
378 Some(elem) => Ok(elem.encode_text(buf.nonnull_buffer())),
379 })
380 .expect("provided closure never fails"),
381 Value::Oid(oid) => strconv::format_uint32(buf, *oid),
382 Value::Record(elems) => strconv::format_record(buf, elems, |buf, elem| match elem {
383 None => Ok::<_, ()>(buf.write_null()),
384 Some(elem) => Ok(elem.encode_text(buf.nonnull_buffer())),
385 })
386 .expect("provided closure never fails"),
387 Value::Text(s) | Value::VarChar(s) | Value::BpChar(s) | Value::Name(s) => {
388 strconv::format_string(buf, s)
389 }
390 Value::Time(t) => strconv::format_time(buf, *t),
391 Value::Timestamp(ts) => strconv::format_timestamp(buf, ts),
392 Value::TimestampTz(ts) => strconv::format_timestamptz(buf, ts),
393 Value::Uuid(u) => strconv::format_uuid(buf, *u),
394 Value::Numeric(d) => strconv::format_numeric(buf, &d.0),
395 Value::MzTimestamp(t) => strconv::format_mz_timestamp(buf, *t),
396 Value::Range(range) => strconv::format_range(buf, range, |buf, elem| match elem {
397 Some(elem) => Ok(elem.encode_text(buf.nonnull_buffer())),
398 None => Ok::<_, ()>(buf.write_null()),
399 })
400 .expect("provided closure never fails"),
401 Value::MzAclItem(mz_acl_item) => strconv::format_mz_acl_item(buf, *mz_acl_item),
402 Value::AclItem(acl_item) => strconv::format_acl_item(buf, *acl_item),
403 }
404 }
405
406 pub fn encode_binary(&self, ty: &Type, buf: &mut BytesMut) -> Result<(), io::Error> {
409 let is_null = match self {
412 Value::Array { dims, elements } => {
413 let ndims = pg_len("number of array dimensions", dims.len())?;
414 let has_null = elements.iter().any(|e| e.is_none());
415 let elem_type = match ty {
416 Type::Array(elem_type) => elem_type,
417 _ => unreachable!(),
418 };
419 buf.put_i32(ndims);
420 buf.put_i32(has_null.into());
421 buf.put_u32(elem_type.oid());
422 for dim in dims {
423 buf.put_i32(pg_len("array dimension length", dim.length)?);
424 buf.put_i32(dim.lower_bound.try_into().map_err(|_| {
425 io::Error::new(
426 io::ErrorKind::Other,
427 "array dimension lower bound does not fit into an i32",
428 )
429 })?);
430 }
431 for elem in elements {
432 encode_element(buf, elem.as_ref(), elem_type)?;
433 }
434 Ok(postgres_types::IsNull::No)
435 }
436 Value::Int2Vector { .. } => {
438 Err("binary encoding of int2vector is not implemented".into())
439 }
440 Value::Bool(b) => b.to_sql(&PgType::BOOL, buf),
441 Value::Bytea(b) => b.to_sql(&PgType::BYTEA, buf),
442 Value::Char(c) => i8::reinterpret_cast(*c).to_sql(&PgType::CHAR, buf),
443 Value::Date(d) => d.pg_epoch_days().to_sql(&PgType::DATE, buf),
444 Value::Float4(f) => f.to_sql(&PgType::FLOAT4, buf),
445 Value::Float8(f) => f.to_sql(&PgType::FLOAT8, buf),
446 Value::Int2(i) => i.to_sql(&PgType::INT2, buf),
447 Value::Int4(i) => i.to_sql(&PgType::INT4, buf),
448 Value::Int8(i) => i.to_sql(&PgType::INT8, buf),
449 Value::UInt2(u) => u.to_sql(&*UINT2, buf),
450 Value::UInt4(u) => u.to_sql(&*UINT4, buf),
451 Value::UInt8(u) => u.to_sql(&*UINT8, buf),
452 Value::Interval(iv) => iv.to_sql(&PgType::INTERVAL, buf),
453 Value::Jsonb(js) => js.to_sql(&PgType::JSONB, buf),
454 Value::List(_) => {
455 Err("binary encoding of list types is not implemented".into())
486 }
487 Value::Map(_) => {
488 Err("binary encoding of map types is not implemented".into())
492 }
493 Value::Name(s) => s.to_sql(&PgType::NAME, buf),
494 Value::Oid(i) => i.to_sql(&PgType::OID, buf),
495 Value::Record(fields) => {
496 let nfields = pg_len("record field length", fields.len())?;
497 buf.put_i32(nfields);
498 let field_types = match ty {
499 Type::Record(fields) => fields,
500 _ => unreachable!(),
501 };
502 for (f, ty) in fields.iter().zip_eq(field_types) {
503 buf.put_u32(ty.oid());
504 encode_element(buf, f.as_ref(), ty)?;
505 }
506 Ok(postgres_types::IsNull::No)
507 }
508 Value::Text(s) => s.to_sql(&PgType::TEXT, buf),
509 Value::BpChar(s) => s.to_sql(&PgType::BPCHAR, buf),
510 Value::VarChar(s) => s.to_sql(&PgType::VARCHAR, buf),
511 Value::Time(t) => t.to_sql(&PgType::TIME, buf),
512 Value::Timestamp(ts) => ts.to_sql(&PgType::TIMESTAMP, buf),
513 Value::TimestampTz(ts) => ts.to_sql(&PgType::TIMESTAMPTZ, buf),
514 Value::Uuid(u) => u.to_sql(&PgType::UUID, buf),
515 Value::Numeric(a) => a.to_sql(&PgType::NUMERIC, buf),
516 Value::MzTimestamp(t) => t.to_string().to_sql(&PgType::TEXT, buf),
517 Value::Range(range) => {
518 buf.put_u8(range.pg_flag_bits());
519
520 let elem_type = match ty {
521 Type::Range { element_type } => element_type,
522 _ => unreachable!(),
523 };
524
525 if let Some(RangeInner { lower, upper }) = &range.inner {
526 for bound in [&lower.bound, &upper.bound] {
527 if let Some(bound) = bound {
528 let base = buf.len();
529 buf.put_i32(0);
530 bound.encode_binary(elem_type, buf)?;
531 let len = pg_len("encoded range bound", buf.len() - base - 4)?;
532 buf[base..base + 4].copy_from_slice(&len.to_be_bytes());
533 }
534 }
535 }
536 Ok(postgres_types::IsNull::No)
537 }
538 Value::MzAclItem(mz_acl_item) => {
539 buf.extend_from_slice(&mz_acl_item.encode_binary());
540 Ok(postgres_types::IsNull::No)
541 }
542 Value::AclItem(_) => Err("aclitem has no binary encoding".into()),
543 }
544 .expect("encode_binary should never trigger a to_sql failure");
545 if let IsNull::Yes = is_null {
546 panic!("encode_binary impossibly called on a null value")
547 }
548 Ok(())
549 }
550
551 pub fn can_encode_binary(typ: &SqlScalarType) -> bool {
555 match typ {
556 SqlScalarType::Bool => true,
557 SqlScalarType::Int16 => true,
558 SqlScalarType::Int32 => true,
559 SqlScalarType::Int64 => true,
560 SqlScalarType::PgLegacyChar => true,
561 SqlScalarType::UInt16 => true,
562 SqlScalarType::Oid => true,
563 SqlScalarType::RegClass => true,
564 SqlScalarType::RegProc => true,
565 SqlScalarType::RegType => true,
566 SqlScalarType::UInt32 => true,
567 SqlScalarType::UInt64 => true,
568 SqlScalarType::Float32 => true,
569 SqlScalarType::Float64 => true,
570 SqlScalarType::Numeric { .. } => true,
571 SqlScalarType::MzTimestamp => true,
572 SqlScalarType::MzAclItem => true,
573 SqlScalarType::AclItem => false, SqlScalarType::Date => true,
575 SqlScalarType::Time => true,
576 SqlScalarType::Timestamp { .. } => true,
577 SqlScalarType::TimestampTz { .. } => true,
578 SqlScalarType::Interval => true,
579 SqlScalarType::Bytes => true,
580 SqlScalarType::String => true,
581 SqlScalarType::VarChar { .. } => true,
582 SqlScalarType::Char { .. } => true,
583 SqlScalarType::PgLegacyName => true,
584 SqlScalarType::Jsonb => true,
585 SqlScalarType::Uuid => true,
586 SqlScalarType::Array(elem_type) => Self::can_encode_binary(elem_type),
587 SqlScalarType::Int2Vector => false, SqlScalarType::List { .. } => false, SqlScalarType::Map { .. } => false, SqlScalarType::Record { fields, .. } => fields
591 .iter()
592 .all(|(_, ty)| Self::can_encode_binary(&ty.scalar_type)),
593 SqlScalarType::Range { element_type } => Self::can_encode_binary(element_type),
594 }
595 }
596
597 pub fn decode(
600 format: Format,
601 ty: &Type,
602 raw: &[u8],
603 ) -> Result<Value, Box<dyn Error + Sync + Send>> {
604 match format {
605 Format::Text => Value::decode_text(ty, raw),
606 Format::Binary => Value::decode_binary(ty, raw),
607 }
608 }
609
610 pub fn decode_text<'a>(
613 ty: &'a Type,
614 raw: &'a [u8],
615 ) -> Result<Value, Box<dyn Error + Sync + Send>> {
616 let s = str::from_utf8(raw)?;
617 Ok(match ty {
618 Type::Array(elem_type) => {
619 let (elements, dims) = strconv::parse_array(
620 s,
621 || None,
622 |elem_text| Value::decode_text(elem_type, elem_text.as_bytes()).map(Some),
623 )?;
624 Value::Array { dims, elements }
625 }
626 Type::Int2Vector { .. } => {
627 return Err("input of Int2Vector types is not implemented".into());
628 }
629 Type::Bool => Value::Bool(strconv::parse_bool(s)?),
630 Type::Bytea => Value::Bytea(strconv::parse_bytes(s)?),
631 Type::Char => Value::Char(raw.get(0).copied().unwrap_or(0)),
632 Type::Date => Value::Date(strconv::parse_date(s)?),
633 Type::Float4 => Value::Float4(strconv::parse_float32(s)?),
634 Type::Float8 => Value::Float8(strconv::parse_float64(s)?),
635 Type::Int2 => Value::Int2(strconv::parse_int16(s)?),
636 Type::Int4 => Value::Int4(strconv::parse_int32(s)?),
637 Type::Int8 => Value::Int8(strconv::parse_int64(s)?),
638 Type::UInt2 => Value::UInt2(UInt2(strconv::parse_uint16(s)?)),
639 Type::UInt4 => Value::UInt4(UInt4(strconv::parse_uint32(s)?)),
640 Type::UInt8 => Value::UInt8(UInt8(strconv::parse_uint64(s)?)),
641 Type::Interval { .. } => Value::Interval(Interval(strconv::parse_interval(s)?)),
642 Type::Json => return Err("input of json types is not implemented".into()),
643 Type::Jsonb => Value::Jsonb(Jsonb(strconv::parse_jsonb(s)?)),
644 Type::List(elem_type) => Value::List(strconv::parse_list(
645 s,
646 matches!(**elem_type, Type::List(..)),
647 || None,
648 |elem_text| Value::decode_text(elem_type, elem_text.as_bytes()).map(Some),
649 )?),
650 Type::Map { value_type } => Value::Map(strconv::parse_map(
651 s,
652 matches!(**value_type, Type::Map { .. }),
653 |elem_text| {
654 elem_text
655 .map(|t| Value::decode_text(value_type, t.as_bytes()))
656 .transpose()
657 },
658 )?),
659 Type::Name => Value::Name(strconv::parse_pg_legacy_name(s)),
660 Type::Numeric { .. } => Value::Numeric(Numeric(strconv::parse_numeric(s)?)),
661 Type::Oid | Type::RegClass | Type::RegProc | Type::RegType => {
662 Value::Oid(strconv::parse_oid(s)?)
663 }
664 Type::Record(_) => {
665 return Err("input of anonymous composite types is not implemented".into());
666 }
667 Type::Text => Value::Text(s.to_owned()),
668 Type::BpChar { .. } => Value::BpChar(s.to_owned()),
669 Type::VarChar { .. } => Value::VarChar(s.to_owned()),
670 Type::Time { .. } => Value::Time(strconv::parse_time(s)?),
671 Type::TimeTz { .. } => return Err("input of timetz types is not implemented".into()),
672 Type::Timestamp { .. } => Value::Timestamp(strconv::parse_timestamp(s)?),
673 Type::TimestampTz { .. } => Value::TimestampTz(strconv::parse_timestamptz(s)?),
674 Type::Uuid => Value::Uuid(Uuid::parse_str(s)?),
675 Type::MzTimestamp => Value::MzTimestamp(strconv::parse_mz_timestamp(s)?),
676 Type::Range { element_type } => Value::Range(strconv::parse_range(s, |elem_text| {
677 Value::decode_text(element_type, elem_text.as_bytes()).map(Box::new)
678 })?),
679 Type::MzAclItem => Value::MzAclItem(strconv::parse_mz_acl_item(s)?),
680 Type::AclItem => Value::AclItem(strconv::parse_acl_item(s)?),
681 })
682 }
683
684 pub fn decode_text_into_row<'a>(
686 ty: &'a Type,
687 s: &'a str,
688 packer: &mut RowPacker,
689 ) -> Result<(), Box<dyn Error + Sync + Send>> {
690 Ok(match ty {
691 Type::Array(elem_type) => {
692 let (elements, dims) =
693 strconv::parse_array(s, || None, |elem_text| Ok::<_, String>(Some(elem_text)))?;
694 unsafe {
696 packer.push_array_with_unchecked(&dims, |packer| {
697 let mut nelements = 0;
698 for element in elements {
699 match element {
700 Some(elem_text) => {
701 Value::decode_text_into_row(elem_type, &elem_text, packer)?
702 }
703
704 None => packer.push(Datum::Null),
705 }
706 nelements += 1;
707 }
708 Ok::<_, Box<dyn Error + Sync + Send>>(nelements)
709 })?
710 }
711 }
712 Type::Int2Vector { .. } => {
713 return Err("input of Int2Vector types is not implemented".into());
714 }
715 Type::Bool => packer.push(Datum::from(strconv::parse_bool(s)?)),
716 Type::Bytea => packer.push(Datum::Bytes(&strconv::parse_bytes(s)?)),
717 Type::Char => packer.push(Datum::UInt8(s.as_bytes().get(0).copied().unwrap_or(0))),
718 Type::Date => packer.push(Datum::Date(strconv::parse_date(s)?)),
719 Type::Float4 => packer.push(Datum::Float32(strconv::parse_float32(s)?.into())),
720 Type::Float8 => packer.push(Datum::Float64(strconv::parse_float64(s)?.into())),
721 Type::Int2 => packer.push(Datum::Int16(strconv::parse_int16(s)?)),
722 Type::Int4 => packer.push(Datum::Int32(strconv::parse_int32(s)?)),
723 Type::Int8 => packer.push(Datum::Int64(strconv::parse_int64(s)?)),
724 Type::UInt2 => packer.push(Datum::UInt16(strconv::parse_uint16(s)?)),
725 Type::UInt4 => packer.push(Datum::UInt32(strconv::parse_uint32(s)?)),
726 Type::UInt8 => packer.push(Datum::UInt64(strconv::parse_uint64(s)?)),
727 Type::Interval { .. } => packer.push(Datum::Interval(strconv::parse_interval(s)?)),
728 Type::Json => return Err("input of json types is not implemented".into()),
729 Type::Jsonb => packer.push(strconv::parse_jsonb(s)?.into_row().unpack_first()),
730 Type::List(elem_type) => {
731 let elems = strconv::parse_list(
732 s,
733 matches!(**elem_type, Type::List(..)),
734 || None,
735 |elem_text| Ok::<_, String>(Some(elem_text)),
736 )?;
737 packer.push_list_with(|packer| {
738 for elem in elems {
739 match elem {
740 Some(elem) => Value::decode_text_into_row(elem_type, &elem, packer)?,
741 None => packer.push(Datum::Null),
742 }
743 }
744 Ok::<_, Box<dyn Error + Sync + Send>>(())
745 })?;
746 }
747 Type::Map { value_type } => {
748 let map =
749 strconv::parse_map(s, matches!(**value_type, Type::Map { .. }), |elem_text| {
750 elem_text.map(Ok::<_, String>).transpose()
751 })?;
752 packer.push_dict_with(|row| {
753 for (k, v) in map {
754 row.push(Datum::String(&k));
755 match v {
756 Some(elem) => Value::decode_text_into_row(value_type, &elem, row)?,
757 None => row.push(Datum::Null),
758 }
759 }
760 Ok::<_, Box<dyn Error + Sync + Send>>(())
761 })?;
762 }
763 Type::Name => packer.push(Datum::String(&strconv::parse_pg_legacy_name(s))),
764 Type::Numeric { .. } => packer.push(Datum::Numeric(strconv::parse_numeric(s)?)),
765 Type::Oid | Type::RegClass | Type::RegProc | Type::RegType => {
766 packer.push(Datum::UInt32(strconv::parse_oid(s)?))
767 }
768 Type::Record(_) => {
769 return Err("input of anonymous composite types is not implemented".into());
770 }
771 Type::Text => packer.push(Datum::String(s)),
772 Type::BpChar { .. } => packer.push(Datum::String(s.trim_end())),
773 Type::VarChar { .. } => packer.push(Datum::String(s)),
774 Type::Time { .. } => packer.push(Datum::Time(strconv::parse_time(s)?)),
775 Type::TimeTz { .. } => return Err("input of timetz types is not implemented".into()),
776 Type::Timestamp { .. } => packer.push(Datum::Timestamp(strconv::parse_timestamp(s)?)),
777 Type::TimestampTz { .. } => {
778 packer.push(Datum::TimestampTz(strconv::parse_timestamptz(s)?))
779 }
780 Type::Uuid => packer.push(Datum::Uuid(Uuid::parse_str(s)?)),
781 Type::MzTimestamp => packer.push(Datum::MzTimestamp(strconv::parse_mz_timestamp(s)?)),
782 Type::Range { element_type } => {
783 let range = strconv::parse_range(s, |elem_text| {
784 Value::decode_text(element_type, elem_text.as_bytes()).map(Box::new)
785 })?;
786 let buf = RowArena::new();
789 let range = range.into_bounds(|elem| elem.into_datum(&buf, element_type));
790
791 packer.push_range(range).unwrap()
792 }
793 Type::MzAclItem => packer.push(Datum::MzAclItem(strconv::parse_mz_acl_item(s)?)),
794 Type::AclItem => packer.push(Datum::AclItem(strconv::parse_acl_item(s)?)),
795 })
796 }
797
798 pub fn decode_binary(ty: &Type, raw: &[u8]) -> Result<Value, Box<dyn Error + Sync + Send>> {
801 match ty {
802 Type::Array(_) => Err("input of array types is not implemented".into()),
803 Type::Int2Vector => Err("input of int2vector types is not implemented".into()),
804 Type::Bool => bool::from_sql(ty.inner(), raw).map(Value::Bool),
805 Type::Bytea => Vec::<u8>::from_sql(ty.inner(), raw).map(Value::Bytea),
806 Type::Char => {
807 i8::from_sql(ty.inner(), raw).map(|c| Value::Char(u8::reinterpret_cast(c)))
808 }
809 Type::Date => {
810 let days = i32::from_sql(ty.inner(), raw)?;
811 Ok(Value::Date(Date::from_pg_epoch(days)?))
812 }
813 Type::Float4 => f32::from_sql(ty.inner(), raw).map(Value::Float4),
814 Type::Float8 => f64::from_sql(ty.inner(), raw).map(Value::Float8),
815 Type::Int2 => i16::from_sql(ty.inner(), raw).map(Value::Int2),
816 Type::Int4 => i32::from_sql(ty.inner(), raw).map(Value::Int4),
817 Type::Int8 => i64::from_sql(ty.inner(), raw).map(Value::Int8),
818 Type::UInt2 => UInt2::from_sql(ty.inner(), raw).map(Value::UInt2),
819 Type::UInt4 => UInt4::from_sql(ty.inner(), raw).map(Value::UInt4),
820 Type::UInt8 => UInt8::from_sql(ty.inner(), raw).map(Value::UInt8),
821 Type::Interval { .. } => Interval::from_sql(ty.inner(), raw).map(Value::Interval),
822 Type::Json => Err("input of json types is not implemented".into()),
823 Type::Jsonb => Jsonb::from_sql(ty.inner(), raw).map(Value::Jsonb),
824 Type::List(_) => Err("binary decoding of list types is not implemented".into()),
825 Type::Map { .. } => Err("binary decoding of map types is not implemented".into()),
826 Type::Name => {
827 let s = String::from_sql(ty.inner(), raw)?;
828 if s.len() > NAME_MAX_BYTES {
829 return Err("identifier too long".into());
830 }
831 Ok(Value::Name(s))
832 }
833 Type::Numeric { .. } => Numeric::from_sql(ty.inner(), raw).map(Value::Numeric),
834 Type::Oid | Type::RegClass | Type::RegProc | Type::RegType => {
835 u32::from_sql(ty.inner(), raw).map(Value::Oid)
836 }
837 Type::Record(_) => Err("input of anonymous composite types is not implemented".into()),
838 Type::Text => String::from_sql(ty.inner(), raw).map(Value::Text),
839 Type::BpChar { .. } => String::from_sql(ty.inner(), raw).map(Value::BpChar),
840 Type::VarChar { .. } => String::from_sql(ty.inner(), raw).map(Value::VarChar),
841 Type::Time { .. } => NaiveTime::from_sql(ty.inner(), raw).map(Value::Time),
842 Type::TimeTz { .. } => Err("input of timetz types is not implemented".into()),
843 Type::Timestamp { .. } => {
844 let ts = NaiveDateTime::from_sql(ty.inner(), raw)?;
845 Ok(Value::Timestamp(CheckedTimestamp::from_timestamplike(ts)?))
846 }
847 Type::TimestampTz { .. } => {
848 let ts = DateTime::<Utc>::from_sql(ty.inner(), raw)?;
849 Ok(Value::TimestampTz(CheckedTimestamp::from_timestamplike(
850 ts,
851 )?))
852 }
853 Type::Uuid => Uuid::from_sql(ty.inner(), raw).map(Value::Uuid),
854 Type::MzTimestamp => {
855 let s = String::from_sql(ty.inner(), raw)?;
856 let t: mz_repr::Timestamp = s.parse()?;
857 Ok(Value::MzTimestamp(t))
858 }
859 Type::Range { .. } => Err("binary decoding of range types is not implemented".into()),
860 Type::MzAclItem => {
861 let mz_acl_item = MzAclItem::decode_binary(raw)?;
862 Ok(Value::MzAclItem(mz_acl_item))
863 }
864 Type::AclItem => Err("aclitem has no binary encoding".into()),
865 }
866 }
867}
868
869fn encode_element(buf: &mut BytesMut, elem: Option<&Value>, ty: &Type) -> Result<(), io::Error> {
870 match elem {
871 None => buf.put_i32(-1),
872 Some(elem) => {
873 let base = buf.len();
874 buf.put_i32(0);
875 elem.encode_binary(ty, buf)?;
876 let len = pg_len("encoded element", buf.len() - base - 4)?;
877 buf[base..base + 4].copy_from_slice(&len.to_be_bytes());
878 }
879 }
880 Ok(())
881}
882
883fn pg_len(what: &str, len: usize) -> Result<i32, io::Error> {
884 len.try_into().map_err(|_| {
885 io::Error::new(
886 io::ErrorKind::Other,
887 format!("{} does not fit into an i32", what),
888 )
889 })
890}
891
892pub fn values_from_row(row: &RowRef, typ: &SqlRelationType) -> Vec<Option<Value>> {
897 row.iter()
898 .zip_eq(typ.column_types.iter())
899 .map(|(col, typ)| Value::from_datum(col, &typ.scalar_type))
900 .collect()
901}
902
903#[cfg(test)]
904mod tests {
905 use super::*;
906
907 #[mz_ore::test]
909 fn decode_text_error_smoke_test() {
910 let bool_array = Value::Array {
911 dims: vec![ArrayDimension {
912 lower_bound: 0,
913 length: 1,
914 }],
915 elements: vec![Some(Value::Bool(true))],
916 };
917
918 let mut buf = BytesMut::new();
919 bool_array.encode_text(&mut buf);
920 let buf = buf.to_vec();
921
922 let int_array_tpe = Type::Array(Box::new(Type::Int4));
923 let decoded_int_array = Value::decode_text(&int_array_tpe, &buf);
924
925 assert_eq!(
926 decoded_int_array.map_err(|e| e.to_string()).unwrap_err(),
927 "invalid input syntax for type array: Specifying array lower bounds is not supported: \"[0:0]={t}\"".to_string()
928 );
929 }
930}