Skip to main content

mz_expr/scalar/func/impls/
jsonb.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::BTreeMap;
11use std::fmt;
12
13use mz_expr_derive::sqlfunc;
14use mz_lowertest::MzReflect;
15use mz_repr::adt::jsonb::{Jsonb, JsonbRef};
16use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem};
17use mz_repr::adt::numeric::{self, Numeric, NumericMaxScale};
18use mz_repr::role_id::RoleId;
19use mz_repr::{ArrayRustType, Datum, Row, RowPacker, SqlColumnType, SqlScalarType, strconv};
20use mz_sql_parser::ast::RawClusterName;
21use mz_sql_parser::ast::display::AstDisplay;
22use serde::{Deserialize, Serialize};
23use serde_json::json;
24
25use crate::EvalError;
26use crate::scalar::func::EagerUnaryFunc;
27use crate::scalar::func::impls::numeric::*;
28
29#[sqlfunc(
30    sqlname = "jsonb_to_text",
31    preserves_uniqueness = false,
32    inverse = to_unary!(super::CastStringToJsonb)
33)]
34pub fn cast_jsonb_to_string<'a>(a: JsonbRef<'a>) -> String {
35    let mut buf = String::new();
36    strconv::format_jsonb(&mut buf, a);
37    buf
38}
39
40#[sqlfunc(sqlname = "jsonb_to_smallint", is_monotone = true)]
41fn cast_jsonb_to_int16<'a>(a: JsonbRef<'a>) -> Result<i16, EvalError> {
42    match a.into_datum() {
43        Datum::Numeric(a) => cast_numeric_to_int16(a.into_inner()),
44        datum => Err(EvalError::InvalidJsonbCast {
45            from: jsonb_typeof(JsonbRef::from_datum(datum)).into(),
46            to: "smallint".into(),
47        }),
48    }
49}
50
51#[sqlfunc(sqlname = "jsonb_to_integer", is_monotone = true)]
52fn cast_jsonb_to_int32<'a>(a: JsonbRef<'a>) -> Result<i32, EvalError> {
53    match a.into_datum() {
54        Datum::Numeric(a) => cast_numeric_to_int32(a.into_inner()),
55        datum => Err(EvalError::InvalidJsonbCast {
56            from: jsonb_typeof(JsonbRef::from_datum(datum)).into(),
57            to: "integer".into(),
58        }),
59    }
60}
61
62#[sqlfunc(sqlname = "jsonb_to_bigint", is_monotone = true)]
63fn cast_jsonb_to_int64<'a>(a: JsonbRef<'a>) -> Result<i64, EvalError> {
64    match a.into_datum() {
65        Datum::Numeric(a) => cast_numeric_to_int64(a.into_inner()),
66        datum => Err(EvalError::InvalidJsonbCast {
67            from: jsonb_typeof(JsonbRef::from_datum(datum)).into(),
68            to: "bigint".into(),
69        }),
70    }
71}
72
73#[sqlfunc(sqlname = "jsonb_to_real", is_monotone = true)]
74fn cast_jsonb_to_float32<'a>(a: JsonbRef<'a>) -> Result<f32, EvalError> {
75    match a.into_datum() {
76        Datum::Numeric(a) => cast_numeric_to_float32(a.into_inner()),
77        datum => Err(EvalError::InvalidJsonbCast {
78            from: jsonb_typeof(JsonbRef::from_datum(datum)).into(),
79            to: "real".into(),
80        }),
81    }
82}
83
84#[sqlfunc(sqlname = "jsonb_to_double", is_monotone = true)]
85fn cast_jsonb_to_float64<'a>(a: JsonbRef<'a>) -> Result<f64, EvalError> {
86    match a.into_datum() {
87        Datum::Numeric(a) => cast_numeric_to_float64(a.into_inner()),
88        datum => Err(EvalError::InvalidJsonbCast {
89            from: jsonb_typeof(JsonbRef::from_datum(datum)).into(),
90            to: "double precision".into(),
91        }),
92    }
93}
94
95#[derive(
96    Ord,
97    PartialOrd,
98    Clone,
99    Debug,
100    Eq,
101    PartialEq,
102    Serialize,
103    Deserialize,
104    Hash,
105    MzReflect
106)]
107pub struct CastJsonbToNumeric(pub Option<NumericMaxScale>);
108
109impl EagerUnaryFunc for CastJsonbToNumeric {
110    type Input<'a> = JsonbRef<'a>;
111    type Output<'a> = Result<Numeric, EvalError>;
112
113    fn call<'a>(&self, a: Self::Input<'a>) -> Self::Output<'a> {
114        match a.into_datum() {
115            Datum::Numeric(mut num) => match self.0 {
116                None => Ok(num.into_inner()),
117                Some(scale) => {
118                    if numeric::rescale(&mut num.0, scale.into_u8()).is_err() {
119                        return Err(EvalError::NumericFieldOverflow);
120                    };
121                    Ok(num.into_inner())
122                }
123            },
124            datum => Err(EvalError::InvalidJsonbCast {
125                from: jsonb_typeof(JsonbRef::from_datum(datum)).into(),
126                to: "numeric".into(),
127            }),
128        }
129    }
130
131    fn output_sql_type(&self, input: SqlColumnType) -> SqlColumnType {
132        SqlScalarType::Numeric { max_scale: self.0 }.nullable(input.nullable)
133    }
134
135    fn is_monotone(&self) -> bool {
136        true
137    }
138}
139
140impl fmt::Display for CastJsonbToNumeric {
141    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
142        f.write_str("jsonb_to_numeric")
143    }
144}
145
146#[sqlfunc(sqlname = "jsonb_to_boolean", is_monotone = true)]
147fn cast_jsonb_to_bool<'a>(a: JsonbRef<'a>) -> Result<bool, EvalError> {
148    match a.into_datum() {
149        Datum::True => Ok(true),
150        Datum::False => Ok(false),
151        datum => Err(EvalError::InvalidJsonbCast {
152            from: jsonb_typeof(JsonbRef::from_datum(datum)).into(),
153            to: "boolean".into(),
154        }),
155    }
156}
157
158#[sqlfunc(sqlname = "jsonbable_to_jsonb")]
159fn cast_jsonbable_to_jsonb<'a>(a: JsonbRef<'a>) -> JsonbRef<'a> {
160    match a.into_datum() {
161        Datum::Numeric(n) => {
162            let n = n.into_inner();
163            let datum = if n.is_finite() {
164                Datum::from(n)
165            } else if n.is_nan() {
166                Datum::String("NaN")
167            } else if n.is_negative() {
168                Datum::String("-Infinity")
169            } else {
170                Datum::String("Infinity")
171            };
172            JsonbRef::from_datum(datum)
173        }
174        datum => JsonbRef::from_datum(datum),
175    }
176}
177
178#[sqlfunc]
179fn jsonb_array_length<'a>(a: JsonbRef<'a>) -> Result<Option<i32>, EvalError> {
180    match a.into_datum() {
181        Datum::List(list) => {
182            let count = list.iter().count();
183            match i32::try_from(count) {
184                Ok(len) => Ok(Some(len)),
185                Err(_) => Err(EvalError::Int32OutOfRange(count.to_string().into())),
186            }
187        }
188        _ => Ok(None),
189    }
190}
191
192#[sqlfunc]
193fn jsonb_typeof<'a>(a: JsonbRef<'a>) -> &'a str {
194    match a.into_datum() {
195        Datum::Map(_) => "object",
196        Datum::List(_) => "array",
197        Datum::String(_) => "string",
198        Datum::Numeric(_) => "number",
199        Datum::True | Datum::False => "boolean",
200        Datum::JsonNull => "null",
201        d => panic!("Not jsonb: {:?}", d),
202    }
203}
204
205#[sqlfunc]
206fn jsonb_strip_nulls<'a>(a: JsonbRef<'a>) -> Jsonb {
207    fn strip_nulls(a: Datum, row: &mut RowPacker) {
208        match a {
209            Datum::Map(dict) => row.push_dict_with(|row| {
210                for (k, v) in dict.iter() {
211                    match v {
212                        Datum::JsonNull => (),
213                        _ => {
214                            row.push(Datum::String(k));
215                            strip_nulls(v, row);
216                        }
217                    }
218                }
219            }),
220            Datum::List(list) => row.push_list_with(|row| {
221                for elem in list.iter() {
222                    strip_nulls(elem, row);
223                }
224            }),
225            _ => row.push(a),
226        }
227    }
228    let mut row = Row::default();
229    strip_nulls(a.into_datum(), &mut row.packer());
230    Jsonb::from_row(row)
231}
232
233#[sqlfunc]
234fn jsonb_pretty<'a>(a: JsonbRef<'a>) -> String {
235    let mut buf = String::new();
236    strconv::format_jsonb_pretty(&mut buf, a);
237    buf
238}
239
240/// Converts a JSONB `Datum` into a `u64`.
241fn jsonb_datum_to_u64<'a>(d: Datum<'a>) -> Result<u64, String> {
242    let Datum::Numeric(n) = d else {
243        return Err("expected numeric value".into());
244    };
245
246    let mut cx = numeric::cx_datum();
247    cx.try_into_u64(n.0)
248        .map_err(|_| format!("number out of u64 range: {n}"))
249}
250
251/// Converts a JSONB `Datum` into a `RoleId`.
252fn jsonb_datum_to_role_id(d: Datum) -> Result<RoleId, String> {
253    match d {
254        Datum::String("Public") => Ok(RoleId::Public),
255        Datum::String(other) => Err(format!("unexpected role ID variant: {other}")),
256        Datum::Map(dict) => {
257            let (key, val) = dict.iter().next().ok_or_else(|| "empty".to_string())?;
258            let n = jsonb_datum_to_u64(val)?;
259            match key {
260                "User" => Ok(RoleId::User(n)),
261                "System" => Ok(RoleId::System(n)),
262                "Predefined" => Ok(RoleId::Predefined(n)),
263                other => Err(format!("unexpected role ID variant: {other}")),
264            }
265        }
266        _ => Err("expected string or object".into()),
267    }
268}
269
270/// Converts a catalog JSON-serialized ID value into the appropriate string format.
271///
272/// Supports all of Materialize's various ID types of the form `<prefix><u64>`.
273#[sqlfunc]
274fn parse_catalog_id<'a>(a: JsonbRef<'a>) -> Result<String, EvalError> {
275    let parse = || match a.into_datum() {
276        // Unit variant, e.g. "Public"
277        Datum::String(variant) => match variant {
278            "Explain" => Ok("e".to_string()),
279            "Public" => Ok("p".to_string()),
280            other => Err(format!("unexpected ID variant: {other}")),
281        },
282        // Newtype variant, e.g. {"User": 1}
283        Datum::Map(dict) => {
284            let (key, val) = dict.iter().next().ok_or_else(|| "empty".to_string())?;
285            let prefix = match key {
286                "IntrospectionSourceIndex" => "si",
287                "Predefined" => "g",
288                "System" => "s",
289                "Transient" => "t",
290                "User" => "u",
291                other => return Err(format!("unexpected ID variant: {other}")),
292            };
293            let n = jsonb_datum_to_u64(val)?;
294            Ok(format!("{prefix}{n}"))
295        }
296        _ => Err("expected string or object".into()),
297    };
298
299    parse().map_err(|e| EvalError::InvalidCatalogJson(e.into()))
300}
301
302/// Converts a catalog JSON-serialized privilege array into an `mz_aclitem[]`.
303#[sqlfunc]
304fn parse_catalog_privileges<'a>(a: JsonbRef<'a>) -> Result<ArrayRustType<MzAclItem>, EvalError> {
305    let parse_one = |datum| match datum {
306        Datum::Map(dict) => {
307            let mut grantee = None;
308            let mut grantor = None;
309            let mut acl_mode = None;
310            for (key, val) in dict.iter() {
311                match key {
312                    "grantee" => {
313                        let id = jsonb_datum_to_role_id(val)?;
314                        grantee = Some(id);
315                    }
316                    "grantor" => {
317                        let id = jsonb_datum_to_role_id(val)?;
318                        grantor = Some(id);
319                    }
320                    "acl_mode" => {
321                        let Datum::Map(mode_dict) = val else {
322                            return Err(format!("unexpected acl_mode: {val}"));
323                        };
324                        let (key, val) = mode_dict.iter().next().ok_or("empty acl_mode")?;
325                        if key != "bitflags" {
326                            return Err(format!("unexpected acl_mode field: {key}"));
327                        }
328                        let bits = jsonb_datum_to_u64(val)?;
329                        let Some(mode) = AclMode::from_bits(bits) else {
330                            return Err(format!("invalid acl_mode bitflags: {bits}"));
331                        };
332                        acl_mode = Some(mode);
333                    }
334                    other => return Err(format!("unexpected privilege field: {other}")),
335                }
336            }
337            Ok(MzAclItem {
338                grantee: grantee.ok_or_else(|| format!("missing grantee: {dict:?}"))?,
339                grantor: grantor.ok_or_else(|| "missing grantor in privilege".to_string())?,
340                acl_mode: acl_mode.ok_or_else(|| "missing acl_mode in privilege".to_string())?,
341            })
342        }
343        other => Err(format!("expected object in array, found: {other}")),
344    };
345
346    let parse = || match a.into_datum() {
347        Datum::List(list) => {
348            let mut result = Vec::new();
349            for item in list.iter() {
350                result.push(parse_one(item)?);
351            }
352            Ok(result)
353        }
354        _ => Err("expected array".to_string()),
355    };
356
357    parse()
358        .map(ArrayRustType)
359        .map_err(|e| EvalError::InvalidCatalogJson(e.into()))
360}
361
362/// Parses a catalog `create_sql` string into a JSONB object.
363///
364/// The returned JSONB does not fully reflect the parsed SQL and instead contains only fields
365/// required by current callers.
366///
367// TODO: This function isn't parsing JSONB and therefore shouldn't live in the `jsonb` module.
368//       Consider moving all the `parse_catalog_*` functions into their own module.
369#[sqlfunc]
370fn parse_catalog_create_sql<'a>(a: &'a str) -> Result<Jsonb, EvalError> {
371    let parse = || -> Result<serde_json::Value, String> {
372        let mut stmts = mz_sql_parser::parser::parse_statements(a)
373            .map_err(|e| format!("failed to parse create_sql: {e}"))?;
374        let stmt = match stmts.len() {
375            1 => stmts.remove(0).ast,
376            n => return Err(format!("expected a single statement, found {n}")),
377        };
378
379        let mut info = BTreeMap::<&str, serde_json::Value>::new();
380
381        use mz_sql_parser::ast::Statement::*;
382        let item_type = match stmt {
383            CreateSecret(_) => "secret",
384            CreateConnection(stmt) => {
385                let connection_type = stmt.connection_type.as_str();
386                info.insert("connection_type", json!(connection_type));
387
388                "connection"
389            }
390            CreateView(_) => "view",
391            CreateMaterializedView(stmt) => {
392                let Some(in_cluster) = stmt.in_cluster else {
393                    return Err("missing IN CLUSTER".into());
394                };
395                let cluster_id = match in_cluster {
396                    RawClusterName::Unresolved(ident) => ident.into_string(),
397                    RawClusterName::Resolved(s) => s,
398                };
399                info.insert("cluster_id", json!(cluster_id));
400
401                let mut definition = stmt.query.to_ast_string_stable();
402                definition.push(';');
403                info.insert("definition", json!(definition));
404
405                "materialized-view"
406            }
407            CreateContinualTask(_) => "continual-task",
408            CreateTable(_) | CreateTableFromSource(_) => "table",
409            CreateSource(_) | CreateWebhookSource(_) => "source",
410            CreateSubsource(_) => "subsource",
411            CreateSink(_) => "sink",
412            CreateIndex(_) => "index",
413            CreateType(_) => "type",
414            _ => return Err("not a CREATE item statement".into()),
415        };
416        info.insert("type", json!(item_type));
417
418        let info = info.into_iter().map(|(k, v)| (k.to_string(), v)).collect();
419        Ok(info)
420    };
421
422    let val = parse().map_err(|e| EvalError::InvalidCatalogJson(e.into()))?;
423    let jsonb = Jsonb::from_serde_json(val).expect("valid JSONB");
424    Ok(jsonb)
425}