1use std::collections::BTreeMap;
11use std::fmt;
12
13use mz_expr_derive::sqlfunc;
14use mz_lowertest::MzReflect;
15use mz_repr::adt::jsonb::{Jsonb, JsonbRef};
16use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem};
17use mz_repr::adt::numeric::{self, Numeric, NumericMaxScale};
18use mz_repr::role_id::RoleId;
19use mz_repr::{ArrayRustType, Datum, Row, RowPacker, SqlColumnType, SqlScalarType, strconv};
20use mz_sql_parser::ast::display::AstDisplay;
21use mz_sql_parser::ast::{AstInfo, Format, FormatSpecifier, RawClusterName, RawItemName};
22use serde::{Deserialize, Serialize};
23use serde_json::json;
24
25use crate::EvalError;
26use crate::scalar::func::EagerUnaryFunc;
27use crate::scalar::func::impls::numeric::*;
28
29#[sqlfunc(
30 sqlname = "jsonb_to_text",
31 preserves_uniqueness = false,
32 inverse = to_unary!(super::CastStringToJsonb)
33)]
34pub fn cast_jsonb_to_string<'a>(a: JsonbRef<'a>) -> String {
35 let mut buf = String::new();
36 strconv::format_jsonb(&mut buf, a);
37 buf
38}
39
40#[sqlfunc(sqlname = "jsonb_to_smallint", is_monotone = true)]
41fn cast_jsonb_to_int16<'a>(a: JsonbRef<'a>) -> Result<i16, EvalError> {
42 match a.into_datum() {
43 Datum::Numeric(a) => cast_numeric_to_int16(a.into_inner()),
44 datum => Err(EvalError::InvalidJsonbCast {
45 from: jsonb_typeof(JsonbRef::from_datum(datum)).into(),
46 to: "smallint".into(),
47 }),
48 }
49}
50
51#[sqlfunc(sqlname = "jsonb_to_integer", is_monotone = true)]
52fn cast_jsonb_to_int32<'a>(a: JsonbRef<'a>) -> Result<i32, EvalError> {
53 match a.into_datum() {
54 Datum::Numeric(a) => cast_numeric_to_int32(a.into_inner()),
55 datum => Err(EvalError::InvalidJsonbCast {
56 from: jsonb_typeof(JsonbRef::from_datum(datum)).into(),
57 to: "integer".into(),
58 }),
59 }
60}
61
62#[sqlfunc(sqlname = "jsonb_to_bigint", is_monotone = true)]
63fn cast_jsonb_to_int64<'a>(a: JsonbRef<'a>) -> Result<i64, EvalError> {
64 match a.into_datum() {
65 Datum::Numeric(a) => cast_numeric_to_int64(a.into_inner()),
66 datum => Err(EvalError::InvalidJsonbCast {
67 from: jsonb_typeof(JsonbRef::from_datum(datum)).into(),
68 to: "bigint".into(),
69 }),
70 }
71}
72
73#[sqlfunc(sqlname = "jsonb_to_real", is_monotone = true)]
74fn cast_jsonb_to_float32<'a>(a: JsonbRef<'a>) -> Result<f32, EvalError> {
75 match a.into_datum() {
76 Datum::Numeric(a) => cast_numeric_to_float32(a.into_inner()),
77 datum => Err(EvalError::InvalidJsonbCast {
78 from: jsonb_typeof(JsonbRef::from_datum(datum)).into(),
79 to: "real".into(),
80 }),
81 }
82}
83
84#[sqlfunc(sqlname = "jsonb_to_double", is_monotone = true)]
85fn cast_jsonb_to_float64<'a>(a: JsonbRef<'a>) -> Result<f64, EvalError> {
86 match a.into_datum() {
87 Datum::Numeric(a) => cast_numeric_to_float64(a.into_inner()),
88 datum => Err(EvalError::InvalidJsonbCast {
89 from: jsonb_typeof(JsonbRef::from_datum(datum)).into(),
90 to: "double precision".into(),
91 }),
92 }
93}
94
95#[derive(
96 Ord,
97 PartialOrd,
98 Clone,
99 Debug,
100 Eq,
101 PartialEq,
102 Serialize,
103 Deserialize,
104 Hash,
105 MzReflect
106)]
107pub struct CastJsonbToNumeric(pub Option<NumericMaxScale>);
108
109impl EagerUnaryFunc for CastJsonbToNumeric {
110 type Input<'a> = JsonbRef<'a>;
111 type Output<'a> = Result<Numeric, EvalError>;
112
113 fn call<'a>(&self, a: Self::Input<'a>) -> Self::Output<'a> {
114 match a.into_datum() {
115 Datum::Numeric(mut num) => match self.0 {
116 None => Ok(num.into_inner()),
117 Some(scale) => {
118 if numeric::rescale(&mut num.0, scale.into_u8()).is_err() {
119 return Err(EvalError::NumericFieldOverflow);
120 };
121 Ok(num.into_inner())
122 }
123 },
124 datum => Err(EvalError::InvalidJsonbCast {
125 from: jsonb_typeof(JsonbRef::from_datum(datum)).into(),
126 to: "numeric".into(),
127 }),
128 }
129 }
130
131 fn output_sql_type(&self, input: SqlColumnType) -> SqlColumnType {
132 SqlScalarType::Numeric { max_scale: self.0 }.nullable(input.nullable)
133 }
134
135 fn is_monotone(&self) -> bool {
136 true
137 }
138}
139
140impl fmt::Display for CastJsonbToNumeric {
141 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
142 f.write_str("jsonb_to_numeric")
143 }
144}
145
146#[sqlfunc(sqlname = "jsonb_to_boolean", is_monotone = true)]
147fn cast_jsonb_to_bool<'a>(a: JsonbRef<'a>) -> Result<bool, EvalError> {
148 match a.into_datum() {
149 Datum::True => Ok(true),
150 Datum::False => Ok(false),
151 datum => Err(EvalError::InvalidJsonbCast {
152 from: jsonb_typeof(JsonbRef::from_datum(datum)).into(),
153 to: "boolean".into(),
154 }),
155 }
156}
157
158#[sqlfunc(sqlname = "jsonbable_to_jsonb")]
159fn cast_jsonbable_to_jsonb<'a>(a: JsonbRef<'a>) -> JsonbRef<'a> {
160 match a.into_datum() {
161 Datum::Numeric(n) => {
162 let n = n.into_inner();
163 let datum = if n.is_finite() {
164 Datum::from(n)
165 } else if n.is_nan() {
166 Datum::String("NaN")
167 } else if n.is_negative() {
168 Datum::String("-Infinity")
169 } else {
170 Datum::String("Infinity")
171 };
172 JsonbRef::from_datum(datum)
173 }
174 datum => JsonbRef::from_datum(datum),
175 }
176}
177
178#[sqlfunc]
179fn jsonb_array_length<'a>(a: JsonbRef<'a>) -> Result<Option<i32>, EvalError> {
180 match a.into_datum() {
181 Datum::List(list) => {
182 let count = list.iter().count();
183 match i32::try_from(count) {
184 Ok(len) => Ok(Some(len)),
185 Err(_) => Err(EvalError::Int32OutOfRange(count.to_string().into())),
186 }
187 }
188 _ => Ok(None),
189 }
190}
191
192#[sqlfunc]
193fn jsonb_typeof<'a>(a: JsonbRef<'a>) -> &'a str {
194 match a.into_datum() {
195 Datum::Map(_) => "object",
196 Datum::List(_) => "array",
197 Datum::String(_) => "string",
198 Datum::Numeric(_) => "number",
199 Datum::True | Datum::False => "boolean",
200 Datum::JsonNull => "null",
201 d => panic!("Not jsonb: {:?}", d),
202 }
203}
204
205#[sqlfunc]
206fn jsonb_strip_nulls<'a>(a: JsonbRef<'a>) -> Jsonb {
207 fn strip_nulls(a: Datum, row: &mut RowPacker) {
208 match a {
209 Datum::Map(dict) => row.push_dict_with(|row| {
210 for (k, v) in dict.iter() {
211 match v {
212 Datum::JsonNull => (),
213 _ => {
214 row.push(Datum::String(k));
215 strip_nulls(v, row);
216 }
217 }
218 }
219 }),
220 Datum::List(list) => row.push_list_with(|row| {
221 for elem in list.iter() {
222 strip_nulls(elem, row);
223 }
224 }),
225 _ => row.push(a),
226 }
227 }
228 let mut row = Row::default();
229 strip_nulls(a.into_datum(), &mut row.packer());
230 Jsonb::from_row(row)
231}
232
233#[sqlfunc]
234fn jsonb_pretty<'a>(a: JsonbRef<'a>) -> String {
235 let mut buf = String::new();
236 strconv::format_jsonb_pretty(&mut buf, a);
237 buf
238}
239
240fn jsonb_datum_to_u64<'a>(d: Datum<'a>) -> Result<u64, String> {
242 let Datum::Numeric(n) = d else {
243 return Err("expected numeric value".into());
244 };
245
246 let mut cx = numeric::cx_datum();
247 cx.try_into_u64(n.0)
248 .map_err(|_| format!("number out of u64 range: {n}"))
249}
250
251fn jsonb_datum_to_role_id(d: Datum) -> Result<RoleId, String> {
253 match d {
254 Datum::String("Public") => Ok(RoleId::Public),
255 Datum::String(other) => Err(format!("unexpected role ID variant: {other}")),
256 Datum::Map(dict) => {
257 let (key, val) = dict.iter().next().ok_or_else(|| "empty".to_string())?;
258 let n = jsonb_datum_to_u64(val)?;
259 match key {
260 "User" => Ok(RoleId::User(n)),
261 "System" => Ok(RoleId::System(n)),
262 "Predefined" => Ok(RoleId::Predefined(n)),
263 other => Err(format!("unexpected role ID variant: {other}")),
264 }
265 }
266 _ => Err("expected string or object".into()),
267 }
268}
269
270#[sqlfunc]
274fn parse_catalog_id<'a>(a: JsonbRef<'a>) -> Result<String, EvalError> {
275 let parse = || match a.into_datum() {
276 Datum::String(variant) => match variant {
278 "Explain" => Ok("e".to_string()),
279 "Public" => Ok("p".to_string()),
280 other => Err(format!("unexpected ID variant: {other}")),
281 },
282 Datum::Map(dict) => {
284 let (key, val) = dict.iter().next().ok_or_else(|| "empty".to_string())?;
285 let prefix = match key {
286 "IntrospectionSourceIndex" => "si",
287 "Predefined" => "g",
288 "System" => "s",
289 "Transient" => "t",
290 "User" => "u",
291 other => return Err(format!("unexpected ID variant: {other}")),
292 };
293 let n = jsonb_datum_to_u64(val)?;
294 Ok(format!("{prefix}{n}"))
295 }
296 _ => Err("expected string or object".into()),
297 };
298
299 parse().map_err(|e| EvalError::InvalidCatalogJson(e.into()))
300}
301
302#[sqlfunc]
304fn parse_catalog_privileges<'a>(a: JsonbRef<'a>) -> Result<ArrayRustType<MzAclItem>, EvalError> {
305 let parse_one = |datum| match datum {
306 Datum::Map(dict) => {
307 let mut grantee = None;
308 let mut grantor = None;
309 let mut acl_mode = None;
310 for (key, val) in dict.iter() {
311 match key {
312 "grantee" => {
313 let id = jsonb_datum_to_role_id(val)?;
314 grantee = Some(id);
315 }
316 "grantor" => {
317 let id = jsonb_datum_to_role_id(val)?;
318 grantor = Some(id);
319 }
320 "acl_mode" => {
321 let Datum::Map(mode_dict) = val else {
322 return Err(format!("unexpected acl_mode: {val}"));
323 };
324 let (key, val) = mode_dict.iter().next().ok_or("empty acl_mode")?;
325 if key != "bitflags" {
326 return Err(format!("unexpected acl_mode field: {key}"));
327 }
328 let bits = jsonb_datum_to_u64(val)?;
329 let Some(mode) = AclMode::from_bits(bits) else {
330 return Err(format!("invalid acl_mode bitflags: {bits}"));
331 };
332 acl_mode = Some(mode);
333 }
334 other => return Err(format!("unexpected privilege field: {other}")),
335 }
336 }
337 Ok(MzAclItem {
338 grantee: grantee.ok_or_else(|| format!("missing grantee: {dict:?}"))?,
339 grantor: grantor.ok_or_else(|| "missing grantor in privilege".to_string())?,
340 acl_mode: acl_mode.ok_or_else(|| "missing acl_mode in privilege".to_string())?,
341 })
342 }
343 other => Err(format!("expected object in array, found: {other}")),
344 };
345
346 let parse = || match a.into_datum() {
347 Datum::List(list) => {
348 let mut result = Vec::new();
349 for item in list.iter() {
350 result.push(parse_one(item)?);
351 }
352 Ok(result)
353 }
354 _ => Err("expected array".to_string()),
355 };
356
357 parse()
358 .map(ArrayRustType)
359 .map_err(|e| EvalError::InvalidCatalogJson(e.into()))
360}
361
362#[sqlfunc]
370fn parse_catalog_create_sql<'a>(a: &'a str) -> Result<Jsonb, EvalError> {
371 fn get_cluster_id(in_cluster: RawClusterName) -> Result<String, &'static str> {
372 match in_cluster {
373 RawClusterName::Resolved(s) => Ok(s),
374 RawClusterName::Unresolved(_) => Err("unresolved cluster name"),
375 }
376 }
377
378 fn get_item_id(item: RawItemName) -> Result<String, &'static str> {
379 match item {
380 RawItemName::Id(id, _, _) => Ok(id),
381 RawItemName::Name(_) => Err("unresolved item name"),
382 }
383 }
384
385 fn format_name<T: AstInfo>(fmt: &Format<T>) -> &'static str {
386 match fmt {
387 Format::Bytes => "bytes",
388 Format::Avro(_) => "avro",
389 Format::Protobuf(_) => "protobuf",
390 Format::Regex(_) => "regex",
391 Format::Csv { .. } => "csv",
392 Format::Json { .. } => "json",
393 Format::Text => "text",
394 }
395 }
396
397 let parse = || -> Result<serde_json::Value, String> {
398 let mut stmts = mz_sql_parser::parser::parse_statements(a)
399 .map_err(|e| format!("failed to parse create_sql: {e}"))?;
400 let stmt = match stmts.len() {
401 1 => stmts.remove(0).ast,
402 n => return Err(format!("expected a single statement, found {n}")),
403 };
404
405 let mut info = BTreeMap::<&str, serde_json::Value>::new();
406
407 use mz_sql_parser::ast::Statement::*;
408 let item_type = match stmt {
409 CreateSecret(_) => "secret",
410 CreateConnection(stmt) => {
411 let connection_type = stmt.connection_type.as_str();
412 info.insert("connection_type", json!(connection_type));
413
414 "connection"
415 }
416 CreateView(_) => "view",
417 CreateMaterializedView(stmt) => {
418 let Some(in_cluster) = stmt.in_cluster else {
419 return Err("missing IN CLUSTER".into());
420 };
421 let cluster_id = match in_cluster {
422 RawClusterName::Unresolved(ident) => ident.into_string(),
423 RawClusterName::Resolved(s) => s,
424 };
425 info.insert("cluster_id", json!(cluster_id));
426
427 let mut definition = stmt.query.to_ast_string_stable();
428 definition.push(';');
429 info.insert("definition", json!(definition));
430
431 "materialized-view"
432 }
433 CreateTable(_) | CreateTableFromSource(_) => "table",
434 CreateSource(stmt) => {
435 let Some(in_cluster) = stmt.in_cluster else {
436 return Err("missing IN CLUSTER".into());
437 };
438 let cluster_id = get_cluster_id(in_cluster)?;
439 info.insert("cluster_id", json!(cluster_id));
440
441 use mz_sql_parser::ast::CreateSourceConnection::*;
442 let (source_type, connection) = match stmt.connection {
443 Kafka { connection, .. } => ("kafka", Some(connection)),
444 Postgres { connection, .. } => ("postgres", Some(connection)),
445 MySql { connection, .. } => ("mysql", Some(connection)),
446 SqlServer { connection, .. } => ("sql-server", Some(connection)),
447 LoadGenerator { .. } => ("load-generator", None),
448 };
449 info.insert("source_type", json!(source_type));
450 if let Some(conn) = connection {
451 let conn_id = get_item_id(conn)?;
452 info.insert("connection_id", json!(conn_id));
453 }
454
455 let is_debezium = matches!(
456 stmt.envelope,
457 Some(mz_sql_parser::ast::SourceEnvelope::Debezium)
458 );
459
460 if let Some(envelope) = stmt.envelope {
461 use mz_sql_parser::ast::SourceEnvelope::*;
462 let envelope_type = match envelope {
463 None => "none",
464 Debezium => "debezium",
465 Upsert { .. } => "upsert",
466 CdcV2 => "materialize",
467 };
468 info.insert("envelope_type", json!(envelope_type));
469 }
470
471 if let Some(format_spec) = stmt.format {
472 match &format_spec {
473 FormatSpecifier::Bare(fmt) => {
474 if is_debezium {
477 info.insert("key_format", json!(format_name(fmt)));
478 }
479 info.insert("value_format", json!(format_name(fmt)));
480 }
481 FormatSpecifier::KeyValue { key, value } => {
482 info.insert("key_format", json!(format_name(key)));
483 info.insert("value_format", json!(format_name(value)));
484 }
485 }
486 }
487
488 "source"
489 }
490 CreateWebhookSource(stmt) => {
491 if stmt.is_table {
492 "table"
493 } else {
494 info.insert("source_type", json!("webhook"));
495 if let Some(in_cluster) = stmt.in_cluster {
496 let cluster_id = get_cluster_id(in_cluster)?;
497 info.insert("cluster_id", json!(cluster_id));
498 }
499 "source"
500 }
501 }
502 CreateSubsource(stmt) => {
503 use mz_sql_parser::ast::CreateSubsourceOptionName;
504 let is_progress = stmt
505 .with_options
506 .iter()
507 .any(|o| matches!(o.name, CreateSubsourceOptionName::Progress));
508 let source_type = if is_progress { "progress" } else { "subsource" };
509 info.insert("source_type", json!(source_type));
510
511 if let Some(of_source) = stmt.of_source {
512 let of_source_id = get_item_id(of_source)?;
513 info.insert("of_source_id", json!(of_source_id));
514 }
515
516 "subsource"
517 }
518 CreateSink(_) => "sink",
519 CreateIndex(stmt) => {
520 let Some(in_cluster) = stmt.in_cluster else {
521 return Err("missing IN CLUSTER".into());
522 };
523 let cluster_id = get_cluster_id(in_cluster)?;
524 info.insert("cluster_id", json!(cluster_id));
525 let on_id = get_item_id(stmt.on_name)?;
526 info.insert("on_id", json!(on_id));
527 "index"
528 }
529 CreateType(_) => "type",
530 _ => return Err("not a CREATE item statement".into()),
531 };
532 info.insert("type", json!(item_type));
533
534 let info = info.into_iter().map(|(k, v)| (k.to_string(), v)).collect();
535 Ok(info)
536 };
537
538 let val = parse().map_err(|e| EvalError::InvalidCatalogJson(e.into()))?;
539 let jsonb = Jsonb::from_serde_json(val).expect("valid JSONB");
540 Ok(jsonb)
541}