1use mz_ore::collections::CollectionExt;
20use proc_macro2::LineColumn;
21use syn::Error;
22use syn::parse::{Parse, ParseStream, Parser};
23use syn::spanned::Spanned;
24
25use super::TestCatalog;
26
27use self::util::*;
28
29pub fn try_parse_mir(catalog: &TestCatalog, s: &str) -> Result<mz_expr::MirRelationExpr, String> {
31 let parser = move |input: ParseStream| {
34 let ctx = Ctx { catalog };
35 relation::parse_expr(&ctx, input)
36 };
37 let s = s.replace("// {", ":: {");
40 let mut expr = parser.parse_str(&s).map_err(|err| {
42 let (line, column) = (err.span().start().line, err.span().start().column);
43 format!("parse error at {line}:{column}:\n{err}\n")
44 })?;
45 relation::fix_types(&mut expr, &mut relation::FixTypesCtx::default())?;
48 Ok(expr)
50}
51
52pub fn try_parse_def(catalog: &TestCatalog, s: &str) -> Result<Def, String> {
54 let parser = move |input: ParseStream| {
57 let ctx = Ctx { catalog };
58 def::parse_def(&ctx, input)
59 };
60 let def = parser.parse_str(s).map_err(|err| {
62 let (line, column) = (err.span().start().line, err.span().start().column);
63 format!("parse error at {line}:{column}:\n{err}\n")
64 })?;
65 Ok(def)
67}
68
69mod relation {
71 use std::collections::BTreeMap;
72
73 use mz_expr::{AccessStrategy, Id, JoinImplementation, LocalId, MirRelationExpr};
74 use mz_repr::{Diff, ReprRelationType, Row, SqlScalarType};
75
76 use crate::parser::analyses::Analyses;
77
78 use super::*;
79
80 type Result = syn::Result<MirRelationExpr>;
81
82 pub fn parse_expr(ctx: CtxRef, input: ParseStream) -> Result {
83 let lookahead = input.lookahead1();
84 if lookahead.peek(kw::Constant) {
85 parse_constant(ctx, input)
86 } else if lookahead.peek(kw::Get) {
87 parse_get(ctx, input)
88 } else if lookahead.peek(kw::Return) {
89 parse_let_or_letrec_old(ctx, input)
90 } else if lookahead.peek(kw::With) {
91 parse_let_or_letrec(ctx, input)
92 } else if lookahead.peek(kw::Project) {
93 parse_project(ctx, input)
94 } else if lookahead.peek(kw::Map) {
95 parse_map(ctx, input)
96 } else if lookahead.peek(kw::FlatMap) {
97 parse_flat_map(ctx, input)
98 } else if lookahead.peek(kw::Filter) {
99 parse_filter(ctx, input)
100 } else if lookahead.peek(kw::CrossJoin) {
101 parse_cross_join(ctx, input)
102 } else if lookahead.peek(kw::Join) {
103 parse_join(ctx, input)
104 } else if lookahead.peek(kw::Distinct) {
105 parse_distinct(ctx, input)
106 } else if lookahead.peek(kw::Reduce) {
107 parse_reduce(ctx, input)
108 } else if lookahead.peek(kw::TopK) {
109 parse_top_k(ctx, input)
110 } else if lookahead.peek(kw::Negate) {
111 parse_negate(ctx, input)
112 } else if lookahead.peek(kw::Threshold) {
113 parse_threshold(ctx, input)
114 } else if lookahead.peek(kw::Union) {
115 parse_union(ctx, input)
116 } else if lookahead.peek(kw::ArrangeBy) {
117 parse_arrange_by(ctx, input)
118 } else {
119 Err(lookahead.error())
120 }
121 }
122
123 fn parse_constant(ctx: CtxRef, input: ParseStream) -> Result {
124 let constant = input.parse::<kw::Constant>()?;
125
126 let parse_typ = |input: ParseStream| -> syn::Result<ReprRelationType> {
127 let analyses = analyses::parse_analyses(input)?;
128 let Some(column_types) = analyses.types else {
129 let msg = "Missing expected `types` analyses for Constant line";
130 Err(Error::new(input.span(), msg))?
131 };
132 let keys = analyses.keys.unwrap_or_default();
133 Ok(ReprRelationType::new(column_types).with_keys(keys))
134 };
135 if input.eat3(syn::Token![<], kw::empty, syn::Token![>]) {
136 let typ = parse_typ(input)?;
137 Ok(MirRelationExpr::Constant {
138 rows: Ok(vec![]),
139 typ,
140 })
141 } else {
142 let typ = parse_typ(input)?;
143 let parse_children = ParseChildren::new(input, constant.span().start());
144 let rows = Ok(parse_children.parse_many(ctx, parse_constant_entry)?);
145 Ok(MirRelationExpr::Constant { rows, typ })
146 }
147 }
148
149 fn parse_constant_entry(_ctx: CtxRef, input: ParseStream) -> syn::Result<(Row, Diff)> {
150 input.parse::<syn::Token![-]>()?;
151
152 let (row, diff);
153
154 let inner1;
155 syn::parenthesized!(inner1 in input);
156
157 if inner1.peek(syn::token::Paren) {
158 let inner2;
159 syn::parenthesized!(inner2 in inner1);
160 row = inner2.parse::<Parsed<Row>>()?.into();
161 inner1.parse::<kw::x>()?;
162 diff = match inner1.parse::<syn::Lit>()? {
163 syn::Lit::Int(l) => Ok(l.base10_parse::<Diff>()?),
164 _ => Err(Error::new(inner1.span(), "expected Diff literal")),
165 }?;
166 } else {
167 row = inner1.parse::<Parsed<Row>>()?.into();
168 diff = Diff::ONE;
169 }
170
171 Ok((row, diff))
172 }
173
174 fn parse_get(ctx: CtxRef, input: ParseStream) -> Result {
175 input.parse::<kw::Get>()?;
176
177 let ident = input.parse::<syn::Ident>()?;
178 match ctx.catalog.get(&ident.to_string()) {
179 Some((id, _cols, typ)) => Ok(MirRelationExpr::Get {
180 id: Id::Global(*id),
181 typ: ReprRelationType::from(typ),
182 access_strategy: AccessStrategy::UnknownOrLocal,
183 }),
184 None => Ok(MirRelationExpr::Get {
185 id: Id::Local(parse_local_id(ident)?),
186 typ: ReprRelationType::empty(),
187 access_strategy: AccessStrategy::UnknownOrLocal,
188 }),
189 }
190 }
191
192 fn parse_let_or_letrec_old(ctx: CtxRef, input: ParseStream) -> Result {
194 let return_ = input.parse::<kw::Return>()?;
195 let parse_body = ParseChildren::new(input, return_.span().start());
196 let body = parse_body.parse_one(ctx, parse_expr)?;
197
198 let with = input.parse::<kw::With>()?;
199 let recursive = input.eat2(kw::Mutually, kw::Recursive);
200 let parse_ctes = ParseChildren::new(input, with.span().start());
201 let mut ctes = parse_ctes.parse_many(ctx, parse_cte)?;
202
203 if ctes.is_empty() {
204 let msg = "At least one Let/LetRec cte binding expected";
205 Err(Error::new(input.span(), msg))?
206 }
207
208 ctes.reverse();
209 let cte_ids = ctes.iter().map(|(id, _, _)| id);
210 if !cte_ids.clone().is_sorted() {
211 let msg = format!(
212 "Error parsing Let/LetRec: seen Return before With, but cte ids are not ordered descending: {:?}",
213 cte_ids.collect::<Vec<_>>()
214 );
215 Err(Error::new(input.span(), msg))?
216 }
217 build_let_or_let_rec(ctes, body, recursive, with)
218 }
219
220 fn parse_let_or_letrec(ctx: CtxRef, input: ParseStream) -> Result {
222 let with = input.parse::<kw::With>()?;
223 let recursive = input.eat2(kw::Mutually, kw::Recursive);
224 let parse_ctes = ParseChildren::new(input, with.span().start());
225 let ctes = parse_ctes.parse_many(ctx, parse_cte)?;
226
227 let return_ = input.parse::<kw::Return>()?;
228 let parse_body = ParseChildren::new(input, return_.span().start());
229 let body = parse_body.parse_one(ctx, parse_expr)?;
230
231 if ctes.is_empty() {
232 let msg = "At least one `let cte` binding expected";
233 Err(Error::new(input.span(), msg))?
234 }
235
236 let cte_ids = ctes.iter().map(|(id, _, _)| id);
237 if !cte_ids.clone().is_sorted() {
238 let msg = format!(
239 "Error parsing Let/LetRec: seen With before Return, but cte ids are not ordered ascending: {:?}",
240 cte_ids.collect::<Vec<_>>()
241 );
242 Err(Error::new(input.span(), msg))?
243 }
244 build_let_or_let_rec(ctes, body, recursive, with)
245 }
246
247 fn build_let_or_let_rec(
248 ctes: Vec<(LocalId, Analyses, MirRelationExpr)>,
249 body: MirRelationExpr,
250 recursive: bool,
251 with: kw::With,
252 ) -> Result {
253 if recursive {
254 let (mut ids, mut values, mut limits) = (vec![], vec![], vec![]);
255 for (id, analyses, value) in ctes.into_iter() {
256 let typ = {
257 let Some(column_types) = analyses.types else {
258 let msg = format!("`let {}` needs a `types` analyses", id);
259 Err(Error::new(with.span(), msg))?
260 };
261 let keys = analyses.keys.unwrap_or_default();
262 ReprRelationType::new(column_types).with_keys(keys)
263 };
264 let value = {
267 let get_cte = MirRelationExpr::Get {
268 id: Id::Local(id),
269 typ,
270 access_strategy: AccessStrategy::UnknownOrLocal,
271 };
272 MirRelationExpr::Union {
274 base: Box::new(get_cte),
275 inputs: vec![value],
276 }
277 };
278
279 ids.push(id);
280 values.push(value);
281 limits.push(None); }
283
284 Ok(MirRelationExpr::LetRec {
285 ids,
286 values,
287 limits,
288 body: Box::new(body),
289 })
290 } else {
291 let mut body = body;
292 for (id, _, value) in ctes.into_iter().rev() {
293 body = MirRelationExpr::Let {
294 id,
295 value: Box::new(value),
296 body: Box::new(body),
297 };
298 }
299 Ok(body)
300 }
301 }
302
303 fn parse_cte(
304 ctx: CtxRef,
305 input: ParseStream,
306 ) -> syn::Result<(LocalId, analyses::Analyses, MirRelationExpr)> {
307 let cte = input.parse::<kw::cte>()?;
308
309 let ident = input.parse::<syn::Ident>()?;
310 let id = parse_local_id(ident)?;
311
312 input.parse::<syn::Token![=]>()?;
313
314 let analyses = analyses::parse_analyses(input)?;
315
316 let parse_value = ParseChildren::new(input, cte.span().start());
317 let value = parse_value.parse_one(ctx, parse_expr)?;
318
319 Ok((id, analyses, value))
320 }
321
322 fn parse_project(ctx: CtxRef, input: ParseStream) -> Result {
323 let project = input.parse::<kw::Project>()?;
324
325 let content;
326 syn::parenthesized!(content in input);
327 let outputs = content.parse_comma_sep(scalar::parse_column_index)?;
328 let parse_input = ParseChildren::new(input, project.span().start());
329 let input = Box::new(parse_input.parse_one(ctx, parse_expr)?);
330
331 Ok(MirRelationExpr::Project { input, outputs })
332 }
333
334 fn parse_map(ctx: CtxRef, input: ParseStream) -> Result {
335 let map = input.parse::<kw::Map>()?;
336
337 let scalars = {
338 let inner;
339 syn::parenthesized!(inner in input);
340 scalar::parse_exprs(&inner)?
341 };
342
343 let parse_input = ParseChildren::new(input, map.span().start());
344 let input = Box::new(parse_input.parse_one(ctx, parse_expr)?);
345
346 Ok(MirRelationExpr::Map { input, scalars })
347 }
348
349 fn parse_flat_map(ctx: CtxRef, input: ParseStream) -> Result {
350 use mz_expr::TableFunc::*;
351
352 let flat_map = input.parse::<kw::FlatMap>()?;
353
354 let ident = input.parse::<syn::Ident>()?;
355 let func = match ident.to_string().to_lowercase().as_str() {
356 "unnest_list" => UnnestList {
357 el_typ: SqlScalarType::Int64, },
359 "unnest_array" => UnnestArray {
360 el_typ: SqlScalarType::Int64, },
362 "wrap1" => Wrap {
363 types: vec![
364 SqlScalarType::Int64.nullable(true), ],
366 width: 1,
367 },
368 "wrap2" => Wrap {
369 types: vec![
370 SqlScalarType::Int64.nullable(true), SqlScalarType::Int64.nullable(true), ],
373 width: 2,
374 },
375 "wrap3" => Wrap {
376 types: vec![
377 SqlScalarType::Int64.nullable(true), SqlScalarType::Int64.nullable(true), SqlScalarType::Int64.nullable(true), ],
381 width: 3,
382 },
383 "generate_series" => GenerateSeriesInt64,
384 "generate_series_i32" => GenerateSeriesInt32,
387 "jsonb_object_keys" => JsonbObjectKeys,
388 _ => Err(Error::new(ident.span(), "unsupported function name"))?,
389 };
390
391 let exprs = {
392 let inner;
393 syn::parenthesized!(inner in input);
394 scalar::parse_exprs(&inner)?
395 };
396
397 let parse_input = ParseChildren::new(input, flat_map.span().start());
398 let input = Box::new(parse_input.parse_one(ctx, parse_expr)?);
399
400 Ok(MirRelationExpr::FlatMap { input, func, exprs })
401 }
402
403 fn parse_filter(ctx: CtxRef, input: ParseStream) -> Result {
404 use mz_expr::MirScalarExpr::CallVariadic;
405 use mz_expr::VariadicFunc::And;
406
407 let filter = input.parse::<kw::Filter>()?;
408
409 let predicates = match scalar::parse_expr(input)? {
410 CallVariadic {
411 func: And(_),
412 exprs,
413 } => exprs,
414 expr => vec![expr],
415 };
416
417 let parse_input = ParseChildren::new(input, filter.span().start());
418 let input = Box::new(parse_input.parse_one(ctx, parse_expr)?);
419
420 Ok(MirRelationExpr::Filter { input, predicates })
421 }
422
423 fn parse_cross_join(ctx: CtxRef, input: ParseStream) -> Result {
424 let join = input.parse::<kw::CrossJoin>()?;
425
426 let parse_inputs = ParseChildren::new(input, join.span().start());
427 let inputs = parse_inputs.parse_many(ctx, parse_expr)?;
428
429 Ok(MirRelationExpr::Join {
430 inputs,
431 equivalences: vec![],
432 implementation: JoinImplementation::Unimplemented,
433 })
434 }
435
436 fn parse_join(ctx: CtxRef, input: ParseStream) -> Result {
437 let join = input.parse::<kw::Join>()?;
438
439 input.parse::<kw::on>()?;
440 input.parse::<syn::Token![=]>()?;
441 let inner;
442 syn::parenthesized!(inner in input);
443 let equivalences = scalar::parse_join_equivalences(&inner)?;
444
445 let parse_inputs = ParseChildren::new(input, join.span().start());
446 let inputs = parse_inputs.parse_many(ctx, parse_expr)?;
447
448 Ok(MirRelationExpr::Join {
449 inputs,
450 equivalences,
451 implementation: JoinImplementation::Unimplemented,
452 })
453 }
454
455 fn parse_distinct(ctx: CtxRef, input: ParseStream) -> Result {
456 let reduce = input.parse::<kw::Distinct>()?;
457
458 let group_key = if input.eat(kw::project) {
459 input.parse::<syn::Token![=]>()?;
460 let inner;
461 syn::bracketed!(inner in input);
462 inner.parse_comma_sep(scalar::parse_expr)?
463 } else {
464 vec![]
465 };
466
467 let monotonic = input.eat(kw::monotonic);
468
469 let expected_group_size = if input.eat(kw::exp_group_size) {
470 input.parse::<syn::Token![=]>()?;
471 Some(input.parse::<syn::LitInt>()?.base10_parse::<u64>()?)
472 } else {
473 None
474 };
475
476 let parse_inputs = ParseChildren::new(input, reduce.span().start());
477 let input = Box::new(parse_inputs.parse_one(ctx, parse_expr)?);
478
479 Ok(MirRelationExpr::Reduce {
480 input,
481 group_key,
482 aggregates: vec![],
483 monotonic,
484 expected_group_size,
485 })
486 }
487
488 fn parse_reduce(ctx: CtxRef, input: ParseStream) -> Result {
489 let reduce = input.parse::<kw::Reduce>()?;
490
491 let group_key = if input.eat(kw::group_by) {
492 input.parse::<syn::Token![=]>()?;
493 let inner;
494 syn::bracketed!(inner in input);
495 inner.parse_comma_sep(scalar::parse_expr)?
496 } else {
497 vec![]
498 };
499
500 let aggregates = {
501 input.parse::<kw::aggregates>()?;
502 input.parse::<syn::Token![=]>()?;
503 let inner;
504 syn::bracketed!(inner in input);
505 inner.parse_comma_sep(aggregate::parse_expr)?
506 };
507
508 let monotonic = input.eat(kw::monotonic);
509
510 let expected_group_size = if input.eat(kw::exp_group_size) {
511 input.parse::<syn::Token![=]>()?;
512 Some(input.parse::<syn::LitInt>()?.base10_parse::<u64>()?)
513 } else {
514 None
515 };
516
517 let parse_inputs = ParseChildren::new(input, reduce.span().start());
518 let input = Box::new(parse_inputs.parse_one(ctx, parse_expr)?);
519
520 Ok(MirRelationExpr::Reduce {
521 input,
522 group_key,
523 aggregates,
524 monotonic,
525 expected_group_size,
526 })
527 }
528
529 fn parse_top_k(ctx: CtxRef, input: ParseStream) -> Result {
530 let top_k = input.parse::<kw::TopK>()?;
531
532 let group_key = if input.eat(kw::group_by) {
533 input.parse::<syn::Token![=]>()?;
534 let inner;
535 syn::bracketed!(inner in input);
536 inner.parse_comma_sep(scalar::parse_column_index)?
537 } else {
538 vec![]
539 };
540
541 let order_key = if input.eat(kw::order_by) {
542 input.parse::<syn::Token![=]>()?;
543 let inner;
544 syn::bracketed!(inner in input);
545 inner.parse_comma_sep(scalar::parse_column_order)?
546 } else {
547 vec![]
548 };
549
550 let limit = if input.eat(kw::limit) {
551 input.parse::<syn::Token![=]>()?;
552 Some(scalar::parse_expr(input)?)
553 } else {
554 None
555 };
556
557 let offset = if input.eat(kw::offset) {
558 input.parse::<syn::Token![=]>()?;
559 input.parse::<syn::LitInt>()?.base10_parse::<usize>()?
560 } else {
561 0
562 };
563
564 let monotonic = input.eat(kw::monotonic);
565
566 let expected_group_size = if input.eat(kw::exp_group_size) {
567 input.parse::<syn::Token![=]>()?;
568 Some(input.parse::<syn::LitInt>()?.base10_parse::<u64>()?)
569 } else {
570 None
571 };
572
573 let parse_inputs = ParseChildren::new(input, top_k.span().start());
574 let input = Box::new(parse_inputs.parse_one(ctx, parse_expr)?);
575
576 Ok(MirRelationExpr::TopK {
577 input,
578 group_key,
579 order_key,
580 limit,
581 offset,
582 monotonic,
583 expected_group_size,
584 })
585 }
586
587 fn parse_negate(ctx: CtxRef, input: ParseStream) -> Result {
588 let negate = input.parse::<kw::Negate>()?;
589
590 let parse_input = ParseChildren::new(input, negate.span().start());
591 let input = Box::new(parse_input.parse_one(ctx, parse_expr)?);
592
593 Ok(MirRelationExpr::Negate { input })
594 }
595
596 fn parse_threshold(ctx: CtxRef, input: ParseStream) -> Result {
597 let threshold = input.parse::<kw::Threshold>()?;
598
599 let parse_input = ParseChildren::new(input, threshold.span().start());
600 let input = Box::new(parse_input.parse_one(ctx, parse_expr)?);
601
602 Ok(MirRelationExpr::Threshold { input })
603 }
604
605 fn parse_union(ctx: CtxRef, input: ParseStream) -> Result {
606 let union = input.parse::<kw::Union>()?;
607
608 let parse_inputs = ParseChildren::new(input, union.span().start());
609 let mut children = parse_inputs.parse_many(ctx, parse_expr)?;
610 let inputs = children.split_off(1);
611 let base = Box::new(children.into_element());
612
613 Ok(MirRelationExpr::Union { base, inputs })
614 }
615
616 fn parse_arrange_by(ctx: CtxRef, input: ParseStream) -> Result {
617 let arrange_by = input.parse::<kw::ArrangeBy>()?;
618
619 let keys = {
620 input.parse::<kw::keys>()?;
621 input.parse::<syn::Token![=]>()?;
622 let inner;
623 syn::bracketed!(inner in input);
624 inner.parse_comma_sep(|input| {
625 let inner;
626 syn::bracketed!(inner in input);
627 scalar::parse_exprs(&inner)
628 })?
629 };
630
631 let parse_input = ParseChildren::new(input, arrange_by.span().start());
632 let input = Box::new(parse_input.parse_one(ctx, parse_expr)?);
633
634 Ok(MirRelationExpr::ArrangeBy { input, keys })
635 }
636
637 fn parse_local_id(ident: syn::Ident) -> syn::Result<LocalId> {
638 if ident.to_string().starts_with('l') {
639 let n = ident.to_string()[1..]
640 .parse::<u64>()
641 .map_err(|err| Error::new(ident.span(), err.to_string()))?;
642 Ok(mz_expr::LocalId::new(n))
643 } else {
644 Err(Error::new(ident.span(), "invalid LocalId"))
645 }
646 }
647
648 #[derive(Default)]
649 pub struct FixTypesCtx {
650 env: BTreeMap<LocalId, ReprRelationType>,
651 typ: Vec<ReprRelationType>,
652 }
653
654 pub fn fix_types(
655 expr: &mut MirRelationExpr,
656 ctx: &mut FixTypesCtx,
657 ) -> std::result::Result<(), String> {
658 match expr {
659 MirRelationExpr::Let { id, value, body } => {
660 fix_types(value, ctx)?;
661 let value_typ = ctx.typ.pop().expect("value type");
662 let prior_typ = ctx.env.insert(id.clone(), value_typ);
663 fix_types(body, ctx)?;
664 ctx.env.remove(id);
665 if let Some(prior_typ) = prior_typ {
666 ctx.env.insert(id.clone(), prior_typ);
667 }
668 }
669 MirRelationExpr::LetRec {
670 ids,
671 values,
672 body,
673 limits: _,
674 } => {
675 let mut prior_typs = BTreeMap::default();
678 for (id, value) in std::iter::zip(ids.iter_mut(), values.iter_mut()) {
679 let MirRelationExpr::Union { base, mut inputs } = value.take_dangerous() else {
680 unreachable!("ensured by construction");
681 };
682 let MirRelationExpr::Get { id: _, typ, .. } = *base else {
683 unreachable!("ensured by construction");
684 };
685 if let Some(prior_typ) = ctx.env.insert(id.clone(), typ) {
686 prior_typs.insert(id.clone(), prior_typ);
687 }
688 *value = inputs.pop().expect("ensured by construction");
689 }
690 for value in values.iter_mut() {
691 fix_types(value, ctx)?;
692 }
693 fix_types(body, ctx)?;
694 for id in ids.iter() {
695 ctx.env.remove(id);
696 if let Some(prior_typ) = prior_typs.remove(id) {
697 ctx.env.insert(id.clone(), prior_typ);
698 }
699 }
700 }
701 MirRelationExpr::Get {
702 id: Id::Local(id),
703 typ,
704 ..
705 } => {
706 let env_typ = match ctx.env.get(&*id) {
707 Some(env_typ) => env_typ,
708 None => Err(format!("Cannot fix type of unbound CTE {}", id))?,
709 };
710 *typ = env_typ.clone();
711 ctx.typ.push(env_typ.clone());
712 }
713 _ => {
714 for input in expr.children_mut() {
715 fix_types(input, ctx)?;
716 }
717 let input_types = ctx.typ.split_off(ctx.typ.len() - expr.num_inputs());
718 ctx.typ.push(expr.typ_with_input_types(&input_types));
719 }
720 };
721
722 Ok(())
723 }
724}
725
726mod scalar {
728 use mz_expr::{
729 BinaryFunc, ColumnOrder, MirScalarExpr, UnaryFunc, UnmaterializableFunc, VariadicFunc, func,
730 };
731 use mz_repr::{
732 AsColumnType, Datum, ReprColumnType, ReprScalarType, Row, RowArena, SqlScalarType,
733 };
734
735 use super::*;
736
737 type Result = syn::Result<MirScalarExpr>;
738
739 pub fn parse_exprs(input: ParseStream) -> syn::Result<Vec<MirScalarExpr>> {
740 input.parse_comma_sep(parse_expr)
741 }
742
743 pub fn parse_expr(input: ParseStream) -> Result {
759 let line = input.span().start().line;
760
761 #[derive(Debug)]
763 enum Op {
764 Unr(mz_expr::UnaryFunc), Neg(mz_expr::UnaryFunc), Bin(mz_expr::BinaryFunc),
767 Var(mz_expr::VariadicFunc),
768 }
769
770 impl Op {
771 fn precedence(&self) -> Option<usize> {
772 match self {
773 Op::Var(mz_expr::VariadicFunc::Or(_)) => Some(1),
775 Op::Var(mz_expr::VariadicFunc::And(_)) => Some(2),
777 Op::Bin(mz_expr::BinaryFunc::Eq(_)) => Some(4),
779 Op::Bin(mz_expr::BinaryFunc::NotEq(_)) => Some(4),
780 Op::Bin(mz_expr::BinaryFunc::Gt(_)) => Some(5),
782 Op::Bin(mz_expr::BinaryFunc::Gte(_)) => Some(5),
783 Op::Bin(mz_expr::BinaryFunc::Lt(_)) => Some(5),
784 Op::Bin(mz_expr::BinaryFunc::Lte(_)) => Some(5),
785 Op::Unr(mz_expr::UnaryFunc::IsNull(_)) => Some(13),
787 Op::Neg(mz_expr::UnaryFunc::IsNull(_)) => Some(13),
788 Op::Unr(mz_expr::UnaryFunc::IsTrue(_)) => Some(13),
789 Op::Neg(mz_expr::UnaryFunc::IsTrue(_)) => Some(13),
790 Op::Unr(mz_expr::UnaryFunc::IsFalse(_)) => Some(13),
791 Op::Neg(mz_expr::UnaryFunc::IsFalse(_)) => Some(13),
792 Op::Bin(mz_expr::BinaryFunc::AddInt64(_)) => Some(14),
794 Op::Bin(mz_expr::BinaryFunc::MulInt64(_)) => Some(15),
796 Op::Bin(mz_expr::BinaryFunc::DivInt64(_)) => Some(15),
797 Op::Bin(mz_expr::BinaryFunc::ModInt64(_)) => Some(15),
798 _ => None,
800 }
801 }
802 }
803
804 #[derive(Debug)]
806 enum Entry {
807 Operand(MirScalarExpr),
808 Operator(Op),
809 }
810
811 let mut opstack = vec![];
812 let mut postfix = vec![];
813 let mut exp_opd = true; while !input.is_empty() && input.span().start().line == line {
817 if exp_opd {
819 postfix.push(Entry::Operand(parse_operand(input)?));
820 exp_opd = false;
821 } else {
822 let op = if input.eat(syn::Token![=]) {
825 exp_opd = true;
826 Op::Bin(func::Eq.into())
827 } else if input.eat(syn::Token![!=]) {
828 exp_opd = true;
829 Op::Bin(func::NotEq.into())
830 } else if input.eat(syn::Token![>=]) {
831 exp_opd = true;
832 Op::Bin(func::Gte.into())
833 } else if input.eat(syn::Token![>]) {
834 exp_opd = true;
835 Op::Bin(func::Gt.into())
836 } else if input.eat(syn::Token![<=]) {
837 exp_opd = true;
838 Op::Bin(func::Lte.into())
839 } else if input.eat(syn::Token![<]) {
840 exp_opd = true;
841 Op::Bin(func::Lt.into())
842 } else if input.eat(syn::Token![+]) {
843 exp_opd = true;
844 Op::Bin(func::AddInt64.into()) } else if input.eat(syn::Token![*]) {
846 exp_opd = true;
847 Op::Bin(func::MulInt64.into()) } else if input.eat(syn::Token![/]) {
849 exp_opd = true;
850 Op::Bin(func::DivInt64.into()) } else if input.eat(syn::Token![%]) {
852 exp_opd = true;
853 Op::Bin(func::ModInt64.into()) } else if input.eat(kw::AND) {
855 exp_opd = true;
856 Op::Var(VariadicFunc::And(func::variadic::And))
857 } else if input.eat(kw::OR) {
858 exp_opd = true;
859 Op::Var(VariadicFunc::Or(func::variadic::Or))
860 } else if input.eat(kw::coalesce) {
861 exp_opd = true;
862 Op::Var(VariadicFunc::Coalesce(func::variadic::Coalesce))
863 } else if input.eat(kw::IS) {
864 let negate = input.eat(kw::NOT);
865
866 let lookahead = input.lookahead1();
867 let func = if input.look_and_eat(kw::NULL, &lookahead) {
868 mz_expr::func::IsNull.into()
869 } else if input.look_and_eat(kw::TRUE, &lookahead) {
870 mz_expr::func::IsTrue.into()
871 } else if input.look_and_eat(kw::FALSE, &lookahead) {
872 mz_expr::func::IsFalse.into()
873 } else {
874 Err(lookahead.error())?
875 };
876
877 if negate { Op::Neg(func) } else { Op::Unr(func) }
878 } else {
879 break;
883 };
884
885 while opstack
889 .last()
890 .map(|op1: &Op| op1.precedence() >= op.precedence())
891 .unwrap_or(false)
892 {
893 let op1 = opstack.pop().expect("non-empty opstack");
894 postfix.push(Entry::Operator(op1));
895 }
896
897 opstack.push(op);
899 }
900 }
901
902 postfix.extend(opstack.into_iter().rev().map(Entry::Operator));
904
905 if postfix.is_empty() {
906 let msg = "Cannot parse an empty expression";
907 Err(Error::new(input.span(), msg))?
908 }
909
910 let mut stack = vec![];
912 postfix.reverse();
913 while let Some(entry) = postfix.pop() {
914 match entry {
915 Entry::Operand(expr) => {
916 stack.push(expr);
917 }
918 Entry::Operator(Op::Unr(func)) => {
919 let expr = Box::new(stack.pop().expect("non-empty stack"));
920 stack.push(MirScalarExpr::CallUnary { func, expr });
921 }
922 Entry::Operator(Op::Neg(func)) => {
923 let expr = Box::new(stack.pop().expect("non-empty stack"));
924 stack.push(MirScalarExpr::CallUnary { func, expr }.not());
925 }
926 Entry::Operator(Op::Bin(func)) => {
927 let expr2 = Box::new(stack.pop().expect("non-empty stack"));
928 let expr1 = Box::new(stack.pop().expect("non-empty stack"));
929 stack.push(MirScalarExpr::CallBinary { func, expr1, expr2 });
930 }
931 Entry::Operator(Op::Var(func)) => {
932 let expr2 = stack.pop().expect("non-empty stack");
933 let expr1 = stack.pop().expect("non-empty stack");
934 let mut exprs = vec![];
935 for expr in [expr1, expr2] {
936 match expr {
937 MirScalarExpr::CallVariadic { func: f, exprs: es } if f == func => {
938 exprs.extend(es);
939 }
940 expr => {
941 exprs.push(expr);
942 }
943 }
944 }
945 stack.push(MirScalarExpr::CallVariadic { func, exprs });
946 }
947 }
948 }
949
950 if stack.len() != 1 {
951 let msg = "Cannot fold postfix vector into a single MirScalarExpr";
952 Err(Error::new(input.span(), msg))?
953 }
954
955 Ok(stack.pop().unwrap())
956 }
957
958 pub fn parse_operand(input: ParseStream) -> Result {
959 let lookahead = input.lookahead1();
960 if lookahead.peek(syn::Token![#]) {
961 parse_column(input)
962 } else if lookahead.peek(syn::Lit) || lookahead.peek(kw::null) {
963 parse_literal_ok(input)
964 } else if lookahead.peek(kw::error) {
965 parse_literal_err(input)
966 } else if lookahead.peek(kw::array) {
967 parse_array(input)
968 } else if lookahead.peek(kw::list) {
969 parse_list(input)
970 } else if lookahead.peek(kw::case) {
971 parse_case(input)
972 } else if lookahead.peek(syn::Ident) {
973 parse_apply(input)
974 } else if lookahead.peek(syn::token::Brace) {
975 let inner;
976 syn::braced!(inner in input);
977 parse_literal_array(&inner)
978 } else if lookahead.peek(syn::token::Bracket) {
979 let inner;
980 syn::bracketed!(inner in input);
981 parse_literal_list(&inner)
982 } else if lookahead.peek(syn::token::Paren) {
983 let inner;
984 syn::parenthesized!(inner in input);
985 parse_expr(&inner)
986 } else {
987 Err(lookahead.error())
988 }
989 }
990
991 fn parse_case(input: ParseStream) -> Result {
993 input.parse::<kw::case>()?;
994 if input.peek(kw::when) {
995 input.parse::<kw::when>()?;
996 let cond = parse_expr(input)?;
997 input.parse::<kw::then>()?;
998 let then = parse_expr(input)?;
999 input.parse::<syn::Token![else]>()?;
1000 let els = parse_expr(input)?;
1001 input.parse::<kw::end>()?;
1002 Ok(MirScalarExpr::If {
1003 cond: Box::new(cond),
1004 then: Box::new(then),
1005 els: Box::new(els),
1006 })
1007 } else {
1008 Err(Error::new(input.span(), "expected 'when' after 'case'"))
1009 }
1010 }
1011
1012 pub fn parse_column(input: ParseStream) -> Result {
1013 Ok(MirScalarExpr::column(parse_column_index(input)?))
1014 }
1015
1016 pub fn parse_column_index(input: ParseStream) -> syn::Result<usize> {
1017 input.parse::<syn::Token![#]>()?;
1018 input.parse::<syn::LitInt>()?.base10_parse::<usize>()
1019 }
1020
1021 pub fn parse_column_order(input: ParseStream) -> syn::Result<ColumnOrder> {
1022 input.parse::<syn::Token![#]>()?;
1023 let column = input.parse::<syn::LitInt>()?.base10_parse::<usize>()?;
1024 let desc = input.eat(kw::desc) || !input.eat(kw::asc);
1025 let nulls_last = input.eat(kw::nulls_last) || !input.eat(kw::nulls_first);
1026 Ok(ColumnOrder {
1027 column,
1028 desc,
1029 nulls_last,
1030 })
1031 }
1032
1033 fn parse_literal_ok(input: ParseStream) -> Result {
1034 let mut row = Row::default();
1035 let mut packer = row.packer();
1036
1037 let typ = if input.eat(kw::null) {
1038 packer.push(Datum::Null);
1039 input.parse::<syn::Token![::]>()?;
1040 ReprColumnType {
1041 scalar_type: analyses::parse_scalar_type(input)?,
1042 nullable: true,
1043 }
1044 } else {
1045 match input.parse::<syn::Lit>()? {
1046 syn::Lit::Str(l) => {
1047 packer.push(Datum::from(l.value().as_str()));
1048 Ok(ReprColumnType::from(&String::as_column_type()))
1049 }
1050 syn::Lit::Int(l) => {
1051 packer.push(Datum::from(l.base10_parse::<i64>()?));
1052 Ok(ReprColumnType::from(&i64::as_column_type()))
1053 }
1054 syn::Lit::Float(l) => {
1055 packer.push(Datum::from(l.base10_parse::<f64>()?));
1056 Ok(ReprColumnType::from(&f64::as_column_type()))
1057 }
1058 syn::Lit::Bool(l) => {
1059 packer.push(Datum::from(l.value));
1060 Ok(ReprColumnType::from(&bool::as_column_type()))
1061 }
1062 _ => Err(Error::new(input.span(), "cannot parse literal")),
1063 }?
1064 };
1065
1066 Ok(MirScalarExpr::Literal(Ok(row), typ))
1067 }
1068 fn parse_literal_err(input: ParseStream) -> Result {
1069 input.parse::<kw::error>()?;
1070 let mut msg = {
1071 let content;
1072 syn::parenthesized!(content in input);
1073 content.parse::<syn::LitStr>()?.value()
1074 };
1075 let err = if msg.starts_with("internal error: ") {
1076 Ok(mz_expr::EvalError::Internal(msg.split_off(16).into()))
1077 } else {
1078 Err(Error::new(msg.span(), "expected `internal error: $msg`"))
1079 }?;
1080 Ok(MirScalarExpr::literal(Err(err), ReprScalarType::Bool))
1081 }
1082
1083 fn parse_literal_array(input: ParseStream) -> Result {
1084 use mz_expr::func::variadic::ArrayCreate;
1085
1086 let elem_type = SqlScalarType::Int64; let func = VariadicFunc::ArrayCreate(ArrayCreate { elem_type });
1088 let exprs = input.parse_comma_sep(parse_literal_ok)?;
1089
1090 let temp_storage = RowArena::default();
1092 let datum = func.eval(&[], &temp_storage, &exprs).expect("datum");
1093 let typ = ReprScalarType::from(&SqlScalarType::Array(Box::new(SqlScalarType::Int64))); Ok(MirScalarExpr::literal_ok(datum, typ))
1095 }
1096 fn parse_literal_list(input: ParseStream) -> Result {
1097 use mz_expr::func::variadic::ListCreate;
1098
1099 let elem_type = SqlScalarType::Int64; let func = VariadicFunc::ListCreate(ListCreate { elem_type });
1101 let exprs = input.parse_comma_sep(parse_literal_ok)?;
1102
1103 let temp_storage = RowArena::default();
1105 let datum = func.eval(&[], &temp_storage, &exprs).expect("datum");
1106 let typ = ReprScalarType::from(&SqlScalarType::Array(Box::new(SqlScalarType::Int64))); Ok(MirScalarExpr::literal_ok(datum, typ))
1108 }
1109 fn parse_array(input: ParseStream) -> Result {
1110 use mz_expr::func::variadic::ArrayCreate;
1111
1112 input.parse::<kw::array>()?;
1113
1114 let inner;
1116 syn::bracketed!(inner in input);
1117
1118 let elem_type = SqlScalarType::Int64; let func = ArrayCreate { elem_type };
1120 let exprs = inner.parse_comma_sep(parse_expr)?;
1121
1122 Ok(MirScalarExpr::call_variadic(func, exprs))
1123 }
1124
1125 fn parse_list(input: ParseStream) -> Result {
1126 use mz_expr::func::variadic::ListCreate;
1127
1128 input.parse::<kw::list>()?;
1129
1130 let inner;
1132 syn::bracketed!(inner in input);
1133
1134 let elem_type = SqlScalarType::Int64; let func = ListCreate { elem_type };
1136 let exprs = inner.parse_comma_sep(parse_expr)?;
1137
1138 Ok(MirScalarExpr::call_variadic(func, exprs))
1139 }
1140
1141 fn parse_apply(input: ParseStream) -> Result {
1142 let ident = input.parse::<syn::Ident>()?;
1143
1144 let inner;
1146 syn::parenthesized!(inner in input);
1147
1148 let parse_nullary = |func: UnmaterializableFunc| -> Result {
1149 Ok(MirScalarExpr::CallUnmaterializable(func))
1150 };
1151 let parse_unary = |func: UnaryFunc| -> Result {
1152 let expr = Box::new(parse_expr(&inner)?);
1153 Ok(MirScalarExpr::CallUnary { func, expr })
1154 };
1155 let parse_binary = |func: BinaryFunc| -> Result {
1156 let expr1 = Box::new(parse_expr(&inner)?);
1157 inner.parse::<syn::Token![,]>()?;
1158 let expr2 = Box::new(parse_expr(&inner)?);
1159 Ok(MirScalarExpr::CallBinary { func, expr1, expr2 })
1160 };
1161 let parse_variadic = |func: VariadicFunc| -> Result {
1162 let exprs = inner.parse_comma_sep(parse_expr)?;
1163 Ok(MirScalarExpr::call_variadic(func, exprs))
1164 };
1165
1166 match ident.to_string().to_lowercase().as_str() {
1172 "mz_environment_id" => parse_nullary(UnmaterializableFunc::MzEnvironmentId),
1174 "abs" => parse_unary(func::AbsInt64.into()),
1176 "not" => parse_unary(func::Not.into()),
1177 "ltrim" => parse_binary(func::TrimLeading.into()),
1179 "greatest" => parse_variadic(VariadicFunc::Greatest(func::variadic::Greatest)),
1181 "coalesce" => parse_variadic(VariadicFunc::Coalesce(func::variadic::Coalesce)),
1182 _ => Err(Error::new(ident.span(), "unsupported function name")),
1183 }
1184 }
1185
1186 pub fn parse_join_equivalences(input: ParseStream) -> syn::Result<Vec<Vec<MirScalarExpr>>> {
1187 let mut equivalences = vec![];
1188 while !input.is_empty() {
1189 let mut equivalence = vec![];
1190 loop {
1191 let mut worklist = vec![parse_operand(input)?];
1192 while let Some(operand) = worklist.pop() {
1193 if let MirScalarExpr::CallBinary {
1196 func: BinaryFunc::Eq(_),
1197 expr1,
1198 expr2,
1199 } = operand
1200 {
1201 worklist.push(*expr2);
1204 worklist.push(*expr1);
1205 } else {
1206 equivalence.push(operand);
1207 }
1208 }
1209 if !input.eat(syn::Token![=]) {
1210 break;
1211 }
1212 }
1213 equivalences.push(equivalence);
1214 input.eat(kw::AND);
1215 }
1216 Ok(equivalences)
1217 }
1218}
1219
1220mod aggregate {
1222 use mz_expr::{AggregateExpr, MirScalarExpr};
1223
1224 use super::*;
1225
1226 type Result = syn::Result<AggregateExpr>;
1227
1228 pub fn parse_expr(input: ParseStream) -> Result {
1229 use mz_expr::AggregateFunc::*;
1230
1231 let ident = input.parse::<syn::Ident>()?;
1235 let func = match ident.to_string().to_lowercase().as_str() {
1236 "count" => Count,
1237 "any" => Any,
1238 "all" => All,
1239 "max" => MaxInt64,
1240 "min" => MinInt64,
1241 "sum" => SumInt64,
1242 _ => Err(Error::new(ident.span(), "unsupported function name"))?,
1243 };
1244
1245 let inner;
1247 syn::parenthesized!(inner in input);
1248
1249 if func == Count && inner.eat(syn::Token![*]) {
1250 Ok(AggregateExpr {
1251 func,
1252 expr: MirScalarExpr::literal_true(),
1253 distinct: false, })
1255 } else {
1256 let distinct = inner.eat(kw::distinct);
1257 let expr = scalar::parse_expr(&inner)?;
1258 Ok(AggregateExpr {
1259 func,
1260 expr,
1261 distinct,
1262 })
1263 }
1264 }
1265}
1266
1267mod row {
1269 use mz_repr::{Datum, Row, RowPacker};
1270
1271 use super::*;
1272
1273 impl Parse for Parsed<Row> {
1274 fn parse(input: ParseStream) -> syn::Result<Self> {
1275 let mut row = Row::default();
1276 let mut packer = ParseRow::new(&mut row);
1277
1278 loop {
1279 if input.is_empty() {
1280 break;
1281 }
1282 packer.parse_datum(input)?;
1283 if input.is_empty() {
1284 break;
1285 }
1286 input.parse::<syn::Token![,]>()?;
1287 }
1288
1289 Ok(Parsed(row))
1290 }
1291 }
1292
1293 impl From<Parsed<Row>> for Row {
1294 fn from(parsed: Parsed<Row>) -> Self {
1295 parsed.0
1296 }
1297 }
1298
1299 struct ParseRow<'a>(RowPacker<'a>);
1300
1301 impl<'a> ParseRow<'a> {
1302 fn new(row: &'a mut Row) -> Self {
1303 Self(row.packer())
1304 }
1305
1306 fn parse_datum(&mut self, input: ParseStream) -> syn::Result<()> {
1307 if input.eat(kw::null) {
1308 self.0.push(Datum::Null)
1309 } else {
1310 match input.parse::<syn::Lit>()? {
1311 syn::Lit::Str(l) => self.0.push(Datum::from(l.value().as_str())),
1312 syn::Lit::Int(l) => self.0.push(Datum::from(l.base10_parse::<i64>()?)),
1313 syn::Lit::Float(l) => self.0.push(Datum::from(l.base10_parse::<f64>()?)),
1314 syn::Lit::Bool(l) => self.0.push(Datum::from(l.value)),
1315 _ => Err(Error::new(input.span(), "cannot parse literal"))?,
1316 }
1317 }
1318 Ok(())
1319 }
1320 }
1321}
1322
1323mod analyses {
1324 use mz_repr::{ReprColumnType, ReprScalarType};
1325
1326 use super::*;
1327
1328 #[derive(Default)]
1329 pub struct Analyses {
1330 pub types: Option<Vec<ReprColumnType>>,
1331 pub keys: Option<Vec<Vec<usize>>>,
1332 }
1333
1334 pub fn parse_analyses(input: ParseStream) -> syn::Result<Analyses> {
1335 let mut analyses = Analyses::default();
1336
1337 if input.eat(syn::Token![::]) {
1341 let inner;
1342 syn::braced!(inner in input);
1343
1344 let (start, end) = (inner.span().start(), inner.span().end());
1345 if start.line != end.line {
1346 let msg = "analyses should not span more than one line".to_string();
1347 Err(Error::new(inner.span(), msg))?
1348 }
1349
1350 while inner.peek(syn::Ident) {
1351 let ident = inner.parse::<syn::Ident>()?.to_string();
1352 match ident.as_str() {
1353 "types" => {
1354 inner.parse::<syn::Token![:]>()?;
1355 let value = inner.parse::<syn::LitStr>()?.value();
1356 analyses.types = Some(parse_types.parse_str(&value)?);
1357 }
1358 key => {
1360 let msg = format!("unexpected analysis type `{}`", key);
1361 Err(Error::new(inner.span(), msg))?;
1362 }
1363 }
1364 }
1365 }
1366 Ok(analyses)
1367 }
1368
1369 fn parse_types(input: ParseStream) -> syn::Result<Vec<ReprColumnType>> {
1370 let inner;
1371 syn::parenthesized!(inner in input);
1372 inner.parse_comma_sep(parse_column_type)
1373 }
1374
1375 pub fn parse_column_type(input: ParseStream) -> syn::Result<ReprColumnType> {
1376 let scalar_type = parse_scalar_type(input)?;
1377 Ok(ReprColumnType {
1378 scalar_type,
1379 nullable: input.eat(syn::Token![?]),
1380 })
1381 }
1382
1383 pub fn parse_scalar_type(input: ParseStream) -> syn::Result<ReprScalarType> {
1384 let lookahead = input.lookahead1();
1385
1386 let scalar_type = if input.look_and_eat(bigint, &lookahead) {
1387 ReprScalarType::Int64
1388 } else if input.look_and_eat(double, &lookahead) {
1389 input.parse::<precision>()?;
1390 ReprScalarType::Float64
1391 } else if input.look_and_eat(boolean, &lookahead) {
1392 ReprScalarType::Bool
1393 } else if input.look_and_eat(character, &lookahead) {
1394 input.parse::<varying>()?;
1395 ReprScalarType::String
1396 } else if input.look_and_eat(integer, &lookahead) {
1397 ReprScalarType::Int32
1398 } else if input.look_and_eat(smallint, &lookahead) {
1399 ReprScalarType::Int16
1400 } else if input.look_and_eat(text, &lookahead) {
1401 ReprScalarType::String
1402 } else {
1403 Err(lookahead.error())?
1404 };
1405
1406 Ok(scalar_type)
1407 }
1408
1409 syn::custom_keyword!(bigint);
1410 syn::custom_keyword!(boolean);
1411 syn::custom_keyword!(character);
1412 syn::custom_keyword!(double);
1413 syn::custom_keyword!(integer);
1414 syn::custom_keyword!(precision);
1415 syn::custom_keyword!(smallint);
1416 syn::custom_keyword!(text);
1417 syn::custom_keyword!(varying);
1418}
1419
1420pub enum Def {
1421 Source {
1422 name: String,
1423 cols: Vec<String>,
1424 typ: mz_repr::SqlRelationType,
1425 },
1426}
1427
1428mod def {
1429 use mz_repr::{SqlColumnType, SqlRelationType};
1430
1431 use super::*;
1432
1433 pub fn parse_def(ctx: CtxRef, input: ParseStream) -> syn::Result<Def> {
1434 parse_def_source(ctx, input) }
1436
1437 fn parse_def_source(ctx: CtxRef, input: ParseStream) -> syn::Result<Def> {
1438 let reduce = input.parse::<def::DefSource>()?;
1439
1440 let name = {
1441 input.parse::<def::name>()?;
1442 input.parse::<syn::Token![=]>()?;
1443 input.parse::<syn::Ident>()?.to_string()
1444 };
1445
1446 let keys = if input.eat(kw::keys) {
1447 input.parse::<syn::Token![=]>()?;
1448 let inner;
1449 syn::bracketed!(inner in input);
1450 inner.parse_comma_sep(|input| {
1451 let inner;
1452 syn::bracketed!(inner in input);
1453 inner.parse_comma_sep(scalar::parse_column_index)
1454 })?
1455 } else {
1456 vec![]
1457 };
1458
1459 let parse_inputs = ParseChildren::new(input, reduce.span().start());
1460 let (cols, column_types) = {
1461 let source_columns = parse_inputs.parse_many(ctx, parse_def_source_column)?;
1462 let mut column_names = vec![];
1463 let mut column_types = vec![];
1464 for (column_name, column_type) in source_columns {
1465 column_names.push(column_name);
1466 column_types.push(column_type);
1467 }
1468 (column_names, column_types)
1469 };
1470
1471 let typ = SqlRelationType { column_types, keys };
1472
1473 Ok(Def::Source { name, cols, typ })
1474 }
1475
1476 fn parse_def_source_column(
1477 _ctx: CtxRef,
1478 input: ParseStream,
1479 ) -> syn::Result<(String, SqlColumnType)> {
1480 input.parse::<syn::Token![-]>()?;
1481 let column_name = input.parse::<syn::Ident>()?.to_string();
1482 input.parse::<syn::Token![:]>()?;
1483 let column_type = SqlColumnType::from_repr(&analyses::parse_column_type(input)?);
1484 Ok((column_name, column_type))
1485 }
1486
1487 syn::custom_keyword!(DefSource);
1488 syn::custom_keyword!(name);
1489}
1490
1491mod util {
1493 use syn::parse::{Lookahead1, ParseBuffer, Peek};
1494
1495 use super::*;
1496
1497 pub trait ParseBufferExt<'a> {
1499 fn look_and_eat<T: Eat>(&self, token: T, lookahead: &Lookahead1<'a>) -> bool;
1500
1501 fn eat<T: Eat>(&self, t: T) -> bool;
1503
1504 fn eat2<T1: Eat, T2: Eat>(&self, t1: T1, t2: T2) -> bool;
1506
1507 fn eat3<T1: Eat, T2: Eat, T3: Eat>(&self, t1: T1, t2: T2, t3: T3) -> bool;
1509
1510 fn parse_comma_sep<T>(&self, p: fn(ParseStream) -> syn::Result<T>) -> syn::Result<Vec<T>>;
1512 }
1513
1514 impl<'a> ParseBufferExt<'a> for ParseBuffer<'a> {
1515 fn look_and_eat<T: Eat>(&self, token: T, lookahead: &Lookahead1<'a>) -> bool {
1518 if lookahead.peek(token) {
1519 self.parse::<T::Token>().unwrap();
1520 true
1521 } else {
1522 false
1523 }
1524 }
1525
1526 fn eat<T: Eat>(&self, t: T) -> bool {
1527 if self.peek(t) {
1528 self.parse::<T::Token>().unwrap();
1529 true
1530 } else {
1531 false
1532 }
1533 }
1534
1535 fn eat2<T1: Eat, T2: Eat>(&self, t1: T1, t2: T2) -> bool {
1536 if self.peek(t1) && self.peek2(t2) {
1537 self.parse::<T1::Token>().unwrap();
1538 self.parse::<T2::Token>().unwrap();
1539 true
1540 } else {
1541 false
1542 }
1543 }
1544
1545 fn eat3<T1: Eat, T2: Eat, T3: Eat>(&self, t1: T1, t2: T2, t3: T3) -> bool {
1546 if self.peek(t1) && self.peek2(t2) && self.peek3(t3) {
1547 self.parse::<T1::Token>().unwrap();
1548 self.parse::<T2::Token>().unwrap();
1549 self.parse::<T3::Token>().unwrap();
1550 true
1551 } else {
1552 false
1553 }
1554 }
1555
1556 fn parse_comma_sep<T>(&self, p: fn(ParseStream) -> syn::Result<T>) -> syn::Result<Vec<T>> {
1557 Ok(self
1558 .parse_terminated(p, syn::Token![,])?
1559 .into_iter()
1560 .collect::<Vec<_>>())
1561 }
1562 }
1563
1564 pub trait Eat: Peek<Token = Self::_Token> {
1571 type _Token: Parse;
1572 }
1573
1574 impl<T> Eat for T
1575 where
1576 T: Peek,
1577 T::Token: Parse,
1578 {
1579 type _Token = T::Token;
1580 }
1581
1582 pub struct Ctx<'a> {
1583 pub catalog: &'a TestCatalog,
1584 }
1585
1586 pub type CtxRef<'a> = &'a Ctx<'a>;
1587
1588 pub struct Parsed<T>(pub T);
1590
1591 pub struct ParseChildren<'a> {
1593 stream: ParseStream<'a>,
1594 parent: LineColumn,
1595 }
1596
1597 impl<'a> ParseChildren<'a> {
1598 pub fn new(stream: ParseStream<'a>, parent: LineColumn) -> Self {
1599 Self { stream, parent }
1600 }
1601
1602 pub fn parse_one<C, T>(
1603 &self,
1604 ctx: C,
1605 function: fn(C, ParseStream) -> syn::Result<T>,
1606 ) -> syn::Result<T> {
1607 match self.maybe_child() {
1608 Ok(_) => function(ctx, self.stream),
1609 Err(e) => Err(e),
1610 }
1611 }
1612
1613 pub fn parse_many<C: Copy, T>(
1614 &self,
1615 ctx: C,
1616 function: fn(C, ParseStream) -> syn::Result<T>,
1617 ) -> syn::Result<Vec<T>> {
1618 let mut inputs = vec![self.parse_one(ctx, function)?];
1619 while self.maybe_child().is_ok() {
1620 inputs.push(function(ctx, self.stream)?);
1621 }
1622 Ok(inputs)
1623 }
1624
1625 fn maybe_child(&self) -> syn::Result<()> {
1626 let start = self.stream.span().start();
1627 if start.line <= self.parent.line {
1628 let msg = format!("child expected at line > {}", self.parent.line);
1629 Err(Error::new(self.stream.span(), msg))?
1630 }
1631 if start.column != self.parent.column + 2 {
1632 let msg = format!("child expected at column {}", self.parent.column + 2);
1633 Err(Error::new(self.stream.span(), msg))?
1634 }
1635 Ok(())
1636 }
1637 }
1638}
1639
1640mod kw {
1642 syn::custom_keyword!(aggregates);
1643 syn::custom_keyword!(AND);
1644 syn::custom_keyword!(ArrangeBy);
1645 syn::custom_keyword!(array);
1646 syn::custom_keyword!(asc);
1647 syn::custom_keyword!(case);
1649 syn::custom_keyword!(coalesce);
1650 syn::custom_keyword!(Constant);
1651 syn::custom_keyword!(CrossJoin);
1652 syn::custom_keyword!(cte);
1653 syn::custom_keyword!(desc);
1654 syn::custom_keyword!(distinct);
1655 syn::custom_keyword!(Distinct);
1656 syn::custom_keyword!(empty);
1657 syn::custom_keyword!(end);
1658 syn::custom_keyword!(eq);
1659 syn::custom_keyword!(error);
1660 syn::custom_keyword!(exp_group_size);
1661 syn::custom_keyword!(FALSE);
1662 syn::custom_keyword!(Filter);
1663 syn::custom_keyword!(FlatMap);
1664 syn::custom_keyword!(Get);
1665 syn::custom_keyword!(group_by);
1666 syn::custom_keyword!(IS);
1667 syn::custom_keyword!(Join);
1668 syn::custom_keyword!(keys);
1669 syn::custom_keyword!(limit);
1670 syn::custom_keyword!(list);
1671 syn::custom_keyword!(Map);
1672 syn::custom_keyword!(monotonic);
1673 syn::custom_keyword!(Mutually);
1674 syn::custom_keyword!(Negate);
1675 syn::custom_keyword!(NOT);
1676 syn::custom_keyword!(null);
1677 syn::custom_keyword!(NULL);
1678 syn::custom_keyword!(nulls_first);
1679 syn::custom_keyword!(nulls_last);
1680 syn::custom_keyword!(offset);
1681 syn::custom_keyword!(on);
1682 syn::custom_keyword!(OR);
1683 syn::custom_keyword!(order_by);
1684 syn::custom_keyword!(project);
1685 syn::custom_keyword!(Project);
1686 syn::custom_keyword!(Recursive);
1687 syn::custom_keyword!(Reduce);
1688 syn::custom_keyword!(Return);
1689 syn::custom_keyword!(then);
1690 syn::custom_keyword!(Threshold);
1691 syn::custom_keyword!(TopK);
1692 syn::custom_keyword!(TRUE);
1693 syn::custom_keyword!(Union);
1694 syn::custom_keyword!(when);
1695 syn::custom_keyword!(With);
1696 syn::custom_keyword!(x);
1697}