Skip to main content

mz_expr_parser/
parser.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10// Copyright Materialize, Inc. and contributors. All rights reserved.
11//
12// Use of this software is governed by the Business Source License
13// included in the LICENSE file.
14//
15// As of the Change Date specified in that file, in accordance with
16// the Business Source License, use of this software will be governed
17// by the Apache License, Version 2.0.
18
19use mz_ore::collections::CollectionExt;
20use proc_macro2::LineColumn;
21use syn::Error;
22use syn::parse::{Parse, ParseStream, Parser};
23use syn::spanned::Spanned;
24
25use super::TestCatalog;
26
27use self::util::*;
28
29/// Builds a [mz_expr::MirRelationExpr] from a string.
30pub fn try_parse_mir(catalog: &TestCatalog, s: &str) -> Result<mz_expr::MirRelationExpr, String> {
31    // Define a Parser that constructs a (read-only) parsing context `ctx` and
32    // delegates to `relation::parse_expr` by passing a `ctx` as a shared ref.
33    let parser = move |input: ParseStream| {
34        let ctx = Ctx { catalog };
35        relation::parse_expr(&ctx, input)
36    };
37    // Since the syn lexer doesn't parse comments, we replace all `// {`
38    // occurrences in the input string with `:: {`.
39    let s = s.replace("// {", ":: {");
40    // Call the parser with the given input string.
41    let mut expr = parser.parse_str(&s).map_err(|err| {
42        let (line, column) = (err.span().start().line, err.span().start().column);
43        format!("parse error at {line}:{column}:\n{err}\n")
44    })?;
45    // Fix the types of the local let bindings of the parsed expression in a
46    // post-processing pass.
47    relation::fix_types(&mut expr, &mut relation::FixTypesCtx::default())?;
48    // Return the parsed, post-processed expression.
49    Ok(expr)
50}
51
52/// Builds a source definition from a string.
53pub fn try_parse_def(catalog: &TestCatalog, s: &str) -> Result<Def, String> {
54    // Define a Parser that constructs a (read-only) parsing context `ctx` and
55    // delegates to `relation::parse_expr` by passing a `ctx` as a shared ref.
56    let parser = move |input: ParseStream| {
57        let ctx = Ctx { catalog };
58        def::parse_def(&ctx, input)
59    };
60    // Call the parser with the given input string.
61    let def = parser.parse_str(s).map_err(|err| {
62        let (line, column) = (err.span().start().line, err.span().start().column);
63        format!("parse error at {line}:{column}:\n{err}\n")
64    })?;
65    // Return the parsed, post-processed expression.
66    Ok(def)
67}
68
69/// Support for parsing [mz_expr::MirRelationExpr].
70mod relation {
71    use std::collections::BTreeMap;
72
73    use mz_expr::{AccessStrategy, Id, JoinImplementation, LocalId, MirRelationExpr};
74    use mz_repr::{Diff, ReprRelationType, Row, SqlScalarType};
75
76    use crate::parser::analyses::Analyses;
77
78    use super::*;
79
80    type Result = syn::Result<MirRelationExpr>;
81
82    pub fn parse_expr(ctx: CtxRef, input: ParseStream) -> Result {
83        let lookahead = input.lookahead1();
84        if lookahead.peek(kw::Constant) {
85            parse_constant(ctx, input)
86        } else if lookahead.peek(kw::Get) {
87            parse_get(ctx, input)
88        } else if lookahead.peek(kw::Return) {
89            parse_let_or_letrec_old(ctx, input)
90        } else if lookahead.peek(kw::With) {
91            parse_let_or_letrec(ctx, input)
92        } else if lookahead.peek(kw::Project) {
93            parse_project(ctx, input)
94        } else if lookahead.peek(kw::Map) {
95            parse_map(ctx, input)
96        } else if lookahead.peek(kw::FlatMap) {
97            parse_flat_map(ctx, input)
98        } else if lookahead.peek(kw::Filter) {
99            parse_filter(ctx, input)
100        } else if lookahead.peek(kw::CrossJoin) {
101            parse_cross_join(ctx, input)
102        } else if lookahead.peek(kw::Join) {
103            parse_join(ctx, input)
104        } else if lookahead.peek(kw::Distinct) {
105            parse_distinct(ctx, input)
106        } else if lookahead.peek(kw::Reduce) {
107            parse_reduce(ctx, input)
108        } else if lookahead.peek(kw::TopK) {
109            parse_top_k(ctx, input)
110        } else if lookahead.peek(kw::Negate) {
111            parse_negate(ctx, input)
112        } else if lookahead.peek(kw::Threshold) {
113            parse_threshold(ctx, input)
114        } else if lookahead.peek(kw::Union) {
115            parse_union(ctx, input)
116        } else if lookahead.peek(kw::ArrangeBy) {
117            parse_arrange_by(ctx, input)
118        } else {
119            Err(lookahead.error())
120        }
121    }
122
123    fn parse_constant(ctx: CtxRef, input: ParseStream) -> Result {
124        let constant = input.parse::<kw::Constant>()?;
125
126        let parse_typ = |input: ParseStream| -> syn::Result<ReprRelationType> {
127            let analyses = analyses::parse_analyses(input)?;
128            let Some(column_types) = analyses.types else {
129                let msg = "Missing expected `types` analyses for Constant line";
130                Err(Error::new(input.span(), msg))?
131            };
132            let keys = analyses.keys.unwrap_or_default();
133            Ok(ReprRelationType::new(column_types).with_keys(keys))
134        };
135        if input.eat3(syn::Token![<], kw::empty, syn::Token![>]) {
136            let typ = parse_typ(input)?;
137            Ok(MirRelationExpr::Constant {
138                rows: Ok(vec![]),
139                typ,
140            })
141        } else {
142            let typ = parse_typ(input)?;
143            let parse_children = ParseChildren::new(input, constant.span().start());
144            let rows = Ok(parse_children.parse_many(ctx, parse_constant_entry)?);
145            Ok(MirRelationExpr::Constant { rows, typ })
146        }
147    }
148
149    fn parse_constant_entry(_ctx: CtxRef, input: ParseStream) -> syn::Result<(Row, Diff)> {
150        input.parse::<syn::Token![-]>()?;
151
152        let (row, diff);
153
154        let inner1;
155        syn::parenthesized!(inner1 in input);
156
157        if inner1.peek(syn::token::Paren) {
158            let inner2;
159            syn::parenthesized!(inner2 in inner1);
160            row = inner2.parse::<Parsed<Row>>()?.into();
161            inner1.parse::<kw::x>()?;
162            diff = match inner1.parse::<syn::Lit>()? {
163                syn::Lit::Int(l) => Ok(l.base10_parse::<Diff>()?),
164                _ => Err(Error::new(inner1.span(), "expected Diff literal")),
165            }?;
166        } else {
167            row = inner1.parse::<Parsed<Row>>()?.into();
168            diff = Diff::ONE;
169        }
170
171        Ok((row, diff))
172    }
173
174    fn parse_get(ctx: CtxRef, input: ParseStream) -> Result {
175        input.parse::<kw::Get>()?;
176
177        let ident = input.parse::<syn::Ident>()?;
178        match ctx.catalog.get(&ident.to_string()) {
179            Some((id, _cols, typ)) => Ok(MirRelationExpr::Get {
180                id: Id::Global(*id),
181                typ: ReprRelationType::from(typ),
182                access_strategy: AccessStrategy::UnknownOrLocal,
183            }),
184            None => Ok(MirRelationExpr::Get {
185                id: Id::Local(parse_local_id(ident)?),
186                typ: ReprRelationType::empty(),
187                access_strategy: AccessStrategy::UnknownOrLocal,
188            }),
189        }
190    }
191
192    /// Parses a Let or a LetRec with the old order: Return first, and then CTEs in descending order.
193    fn parse_let_or_letrec_old(ctx: CtxRef, input: ParseStream) -> Result {
194        let return_ = input.parse::<kw::Return>()?;
195        let parse_body = ParseChildren::new(input, return_.span().start());
196        let body = parse_body.parse_one(ctx, parse_expr)?;
197
198        let with = input.parse::<kw::With>()?;
199        let recursive = input.eat2(kw::Mutually, kw::Recursive);
200        let parse_ctes = ParseChildren::new(input, with.span().start());
201        let mut ctes = parse_ctes.parse_many(ctx, parse_cte)?;
202
203        if ctes.is_empty() {
204            let msg = "At least one Let/LetRec cte binding expected";
205            Err(Error::new(input.span(), msg))?
206        }
207
208        ctes.reverse();
209        let cte_ids = ctes.iter().map(|(id, _, _)| id);
210        if !cte_ids.clone().is_sorted() {
211            let msg = format!(
212                "Error parsing Let/LetRec: seen Return before With, but cte ids are not ordered descending: {:?}",
213                cte_ids.collect::<Vec<_>>()
214            );
215            Err(Error::new(input.span(), msg))?
216        }
217        build_let_or_let_rec(ctes, body, recursive, with)
218    }
219
220    /// Parses a Let or a LetRec with the new order: CTEs first in ascending order, and then Return.
221    fn parse_let_or_letrec(ctx: CtxRef, input: ParseStream) -> Result {
222        let with = input.parse::<kw::With>()?;
223        let recursive = input.eat2(kw::Mutually, kw::Recursive);
224        let parse_ctes = ParseChildren::new(input, with.span().start());
225        let ctes = parse_ctes.parse_many(ctx, parse_cte)?;
226
227        let return_ = input.parse::<kw::Return>()?;
228        let parse_body = ParseChildren::new(input, return_.span().start());
229        let body = parse_body.parse_one(ctx, parse_expr)?;
230
231        if ctes.is_empty() {
232            let msg = "At least one `let cte` binding expected";
233            Err(Error::new(input.span(), msg))?
234        }
235
236        let cte_ids = ctes.iter().map(|(id, _, _)| id);
237        if !cte_ids.clone().is_sorted() {
238            let msg = format!(
239                "Error parsing Let/LetRec: seen With before Return, but cte ids are not ordered ascending: {:?}",
240                cte_ids.collect::<Vec<_>>()
241            );
242            Err(Error::new(input.span(), msg))?
243        }
244        build_let_or_let_rec(ctes, body, recursive, with)
245    }
246
247    fn build_let_or_let_rec(
248        ctes: Vec<(LocalId, Analyses, MirRelationExpr)>,
249        body: MirRelationExpr,
250        recursive: bool,
251        with: kw::With,
252    ) -> Result {
253        if recursive {
254            let (mut ids, mut values, mut limits) = (vec![], vec![], vec![]);
255            for (id, analyses, value) in ctes.into_iter() {
256                let typ = {
257                    let Some(column_types) = analyses.types else {
258                        let msg = format!("`let {}` needs a `types` analyses", id);
259                        Err(Error::new(with.span(), msg))?
260                    };
261                    let keys = analyses.keys.unwrap_or_default();
262                    ReprRelationType::new(column_types).with_keys(keys)
263                };
264                // An ugly-ugly hack to pass the type information of the WMR CTE
265                // to the `fix_types` pass.
266                let value = {
267                    let get_cte = MirRelationExpr::Get {
268                        id: Id::Local(id),
269                        typ,
270                        access_strategy: AccessStrategy::UnknownOrLocal,
271                    };
272                    // Do not use the `union` smart constructor here!
273                    MirRelationExpr::Union {
274                        base: Box::new(get_cte),
275                        inputs: vec![value],
276                    }
277                };
278
279                ids.push(id);
280                values.push(value);
281                limits.push(None); // TODO: support limits
282            }
283
284            Ok(MirRelationExpr::LetRec {
285                ids,
286                values,
287                limits,
288                body: Box::new(body),
289            })
290        } else {
291            let mut body = body;
292            for (id, _, value) in ctes.into_iter().rev() {
293                body = MirRelationExpr::Let {
294                    id,
295                    value: Box::new(value),
296                    body: Box::new(body),
297                };
298            }
299            Ok(body)
300        }
301    }
302
303    fn parse_cte(
304        ctx: CtxRef,
305        input: ParseStream,
306    ) -> syn::Result<(LocalId, analyses::Analyses, MirRelationExpr)> {
307        let cte = input.parse::<kw::cte>()?;
308
309        let ident = input.parse::<syn::Ident>()?;
310        let id = parse_local_id(ident)?;
311
312        input.parse::<syn::Token![=]>()?;
313
314        let analyses = analyses::parse_analyses(input)?;
315
316        let parse_value = ParseChildren::new(input, cte.span().start());
317        let value = parse_value.parse_one(ctx, parse_expr)?;
318
319        Ok((id, analyses, value))
320    }
321
322    fn parse_project(ctx: CtxRef, input: ParseStream) -> Result {
323        let project = input.parse::<kw::Project>()?;
324
325        let content;
326        syn::parenthesized!(content in input);
327        let outputs = content.parse_comma_sep(scalar::parse_column_index)?;
328        let parse_input = ParseChildren::new(input, project.span().start());
329        let input = Box::new(parse_input.parse_one(ctx, parse_expr)?);
330
331        Ok(MirRelationExpr::Project { input, outputs })
332    }
333
334    fn parse_map(ctx: CtxRef, input: ParseStream) -> Result {
335        let map = input.parse::<kw::Map>()?;
336
337        let scalars = {
338            let inner;
339            syn::parenthesized!(inner in input);
340            scalar::parse_exprs(&inner)?
341        };
342
343        let parse_input = ParseChildren::new(input, map.span().start());
344        let input = Box::new(parse_input.parse_one(ctx, parse_expr)?);
345
346        Ok(MirRelationExpr::Map { input, scalars })
347    }
348
349    fn parse_flat_map(ctx: CtxRef, input: ParseStream) -> Result {
350        use mz_expr::TableFunc::*;
351
352        let flat_map = input.parse::<kw::FlatMap>()?;
353
354        let ident = input.parse::<syn::Ident>()?;
355        let func = match ident.to_string().to_lowercase().as_str() {
356            "unnest_list" => UnnestList {
357                el_typ: SqlScalarType::Int64, // FIXME
358            },
359            "unnest_array" => UnnestArray {
360                el_typ: SqlScalarType::Int64, // FIXME
361            },
362            "wrap1" => Wrap {
363                types: vec![
364                    SqlScalarType::Int64.nullable(true), // FIXME
365                ],
366                width: 1,
367            },
368            "wrap2" => Wrap {
369                types: vec![
370                    SqlScalarType::Int64.nullable(true), // FIXME
371                    SqlScalarType::Int64.nullable(true), // FIXME
372                ],
373                width: 2,
374            },
375            "wrap3" => Wrap {
376                types: vec![
377                    SqlScalarType::Int64.nullable(true), // FIXME
378                    SqlScalarType::Int64.nullable(true), // FIXME
379                    SqlScalarType::Int64.nullable(true), // FIXME
380                ],
381                width: 3,
382            },
383            "generate_series" => GenerateSeriesInt64,
384            // Both int widths display as `generate_series`; this spelling lets a
385            // test pin the 32-bit variant (e.g. its widening casts) on input.
386            "generate_series_i32" => GenerateSeriesInt32,
387            "jsonb_object_keys" => JsonbObjectKeys,
388            _ => Err(Error::new(ident.span(), "unsupported function name"))?,
389        };
390
391        let exprs = {
392            let inner;
393            syn::parenthesized!(inner in input);
394            scalar::parse_exprs(&inner)?
395        };
396
397        let parse_input = ParseChildren::new(input, flat_map.span().start());
398        let input = Box::new(parse_input.parse_one(ctx, parse_expr)?);
399
400        Ok(MirRelationExpr::FlatMap { input, func, exprs })
401    }
402
403    fn parse_filter(ctx: CtxRef, input: ParseStream) -> Result {
404        use mz_expr::MirScalarExpr::CallVariadic;
405        use mz_expr::VariadicFunc::And;
406
407        let filter = input.parse::<kw::Filter>()?;
408
409        let predicates = match scalar::parse_expr(input)? {
410            CallVariadic {
411                func: And(_),
412                exprs,
413            } => exprs,
414            expr => vec![expr],
415        };
416
417        let parse_input = ParseChildren::new(input, filter.span().start());
418        let input = Box::new(parse_input.parse_one(ctx, parse_expr)?);
419
420        Ok(MirRelationExpr::Filter { input, predicates })
421    }
422
423    fn parse_cross_join(ctx: CtxRef, input: ParseStream) -> Result {
424        let join = input.parse::<kw::CrossJoin>()?;
425
426        let parse_inputs = ParseChildren::new(input, join.span().start());
427        let inputs = parse_inputs.parse_many(ctx, parse_expr)?;
428
429        Ok(MirRelationExpr::Join {
430            inputs,
431            equivalences: vec![],
432            implementation: JoinImplementation::Unimplemented,
433        })
434    }
435
436    fn parse_join(ctx: CtxRef, input: ParseStream) -> Result {
437        let join = input.parse::<kw::Join>()?;
438
439        input.parse::<kw::on>()?;
440        input.parse::<syn::Token![=]>()?;
441        let inner;
442        syn::parenthesized!(inner in input);
443        let equivalences = scalar::parse_join_equivalences(&inner)?;
444
445        let parse_inputs = ParseChildren::new(input, join.span().start());
446        let inputs = parse_inputs.parse_many(ctx, parse_expr)?;
447
448        Ok(MirRelationExpr::Join {
449            inputs,
450            equivalences,
451            implementation: JoinImplementation::Unimplemented,
452        })
453    }
454
455    fn parse_distinct(ctx: CtxRef, input: ParseStream) -> Result {
456        let reduce = input.parse::<kw::Distinct>()?;
457
458        let group_key = if input.eat(kw::project) {
459            input.parse::<syn::Token![=]>()?;
460            let inner;
461            syn::bracketed!(inner in input);
462            inner.parse_comma_sep(scalar::parse_expr)?
463        } else {
464            vec![]
465        };
466
467        let monotonic = input.eat(kw::monotonic);
468
469        let expected_group_size = if input.eat(kw::exp_group_size) {
470            input.parse::<syn::Token![=]>()?;
471            Some(input.parse::<syn::LitInt>()?.base10_parse::<u64>()?)
472        } else {
473            None
474        };
475
476        let parse_inputs = ParseChildren::new(input, reduce.span().start());
477        let input = Box::new(parse_inputs.parse_one(ctx, parse_expr)?);
478
479        Ok(MirRelationExpr::Reduce {
480            input,
481            group_key,
482            aggregates: vec![],
483            monotonic,
484            expected_group_size,
485        })
486    }
487
488    fn parse_reduce(ctx: CtxRef, input: ParseStream) -> Result {
489        let reduce = input.parse::<kw::Reduce>()?;
490
491        let group_key = if input.eat(kw::group_by) {
492            input.parse::<syn::Token![=]>()?;
493            let inner;
494            syn::bracketed!(inner in input);
495            inner.parse_comma_sep(scalar::parse_expr)?
496        } else {
497            vec![]
498        };
499
500        let aggregates = {
501            input.parse::<kw::aggregates>()?;
502            input.parse::<syn::Token![=]>()?;
503            let inner;
504            syn::bracketed!(inner in input);
505            inner.parse_comma_sep(aggregate::parse_expr)?
506        };
507
508        let monotonic = input.eat(kw::monotonic);
509
510        let expected_group_size = if input.eat(kw::exp_group_size) {
511            input.parse::<syn::Token![=]>()?;
512            Some(input.parse::<syn::LitInt>()?.base10_parse::<u64>()?)
513        } else {
514            None
515        };
516
517        let parse_inputs = ParseChildren::new(input, reduce.span().start());
518        let input = Box::new(parse_inputs.parse_one(ctx, parse_expr)?);
519
520        Ok(MirRelationExpr::Reduce {
521            input,
522            group_key,
523            aggregates,
524            monotonic,
525            expected_group_size,
526        })
527    }
528
529    fn parse_top_k(ctx: CtxRef, input: ParseStream) -> Result {
530        let top_k = input.parse::<kw::TopK>()?;
531
532        let group_key = if input.eat(kw::group_by) {
533            input.parse::<syn::Token![=]>()?;
534            let inner;
535            syn::bracketed!(inner in input);
536            inner.parse_comma_sep(scalar::parse_column_index)?
537        } else {
538            vec![]
539        };
540
541        let order_key = if input.eat(kw::order_by) {
542            input.parse::<syn::Token![=]>()?;
543            let inner;
544            syn::bracketed!(inner in input);
545            inner.parse_comma_sep(scalar::parse_column_order)?
546        } else {
547            vec![]
548        };
549
550        let limit = if input.eat(kw::limit) {
551            input.parse::<syn::Token![=]>()?;
552            Some(scalar::parse_expr(input)?)
553        } else {
554            None
555        };
556
557        let offset = if input.eat(kw::offset) {
558            input.parse::<syn::Token![=]>()?;
559            input.parse::<syn::LitInt>()?.base10_parse::<usize>()?
560        } else {
561            0
562        };
563
564        let monotonic = input.eat(kw::monotonic);
565
566        let expected_group_size = if input.eat(kw::exp_group_size) {
567            input.parse::<syn::Token![=]>()?;
568            Some(input.parse::<syn::LitInt>()?.base10_parse::<u64>()?)
569        } else {
570            None
571        };
572
573        let parse_inputs = ParseChildren::new(input, top_k.span().start());
574        let input = Box::new(parse_inputs.parse_one(ctx, parse_expr)?);
575
576        Ok(MirRelationExpr::TopK {
577            input,
578            group_key,
579            order_key,
580            limit,
581            offset,
582            monotonic,
583            expected_group_size,
584        })
585    }
586
587    fn parse_negate(ctx: CtxRef, input: ParseStream) -> Result {
588        let negate = input.parse::<kw::Negate>()?;
589
590        let parse_input = ParseChildren::new(input, negate.span().start());
591        let input = Box::new(parse_input.parse_one(ctx, parse_expr)?);
592
593        Ok(MirRelationExpr::Negate { input })
594    }
595
596    fn parse_threshold(ctx: CtxRef, input: ParseStream) -> Result {
597        let threshold = input.parse::<kw::Threshold>()?;
598
599        let parse_input = ParseChildren::new(input, threshold.span().start());
600        let input = Box::new(parse_input.parse_one(ctx, parse_expr)?);
601
602        Ok(MirRelationExpr::Threshold { input })
603    }
604
605    fn parse_union(ctx: CtxRef, input: ParseStream) -> Result {
606        let union = input.parse::<kw::Union>()?;
607
608        let parse_inputs = ParseChildren::new(input, union.span().start());
609        let mut children = parse_inputs.parse_many(ctx, parse_expr)?;
610        let inputs = children.split_off(1);
611        let base = Box::new(children.into_element());
612
613        Ok(MirRelationExpr::Union { base, inputs })
614    }
615
616    fn parse_arrange_by(ctx: CtxRef, input: ParseStream) -> Result {
617        let arrange_by = input.parse::<kw::ArrangeBy>()?;
618
619        let keys = {
620            input.parse::<kw::keys>()?;
621            input.parse::<syn::Token![=]>()?;
622            let inner;
623            syn::bracketed!(inner in input);
624            inner.parse_comma_sep(|input| {
625                let inner;
626                syn::bracketed!(inner in input);
627                scalar::parse_exprs(&inner)
628            })?
629        };
630
631        let parse_input = ParseChildren::new(input, arrange_by.span().start());
632        let input = Box::new(parse_input.parse_one(ctx, parse_expr)?);
633
634        Ok(MirRelationExpr::ArrangeBy { input, keys })
635    }
636
637    fn parse_local_id(ident: syn::Ident) -> syn::Result<LocalId> {
638        if ident.to_string().starts_with('l') {
639            let n = ident.to_string()[1..]
640                .parse::<u64>()
641                .map_err(|err| Error::new(ident.span(), err.to_string()))?;
642            Ok(mz_expr::LocalId::new(n))
643        } else {
644            Err(Error::new(ident.span(), "invalid LocalId"))
645        }
646    }
647
648    #[derive(Default)]
649    pub struct FixTypesCtx {
650        env: BTreeMap<LocalId, ReprRelationType>,
651        typ: Vec<ReprRelationType>,
652    }
653
654    pub fn fix_types(
655        expr: &mut MirRelationExpr,
656        ctx: &mut FixTypesCtx,
657    ) -> std::result::Result<(), String> {
658        match expr {
659            MirRelationExpr::Let { id, value, body } => {
660                fix_types(value, ctx)?;
661                let value_typ = ctx.typ.pop().expect("value type");
662                let prior_typ = ctx.env.insert(id.clone(), value_typ);
663                fix_types(body, ctx)?;
664                ctx.env.remove(id);
665                if let Some(prior_typ) = prior_typ {
666                    ctx.env.insert(id.clone(), prior_typ);
667                }
668            }
669            MirRelationExpr::LetRec {
670                ids,
671                values,
672                body,
673                limits: _,
674            } => {
675                // An ugly-ugly hack to pass the type information of the WMR CTE
676                // to the `fix_types` pass.
677                let mut prior_typs = BTreeMap::default();
678                for (id, value) in std::iter::zip(ids.iter_mut(), values.iter_mut()) {
679                    let MirRelationExpr::Union { base, mut inputs } = value.take_dangerous() else {
680                        unreachable!("ensured by construction");
681                    };
682                    let MirRelationExpr::Get { id: _, typ, .. } = *base else {
683                        unreachable!("ensured by construction");
684                    };
685                    if let Some(prior_typ) = ctx.env.insert(id.clone(), typ) {
686                        prior_typs.insert(id.clone(), prior_typ);
687                    }
688                    *value = inputs.pop().expect("ensured by construction");
689                }
690                for value in values.iter_mut() {
691                    fix_types(value, ctx)?;
692                }
693                fix_types(body, ctx)?;
694                for id in ids.iter() {
695                    ctx.env.remove(id);
696                    if let Some(prior_typ) = prior_typs.remove(id) {
697                        ctx.env.insert(id.clone(), prior_typ);
698                    }
699                }
700            }
701            MirRelationExpr::Get {
702                id: Id::Local(id),
703                typ,
704                ..
705            } => {
706                let env_typ = match ctx.env.get(&*id) {
707                    Some(env_typ) => env_typ,
708                    None => Err(format!("Cannot fix type of unbound CTE {}", id))?,
709                };
710                *typ = env_typ.clone();
711                ctx.typ.push(env_typ.clone());
712            }
713            _ => {
714                for input in expr.children_mut() {
715                    fix_types(input, ctx)?;
716                }
717                let input_types = ctx.typ.split_off(ctx.typ.len() - expr.num_inputs());
718                ctx.typ.push(expr.typ_with_input_types(&input_types));
719            }
720        };
721
722        Ok(())
723    }
724}
725
726/// Support for parsing [mz_expr::MirScalarExpr].
727mod scalar {
728    use mz_expr::{
729        BinaryFunc, ColumnOrder, MirScalarExpr, UnaryFunc, UnmaterializableFunc, VariadicFunc, func,
730    };
731    use mz_repr::{
732        AsColumnType, Datum, ReprColumnType, ReprScalarType, Row, RowArena, SqlScalarType,
733    };
734
735    use super::*;
736
737    type Result = syn::Result<MirScalarExpr>;
738
739    pub fn parse_exprs(input: ParseStream) -> syn::Result<Vec<MirScalarExpr>> {
740        input.parse_comma_sep(parse_expr)
741    }
742
743    /// Parses a single expression.
744    ///
745    /// Because in EXPLAIN contexts parentheses might be optional, we need to
746    /// correctly handle operator precedence of infix operators.
747    ///
748    /// Currently, this works in two steps:
749    ///
750    /// 1. Convert the original infix expression to a postfix expression using
751    ///    an adapted variant of this [algorithm] with precedence taken from the
752    ///    Postgres [precedence] docs. Parenthesized operands are parsed in one
753    ///    step, so steps (3-4) from the [algorithm] are not needed here.
754    /// 2. Convert the postfix vector into a single [MirScalarExpr].
755    ///
756    /// [algorithm]: <https://www.prepbytes.com/blog/stacks/infix-to-postfix-conversion-using-stack/>
757    /// [precedence]: <https://www.postgresql.org/docs/7.2/sql-precedence.html>
758    pub fn parse_expr(input: ParseStream) -> Result {
759        let line = input.span().start().line;
760
761        /// Helper struct to keep track of the parsing state.
762        #[derive(Debug)]
763        enum Op {
764            Unr(mz_expr::UnaryFunc), // unary
765            Neg(mz_expr::UnaryFunc), // negated unary (append -.not() on fold)
766            Bin(mz_expr::BinaryFunc),
767            Var(mz_expr::VariadicFunc),
768        }
769
770        impl Op {
771            fn precedence(&self) -> Option<usize> {
772                match self {
773                    // 01: logical disjunction
774                    Op::Var(mz_expr::VariadicFunc::Or(_)) => Some(1),
775                    // 02: logical conjunction
776                    Op::Var(mz_expr::VariadicFunc::And(_)) => Some(2),
777                    // 04: equality, assignment
778                    Op::Bin(mz_expr::BinaryFunc::Eq(_)) => Some(4),
779                    Op::Bin(mz_expr::BinaryFunc::NotEq(_)) => Some(4),
780                    // 05: less than, greater than
781                    Op::Bin(mz_expr::BinaryFunc::Gt(_)) => Some(5),
782                    Op::Bin(mz_expr::BinaryFunc::Gte(_)) => Some(5),
783                    Op::Bin(mz_expr::BinaryFunc::Lt(_)) => Some(5),
784                    Op::Bin(mz_expr::BinaryFunc::Lte(_)) => Some(5),
785                    // 13: test for TRUE, FALSE, UNKNOWN, NULL
786                    Op::Unr(mz_expr::UnaryFunc::IsNull(_)) => Some(13),
787                    Op::Neg(mz_expr::UnaryFunc::IsNull(_)) => Some(13),
788                    Op::Unr(mz_expr::UnaryFunc::IsTrue(_)) => Some(13),
789                    Op::Neg(mz_expr::UnaryFunc::IsTrue(_)) => Some(13),
790                    Op::Unr(mz_expr::UnaryFunc::IsFalse(_)) => Some(13),
791                    Op::Neg(mz_expr::UnaryFunc::IsFalse(_)) => Some(13),
792                    // 14: addition, subtraction
793                    Op::Bin(mz_expr::BinaryFunc::AddInt64(_)) => Some(14),
794                    // 14: multiplication, division, modulo
795                    Op::Bin(mz_expr::BinaryFunc::MulInt64(_)) => Some(15),
796                    Op::Bin(mz_expr::BinaryFunc::DivInt64(_)) => Some(15),
797                    Op::Bin(mz_expr::BinaryFunc::ModInt64(_)) => Some(15),
798                    // unsupported
799                    _ => None,
800                }
801            }
802        }
803
804        /// Helper struct for entries in the postfix vector.
805        #[derive(Debug)]
806        enum Entry {
807            Operand(MirScalarExpr),
808            Operator(Op),
809        }
810
811        let mut opstack = vec![];
812        let mut postfix = vec![];
813        let mut exp_opd = true; // expects an argument of an operator
814
815        // Scan the given infix expression from left to right.
816        while !input.is_empty() && input.span().start().line == line {
817            // Operands and operators alternate.
818            if exp_opd {
819                postfix.push(Entry::Operand(parse_operand(input)?));
820                exp_opd = false;
821            } else {
822                // If the current symbol is an operator, then bind it to op.
823                // Else it is an operand - append it to postfix and continue.
824                let op = if input.eat(syn::Token![=]) {
825                    exp_opd = true;
826                    Op::Bin(func::Eq.into())
827                } else if input.eat(syn::Token![!=]) {
828                    exp_opd = true;
829                    Op::Bin(func::NotEq.into())
830                } else if input.eat(syn::Token![>=]) {
831                    exp_opd = true;
832                    Op::Bin(func::Gte.into())
833                } else if input.eat(syn::Token![>]) {
834                    exp_opd = true;
835                    Op::Bin(func::Gt.into())
836                } else if input.eat(syn::Token![<=]) {
837                    exp_opd = true;
838                    Op::Bin(func::Lte.into())
839                } else if input.eat(syn::Token![<]) {
840                    exp_opd = true;
841                    Op::Bin(func::Lt.into())
842                } else if input.eat(syn::Token![+]) {
843                    exp_opd = true;
844                    Op::Bin(func::AddInt64.into()) // TODO: fix placeholder
845                } else if input.eat(syn::Token![*]) {
846                    exp_opd = true;
847                    Op::Bin(func::MulInt64.into()) // TODO: fix placeholder
848                } else if input.eat(syn::Token![/]) {
849                    exp_opd = true;
850                    Op::Bin(func::DivInt64.into()) // TODO: fix placeholder
851                } else if input.eat(syn::Token![%]) {
852                    exp_opd = true;
853                    Op::Bin(func::ModInt64.into()) // TODO: fix placeholder
854                } else if input.eat(kw::AND) {
855                    exp_opd = true;
856                    Op::Var(VariadicFunc::And(func::variadic::And))
857                } else if input.eat(kw::OR) {
858                    exp_opd = true;
859                    Op::Var(VariadicFunc::Or(func::variadic::Or))
860                } else if input.eat(kw::coalesce) {
861                    exp_opd = true;
862                    Op::Var(VariadicFunc::Coalesce(func::variadic::Coalesce))
863                } else if input.eat(kw::IS) {
864                    let negate = input.eat(kw::NOT);
865
866                    let lookahead = input.lookahead1();
867                    let func = if input.look_and_eat(kw::NULL, &lookahead) {
868                        mz_expr::func::IsNull.into()
869                    } else if input.look_and_eat(kw::TRUE, &lookahead) {
870                        mz_expr::func::IsTrue.into()
871                    } else if input.look_and_eat(kw::FALSE, &lookahead) {
872                        mz_expr::func::IsFalse.into()
873                    } else {
874                        Err(lookahead.error())?
875                    };
876
877                    if negate { Op::Neg(func) } else { Op::Unr(func) }
878                } else {
879                    // We were expecting an optional operator but didn't find
880                    // anything. Exit the parsing loop and process the postfix
881                    // vector.
882                    break;
883                };
884
885                // First, pop the operators which are already on the opstack that
886                // have higher or equal precedence than the current operator and
887                // append them to the postfix.
888                while opstack
889                    .last()
890                    .map(|op1: &Op| op1.precedence() >= op.precedence())
891                    .unwrap_or(false)
892                {
893                    let op1 = opstack.pop().expect("non-empty opstack");
894                    postfix.push(Entry::Operator(op1));
895                }
896
897                // Then push the op from this iteration onto the stack.
898                opstack.push(op);
899            }
900        }
901
902        // Pop all remaining symbols from opstack and append them to postfix.
903        postfix.extend(opstack.into_iter().rev().map(Entry::Operator));
904
905        if postfix.is_empty() {
906            let msg = "Cannot parse an empty expression";
907            Err(Error::new(input.span(), msg))?
908        }
909
910        // Flatten the postfix vector into a single MirScalarExpr.
911        let mut stack = vec![];
912        postfix.reverse();
913        while let Some(entry) = postfix.pop() {
914            match entry {
915                Entry::Operand(expr) => {
916                    stack.push(expr);
917                }
918                Entry::Operator(Op::Unr(func)) => {
919                    let expr = Box::new(stack.pop().expect("non-empty stack"));
920                    stack.push(MirScalarExpr::CallUnary { func, expr });
921                }
922                Entry::Operator(Op::Neg(func)) => {
923                    let expr = Box::new(stack.pop().expect("non-empty stack"));
924                    stack.push(MirScalarExpr::CallUnary { func, expr }.not());
925                }
926                Entry::Operator(Op::Bin(func)) => {
927                    let expr2 = Box::new(stack.pop().expect("non-empty stack"));
928                    let expr1 = Box::new(stack.pop().expect("non-empty stack"));
929                    stack.push(MirScalarExpr::CallBinary { func, expr1, expr2 });
930                }
931                Entry::Operator(Op::Var(func)) => {
932                    let expr2 = stack.pop().expect("non-empty stack");
933                    let expr1 = stack.pop().expect("non-empty stack");
934                    let mut exprs = vec![];
935                    for expr in [expr1, expr2] {
936                        match expr {
937                            MirScalarExpr::CallVariadic { func: f, exprs: es } if f == func => {
938                                exprs.extend(es);
939                            }
940                            expr => {
941                                exprs.push(expr);
942                            }
943                        }
944                    }
945                    stack.push(MirScalarExpr::CallVariadic { func, exprs });
946                }
947            }
948        }
949
950        if stack.len() != 1 {
951            let msg = "Cannot fold postfix vector into a single MirScalarExpr";
952            Err(Error::new(input.span(), msg))?
953        }
954
955        Ok(stack.pop().unwrap())
956    }
957
958    pub fn parse_operand(input: ParseStream) -> Result {
959        let lookahead = input.lookahead1();
960        if lookahead.peek(syn::Token![#]) {
961            parse_column(input)
962        } else if lookahead.peek(syn::Lit) || lookahead.peek(kw::null) {
963            parse_literal_ok(input)
964        } else if lookahead.peek(kw::error) {
965            parse_literal_err(input)
966        } else if lookahead.peek(kw::array) {
967            parse_array(input)
968        } else if lookahead.peek(kw::list) {
969            parse_list(input)
970        } else if lookahead.peek(kw::case) {
971            parse_case(input)
972        } else if lookahead.peek(syn::Ident) {
973            parse_apply(input)
974        } else if lookahead.peek(syn::token::Brace) {
975            let inner;
976            syn::braced!(inner in input);
977            parse_literal_array(&inner)
978        } else if lookahead.peek(syn::token::Bracket) {
979            let inner;
980            syn::bracketed!(inner in input);
981            parse_literal_list(&inner)
982        } else if lookahead.peek(syn::token::Paren) {
983            let inner;
984            syn::parenthesized!(inner in input);
985            parse_expr(&inner)
986        } else {
987            Err(lookahead.error())
988        }
989    }
990
991    /// Parses `case when {cond} then {then} else {els} end`.
992    fn parse_case(input: ParseStream) -> Result {
993        input.parse::<kw::case>()?;
994        if input.peek(kw::when) {
995            input.parse::<kw::when>()?;
996            let cond = parse_expr(input)?;
997            input.parse::<kw::then>()?;
998            let then = parse_expr(input)?;
999            input.parse::<syn::Token![else]>()?;
1000            let els = parse_expr(input)?;
1001            input.parse::<kw::end>()?;
1002            Ok(MirScalarExpr::If {
1003                cond: Box::new(cond),
1004                then: Box::new(then),
1005                els: Box::new(els),
1006            })
1007        } else {
1008            Err(Error::new(input.span(), "expected 'when' after 'case'"))
1009        }
1010    }
1011
1012    pub fn parse_column(input: ParseStream) -> Result {
1013        Ok(MirScalarExpr::column(parse_column_index(input)?))
1014    }
1015
1016    pub fn parse_column_index(input: ParseStream) -> syn::Result<usize> {
1017        input.parse::<syn::Token![#]>()?;
1018        input.parse::<syn::LitInt>()?.base10_parse::<usize>()
1019    }
1020
1021    pub fn parse_column_order(input: ParseStream) -> syn::Result<ColumnOrder> {
1022        input.parse::<syn::Token![#]>()?;
1023        let column = input.parse::<syn::LitInt>()?.base10_parse::<usize>()?;
1024        let desc = input.eat(kw::desc) || !input.eat(kw::asc);
1025        let nulls_last = input.eat(kw::nulls_last) || !input.eat(kw::nulls_first);
1026        Ok(ColumnOrder {
1027            column,
1028            desc,
1029            nulls_last,
1030        })
1031    }
1032
1033    fn parse_literal_ok(input: ParseStream) -> Result {
1034        let mut row = Row::default();
1035        let mut packer = row.packer();
1036
1037        let typ = if input.eat(kw::null) {
1038            packer.push(Datum::Null);
1039            input.parse::<syn::Token![::]>()?;
1040            ReprColumnType {
1041                scalar_type: analyses::parse_scalar_type(input)?,
1042                nullable: true,
1043            }
1044        } else {
1045            match input.parse::<syn::Lit>()? {
1046                syn::Lit::Str(l) => {
1047                    packer.push(Datum::from(l.value().as_str()));
1048                    Ok(ReprColumnType::from(&String::as_column_type()))
1049                }
1050                syn::Lit::Int(l) => {
1051                    packer.push(Datum::from(l.base10_parse::<i64>()?));
1052                    Ok(ReprColumnType::from(&i64::as_column_type()))
1053                }
1054                syn::Lit::Float(l) => {
1055                    packer.push(Datum::from(l.base10_parse::<f64>()?));
1056                    Ok(ReprColumnType::from(&f64::as_column_type()))
1057                }
1058                syn::Lit::Bool(l) => {
1059                    packer.push(Datum::from(l.value));
1060                    Ok(ReprColumnType::from(&bool::as_column_type()))
1061                }
1062                _ => Err(Error::new(input.span(), "cannot parse literal")),
1063            }?
1064        };
1065
1066        Ok(MirScalarExpr::Literal(Ok(row), typ))
1067    }
1068    fn parse_literal_err(input: ParseStream) -> Result {
1069        input.parse::<kw::error>()?;
1070        let mut msg = {
1071            let content;
1072            syn::parenthesized!(content in input);
1073            content.parse::<syn::LitStr>()?.value()
1074        };
1075        let err = if msg.starts_with("internal error: ") {
1076            Ok(mz_expr::EvalError::Internal(msg.split_off(16).into()))
1077        } else {
1078            Err(Error::new(msg.span(), "expected `internal error: $msg`"))
1079        }?;
1080        Ok(MirScalarExpr::literal(Err(err), ReprScalarType::Bool))
1081    }
1082
1083    fn parse_literal_array(input: ParseStream) -> Result {
1084        use mz_expr::func::variadic::ArrayCreate;
1085
1086        let elem_type = SqlScalarType::Int64; // FIXME
1087        let func = VariadicFunc::ArrayCreate(ArrayCreate { elem_type });
1088        let exprs = input.parse_comma_sep(parse_literal_ok)?;
1089
1090        // Evaluate into a datum
1091        let temp_storage = RowArena::default();
1092        let datum = func.eval(&[], &temp_storage, &exprs).expect("datum");
1093        let typ = ReprScalarType::from(&SqlScalarType::Array(Box::new(SqlScalarType::Int64))); // FIXME
1094        Ok(MirScalarExpr::literal_ok(datum, typ))
1095    }
1096    fn parse_literal_list(input: ParseStream) -> Result {
1097        use mz_expr::func::variadic::ListCreate;
1098
1099        let elem_type = SqlScalarType::Int64; // FIXME
1100        let func = VariadicFunc::ListCreate(ListCreate { elem_type });
1101        let exprs = input.parse_comma_sep(parse_literal_ok)?;
1102
1103        // Evaluate into a datum
1104        let temp_storage = RowArena::default();
1105        let datum = func.eval(&[], &temp_storage, &exprs).expect("datum");
1106        let typ = ReprScalarType::from(&SqlScalarType::Array(Box::new(SqlScalarType::Int64))); // FIXME
1107        Ok(MirScalarExpr::literal_ok(datum, typ))
1108    }
1109    fn parse_array(input: ParseStream) -> Result {
1110        use mz_expr::func::variadic::ArrayCreate;
1111
1112        input.parse::<kw::array>()?;
1113
1114        // parse brackets
1115        let inner;
1116        syn::bracketed!(inner in input);
1117
1118        let elem_type = SqlScalarType::Int64; // FIXME
1119        let func = ArrayCreate { elem_type };
1120        let exprs = inner.parse_comma_sep(parse_expr)?;
1121
1122        Ok(MirScalarExpr::call_variadic(func, exprs))
1123    }
1124
1125    fn parse_list(input: ParseStream) -> Result {
1126        use mz_expr::func::variadic::ListCreate;
1127
1128        input.parse::<kw::list>()?;
1129
1130        // parse brackets
1131        let inner;
1132        syn::bracketed!(inner in input);
1133
1134        let elem_type = SqlScalarType::Int64; // FIXME
1135        let func = ListCreate { elem_type };
1136        let exprs = inner.parse_comma_sep(parse_expr)?;
1137
1138        Ok(MirScalarExpr::call_variadic(func, exprs))
1139    }
1140
1141    fn parse_apply(input: ParseStream) -> Result {
1142        let ident = input.parse::<syn::Ident>()?;
1143
1144        // parse parentheses
1145        let inner;
1146        syn::parenthesized!(inner in input);
1147
1148        let parse_nullary = |func: UnmaterializableFunc| -> Result {
1149            Ok(MirScalarExpr::CallUnmaterializable(func))
1150        };
1151        let parse_unary = |func: UnaryFunc| -> Result {
1152            let expr = Box::new(parse_expr(&inner)?);
1153            Ok(MirScalarExpr::CallUnary { func, expr })
1154        };
1155        let parse_binary = |func: BinaryFunc| -> Result {
1156            let expr1 = Box::new(parse_expr(&inner)?);
1157            inner.parse::<syn::Token![,]>()?;
1158            let expr2 = Box::new(parse_expr(&inner)?);
1159            Ok(MirScalarExpr::CallBinary { func, expr1, expr2 })
1160        };
1161        let parse_variadic = |func: VariadicFunc| -> Result {
1162            let exprs = inner.parse_comma_sep(parse_expr)?;
1163            Ok(MirScalarExpr::call_variadic(func, exprs))
1164        };
1165
1166        // Infix binary and variadic function calls are handled in `parse_scalar_expr`.
1167        //
1168        // Some restrictions apply with the current state of the code,
1169        // most notably one cannot handle overloaded function names because we don't want to do
1170        // name resolution in the parser.
1171        match ident.to_string().to_lowercase().as_str() {
1172            // Supported unmaterializable (a.k.a. nullary) functions:
1173            "mz_environment_id" => parse_nullary(UnmaterializableFunc::MzEnvironmentId),
1174            // Supported unary functions:
1175            "abs" => parse_unary(func::AbsInt64.into()),
1176            "not" => parse_unary(func::Not.into()),
1177            // Supported binary functions:
1178            "ltrim" => parse_binary(func::TrimLeading.into()),
1179            // Supported variadic functions:
1180            "greatest" => parse_variadic(VariadicFunc::Greatest(func::variadic::Greatest)),
1181            "coalesce" => parse_variadic(VariadicFunc::Coalesce(func::variadic::Coalesce)),
1182            _ => Err(Error::new(ident.span(), "unsupported function name")),
1183        }
1184    }
1185
1186    pub fn parse_join_equivalences(input: ParseStream) -> syn::Result<Vec<Vec<MirScalarExpr>>> {
1187        let mut equivalences = vec![];
1188        while !input.is_empty() {
1189            let mut equivalence = vec![];
1190            loop {
1191                let mut worklist = vec![parse_operand(input)?];
1192                while let Some(operand) = worklist.pop() {
1193                    // Be more lenient and support parenthesized equivalences,
1194                    // e.g. `... AND (x = u + v = z + 1) AND ...`.
1195                    if let MirScalarExpr::CallBinary {
1196                        func: BinaryFunc::Eq(_),
1197                        expr1,
1198                        expr2,
1199                    } = operand
1200                    {
1201                        // We reverse the order in the worklist in order to get
1202                        // the correct order in the equivalence class.
1203                        worklist.push(*expr2);
1204                        worklist.push(*expr1);
1205                    } else {
1206                        equivalence.push(operand);
1207                    }
1208                }
1209                if !input.eat(syn::Token![=]) {
1210                    break;
1211                }
1212            }
1213            equivalences.push(equivalence);
1214            input.eat(kw::AND);
1215        }
1216        Ok(equivalences)
1217    }
1218}
1219
1220/// Support for parsing [mz_expr::AggregateExpr].
1221mod aggregate {
1222    use mz_expr::{AggregateExpr, MirScalarExpr};
1223
1224    use super::*;
1225
1226    type Result = syn::Result<AggregateExpr>;
1227
1228    pub fn parse_expr(input: ParseStream) -> Result {
1229        use mz_expr::AggregateFunc::*;
1230
1231        // Some restrictions apply with the current state of the code,
1232        // most notably one cannot handle overloaded function names because we don't want to do
1233        // name resolution in the parser.
1234        let ident = input.parse::<syn::Ident>()?;
1235        let func = match ident.to_string().to_lowercase().as_str() {
1236            "count" => Count,
1237            "any" => Any,
1238            "all" => All,
1239            "max" => MaxInt64,
1240            "min" => MinInt64,
1241            "sum" => SumInt64,
1242            _ => Err(Error::new(ident.span(), "unsupported function name"))?,
1243        };
1244
1245        // parse parentheses
1246        let inner;
1247        syn::parenthesized!(inner in input);
1248
1249        if func == Count && inner.eat(syn::Token![*]) {
1250            Ok(AggregateExpr {
1251                func,
1252                expr: MirScalarExpr::literal_true(),
1253                distinct: false, // TODO: fix explain output
1254            })
1255        } else {
1256            let distinct = inner.eat(kw::distinct);
1257            let expr = scalar::parse_expr(&inner)?;
1258            Ok(AggregateExpr {
1259                func,
1260                expr,
1261                distinct,
1262            })
1263        }
1264    }
1265}
1266
1267/// Support for parsing [mz_repr::Row].
1268mod row {
1269    use mz_repr::{Datum, Row, RowPacker};
1270
1271    use super::*;
1272
1273    impl Parse for Parsed<Row> {
1274        fn parse(input: ParseStream) -> syn::Result<Self> {
1275            let mut row = Row::default();
1276            let mut packer = ParseRow::new(&mut row);
1277
1278            loop {
1279                if input.is_empty() {
1280                    break;
1281                }
1282                packer.parse_datum(input)?;
1283                if input.is_empty() {
1284                    break;
1285                }
1286                input.parse::<syn::Token![,]>()?;
1287            }
1288
1289            Ok(Parsed(row))
1290        }
1291    }
1292
1293    impl From<Parsed<Row>> for Row {
1294        fn from(parsed: Parsed<Row>) -> Self {
1295            parsed.0
1296        }
1297    }
1298
1299    struct ParseRow<'a>(RowPacker<'a>);
1300
1301    impl<'a> ParseRow<'a> {
1302        fn new(row: &'a mut Row) -> Self {
1303            Self(row.packer())
1304        }
1305
1306        fn parse_datum(&mut self, input: ParseStream) -> syn::Result<()> {
1307            if input.eat(kw::null) {
1308                self.0.push(Datum::Null)
1309            } else {
1310                match input.parse::<syn::Lit>()? {
1311                    syn::Lit::Str(l) => self.0.push(Datum::from(l.value().as_str())),
1312                    syn::Lit::Int(l) => self.0.push(Datum::from(l.base10_parse::<i64>()?)),
1313                    syn::Lit::Float(l) => self.0.push(Datum::from(l.base10_parse::<f64>()?)),
1314                    syn::Lit::Bool(l) => self.0.push(Datum::from(l.value)),
1315                    _ => Err(Error::new(input.span(), "cannot parse literal"))?,
1316                }
1317            }
1318            Ok(())
1319        }
1320    }
1321}
1322
1323mod analyses {
1324    use mz_repr::{ReprColumnType, ReprScalarType};
1325
1326    use super::*;
1327
1328    #[derive(Default)]
1329    pub struct Analyses {
1330        pub types: Option<Vec<ReprColumnType>>,
1331        pub keys: Option<Vec<Vec<usize>>>,
1332    }
1333
1334    pub fn parse_analyses(input: ParseStream) -> syn::Result<Analyses> {
1335        let mut analyses = Analyses::default();
1336
1337        // Analyses are optional, appearing after a `//` at the end of the
1338        // line. However, since the syn lexer eats comments, we assume that `//`
1339        // was replaced with `::` upfront.
1340        if input.eat(syn::Token![::]) {
1341            let inner;
1342            syn::braced!(inner in input);
1343
1344            let (start, end) = (inner.span().start(), inner.span().end());
1345            if start.line != end.line {
1346                let msg = "analyses should not span more than one line".to_string();
1347                Err(Error::new(inner.span(), msg))?
1348            }
1349
1350            while inner.peek(syn::Ident) {
1351                let ident = inner.parse::<syn::Ident>()?.to_string();
1352                match ident.as_str() {
1353                    "types" => {
1354                        inner.parse::<syn::Token![:]>()?;
1355                        let value = inner.parse::<syn::LitStr>()?.value();
1356                        analyses.types = Some(parse_types.parse_str(&value)?);
1357                    }
1358                    // TODO: support keys
1359                    key => {
1360                        let msg = format!("unexpected analysis type `{}`", key);
1361                        Err(Error::new(inner.span(), msg))?;
1362                    }
1363                }
1364            }
1365        }
1366        Ok(analyses)
1367    }
1368
1369    fn parse_types(input: ParseStream) -> syn::Result<Vec<ReprColumnType>> {
1370        let inner;
1371        syn::parenthesized!(inner in input);
1372        inner.parse_comma_sep(parse_column_type)
1373    }
1374
1375    pub fn parse_column_type(input: ParseStream) -> syn::Result<ReprColumnType> {
1376        let scalar_type = parse_scalar_type(input)?;
1377        Ok(ReprColumnType {
1378            scalar_type,
1379            nullable: input.eat(syn::Token![?]),
1380        })
1381    }
1382
1383    pub fn parse_scalar_type(input: ParseStream) -> syn::Result<ReprScalarType> {
1384        let lookahead = input.lookahead1();
1385
1386        let scalar_type = if input.look_and_eat(bigint, &lookahead) {
1387            ReprScalarType::Int64
1388        } else if input.look_and_eat(double, &lookahead) {
1389            input.parse::<precision>()?;
1390            ReprScalarType::Float64
1391        } else if input.look_and_eat(boolean, &lookahead) {
1392            ReprScalarType::Bool
1393        } else if input.look_and_eat(character, &lookahead) {
1394            input.parse::<varying>()?;
1395            ReprScalarType::String
1396        } else if input.look_and_eat(integer, &lookahead) {
1397            ReprScalarType::Int32
1398        } else if input.look_and_eat(smallint, &lookahead) {
1399            ReprScalarType::Int16
1400        } else if input.look_and_eat(text, &lookahead) {
1401            ReprScalarType::String
1402        } else {
1403            Err(lookahead.error())?
1404        };
1405
1406        Ok(scalar_type)
1407    }
1408
1409    syn::custom_keyword!(bigint);
1410    syn::custom_keyword!(boolean);
1411    syn::custom_keyword!(character);
1412    syn::custom_keyword!(double);
1413    syn::custom_keyword!(integer);
1414    syn::custom_keyword!(precision);
1415    syn::custom_keyword!(smallint);
1416    syn::custom_keyword!(text);
1417    syn::custom_keyword!(varying);
1418}
1419
1420pub enum Def {
1421    Source {
1422        name: String,
1423        cols: Vec<String>,
1424        typ: mz_repr::SqlRelationType,
1425    },
1426}
1427
1428mod def {
1429    use mz_repr::{SqlColumnType, SqlRelationType};
1430
1431    use super::*;
1432
1433    pub fn parse_def(ctx: CtxRef, input: ParseStream) -> syn::Result<Def> {
1434        parse_def_source(ctx, input) // only one variant for now
1435    }
1436
1437    fn parse_def_source(ctx: CtxRef, input: ParseStream) -> syn::Result<Def> {
1438        let reduce = input.parse::<def::DefSource>()?;
1439
1440        let name = {
1441            input.parse::<def::name>()?;
1442            input.parse::<syn::Token![=]>()?;
1443            input.parse::<syn::Ident>()?.to_string()
1444        };
1445
1446        let keys = if input.eat(kw::keys) {
1447            input.parse::<syn::Token![=]>()?;
1448            let inner;
1449            syn::bracketed!(inner in input);
1450            inner.parse_comma_sep(|input| {
1451                let inner;
1452                syn::bracketed!(inner in input);
1453                inner.parse_comma_sep(scalar::parse_column_index)
1454            })?
1455        } else {
1456            vec![]
1457        };
1458
1459        let parse_inputs = ParseChildren::new(input, reduce.span().start());
1460        let (cols, column_types) = {
1461            let source_columns = parse_inputs.parse_many(ctx, parse_def_source_column)?;
1462            let mut column_names = vec![];
1463            let mut column_types = vec![];
1464            for (column_name, column_type) in source_columns {
1465                column_names.push(column_name);
1466                column_types.push(column_type);
1467            }
1468            (column_names, column_types)
1469        };
1470
1471        let typ = SqlRelationType { column_types, keys };
1472
1473        Ok(Def::Source { name, cols, typ })
1474    }
1475
1476    fn parse_def_source_column(
1477        _ctx: CtxRef,
1478        input: ParseStream,
1479    ) -> syn::Result<(String, SqlColumnType)> {
1480        input.parse::<syn::Token![-]>()?;
1481        let column_name = input.parse::<syn::Ident>()?.to_string();
1482        input.parse::<syn::Token![:]>()?;
1483        let column_type = SqlColumnType::from_repr(&analyses::parse_column_type(input)?);
1484        Ok((column_name, column_type))
1485    }
1486
1487    syn::custom_keyword!(DefSource);
1488    syn::custom_keyword!(name);
1489}
1490
1491/// Help utilities used by sibling modules.
1492mod util {
1493    use syn::parse::{Lookahead1, ParseBuffer, Peek};
1494
1495    use super::*;
1496
1497    /// Extension methods for [`syn::parse::ParseBuffer`].
1498    pub trait ParseBufferExt<'a> {
1499        fn look_and_eat<T: Eat>(&self, token: T, lookahead: &Lookahead1<'a>) -> bool;
1500
1501        /// Consumes a token `T` if present.
1502        fn eat<T: Eat>(&self, t: T) -> bool;
1503
1504        /// Consumes two tokens `T1 T2` if present in that order.
1505        fn eat2<T1: Eat, T2: Eat>(&self, t1: T1, t2: T2) -> bool;
1506
1507        /// Consumes three tokens `T1 T2 T3` if present in that order.
1508        fn eat3<T1: Eat, T2: Eat, T3: Eat>(&self, t1: T1, t2: T2, t3: T3) -> bool;
1509
1510        // Parse a comma-separated list of items into a vector.
1511        fn parse_comma_sep<T>(&self, p: fn(ParseStream) -> syn::Result<T>) -> syn::Result<Vec<T>>;
1512    }
1513
1514    impl<'a> ParseBufferExt<'a> for ParseBuffer<'a> {
1515        /// Consumes a token `T` if present, looking it up using the provided
1516        /// [`Lookahead1`] instance.
1517        fn look_and_eat<T: Eat>(&self, token: T, lookahead: &Lookahead1<'a>) -> bool {
1518            if lookahead.peek(token) {
1519                self.parse::<T::Token>().unwrap();
1520                true
1521            } else {
1522                false
1523            }
1524        }
1525
1526        fn eat<T: Eat>(&self, t: T) -> bool {
1527            if self.peek(t) {
1528                self.parse::<T::Token>().unwrap();
1529                true
1530            } else {
1531                false
1532            }
1533        }
1534
1535        fn eat2<T1: Eat, T2: Eat>(&self, t1: T1, t2: T2) -> bool {
1536            if self.peek(t1) && self.peek2(t2) {
1537                self.parse::<T1::Token>().unwrap();
1538                self.parse::<T2::Token>().unwrap();
1539                true
1540            } else {
1541                false
1542            }
1543        }
1544
1545        fn eat3<T1: Eat, T2: Eat, T3: Eat>(&self, t1: T1, t2: T2, t3: T3) -> bool {
1546            if self.peek(t1) && self.peek2(t2) && self.peek3(t3) {
1547                self.parse::<T1::Token>().unwrap();
1548                self.parse::<T2::Token>().unwrap();
1549                self.parse::<T3::Token>().unwrap();
1550                true
1551            } else {
1552                false
1553            }
1554        }
1555
1556        fn parse_comma_sep<T>(&self, p: fn(ParseStream) -> syn::Result<T>) -> syn::Result<Vec<T>> {
1557            Ok(self
1558                .parse_terminated(p, syn::Token![,])?
1559                .into_iter()
1560                .collect::<Vec<_>>())
1561        }
1562    }
1563
1564    // Helper trait for types that can be eaten.
1565    //
1566    // Implementing types must also implement [`Peek`], and the associated
1567    // [`Peek::Token`] type should implement [`Parse`]). For some reason the
1568    // latter bound is not present in [`Peek`] even if it makes a lot of sense,
1569    // which is why we need this helper.
1570    pub trait Eat: Peek<Token = Self::_Token> {
1571        type _Token: Parse;
1572    }
1573
1574    impl<T> Eat for T
1575    where
1576        T: Peek,
1577        T::Token: Parse,
1578    {
1579        type _Token = T::Token;
1580    }
1581
1582    pub struct Ctx<'a> {
1583        pub catalog: &'a TestCatalog,
1584    }
1585
1586    pub type CtxRef<'a> = &'a Ctx<'a>;
1587
1588    /// Newtype for external types that need to implement [Parse].
1589    pub struct Parsed<T>(pub T);
1590
1591    /// Provides facilities for parsing
1592    pub struct ParseChildren<'a> {
1593        stream: ParseStream<'a>,
1594        parent: LineColumn,
1595    }
1596
1597    impl<'a> ParseChildren<'a> {
1598        pub fn new(stream: ParseStream<'a>, parent: LineColumn) -> Self {
1599            Self { stream, parent }
1600        }
1601
1602        pub fn parse_one<C, T>(
1603            &self,
1604            ctx: C,
1605            function: fn(C, ParseStream) -> syn::Result<T>,
1606        ) -> syn::Result<T> {
1607            match self.maybe_child() {
1608                Ok(_) => function(ctx, self.stream),
1609                Err(e) => Err(e),
1610            }
1611        }
1612
1613        pub fn parse_many<C: Copy, T>(
1614            &self,
1615            ctx: C,
1616            function: fn(C, ParseStream) -> syn::Result<T>,
1617        ) -> syn::Result<Vec<T>> {
1618            let mut inputs = vec![self.parse_one(ctx, function)?];
1619            while self.maybe_child().is_ok() {
1620                inputs.push(function(ctx, self.stream)?);
1621            }
1622            Ok(inputs)
1623        }
1624
1625        fn maybe_child(&self) -> syn::Result<()> {
1626            let start = self.stream.span().start();
1627            if start.line <= self.parent.line {
1628                let msg = format!("child expected at line > {}", self.parent.line);
1629                Err(Error::new(self.stream.span(), msg))?
1630            }
1631            if start.column != self.parent.column + 2 {
1632                let msg = format!("child expected at column {}", self.parent.column + 2);
1633                Err(Error::new(self.stream.span(), msg))?
1634            }
1635            Ok(())
1636        }
1637    }
1638}
1639
1640/// Custom keywords used while parsing.
1641mod kw {
1642    syn::custom_keyword!(aggregates);
1643    syn::custom_keyword!(AND);
1644    syn::custom_keyword!(ArrangeBy);
1645    syn::custom_keyword!(array);
1646    syn::custom_keyword!(asc);
1647    // case when ... then ... else ... end
1648    syn::custom_keyword!(case);
1649    syn::custom_keyword!(coalesce);
1650    syn::custom_keyword!(Constant);
1651    syn::custom_keyword!(CrossJoin);
1652    syn::custom_keyword!(cte);
1653    syn::custom_keyword!(desc);
1654    syn::custom_keyword!(distinct);
1655    syn::custom_keyword!(Distinct);
1656    syn::custom_keyword!(empty);
1657    syn::custom_keyword!(end);
1658    syn::custom_keyword!(eq);
1659    syn::custom_keyword!(error);
1660    syn::custom_keyword!(exp_group_size);
1661    syn::custom_keyword!(FALSE);
1662    syn::custom_keyword!(Filter);
1663    syn::custom_keyword!(FlatMap);
1664    syn::custom_keyword!(Get);
1665    syn::custom_keyword!(group_by);
1666    syn::custom_keyword!(IS);
1667    syn::custom_keyword!(Join);
1668    syn::custom_keyword!(keys);
1669    syn::custom_keyword!(limit);
1670    syn::custom_keyword!(list);
1671    syn::custom_keyword!(Map);
1672    syn::custom_keyword!(monotonic);
1673    syn::custom_keyword!(Mutually);
1674    syn::custom_keyword!(Negate);
1675    syn::custom_keyword!(NOT);
1676    syn::custom_keyword!(null);
1677    syn::custom_keyword!(NULL);
1678    syn::custom_keyword!(nulls_first);
1679    syn::custom_keyword!(nulls_last);
1680    syn::custom_keyword!(offset);
1681    syn::custom_keyword!(on);
1682    syn::custom_keyword!(OR);
1683    syn::custom_keyword!(order_by);
1684    syn::custom_keyword!(project);
1685    syn::custom_keyword!(Project);
1686    syn::custom_keyword!(Recursive);
1687    syn::custom_keyword!(Reduce);
1688    syn::custom_keyword!(Return);
1689    syn::custom_keyword!(then);
1690    syn::custom_keyword!(Threshold);
1691    syn::custom_keyword!(TopK);
1692    syn::custom_keyword!(TRUE);
1693    syn::custom_keyword!(Union);
1694    syn::custom_keyword!(when);
1695    syn::custom_keyword!(With);
1696    syn::custom_keyword!(x);
1697}