mz_expr/scalar/func/impls/
jsonb.rs1use std::collections::BTreeMap;
11use std::fmt;
12
13use mz_expr_derive::sqlfunc;
14use mz_lowertest::MzReflect;
15use mz_repr::adt::jsonb::{Jsonb, JsonbRef};
16use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem};
17use mz_repr::adt::numeric::{self, Numeric, NumericMaxScale};
18use mz_repr::role_id::RoleId;
19use mz_repr::{ArrayRustType, Datum, Row, RowPacker, SqlColumnType, SqlScalarType, strconv};
20use mz_sql_parser::ast::RawClusterName;
21use mz_sql_parser::ast::display::AstDisplay;
22use serde::{Deserialize, Serialize};
23use serde_json::json;
24
25use crate::EvalError;
26use crate::scalar::func::EagerUnaryFunc;
27use crate::scalar::func::impls::numeric::*;
28
29#[sqlfunc(
30 sqlname = "jsonb_to_text",
31 preserves_uniqueness = false,
32 inverse = to_unary!(super::CastStringToJsonb)
33)]
34pub fn cast_jsonb_to_string<'a>(a: JsonbRef<'a>) -> String {
35 let mut buf = String::new();
36 strconv::format_jsonb(&mut buf, a);
37 buf
38}
39
40#[sqlfunc(sqlname = "jsonb_to_smallint", is_monotone = true)]
41fn cast_jsonb_to_int16<'a>(a: JsonbRef<'a>) -> Result<i16, EvalError> {
42 match a.into_datum() {
43 Datum::Numeric(a) => cast_numeric_to_int16(a.into_inner()),
44 datum => Err(EvalError::InvalidJsonbCast {
45 from: jsonb_typeof(JsonbRef::from_datum(datum)).into(),
46 to: "smallint".into(),
47 }),
48 }
49}
50
51#[sqlfunc(sqlname = "jsonb_to_integer", is_monotone = true)]
52fn cast_jsonb_to_int32<'a>(a: JsonbRef<'a>) -> Result<i32, EvalError> {
53 match a.into_datum() {
54 Datum::Numeric(a) => cast_numeric_to_int32(a.into_inner()),
55 datum => Err(EvalError::InvalidJsonbCast {
56 from: jsonb_typeof(JsonbRef::from_datum(datum)).into(),
57 to: "integer".into(),
58 }),
59 }
60}
61
62#[sqlfunc(sqlname = "jsonb_to_bigint", is_monotone = true)]
63fn cast_jsonb_to_int64<'a>(a: JsonbRef<'a>) -> Result<i64, EvalError> {
64 match a.into_datum() {
65 Datum::Numeric(a) => cast_numeric_to_int64(a.into_inner()),
66 datum => Err(EvalError::InvalidJsonbCast {
67 from: jsonb_typeof(JsonbRef::from_datum(datum)).into(),
68 to: "bigint".into(),
69 }),
70 }
71}
72
73#[sqlfunc(sqlname = "jsonb_to_real", is_monotone = true)]
74fn cast_jsonb_to_float32<'a>(a: JsonbRef<'a>) -> Result<f32, EvalError> {
75 match a.into_datum() {
76 Datum::Numeric(a) => cast_numeric_to_float32(a.into_inner()),
77 datum => Err(EvalError::InvalidJsonbCast {
78 from: jsonb_typeof(JsonbRef::from_datum(datum)).into(),
79 to: "real".into(),
80 }),
81 }
82}
83
84#[sqlfunc(sqlname = "jsonb_to_double", is_monotone = true)]
85fn cast_jsonb_to_float64<'a>(a: JsonbRef<'a>) -> Result<f64, EvalError> {
86 match a.into_datum() {
87 Datum::Numeric(a) => cast_numeric_to_float64(a.into_inner()),
88 datum => Err(EvalError::InvalidJsonbCast {
89 from: jsonb_typeof(JsonbRef::from_datum(datum)).into(),
90 to: "double precision".into(),
91 }),
92 }
93}
94
95#[derive(
96 Ord,
97 PartialOrd,
98 Clone,
99 Debug,
100 Eq,
101 PartialEq,
102 Serialize,
103 Deserialize,
104 Hash,
105 MzReflect
106)]
107pub struct CastJsonbToNumeric(pub Option<NumericMaxScale>);
108
109impl EagerUnaryFunc for CastJsonbToNumeric {
110 type Input<'a> = JsonbRef<'a>;
111 type Output<'a> = Result<Numeric, EvalError>;
112
113 fn call<'a>(&self, a: Self::Input<'a>) -> Self::Output<'a> {
114 match a.into_datum() {
115 Datum::Numeric(mut num) => match self.0 {
116 None => Ok(num.into_inner()),
117 Some(scale) => {
118 if numeric::rescale(&mut num.0, scale.into_u8()).is_err() {
119 return Err(EvalError::NumericFieldOverflow);
120 };
121 Ok(num.into_inner())
122 }
123 },
124 datum => Err(EvalError::InvalidJsonbCast {
125 from: jsonb_typeof(JsonbRef::from_datum(datum)).into(),
126 to: "numeric".into(),
127 }),
128 }
129 }
130
131 fn output_sql_type(&self, input: SqlColumnType) -> SqlColumnType {
132 SqlScalarType::Numeric { max_scale: self.0 }.nullable(input.nullable)
133 }
134
135 fn is_monotone(&self) -> bool {
136 true
137 }
138}
139
140impl fmt::Display for CastJsonbToNumeric {
141 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
142 f.write_str("jsonb_to_numeric")
143 }
144}
145
146#[sqlfunc(sqlname = "jsonb_to_boolean", is_monotone = true)]
147fn cast_jsonb_to_bool<'a>(a: JsonbRef<'a>) -> Result<bool, EvalError> {
148 match a.into_datum() {
149 Datum::True => Ok(true),
150 Datum::False => Ok(false),
151 datum => Err(EvalError::InvalidJsonbCast {
152 from: jsonb_typeof(JsonbRef::from_datum(datum)).into(),
153 to: "boolean".into(),
154 }),
155 }
156}
157
158#[sqlfunc(sqlname = "jsonbable_to_jsonb")]
159fn cast_jsonbable_to_jsonb<'a>(a: JsonbRef<'a>) -> JsonbRef<'a> {
160 match a.into_datum() {
161 Datum::Numeric(n) => {
162 let n = n.into_inner();
163 let datum = if n.is_finite() {
164 Datum::from(n)
165 } else if n.is_nan() {
166 Datum::String("NaN")
167 } else if n.is_negative() {
168 Datum::String("-Infinity")
169 } else {
170 Datum::String("Infinity")
171 };
172 JsonbRef::from_datum(datum)
173 }
174 datum => JsonbRef::from_datum(datum),
175 }
176}
177
178#[sqlfunc]
179fn jsonb_array_length<'a>(a: JsonbRef<'a>) -> Result<Option<i32>, EvalError> {
180 match a.into_datum() {
181 Datum::List(list) => {
182 let count = list.iter().count();
183 match i32::try_from(count) {
184 Ok(len) => Ok(Some(len)),
185 Err(_) => Err(EvalError::Int32OutOfRange(count.to_string().into())),
186 }
187 }
188 _ => Ok(None),
189 }
190}
191
192#[sqlfunc]
193fn jsonb_typeof<'a>(a: JsonbRef<'a>) -> &'a str {
194 match a.into_datum() {
195 Datum::Map(_) => "object",
196 Datum::List(_) => "array",
197 Datum::String(_) => "string",
198 Datum::Numeric(_) => "number",
199 Datum::True | Datum::False => "boolean",
200 Datum::JsonNull => "null",
201 d => panic!("Not jsonb: {:?}", d),
202 }
203}
204
205#[sqlfunc]
206fn jsonb_strip_nulls<'a>(a: JsonbRef<'a>) -> Jsonb {
207 fn strip_nulls(a: Datum, row: &mut RowPacker) {
208 match a {
209 Datum::Map(dict) => row.push_dict_with(|row| {
210 for (k, v) in dict.iter() {
211 match v {
212 Datum::JsonNull => (),
213 _ => {
214 row.push(Datum::String(k));
215 strip_nulls(v, row);
216 }
217 }
218 }
219 }),
220 Datum::List(list) => row.push_list_with(|row| {
221 for elem in list.iter() {
222 strip_nulls(elem, row);
223 }
224 }),
225 _ => row.push(a),
226 }
227 }
228 let mut row = Row::default();
229 strip_nulls(a.into_datum(), &mut row.packer());
230 Jsonb::from_row(row)
231}
232
233#[sqlfunc]
234fn jsonb_pretty<'a>(a: JsonbRef<'a>) -> String {
235 let mut buf = String::new();
236 strconv::format_jsonb_pretty(&mut buf, a);
237 buf
238}
239
240fn jsonb_datum_to_u64<'a>(d: Datum<'a>) -> Result<u64, String> {
242 let Datum::Numeric(n) = d else {
243 return Err("expected numeric value".into());
244 };
245
246 let mut cx = numeric::cx_datum();
247 cx.try_into_u64(n.0)
248 .map_err(|_| format!("number out of u64 range: {n}"))
249}
250
251fn jsonb_datum_to_role_id(d: Datum) -> Result<RoleId, String> {
253 match d {
254 Datum::String("Public") => Ok(RoleId::Public),
255 Datum::String(other) => Err(format!("unexpected role ID variant: {other}")),
256 Datum::Map(dict) => {
257 let (key, val) = dict.iter().next().ok_or_else(|| "empty".to_string())?;
258 let n = jsonb_datum_to_u64(val)?;
259 match key {
260 "User" => Ok(RoleId::User(n)),
261 "System" => Ok(RoleId::System(n)),
262 "Predefined" => Ok(RoleId::Predefined(n)),
263 other => Err(format!("unexpected role ID variant: {other}")),
264 }
265 }
266 _ => Err("expected string or object".into()),
267 }
268}
269
270#[sqlfunc]
274fn parse_catalog_id<'a>(a: JsonbRef<'a>) -> Result<String, EvalError> {
275 let parse = || match a.into_datum() {
276 Datum::String(variant) => match variant {
278 "Explain" => Ok("e".to_string()),
279 "Public" => Ok("p".to_string()),
280 other => Err(format!("unexpected ID variant: {other}")),
281 },
282 Datum::Map(dict) => {
284 let (key, val) = dict.iter().next().ok_or_else(|| "empty".to_string())?;
285 let prefix = match key {
286 "IntrospectionSourceIndex" => "si",
287 "Predefined" => "g",
288 "System" => "s",
289 "Transient" => "t",
290 "User" => "u",
291 other => return Err(format!("unexpected ID variant: {other}")),
292 };
293 let n = jsonb_datum_to_u64(val)?;
294 Ok(format!("{prefix}{n}"))
295 }
296 _ => Err("expected string or object".into()),
297 };
298
299 parse().map_err(|e| EvalError::InvalidCatalogJson(e.into()))
300}
301
302#[sqlfunc]
304fn parse_catalog_privileges<'a>(a: JsonbRef<'a>) -> Result<ArrayRustType<MzAclItem>, EvalError> {
305 let parse_one = |datum| match datum {
306 Datum::Map(dict) => {
307 let mut grantee = None;
308 let mut grantor = None;
309 let mut acl_mode = None;
310 for (key, val) in dict.iter() {
311 match key {
312 "grantee" => {
313 let id = jsonb_datum_to_role_id(val)?;
314 grantee = Some(id);
315 }
316 "grantor" => {
317 let id = jsonb_datum_to_role_id(val)?;
318 grantor = Some(id);
319 }
320 "acl_mode" => {
321 let Datum::Map(mode_dict) = val else {
322 return Err(format!("unexpected acl_mode: {val}"));
323 };
324 let (key, val) = mode_dict.iter().next().ok_or("empty acl_mode")?;
325 if key != "bitflags" {
326 return Err(format!("unexpected acl_mode field: {key}"));
327 }
328 let bits = jsonb_datum_to_u64(val)?;
329 let Some(mode) = AclMode::from_bits(bits) else {
330 return Err(format!("invalid acl_mode bitflags: {bits}"));
331 };
332 acl_mode = Some(mode);
333 }
334 other => return Err(format!("unexpected privilege field: {other}")),
335 }
336 }
337 Ok(MzAclItem {
338 grantee: grantee.ok_or_else(|| format!("missing grantee: {dict:?}"))?,
339 grantor: grantor.ok_or_else(|| "missing grantor in privilege".to_string())?,
340 acl_mode: acl_mode.ok_or_else(|| "missing acl_mode in privilege".to_string())?,
341 })
342 }
343 other => Err(format!("expected object in array, found: {other}")),
344 };
345
346 let parse = || match a.into_datum() {
347 Datum::List(list) => {
348 let mut result = Vec::new();
349 for item in list.iter() {
350 result.push(parse_one(item)?);
351 }
352 Ok(result)
353 }
354 _ => Err("expected array".to_string()),
355 };
356
357 parse()
358 .map(ArrayRustType)
359 .map_err(|e| EvalError::InvalidCatalogJson(e.into()))
360}
361
362#[sqlfunc]
370fn parse_catalog_create_sql<'a>(a: &'a str) -> Result<Jsonb, EvalError> {
371 let parse = || -> Result<serde_json::Value, String> {
372 let mut stmts = mz_sql_parser::parser::parse_statements(a)
373 .map_err(|e| format!("failed to parse create_sql: {e}"))?;
374 let stmt = match stmts.len() {
375 1 => stmts.remove(0).ast,
376 n => return Err(format!("expected a single statement, found {n}")),
377 };
378
379 let mut info = BTreeMap::<&str, serde_json::Value>::new();
380
381 use mz_sql_parser::ast::Statement::*;
382 let item_type = match stmt {
383 CreateSecret(_) => "secret",
384 CreateConnection(stmt) => {
385 let connection_type = stmt.connection_type.as_str();
386 info.insert("connection_type", json!(connection_type));
387
388 "connection"
389 }
390 CreateView(_) => "view",
391 CreateMaterializedView(stmt) => {
392 let Some(in_cluster) = stmt.in_cluster else {
393 return Err("missing IN CLUSTER".into());
394 };
395 let cluster_id = match in_cluster {
396 RawClusterName::Unresolved(ident) => ident.into_string(),
397 RawClusterName::Resolved(s) => s,
398 };
399 info.insert("cluster_id", json!(cluster_id));
400
401 let mut definition = stmt.query.to_ast_string_stable();
402 definition.push(';');
403 info.insert("definition", json!(definition));
404
405 "materialized-view"
406 }
407 CreateContinualTask(_) => "continual-task",
408 CreateTable(_) | CreateTableFromSource(_) => "table",
409 CreateSource(_) | CreateWebhookSource(_) => "source",
410 CreateSubsource(_) => "subsource",
411 CreateSink(_) => "sink",
412 CreateIndex(_) => "index",
413 CreateType(_) => "type",
414 _ => return Err("not a CREATE item statement".into()),
415 };
416 info.insert("type", json!(item_type));
417
418 let info = info.into_iter().map(|(k, v)| (k.to_string(), v)).collect();
419 Ok(info)
420 };
421
422 let val = parse().map_err(|e| EvalError::InvalidCatalogJson(e.into()))?;
423 let jsonb = Jsonb::from_serde_json(val).expect("valid JSONB");
424 Ok(jsonb)
425}