Skip to main content

mz_expr/scalar/func/impls/
jsonb.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::BTreeMap;
11use std::fmt;
12
13use mz_expr_derive::sqlfunc;
14use mz_lowertest::MzReflect;
15use mz_repr::adt::jsonb::{Jsonb, JsonbRef};
16use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem};
17use mz_repr::adt::numeric::{self, Numeric, NumericMaxScale};
18use mz_repr::role_id::RoleId;
19use mz_repr::{ArrayRustType, Datum, Row, RowPacker, SqlColumnType, SqlScalarType, strconv};
20use mz_sql_parser::ast::display::AstDisplay;
21use mz_sql_parser::ast::{AstInfo, Format, FormatSpecifier, RawClusterName, RawItemName};
22use serde::{Deserialize, Serialize};
23use serde_json::json;
24
25use crate::EvalError;
26use crate::scalar::func::EagerUnaryFunc;
27use crate::scalar::func::impls::numeric::*;
28
29#[sqlfunc(
30    sqlname = "jsonb_to_text",
31    preserves_uniqueness = false,
32    inverse = to_unary!(super::CastStringToJsonb)
33)]
34pub fn cast_jsonb_to_string<'a>(a: JsonbRef<'a>) -> String {
35    let mut buf = String::new();
36    strconv::format_jsonb(&mut buf, a);
37    buf
38}
39
40#[sqlfunc(sqlname = "jsonb_to_smallint", is_monotone = true)]
41fn cast_jsonb_to_int16<'a>(a: JsonbRef<'a>) -> Result<i16, EvalError> {
42    match a.into_datum() {
43        Datum::Numeric(a) => cast_numeric_to_int16(a.into_inner()),
44        datum => Err(EvalError::InvalidJsonbCast {
45            from: jsonb_typeof(JsonbRef::from_datum(datum)).into(),
46            to: "smallint".into(),
47        }),
48    }
49}
50
51#[sqlfunc(sqlname = "jsonb_to_integer", is_monotone = true)]
52fn cast_jsonb_to_int32<'a>(a: JsonbRef<'a>) -> Result<i32, EvalError> {
53    match a.into_datum() {
54        Datum::Numeric(a) => cast_numeric_to_int32(a.into_inner()),
55        datum => Err(EvalError::InvalidJsonbCast {
56            from: jsonb_typeof(JsonbRef::from_datum(datum)).into(),
57            to: "integer".into(),
58        }),
59    }
60}
61
62#[sqlfunc(sqlname = "jsonb_to_bigint", is_monotone = true)]
63fn cast_jsonb_to_int64<'a>(a: JsonbRef<'a>) -> Result<i64, EvalError> {
64    match a.into_datum() {
65        Datum::Numeric(a) => cast_numeric_to_int64(a.into_inner()),
66        datum => Err(EvalError::InvalidJsonbCast {
67            from: jsonb_typeof(JsonbRef::from_datum(datum)).into(),
68            to: "bigint".into(),
69        }),
70    }
71}
72
73#[sqlfunc(sqlname = "jsonb_to_real", is_monotone = true)]
74fn cast_jsonb_to_float32<'a>(a: JsonbRef<'a>) -> Result<f32, EvalError> {
75    match a.into_datum() {
76        Datum::Numeric(a) => cast_numeric_to_float32(a.into_inner()),
77        datum => Err(EvalError::InvalidJsonbCast {
78            from: jsonb_typeof(JsonbRef::from_datum(datum)).into(),
79            to: "real".into(),
80        }),
81    }
82}
83
84#[sqlfunc(sqlname = "jsonb_to_double", is_monotone = true)]
85fn cast_jsonb_to_float64<'a>(a: JsonbRef<'a>) -> Result<f64, EvalError> {
86    match a.into_datum() {
87        Datum::Numeric(a) => cast_numeric_to_float64(a.into_inner()),
88        datum => Err(EvalError::InvalidJsonbCast {
89            from: jsonb_typeof(JsonbRef::from_datum(datum)).into(),
90            to: "double precision".into(),
91        }),
92    }
93}
94
95#[derive(
96    Ord,
97    PartialOrd,
98    Clone,
99    Debug,
100    Eq,
101    PartialEq,
102    Serialize,
103    Deserialize,
104    Hash,
105    MzReflect
106)]
107pub struct CastJsonbToNumeric(pub Option<NumericMaxScale>);
108
109impl EagerUnaryFunc for CastJsonbToNumeric {
110    type Input<'a> = JsonbRef<'a>;
111    type Output<'a> = Result<Numeric, EvalError>;
112
113    fn call<'a>(&self, a: Self::Input<'a>) -> Self::Output<'a> {
114        match a.into_datum() {
115            Datum::Numeric(mut num) => match self.0 {
116                None => Ok(num.into_inner()),
117                Some(scale) => {
118                    if numeric::rescale(&mut num.0, scale.into_u8()).is_err() {
119                        return Err(EvalError::NumericFieldOverflow);
120                    };
121                    Ok(num.into_inner())
122                }
123            },
124            datum => Err(EvalError::InvalidJsonbCast {
125                from: jsonb_typeof(JsonbRef::from_datum(datum)).into(),
126                to: "numeric".into(),
127            }),
128        }
129    }
130
131    fn output_sql_type(&self, input: SqlColumnType) -> SqlColumnType {
132        SqlScalarType::Numeric { max_scale: self.0 }.nullable(input.nullable)
133    }
134
135    fn is_monotone(&self) -> bool {
136        true
137    }
138}
139
140impl fmt::Display for CastJsonbToNumeric {
141    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
142        f.write_str("jsonb_to_numeric")
143    }
144}
145
146#[sqlfunc(sqlname = "jsonb_to_boolean", is_monotone = true)]
147fn cast_jsonb_to_bool<'a>(a: JsonbRef<'a>) -> Result<bool, EvalError> {
148    match a.into_datum() {
149        Datum::True => Ok(true),
150        Datum::False => Ok(false),
151        datum => Err(EvalError::InvalidJsonbCast {
152            from: jsonb_typeof(JsonbRef::from_datum(datum)).into(),
153            to: "boolean".into(),
154        }),
155    }
156}
157
158#[sqlfunc(sqlname = "jsonbable_to_jsonb")]
159fn cast_jsonbable_to_jsonb<'a>(a: JsonbRef<'a>) -> JsonbRef<'a> {
160    match a.into_datum() {
161        Datum::Numeric(n) => {
162            let n = n.into_inner();
163            let datum = if n.is_finite() {
164                Datum::from(n)
165            } else if n.is_nan() {
166                Datum::String("NaN")
167            } else if n.is_negative() {
168                Datum::String("-Infinity")
169            } else {
170                Datum::String("Infinity")
171            };
172            JsonbRef::from_datum(datum)
173        }
174        datum => JsonbRef::from_datum(datum),
175    }
176}
177
178#[sqlfunc]
179fn jsonb_array_length<'a>(a: JsonbRef<'a>) -> Result<Option<i32>, EvalError> {
180    match a.into_datum() {
181        Datum::List(list) => {
182            let count = list.iter().count();
183            match i32::try_from(count) {
184                Ok(len) => Ok(Some(len)),
185                Err(_) => Err(EvalError::Int32OutOfRange(count.to_string().into())),
186            }
187        }
188        _ => Ok(None),
189    }
190}
191
192#[sqlfunc]
193fn jsonb_typeof<'a>(a: JsonbRef<'a>) -> &'a str {
194    match a.into_datum() {
195        Datum::Map(_) => "object",
196        Datum::List(_) => "array",
197        Datum::String(_) => "string",
198        Datum::Numeric(_) => "number",
199        Datum::True | Datum::False => "boolean",
200        Datum::JsonNull => "null",
201        d => panic!("Not jsonb: {:?}", d),
202    }
203}
204
205#[sqlfunc]
206fn jsonb_strip_nulls<'a>(a: JsonbRef<'a>) -> Jsonb {
207    fn strip_nulls(a: Datum, row: &mut RowPacker) {
208        match a {
209            Datum::Map(dict) => row.push_dict_with(|row| {
210                for (k, v) in dict.iter() {
211                    match v {
212                        Datum::JsonNull => (),
213                        _ => {
214                            row.push(Datum::String(k));
215                            strip_nulls(v, row);
216                        }
217                    }
218                }
219            }),
220            Datum::List(list) => row.push_list_with(|row| {
221                for elem in list.iter() {
222                    strip_nulls(elem, row);
223                }
224            }),
225            _ => row.push(a),
226        }
227    }
228    let mut row = Row::default();
229    strip_nulls(a.into_datum(), &mut row.packer());
230    Jsonb::from_row(row)
231}
232
233#[sqlfunc]
234fn jsonb_pretty<'a>(a: JsonbRef<'a>) -> String {
235    let mut buf = String::new();
236    strconv::format_jsonb_pretty(&mut buf, a);
237    buf
238}
239
240/// Converts a JSONB `Datum` into a `u64`.
241fn jsonb_datum_to_u64<'a>(d: Datum<'a>) -> Result<u64, String> {
242    let Datum::Numeric(n) = d else {
243        return Err("expected numeric value".into());
244    };
245
246    let mut cx = numeric::cx_datum();
247    cx.try_into_u64(n.0)
248        .map_err(|_| format!("number out of u64 range: {n}"))
249}
250
251/// Converts a JSONB `Datum` into a `RoleId`.
252fn jsonb_datum_to_role_id(d: Datum) -> Result<RoleId, String> {
253    match d {
254        Datum::String("Public") => Ok(RoleId::Public),
255        Datum::String(other) => Err(format!("unexpected role ID variant: {other}")),
256        Datum::Map(dict) => {
257            let (key, val) = dict.iter().next().ok_or_else(|| "empty".to_string())?;
258            let n = jsonb_datum_to_u64(val)?;
259            match key {
260                "User" => Ok(RoleId::User(n)),
261                "System" => Ok(RoleId::System(n)),
262                "Predefined" => Ok(RoleId::Predefined(n)),
263                other => Err(format!("unexpected role ID variant: {other}")),
264            }
265        }
266        _ => Err("expected string or object".into()),
267    }
268}
269
270/// Converts a catalog JSON-serialized ID value into the appropriate string format.
271///
272/// Supports all of Materialize's various ID types of the form `<prefix><u64>`.
273#[sqlfunc]
274fn parse_catalog_id<'a>(a: JsonbRef<'a>) -> Result<String, EvalError> {
275    let parse = || match a.into_datum() {
276        // Unit variant, e.g. "Public"
277        Datum::String(variant) => match variant {
278            "Explain" => Ok("e".to_string()),
279            "Public" => Ok("p".to_string()),
280            other => Err(format!("unexpected ID variant: {other}")),
281        },
282        // Newtype variant, e.g. {"User": 1}
283        Datum::Map(dict) => {
284            let (key, val) = dict.iter().next().ok_or_else(|| "empty".to_string())?;
285            let prefix = match key {
286                "IntrospectionSourceIndex" => "si",
287                "Predefined" => "g",
288                "System" => "s",
289                "Transient" => "t",
290                "User" => "u",
291                other => return Err(format!("unexpected ID variant: {other}")),
292            };
293            let n = jsonb_datum_to_u64(val)?;
294            Ok(format!("{prefix}{n}"))
295        }
296        _ => Err("expected string or object".into()),
297    };
298
299    parse().map_err(|e| EvalError::InvalidCatalogJson(e.into()))
300}
301
302/// Converts a catalog JSON-serialized privilege array into an `mz_aclitem[]`.
303#[sqlfunc]
304fn parse_catalog_privileges<'a>(a: JsonbRef<'a>) -> Result<ArrayRustType<MzAclItem>, EvalError> {
305    let parse_one = |datum| match datum {
306        Datum::Map(dict) => {
307            let mut grantee = None;
308            let mut grantor = None;
309            let mut acl_mode = None;
310            for (key, val) in dict.iter() {
311                match key {
312                    "grantee" => {
313                        let id = jsonb_datum_to_role_id(val)?;
314                        grantee = Some(id);
315                    }
316                    "grantor" => {
317                        let id = jsonb_datum_to_role_id(val)?;
318                        grantor = Some(id);
319                    }
320                    "acl_mode" => {
321                        let Datum::Map(mode_dict) = val else {
322                            return Err(format!("unexpected acl_mode: {val}"));
323                        };
324                        let (key, val) = mode_dict.iter().next().ok_or("empty acl_mode")?;
325                        if key != "bitflags" {
326                            return Err(format!("unexpected acl_mode field: {key}"));
327                        }
328                        let bits = jsonb_datum_to_u64(val)?;
329                        let Some(mode) = AclMode::from_bits(bits) else {
330                            return Err(format!("invalid acl_mode bitflags: {bits}"));
331                        };
332                        acl_mode = Some(mode);
333                    }
334                    other => return Err(format!("unexpected privilege field: {other}")),
335                }
336            }
337            Ok(MzAclItem {
338                grantee: grantee.ok_or_else(|| format!("missing grantee: {dict:?}"))?,
339                grantor: grantor.ok_or_else(|| "missing grantor in privilege".to_string())?,
340                acl_mode: acl_mode.ok_or_else(|| "missing acl_mode in privilege".to_string())?,
341            })
342        }
343        other => Err(format!("expected object in array, found: {other}")),
344    };
345
346    let parse = || match a.into_datum() {
347        Datum::List(list) => {
348            let mut result = Vec::new();
349            for item in list.iter() {
350                result.push(parse_one(item)?);
351            }
352            Ok(result)
353        }
354        _ => Err("expected array".to_string()),
355    };
356
357    parse()
358        .map(ArrayRustType)
359        .map_err(|e| EvalError::InvalidCatalogJson(e.into()))
360}
361
362/// Parses a catalog `create_sql` string into a JSONB object.
363///
364/// The returned JSONB does not fully reflect the parsed SQL and instead contains only fields
365/// required by current callers.
366///
367// TODO: This function isn't parsing JSONB and therefore shouldn't live in the `jsonb` module.
368//       Consider moving all the `parse_catalog_*` functions into their own module.
369#[sqlfunc]
370fn parse_catalog_create_sql<'a>(a: &'a str) -> Result<Jsonb, EvalError> {
371    fn get_cluster_id(in_cluster: RawClusterName) -> Result<String, &'static str> {
372        match in_cluster {
373            RawClusterName::Resolved(s) => Ok(s),
374            RawClusterName::Unresolved(_) => Err("unresolved cluster name"),
375        }
376    }
377
378    fn get_item_id(item: RawItemName) -> Result<String, &'static str> {
379        match item {
380            RawItemName::Id(id, _, _) => Ok(id),
381            RawItemName::Name(_) => Err("unresolved item name"),
382        }
383    }
384
385    fn format_name<T: AstInfo>(fmt: &Format<T>) -> &'static str {
386        match fmt {
387            Format::Bytes => "bytes",
388            Format::Avro(_) => "avro",
389            Format::Protobuf(_) => "protobuf",
390            Format::Regex(_) => "regex",
391            Format::Csv { .. } => "csv",
392            Format::Json { .. } => "json",
393            Format::Text => "text",
394        }
395    }
396
397    let parse = || -> Result<serde_json::Value, String> {
398        let mut stmts = mz_sql_parser::parser::parse_statements(a)
399            .map_err(|e| format!("failed to parse create_sql: {e}"))?;
400        let stmt = match stmts.len() {
401            1 => stmts.remove(0).ast,
402            n => return Err(format!("expected a single statement, found {n}")),
403        };
404
405        let mut info = BTreeMap::<&str, serde_json::Value>::new();
406
407        use mz_sql_parser::ast::Statement::*;
408        let item_type = match stmt {
409            CreateSecret(_) => "secret",
410            CreateConnection(stmt) => {
411                let connection_type = stmt.connection_type.as_str();
412                info.insert("connection_type", json!(connection_type));
413
414                "connection"
415            }
416            CreateView(_) => "view",
417            CreateMaterializedView(stmt) => {
418                let Some(in_cluster) = stmt.in_cluster else {
419                    return Err("missing IN CLUSTER".into());
420                };
421                let cluster_id = match in_cluster {
422                    RawClusterName::Unresolved(ident) => ident.into_string(),
423                    RawClusterName::Resolved(s) => s,
424                };
425                info.insert("cluster_id", json!(cluster_id));
426
427                let mut definition = stmt.query.to_ast_string_stable();
428                definition.push(';');
429                info.insert("definition", json!(definition));
430
431                "materialized-view"
432            }
433            CreateTable(_) | CreateTableFromSource(_) => "table",
434            CreateSource(stmt) => {
435                let Some(in_cluster) = stmt.in_cluster else {
436                    return Err("missing IN CLUSTER".into());
437                };
438                let cluster_id = get_cluster_id(in_cluster)?;
439                info.insert("cluster_id", json!(cluster_id));
440
441                use mz_sql_parser::ast::CreateSourceConnection::*;
442                let (source_type, connection) = match stmt.connection {
443                    Kafka { connection, .. } => ("kafka", Some(connection)),
444                    Postgres { connection, .. } => ("postgres", Some(connection)),
445                    MySql { connection, .. } => ("mysql", Some(connection)),
446                    SqlServer { connection, .. } => ("sql-server", Some(connection)),
447                    LoadGenerator { .. } => ("load-generator", None),
448                };
449                info.insert("source_type", json!(source_type));
450                if let Some(conn) = connection {
451                    let conn_id = get_item_id(conn)?;
452                    info.insert("connection_id", json!(conn_id));
453                }
454
455                let is_debezium = matches!(
456                    stmt.envelope,
457                    Some(mz_sql_parser::ast::SourceEnvelope::Debezium)
458                );
459
460                if let Some(envelope) = stmt.envelope {
461                    use mz_sql_parser::ast::SourceEnvelope::*;
462                    let envelope_type = match envelope {
463                        None => "none",
464                        Debezium => "debezium",
465                        Upsert { .. } => "upsert",
466                        CdcV2 => "materialize",
467                    };
468                    info.insert("envelope_type", json!(envelope_type));
469                }
470
471                if let Some(format_spec) = stmt.format {
472                    match &format_spec {
473                        FormatSpecifier::Bare(fmt) => {
474                            // Debezium sources with a single format spec implicitly use
475                            // the same format for both key and value.
476                            if is_debezium {
477                                info.insert("key_format", json!(format_name(fmt)));
478                            }
479                            info.insert("value_format", json!(format_name(fmt)));
480                        }
481                        FormatSpecifier::KeyValue { key, value } => {
482                            info.insert("key_format", json!(format_name(key)));
483                            info.insert("value_format", json!(format_name(value)));
484                        }
485                    }
486                }
487
488                "source"
489            }
490            CreateWebhookSource(stmt) => {
491                if stmt.is_table {
492                    "table"
493                } else {
494                    info.insert("source_type", json!("webhook"));
495                    if let Some(in_cluster) = stmt.in_cluster {
496                        let cluster_id = get_cluster_id(in_cluster)?;
497                        info.insert("cluster_id", json!(cluster_id));
498                    }
499                    "source"
500                }
501            }
502            CreateSubsource(stmt) => {
503                use mz_sql_parser::ast::CreateSubsourceOptionName;
504                let is_progress = stmt
505                    .with_options
506                    .iter()
507                    .any(|o| matches!(o.name, CreateSubsourceOptionName::Progress));
508                let source_type = if is_progress { "progress" } else { "subsource" };
509                info.insert("source_type", json!(source_type));
510
511                if let Some(of_source) = stmt.of_source {
512                    let of_source_id = get_item_id(of_source)?;
513                    info.insert("of_source_id", json!(of_source_id));
514                }
515
516                "subsource"
517            }
518            CreateSink(_) => "sink",
519            CreateIndex(stmt) => {
520                let Some(in_cluster) = stmt.in_cluster else {
521                    return Err("missing IN CLUSTER".into());
522                };
523                let cluster_id = get_cluster_id(in_cluster)?;
524                info.insert("cluster_id", json!(cluster_id));
525                let on_id = get_item_id(stmt.on_name)?;
526                info.insert("on_id", json!(on_id));
527                "index"
528            }
529            CreateType(_) => "type",
530            _ => return Err("not a CREATE item statement".into()),
531        };
532        info.insert("type", json!(item_type));
533
534        let info = info.into_iter().map(|(k, v)| (k.to_string(), v)).collect();
535        Ok(info)
536    };
537
538    let val = parse().map_err(|e| EvalError::InvalidCatalogJson(e.into()))?;
539    let jsonb = Jsonb::from_serde_json(val).expect("valid JSONB");
540    Ok(jsonb)
541}