Skip to main content

mz_expr/relation/
func.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10#![allow(missing_docs)]
11
12use std::cmp::{max, min};
13use std::iter::Sum;
14use std::ops::Deref;
15use std::str::FromStr;
16use std::{fmt, iter};
17
18use chrono::{DateTime, NaiveDateTime, NaiveTime, Utc};
19use dec::OrderedDecimal;
20use itertools::{Either, Itertools};
21use mz_lowertest::MzReflect;
22use mz_ore::cast::CastFrom;
23
24use mz_ore::str::separated;
25use mz_ore::{soft_assert_eq_no_log, soft_assert_or_log, soft_panic_or_log};
26use mz_repr::adt::array::ArrayDimension;
27use mz_repr::adt::date::Date;
28use mz_repr::adt::interval::Interval;
29use mz_repr::adt::numeric::{self, Numeric, NumericMaxScale};
30use mz_repr::adt::regex::{Regex as ReprRegex, RegexCompilationError};
31use mz_repr::adt::timestamp::{CheckedTimestamp, TimestampLike};
32use mz_repr::{
33    ColumnName, Datum, Diff, ReprColumnType, ReprRelationType, Row, RowArena, RowPacker, SharedRow,
34    SqlColumnType, SqlRelationType, SqlScalarType, datum_size,
35};
36use num::{CheckedAdd, Integer, Signed, ToPrimitive};
37use ordered_float::OrderedFloat;
38use regex::Regex;
39use serde::{Deserialize, Serialize};
40use smallvec::SmallVec;
41
42use crate::EvalError;
43use crate::WindowFrameBound::{
44    CurrentRow, OffsetFollowing, OffsetPreceding, UnboundedFollowing, UnboundedPreceding,
45};
46use crate::WindowFrameUnits::{Groups, Range, Rows};
47use crate::explain::{HumanizedExpr, HumanizerMode};
48use crate::relation::{
49    ColumnOrder, WindowFrame, WindowFrameBound, WindowFrameUnits, compare_columns,
50};
51use crate::scalar::func::{add_timestamp_months, jsonb_stringify};
52
53// TODO(jamii) be careful about overflow in sum/avg
54// see https://timely.zulipchat.com/#narrow/stream/186635-engineering/topic/additional.20work/near/163507435
55
56fn max_string<'a, I>(datums: I) -> Datum<'a>
57where
58    I: IntoIterator<Item = Datum<'a>>,
59{
60    match datums
61        .into_iter()
62        .filter(|d| !d.is_null())
63        .max_by(|a, b| a.unwrap_str().cmp(b.unwrap_str()))
64    {
65        Some(datum) => datum,
66        None => Datum::Null,
67    }
68}
69
70fn max_datum<'a, I, DatumType>(datums: I) -> Datum<'a>
71where
72    I: IntoIterator<Item = Datum<'a>>,
73    DatumType: TryFrom<Datum<'a>> + Ord,
74    <DatumType as TryFrom<Datum<'a>>>::Error: std::fmt::Debug,
75    Datum<'a>: From<Option<DatumType>>,
76{
77    let x: Option<DatumType> = datums
78        .into_iter()
79        .filter(|d| !d.is_null())
80        .map(|d| DatumType::try_from(d).expect("unexpected type"))
81        .max();
82
83    x.into()
84}
85
86fn min_datum<'a, I, DatumType>(datums: I) -> Datum<'a>
87where
88    I: IntoIterator<Item = Datum<'a>>,
89    DatumType: TryFrom<Datum<'a>> + Ord,
90    <DatumType as TryFrom<Datum<'a>>>::Error: std::fmt::Debug,
91    Datum<'a>: From<Option<DatumType>>,
92{
93    let x: Option<DatumType> = datums
94        .into_iter()
95        .filter(|d| !d.is_null())
96        .map(|d| DatumType::try_from(d).expect("unexpected type"))
97        .min();
98
99    x.into()
100}
101
102fn min_string<'a, I>(datums: I) -> Datum<'a>
103where
104    I: IntoIterator<Item = Datum<'a>>,
105{
106    match datums
107        .into_iter()
108        .filter(|d| !d.is_null())
109        .min_by(|a, b| a.unwrap_str().cmp(b.unwrap_str()))
110    {
111        Some(datum) => datum,
112        None => Datum::Null,
113    }
114}
115
116fn sum_datum<'a, I, DatumType, ResultType>(datums: I) -> Datum<'a>
117where
118    I: IntoIterator<Item = Datum<'a>>,
119    DatumType: TryFrom<Datum<'a>>,
120    <DatumType as TryFrom<Datum<'a>>>::Error: std::fmt::Debug,
121    ResultType: From<DatumType> + Sum + Into<Datum<'a>>,
122{
123    let mut datums = datums.into_iter().filter(|d| !d.is_null()).peekable();
124    if datums.peek().is_none() {
125        Datum::Null
126    } else {
127        let x = datums
128            .map(|d| ResultType::from(DatumType::try_from(d).expect("unexpected type")))
129            .sum::<ResultType>();
130        x.into()
131    }
132}
133
134fn sum_numeric<'a, I>(datums: I) -> Datum<'a>
135where
136    I: IntoIterator<Item = Datum<'a>>,
137{
138    let mut cx = numeric::cx_datum();
139    let mut sum = Numeric::zero();
140    let mut empty = true;
141    for d in datums {
142        if !d.is_null() {
143            empty = false;
144            cx.add(&mut sum, &d.unwrap_numeric().0);
145        }
146    }
147    match empty {
148        true => Datum::Null,
149        false => Datum::from(sum),
150    }
151}
152
153// TODO(benesch): remove potentially dangerous usage of `as`.
154#[allow(clippy::as_conversions)]
155fn count<'a, I>(datums: I) -> Datum<'a>
156where
157    I: IntoIterator<Item = Datum<'a>>,
158{
159    // TODO(jkosh44) This should error when the count can't fit inside of an `i64` instead of returning a negative result.
160    let x: i64 = datums.into_iter().filter(|d| !d.is_null()).count() as i64;
161    Datum::from(x)
162}
163
164fn any<'a, I>(datums: I) -> Datum<'a>
165where
166    I: IntoIterator<Item = Datum<'a>>,
167{
168    datums
169        .into_iter()
170        .fold(Datum::False, |state, next| match (state, next) {
171            (Datum::True, _) | (_, Datum::True) => Datum::True,
172            (Datum::Null, _) | (_, Datum::Null) => Datum::Null,
173            _ => Datum::False,
174        })
175}
176
177fn all<'a, I>(datums: I) -> Datum<'a>
178where
179    I: IntoIterator<Item = Datum<'a>>,
180{
181    datums
182        .into_iter()
183        .fold(Datum::True, |state, next| match (state, next) {
184            (Datum::False, _) | (_, Datum::False) => Datum::False,
185            (Datum::Null, _) | (_, Datum::Null) => Datum::Null,
186            _ => Datum::True,
187        })
188}
189
190fn string_agg<'a, I>(datums: I, temp_storage: &'a RowArena, order_by: &[ColumnOrder]) -> Datum<'a>
191where
192    I: IntoIterator<Item = Datum<'a>>,
193{
194    const EMPTY_SEP: &str = "";
195
196    let datums = order_aggregate_datums(datums, order_by);
197    let mut sep_value_pairs = datums.into_iter().filter_map(|d| {
198        if d.is_null() {
199            return None;
200        }
201        let mut value_sep = d.unwrap_list().iter();
202        match (value_sep.next().unwrap(), value_sep.next().unwrap()) {
203            (Datum::Null, _) => None,
204            (Datum::String(val), Datum::Null) => Some((EMPTY_SEP, val)),
205            (Datum::String(val), Datum::String(sep)) => Some((sep, val)),
206            _ => unreachable!(),
207        }
208    });
209
210    let mut s = String::default();
211    match sep_value_pairs.next() {
212        // First value not prefixed by its separator
213        Some((_, value)) => s.push_str(value),
214        // If no non-null values sent, return NULL.
215        None => return Datum::Null,
216    }
217
218    for (sep, value) in sep_value_pairs {
219        s.push_str(sep);
220        s.push_str(value);
221    }
222
223    Datum::String(temp_storage.push_string(s))
224}
225
226fn jsonb_agg<'a, I>(datums: I, temp_storage: &'a RowArena, order_by: &[ColumnOrder]) -> Datum<'a>
227where
228    I: IntoIterator<Item = Datum<'a>>,
229{
230    let datums = order_aggregate_datums(datums, order_by);
231    temp_storage.make_datum(|packer| {
232        packer.push_list(datums.into_iter().filter(|d| !d.is_null()));
233    })
234}
235
236fn dict_agg<'a, I>(datums: I, temp_storage: &'a RowArena, order_by: &[ColumnOrder]) -> Datum<'a>
237where
238    I: IntoIterator<Item = Datum<'a>>,
239{
240    let datums = order_aggregate_datums(datums, order_by);
241    temp_storage.make_datum(|packer| {
242        let mut datums: Vec<_> = datums
243            .into_iter()
244            .filter_map(|d| {
245                if d.is_null() {
246                    return None;
247                }
248                let mut list = d.unwrap_list().iter();
249                let key = list.next().unwrap();
250                let val = list.next().unwrap();
251                if key.is_null() {
252                    // TODO(benesch): this should produce an error, but
253                    // aggregate functions cannot presently produce errors.
254                    None
255                } else {
256                    Some((key.unwrap_str(), val))
257                }
258            })
259            .collect();
260        // datums are ordered by any ORDER BY clause now, and we want to preserve
261        // the last entry for each key, but we also need to present unique and sorted
262        // keys to push_dict. Use sort_by here, which is stable, and so will preserve
263        // the ORDER BY order. Then reverse and dedup to retain the last of each
264        // key. Reverse again so we're back in push_dict order.
265        datums.sort_by_key(|(k, _v)| *k);
266        datums.reverse();
267        datums.dedup_by_key(|(k, _v)| *k);
268        datums.reverse();
269        packer.push_dict(datums);
270    })
271}
272
273/// Assuming datums is a List, sort them by the 2nd through Nth elements
274/// corresponding to order_by, then return the 1st element.
275///
276/// Near the usages of this function, we sometimes want to produce Datums with a shorter lifetime
277/// than 'a. We have to actually perform the shortening of the lifetime here, inside this function,
278/// because if we were to simply return `impl Iterator<Item = Datum<'a>>`, that wouldn't be
279/// covariant in the item type, because opaque types are always invariant. (Contrast this with how
280/// we perform the shortening _inside_ this function: the input of the `map` is known to
281/// specifically be `std::vec::IntoIter`, which is known to be covariant.)
282pub fn order_aggregate_datums<'a: 'b, 'b, I>(
283    datums: I,
284    order_by: &[ColumnOrder],
285) -> impl Iterator<Item = Datum<'b>>
286where
287    I: IntoIterator<Item = Datum<'a>>,
288{
289    order_aggregate_datums_with_rank_inner(datums, order_by)
290        .into_iter()
291        // (`payload` is coerced here to `Datum<'b>` in the argument of the closure)
292        .map(|(payload, _order_datums)| payload)
293}
294
295/// Assuming datums is a List, sort them by the 2nd through Nth elements
296/// corresponding to order_by, then return the 1st element and computed order by expression.
297fn order_aggregate_datums_with_rank<'a, I>(
298    datums: I,
299    order_by: &[ColumnOrder],
300) -> impl Iterator<Item = (Datum<'a>, Row)>
301where
302    I: IntoIterator<Item = Datum<'a>>,
303{
304    order_aggregate_datums_with_rank_inner(datums, order_by)
305        .into_iter()
306        .map(|(payload, order_by_datums)| (payload, Row::pack(order_by_datums)))
307}
308
309fn order_aggregate_datums_with_rank_inner<'a, I>(
310    datums: I,
311    order_by: &[ColumnOrder],
312) -> Vec<(Datum<'a>, Vec<Datum<'a>>)>
313where
314    I: IntoIterator<Item = Datum<'a>>,
315{
316    let mut decoded: Vec<(Datum, Vec<Datum>)> = datums
317        .into_iter()
318        .map(|d| {
319            let list = d.unwrap_list();
320            let mut list_it = list.iter();
321            let payload = list_it.next().unwrap();
322
323            // We decode the order_by Datums here instead of the comparison function, because the
324            // comparison function is expected to be called `O(log n)` times on each input row.
325            // The only downside is that the decoded data might be bigger, but I think that's fine,
326            // because:
327            // - if we have a window partition so big that this would create a memory problem, then
328            //   the non-incrementalness of window functions will create a serious CPU problem
329            //   anyway,
330            // - and anyhow various other parts of the window function code already do decoding
331            //   upfront.
332            let mut order_by_datums = Vec::with_capacity(order_by.len());
333            for _ in 0..order_by.len() {
334                order_by_datums.push(
335                    list_it
336                        .next()
337                        .expect("must have exactly the same number of Datums as `order_by`"),
338                );
339            }
340
341            (payload, order_by_datums)
342        })
343        .collect();
344
345    let mut sort_by =
346        |(payload_left, left_order_by_datums): &(Datum, Vec<Datum>),
347         (payload_right, right_order_by_datums): &(Datum, Vec<Datum>)| {
348            compare_columns(
349                order_by,
350                left_order_by_datums,
351                right_order_by_datums,
352                || payload_left.cmp(payload_right),
353            )
354        };
355    // `sort_unstable_by` can be faster and uses less memory than `sort_by`. An unstable sort is
356    // enough here, because if two elements are equal in our `compare` function, then the elements
357    // are actually binary-equal (because of the `tiebreaker` given to `compare_columns`), so it
358    // doesn't matter what order they end up in.
359    decoded.sort_unstable_by(&mut sort_by);
360    decoded
361}
362
363fn array_concat<'a, I>(datums: I, temp_storage: &'a RowArena, order_by: &[ColumnOrder]) -> Datum<'a>
364where
365    I: IntoIterator<Item = Datum<'a>>,
366{
367    let datums = order_aggregate_datums(datums, order_by);
368    let datums: Vec<_> = datums
369        .into_iter()
370        .map(|d| d.unwrap_array().elements().iter())
371        .flatten()
372        .collect();
373    let dims = ArrayDimension {
374        lower_bound: 1,
375        length: datums.len(),
376    };
377    temp_storage.make_datum(|packer| {
378        packer.try_push_array(&[dims], datums).unwrap();
379    })
380}
381
382fn list_concat<'a, I>(datums: I, temp_storage: &'a RowArena, order_by: &[ColumnOrder]) -> Datum<'a>
383where
384    I: IntoIterator<Item = Datum<'a>>,
385{
386    let datums = order_aggregate_datums(datums, order_by);
387    temp_storage.make_datum(|packer| {
388        packer.push_list(datums.into_iter().map(|d| d.unwrap_list().iter()).flatten());
389    })
390}
391
392/// The expected input is in the format of `[((OriginalRow, [EncodedArgs]), OrderByExprs...)]`
393/// The output is in the format of `[result_value, original_row]`.
394/// See an example at `lag_lead`, where the input-output formats are similar.
395fn row_number<'a, I>(
396    datums: I,
397    callers_temp_storage: &'a RowArena,
398    order_by: &[ColumnOrder],
399) -> Datum<'a>
400where
401    I: IntoIterator<Item = Datum<'a>>,
402{
403    // We want to use our own temp_storage here, to avoid flooding `callers_temp_storage` with a
404    // large number of new datums. This is because we don't want to make an assumption about
405    // whether the caller creates a new temp_storage between window partitions.
406    let temp_storage = RowArena::new();
407    let datums = row_number_no_list(datums, &temp_storage, order_by);
408
409    callers_temp_storage.make_datum(|packer| {
410        packer.push_list(datums);
411    })
412}
413
414/// Like `row_number`, but doesn't perform the final wrapping in a list, returning an Iterator
415/// instead.
416fn row_number_no_list<'a: 'b, 'b, I>(
417    datums: I,
418    callers_temp_storage: &'b RowArena,
419    order_by: &[ColumnOrder],
420) -> impl Iterator<Item = Datum<'b>>
421where
422    I: IntoIterator<Item = Datum<'a>>,
423{
424    let datums = order_aggregate_datums(datums, order_by);
425
426    callers_temp_storage.reserve(datums.size_hint().0);
427    #[allow(clippy::disallowed_methods)]
428    datums
429        .into_iter()
430        .map(|d| d.unwrap_list().iter())
431        .flatten()
432        .zip(1i64..)
433        .map(|(d, i)| {
434            callers_temp_storage.make_datum(|packer| {
435                packer.push_list_with(|packer| {
436                    packer.push(Datum::Int64(i));
437                    packer.push(d);
438                });
439            })
440        })
441}
442
443/// The expected input is in the format of `[((OriginalRow, [EncodedArgs]), OrderByExprs...)]`
444/// The output is in the format of `[result_value, original_row]`.
445/// See an example at `lag_lead`, where the input-output formats are similar.
446fn rank<'a, I>(datums: I, callers_temp_storage: &'a RowArena, order_by: &[ColumnOrder]) -> Datum<'a>
447where
448    I: IntoIterator<Item = Datum<'a>>,
449{
450    let temp_storage = RowArena::new();
451    let datums = rank_no_list(datums, &temp_storage, order_by);
452
453    callers_temp_storage.make_datum(|packer| {
454        packer.push_list(datums);
455    })
456}
457
458/// Like `rank`, but doesn't perform the final wrapping in a list, returning an Iterator
459/// instead.
460fn rank_no_list<'a: 'b, 'b, I>(
461    datums: I,
462    callers_temp_storage: &'b RowArena,
463    order_by: &[ColumnOrder],
464) -> impl Iterator<Item = Datum<'b>>
465where
466    I: IntoIterator<Item = Datum<'a>>,
467{
468    // Keep the row used for ordering around, as it is used to determine the rank
469    let datums = order_aggregate_datums_with_rank(datums, order_by);
470
471    let mut datums = datums
472        .into_iter()
473        .map(|(d0, order_row)| {
474            d0.unwrap_list()
475                .iter()
476                .map(move |d1| (d1, order_row.clone()))
477        })
478        .flatten();
479
480    callers_temp_storage.reserve(datums.size_hint().0);
481    datums
482        .next()
483        .map_or(vec![], |(first_datum, first_order_row)| {
484            // Folding with (last order_by row, last assigned rank,
485            // row number, output vec)
486            datums.fold(
487                (first_order_row, 1, 1, vec![(first_datum, 1)]),
488                |mut acc, (next_datum, next_order_row)| {
489                let (ref mut acc_row, ref mut acc_rank, ref mut acc_row_num, ref mut output) = acc;
490                *acc_row_num += 1;
491                // Identity is based on the order_by expression
492                if *acc_row != next_order_row {
493                    *acc_rank = *acc_row_num;
494                    *acc_row = next_order_row;
495                }
496
497                (*output).push((next_datum, *acc_rank));
498                acc
499            })
500        }.3).into_iter().map(|(d, i)| {
501        callers_temp_storage.make_datum(|packer| {
502            packer.push_list_with(|packer| {
503                packer.push(Datum::Int64(i));
504                packer.push(d);
505            });
506        })
507    })
508}
509
510/// The expected input is in the format of `[((OriginalRow, [EncodedArgs]), OrderByExprs...)]`
511/// The output is in the format of `[result_value, original_row]`.
512/// See an example at `lag_lead`, where the input-output formats are similar.
513fn dense_rank<'a, I>(
514    datums: I,
515    callers_temp_storage: &'a RowArena,
516    order_by: &[ColumnOrder],
517) -> Datum<'a>
518where
519    I: IntoIterator<Item = Datum<'a>>,
520{
521    let temp_storage = RowArena::new();
522    let datums = dense_rank_no_list(datums, &temp_storage, order_by);
523
524    callers_temp_storage.make_datum(|packer| {
525        packer.push_list(datums);
526    })
527}
528
529/// Like `dense_rank`, but doesn't perform the final wrapping in a list, returning an Iterator
530/// instead.
531fn dense_rank_no_list<'a: 'b, 'b, I>(
532    datums: I,
533    callers_temp_storage: &'b RowArena,
534    order_by: &[ColumnOrder],
535) -> impl Iterator<Item = Datum<'b>>
536where
537    I: IntoIterator<Item = Datum<'a>>,
538{
539    // Keep the row used for ordering around, as it is used to determine the rank
540    let datums = order_aggregate_datums_with_rank(datums, order_by);
541
542    let mut datums = datums
543        .into_iter()
544        .map(|(d0, order_row)| {
545            d0.unwrap_list()
546                .iter()
547                .map(move |d1| (d1, order_row.clone()))
548        })
549        .flatten();
550
551    callers_temp_storage.reserve(datums.size_hint().0);
552    datums
553        .next()
554        .map_or(vec![], |(first_datum, first_order_row)| {
555            // Folding with (last order_by row, last assigned rank,
556            // output vec)
557            datums.fold(
558                (first_order_row, 1, vec![(first_datum, 1)]),
559                |mut acc, (next_datum, next_order_row)| {
560                let (ref mut acc_row, ref mut acc_rank, ref mut output) = acc;
561                // Identity is based on the order_by expression
562                if *acc_row != next_order_row {
563                    *acc_rank += 1;
564                    *acc_row = next_order_row;
565                }
566
567                (*output).push((next_datum, *acc_rank));
568                acc
569            })
570        }.2).into_iter().map(|(d, i)| {
571        callers_temp_storage.make_datum(|packer| {
572            packer.push_list_with(|packer| {
573                packer.push(Datum::Int64(i));
574                packer.push(d);
575            });
576        })
577    })
578}
579
580/// The expected input is in the format of `[((OriginalRow, EncodedArgs), OrderByExprs...)]`
581/// For example,
582///
583/// lag(x*y, 1, null) over (partition by x+y order by x-y, x/y)
584///
585/// list of:
586/// row(
587///   row(
588///     row(#0, #1),
589///     row((#0 * #1), 1, null)
590///   ),
591///   (#0 - #1),
592///   (#0 / #1)
593/// )
594///
595/// The output is in the format of `[result_value, original_row]`, e.g.
596/// list of:
597/// row(
598///   42,
599///   row(7, 8)
600/// )
601fn lag_lead<'a, I>(
602    datums: I,
603    callers_temp_storage: &'a RowArena,
604    order_by: &[ColumnOrder],
605    lag_lead_type: &LagLeadType,
606    ignore_nulls: &bool,
607) -> Datum<'a>
608where
609    I: IntoIterator<Item = Datum<'a>>,
610{
611    let temp_storage = RowArena::new();
612    let iter = lag_lead_no_list(datums, &temp_storage, order_by, lag_lead_type, ignore_nulls);
613    callers_temp_storage.make_datum(|packer| {
614        packer.push_list(iter);
615    })
616}
617
618/// Like `lag_lead`, but doesn't perform the final wrapping in a list, returning an Iterator
619/// instead.
620fn lag_lead_no_list<'a: 'b, 'b, I>(
621    datums: I,
622    callers_temp_storage: &'b RowArena,
623    order_by: &[ColumnOrder],
624    lag_lead_type: &LagLeadType,
625    ignore_nulls: &bool,
626) -> impl Iterator<Item = Datum<'b>>
627where
628    I: IntoIterator<Item = Datum<'a>>,
629{
630    // Sort the datums according to the ORDER BY expressions and return the (OriginalRow, EncodedArgs) record
631    let datums = order_aggregate_datums(datums, order_by);
632
633    // Take the (OriginalRow, EncodedArgs) records and unwrap them into separate datums.
634    // EncodedArgs = (InputValue, Offset, DefaultValue) for Lag/Lead
635    // (`OriginalRow` is kept in a record form, as we don't need to look inside that.)
636    let (orig_rows, unwrapped_args): (Vec<_>, Vec<_>) = datums
637        .into_iter()
638        .map(|d| {
639            let mut iter = d.unwrap_list().iter();
640            let original_row = iter.next().unwrap();
641            let (input_value, offset, default_value) =
642                unwrap_lag_lead_encoded_args(iter.next().unwrap());
643            (original_row, (input_value, offset, default_value))
644        })
645        .unzip();
646
647    let result = lag_lead_inner(unwrapped_args, lag_lead_type, ignore_nulls);
648
649    callers_temp_storage.reserve(result.len());
650    result
651        .into_iter()
652        .zip_eq(orig_rows)
653        .map(|(result_value, original_row)| {
654            callers_temp_storage.make_datum(|packer| {
655                packer.push_list_with(|packer| {
656                    packer.push(result_value);
657                    packer.push(original_row);
658                });
659            })
660        })
661}
662
663/// lag/lead's arguments are in a record. This function unwraps this record.
664fn unwrap_lag_lead_encoded_args(encoded_args: Datum) -> (Datum, Datum, Datum) {
665    let mut encoded_args_iter = encoded_args.unwrap_list().iter();
666    let (input_value, offset, default_value) = (
667        encoded_args_iter.next().unwrap(),
668        encoded_args_iter.next().unwrap(),
669        encoded_args_iter.next().unwrap(),
670    );
671    (input_value, offset, default_value)
672}
673
674/// Each element of `args` has the 3 arguments evaluated for a single input row.
675/// Returns the results for each input row.
676fn lag_lead_inner<'a>(
677    args: Vec<(Datum<'a>, Datum<'a>, Datum<'a>)>,
678    lag_lead_type: &LagLeadType,
679    ignore_nulls: &bool,
680) -> Vec<Datum<'a>> {
681    if *ignore_nulls {
682        lag_lead_inner_ignore_nulls(args, lag_lead_type)
683    } else {
684        lag_lead_inner_respect_nulls(args, lag_lead_type)
685    }
686}
687
688fn lag_lead_inner_respect_nulls<'a>(
689    args: Vec<(Datum<'a>, Datum<'a>, Datum<'a>)>,
690    lag_lead_type: &LagLeadType,
691) -> Vec<Datum<'a>> {
692    let mut result: Vec<Datum> = Vec::with_capacity(args.len());
693    for (idx, (_, offset, default_value)) in args.iter().enumerate() {
694        // Null offsets are acceptable, and always return null
695        if offset.is_null() {
696            result.push(Datum::Null);
697            continue;
698        }
699
700        let idx = i64::try_from(idx).expect("Array index does not fit in i64");
701        let offset = i64::from(offset.unwrap_int32());
702        let offset = match lag_lead_type {
703            LagLeadType::Lag => -offset,
704            LagLeadType::Lead => offset,
705        };
706
707        // Get a Datum from `datums`. Return None if index is out of range.
708        let datums_get = |i: i64| -> Option<Datum> {
709            match u64::try_from(i) {
710                Ok(i) => args
711                    .get(usize::cast_from(i))
712                    .map(|d| Some(d.0)) // succeeded in getting a Datum from the vec
713                    .unwrap_or(None), // overindexing
714                Err(_) => None, // underindexing (negative index)
715            }
716        };
717
718        let lagged_value = datums_get(idx + offset).unwrap_or(*default_value);
719
720        result.push(lagged_value);
721    }
722
723    result
724}
725
726// `i64` indexes get involved in this function because it's convenient to allow negative indexes and
727// have `datums_get` fail on them, and thus handle the beginning and end of the input vector
728// uniformly, rather than checking underflow separately during index manipulations.
729#[allow(clippy::as_conversions)]
730fn lag_lead_inner_ignore_nulls<'a>(
731    args: Vec<(Datum<'a>, Datum<'a>, Datum<'a>)>,
732    lag_lead_type: &LagLeadType,
733) -> Vec<Datum<'a>> {
734    // We check here once that even the largest index fits in `i64`, and then do silent `as`
735    // conversions from `usize` indexes to `i64` indexes throughout this function.
736    if i64::try_from(args.len()).is_err() {
737        panic!("window partition way too big")
738    }
739    // Preparation: Make sure we can jump over a run of nulls in constant time, i.e., regardless of
740    // how many nulls the run has. The following skip tables will point to the next non-null index.
741    let mut skip_nulls_backward = vec![None; args.len()];
742    let mut last_non_null: i64 = -1;
743    let pairs = args
744        .iter()
745        .enumerate()
746        .zip_eq(skip_nulls_backward.iter_mut());
747    for ((i, (d, _, _)), slot) in pairs {
748        if d.is_null() {
749            *slot = Some(last_non_null);
750        } else {
751            last_non_null = i as i64;
752        }
753    }
754    let mut skip_nulls_forward = vec![None; args.len()];
755    let mut last_non_null: i64 = args.len() as i64;
756    let pairs = args
757        .iter()
758        .enumerate()
759        .rev()
760        .zip_eq(skip_nulls_forward.iter_mut().rev());
761    for ((i, (d, _, _)), slot) in pairs {
762        if d.is_null() {
763            *slot = Some(last_non_null);
764        } else {
765            last_non_null = i as i64;
766        }
767    }
768
769    // The actual computation.
770    let mut result: Vec<Datum> = Vec::with_capacity(args.len());
771    for (idx, (_, offset, default_value)) in args.iter().enumerate() {
772        // Null offsets are acceptable, and always return null
773        if offset.is_null() {
774            result.push(Datum::Null);
775            continue;
776        }
777
778        let idx = idx as i64; // checked at the beginning of the function that len() fits
779        let offset = i64::cast_from(offset.unwrap_int32());
780        let offset = match lag_lead_type {
781            LagLeadType::Lag => -offset,
782            LagLeadType::Lead => offset,
783        };
784        let increment = offset.signum();
785
786        // Get a Datum from `datums`. Return None if index is out of range.
787        let datums_get = |i: i64| -> Option<Datum> {
788            match u64::try_from(i) {
789                Ok(i) => args
790                    .get(usize::cast_from(i))
791                    .map(|d| Some(d.0)) // succeeded in getting a Datum from the vec
792                    .unwrap_or(None), // overindexing
793                Err(_) => None, // underindexing (negative index)
794            }
795        };
796
797        let lagged_value = if increment != 0 {
798            // We start j from idx, and step j until we have seen an abs(offset) number of non-null
799            // values or reach the beginning or end of the partition.
800            //
801            // If offset is big, then this is slow: Considering the entire function, it's
802            // `O(partition_size * offset)`.
803            // However, a common use case is an offset of 1, for which this doesn't matter.
804            // TODO: For larger offsets, we could have a completely different implementation
805            // that starts the inner loop from the index where we found the previous result:
806            // https://github.com/MaterializeInc/materialize/pull/29287#discussion_r1738695174
807            let mut j = idx;
808            for _ in 0..num::abs(offset) {
809                j += increment;
810                // Jump over a run of nulls
811                if datums_get(j).is_some_and(|d| d.is_null()) {
812                    let ju = j as usize; // `j >= 0` because of the above `is_some_and`
813                    if increment > 0 {
814                        j = skip_nulls_forward[ju].expect("checked above that it's null");
815                    } else {
816                        j = skip_nulls_backward[ju].expect("checked above that it's null");
817                    }
818                }
819                if datums_get(j).is_none() {
820                    break;
821                }
822            }
823            match datums_get(j) {
824                Some(datum) => datum,
825                None => *default_value,
826            }
827        } else {
828            assert_eq!(offset, 0);
829            let datum = datums_get(idx).expect("known to exist");
830            if !datum.is_null() {
831                datum
832            } else {
833                // Not clear what should the semantics be here. See
834                // https://github.com/MaterializeInc/database-issues/issues/8497
835                // (We used to run into an infinite loop in this case, so panicking is
836                // better.)
837                panic!("0 offset in lag/lead IGNORE NULLS");
838            }
839        };
840
841        result.push(lagged_value);
842    }
843
844    result
845}
846
847/// The expected input is in the format of [((OriginalRow, InputValue), OrderByExprs...)]
848fn first_value<'a, I>(
849    datums: I,
850    callers_temp_storage: &'a RowArena,
851    order_by: &[ColumnOrder],
852    window_frame: &WindowFrame,
853) -> Datum<'a>
854where
855    I: IntoIterator<Item = Datum<'a>>,
856{
857    let temp_storage = RowArena::new();
858    let iter = first_value_no_list(datums, &temp_storage, order_by, window_frame);
859    callers_temp_storage.make_datum(|packer| {
860        packer.push_list(iter);
861    })
862}
863
864/// Like `first_value`, but doesn't perform the final wrapping in a list, returning an Iterator
865/// instead.
866fn first_value_no_list<'a: 'b, 'b, I>(
867    datums: I,
868    callers_temp_storage: &'b RowArena,
869    order_by: &[ColumnOrder],
870    window_frame: &WindowFrame,
871) -> impl Iterator<Item = Datum<'b>>
872where
873    I: IntoIterator<Item = Datum<'a>>,
874{
875    // Sort the datums according to the ORDER BY expressions and return the (OriginalRow, InputValue) record
876    let datums = order_aggregate_datums(datums, order_by);
877
878    // Decode the input (OriginalRow, InputValue) into separate datums
879    let (orig_rows, args): (Vec<_>, Vec<_>) = datums
880        .into_iter()
881        .map(|d| {
882            let mut iter = d.unwrap_list().iter();
883            let original_row = iter.next().unwrap();
884            let arg = iter.next().unwrap();
885
886            (original_row, arg)
887        })
888        .unzip();
889
890    let results = first_value_inner(args, window_frame);
891
892    callers_temp_storage.reserve(results.len());
893    results
894        .into_iter()
895        .zip_eq(orig_rows)
896        .map(|(result_value, original_row)| {
897            callers_temp_storage.make_datum(|packer| {
898                packer.push_list_with(|packer| {
899                    packer.push(result_value);
900                    packer.push(original_row);
901                });
902            })
903        })
904}
905
906fn first_value_inner<'a>(datums: Vec<Datum<'a>>, window_frame: &WindowFrame) -> Vec<Datum<'a>> {
907    let length = datums.len();
908    let mut result: Vec<Datum> = Vec::with_capacity(length);
909    for (idx, current_datum) in datums.iter().enumerate() {
910        let first_value = match &window_frame.start_bound {
911            // Always return the current value
912            WindowFrameBound::CurrentRow => *current_datum,
913            WindowFrameBound::UnboundedPreceding => {
914                if let WindowFrameBound::OffsetPreceding(end_offset) = &window_frame.end_bound {
915                    let end_offset = usize::cast_from(*end_offset);
916
917                    // If the frame ends before the first row, return null
918                    if idx < end_offset {
919                        Datum::Null
920                    } else {
921                        datums[0]
922                    }
923                } else {
924                    datums[0]
925                }
926            }
927            WindowFrameBound::OffsetPreceding(offset) => {
928                let start_offset = usize::cast_from(*offset);
929                let start_idx = idx.saturating_sub(start_offset);
930                if let WindowFrameBound::OffsetPreceding(end_offset) = &window_frame.end_bound {
931                    let end_offset = usize::cast_from(*end_offset);
932
933                    // If the frame is empty or ends before the first row, return null
934                    if start_offset < end_offset || idx < end_offset {
935                        Datum::Null
936                    } else {
937                        datums[start_idx]
938                    }
939                } else {
940                    datums[start_idx]
941                }
942            }
943            WindowFrameBound::OffsetFollowing(offset) => {
944                let start_offset = usize::cast_from(*offset);
945                let start_idx = idx.saturating_add(start_offset);
946                if let WindowFrameBound::OffsetFollowing(end_offset) = &window_frame.end_bound {
947                    // If the frame is empty or starts after the last row, return null
948                    if offset > end_offset || start_idx >= length {
949                        Datum::Null
950                    } else {
951                        datums[start_idx]
952                    }
953                } else {
954                    datums
955                        .get(start_idx)
956                        .map(|d| d.clone())
957                        .unwrap_or(Datum::Null)
958                }
959            }
960            // Forbidden during planning
961            WindowFrameBound::UnboundedFollowing => unreachable!(),
962        };
963        result.push(first_value);
964    }
965    result
966}
967
968/// The expected input is in the format of [((OriginalRow, InputValue), OrderByExprs...)]
969fn last_value<'a, I>(
970    datums: I,
971    callers_temp_storage: &'a RowArena,
972    order_by: &[ColumnOrder],
973    window_frame: &WindowFrame,
974) -> Datum<'a>
975where
976    I: IntoIterator<Item = Datum<'a>>,
977{
978    let temp_storage = RowArena::new();
979    let iter = last_value_no_list(datums, &temp_storage, order_by, window_frame);
980    callers_temp_storage.make_datum(|packer| {
981        packer.push_list(iter);
982    })
983}
984
985/// Like `last_value`, but doesn't perform the final wrapping in a list, returning an Iterator
986/// instead.
987fn last_value_no_list<'a: 'b, 'b, I>(
988    datums: I,
989    callers_temp_storage: &'b RowArena,
990    order_by: &[ColumnOrder],
991    window_frame: &WindowFrame,
992) -> impl Iterator<Item = Datum<'b>>
993where
994    I: IntoIterator<Item = Datum<'a>>,
995{
996    // Sort the datums according to the ORDER BY expressions and return the ((OriginalRow, InputValue), OrderByRow) record
997    // The OrderByRow is kept around because it is required to compute the peer groups in RANGE mode
998    let datums = order_aggregate_datums_with_rank(datums, order_by);
999
1000    // Decode the input (OriginalRow, InputValue) into separate datums, while keeping the OrderByRow
1001    let size_hint = datums.size_hint().0;
1002    let mut args = Vec::with_capacity(size_hint);
1003    let mut original_rows = Vec::with_capacity(size_hint);
1004    let mut order_by_rows = Vec::with_capacity(size_hint);
1005    for (d, order_by_row) in datums.into_iter() {
1006        let mut iter = d.unwrap_list().iter();
1007        let original_row = iter.next().unwrap();
1008        let arg = iter.next().unwrap();
1009        order_by_rows.push(order_by_row);
1010        original_rows.push(original_row);
1011        args.push(arg);
1012    }
1013
1014    let results = last_value_inner(args, &order_by_rows, window_frame);
1015
1016    callers_temp_storage.reserve(results.len());
1017    results
1018        .into_iter()
1019        .zip_eq(original_rows)
1020        .map(|(result_value, original_row)| {
1021            callers_temp_storage.make_datum(|packer| {
1022                packer.push_list_with(|packer| {
1023                    packer.push(result_value);
1024                    packer.push(original_row);
1025                });
1026            })
1027        })
1028}
1029
1030fn last_value_inner<'a>(
1031    args: Vec<Datum<'a>>,
1032    order_by_rows: &Vec<Row>,
1033    window_frame: &WindowFrame,
1034) -> Vec<Datum<'a>> {
1035    let length = args.len();
1036    let mut results: Vec<Datum> = Vec::with_capacity(length);
1037    for (idx, (current_datum, order_by_row)) in args.iter().zip_eq(order_by_rows).enumerate() {
1038        let last_value = match &window_frame.end_bound {
1039            WindowFrameBound::CurrentRow => match &window_frame.units {
1040                // Always return the current value when in ROWS mode
1041                WindowFrameUnits::Rows => *current_datum,
1042                WindowFrameUnits::Range => {
1043                    // When in RANGE mode, return the last value of the peer group
1044                    // The peer group is the group of rows with the same ORDER BY value
1045                    // Note: Range is only supported for the default window frame (RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW),
1046                    // which is why it does not appear in the other branches
1047                    let target_idx = order_by_rows[idx..]
1048                        .iter()
1049                        .enumerate()
1050                        .take_while(|(_, row)| *row == order_by_row)
1051                        .last()
1052                        .unwrap()
1053                        .0
1054                        + idx;
1055                    args[target_idx]
1056                }
1057                // GROUPS is not supported, and forbidden during planning
1058                WindowFrameUnits::Groups => unreachable!(),
1059            },
1060            WindowFrameBound::UnboundedFollowing => {
1061                if let WindowFrameBound::OffsetFollowing(start_offset) = &window_frame.start_bound {
1062                    let start_offset = usize::cast_from(*start_offset);
1063
1064                    // If the frame starts after the last row of the window, return null
1065                    if idx + start_offset > length - 1 {
1066                        Datum::Null
1067                    } else {
1068                        args[length - 1]
1069                    }
1070                } else {
1071                    args[length - 1]
1072                }
1073            }
1074            WindowFrameBound::OffsetFollowing(offset) => {
1075                let end_offset = usize::cast_from(*offset);
1076                let end_idx = idx.saturating_add(end_offset);
1077                if let WindowFrameBound::OffsetFollowing(start_offset) = &window_frame.start_bound {
1078                    let start_offset = usize::cast_from(*start_offset);
1079                    let start_idx = idx.saturating_add(start_offset);
1080
1081                    // If the frame is empty or starts after the last row of the window, return null
1082                    if end_offset < start_offset || start_idx >= length {
1083                        Datum::Null
1084                    } else {
1085                        // Return the last valid element in the window
1086                        args.get(end_idx).unwrap_or(&args[length - 1]).clone()
1087                    }
1088                } else {
1089                    args.get(end_idx).unwrap_or(&args[length - 1]).clone()
1090                }
1091            }
1092            WindowFrameBound::OffsetPreceding(offset) => {
1093                let end_offset = usize::cast_from(*offset);
1094                let end_idx = idx.saturating_sub(end_offset);
1095                if idx < end_offset {
1096                    // If the frame ends before the first row, return null
1097                    Datum::Null
1098                } else if let WindowFrameBound::OffsetPreceding(start_offset) =
1099                    &window_frame.start_bound
1100                {
1101                    // If the frame is empty, return null
1102                    if offset > start_offset {
1103                        Datum::Null
1104                    } else {
1105                        args[end_idx]
1106                    }
1107                } else {
1108                    args[end_idx]
1109                }
1110            }
1111            // Forbidden during planning
1112            WindowFrameBound::UnboundedPreceding => unreachable!(),
1113        };
1114        results.push(last_value);
1115    }
1116    results
1117}
1118
1119/// Executes `FusedValueWindowFunc` on a reduction group.
1120/// The expected input is in the format of `[((OriginalRow, (Args1, Args2, ...)), OrderByExprs...)]`
1121/// where `Args1`, `Args2`, are the arguments of each of the fused functions. For functions that
1122/// have only a single argument (first_value/last_value), these are simple values. For functions
1123/// that have multiple arguments (lag/lead), these are also records.
1124fn fused_value_window_func<'a, I>(
1125    input_datums: I,
1126    callers_temp_storage: &'a RowArena,
1127    funcs: &Vec<AggregateFunc>,
1128    order_by: &Vec<ColumnOrder>,
1129) -> Datum<'a>
1130where
1131    I: IntoIterator<Item = Datum<'a>>,
1132{
1133    let temp_storage = RowArena::new();
1134    let iter = fused_value_window_func_no_list(input_datums, &temp_storage, funcs, order_by);
1135    callers_temp_storage.make_datum(|packer| {
1136        packer.push_list(iter);
1137    })
1138}
1139
1140/// Like `fused_value_window_func`, but doesn't perform the final wrapping in a list, returning an
1141/// Iterator instead.
1142fn fused_value_window_func_no_list<'a: 'b, 'b, I>(
1143    input_datums: I,
1144    callers_temp_storage: &'b RowArena,
1145    funcs: &Vec<AggregateFunc>,
1146    order_by: &Vec<ColumnOrder>,
1147) -> impl Iterator<Item = Datum<'b>>
1148where
1149    I: IntoIterator<Item = Datum<'a>>,
1150{
1151    let has_last_value = funcs
1152        .iter()
1153        .any(|f| matches!(f, AggregateFunc::LastValue { .. }));
1154
1155    let input_datums_with_ranks = order_aggregate_datums_with_rank(input_datums, order_by);
1156
1157    let size_hint = input_datums_with_ranks.size_hint().0;
1158    let mut encoded_argsss = vec![Vec::with_capacity(size_hint); funcs.len()];
1159    let mut original_rows = Vec::with_capacity(size_hint);
1160    let mut order_by_rows = Vec::with_capacity(size_hint);
1161    for (d, order_by_row) in input_datums_with_ranks {
1162        let mut iter = d.unwrap_list().iter();
1163        let original_row = iter.next().unwrap();
1164        original_rows.push(original_row);
1165        let mut argss_iter = iter.next().unwrap().unwrap_list().iter();
1166        for i in 0..funcs.len() {
1167            let encoded_args = argss_iter.next().unwrap();
1168            encoded_argsss[i].push(encoded_args);
1169        }
1170        if has_last_value {
1171            order_by_rows.push(order_by_row);
1172        }
1173    }
1174
1175    let mut results_per_row = vec![Vec::with_capacity(funcs.len()); original_rows.len()];
1176    for (func, encoded_argss) in funcs.iter().zip_eq(encoded_argsss) {
1177        let results = match func {
1178            AggregateFunc::LagLead {
1179                order_by: inner_order_by,
1180                lag_lead,
1181                ignore_nulls,
1182            } => {
1183                assert_eq!(order_by, inner_order_by);
1184                let unwrapped_argss = encoded_argss
1185                    .into_iter()
1186                    .map(|encoded_args| unwrap_lag_lead_encoded_args(encoded_args))
1187                    .collect();
1188                lag_lead_inner(unwrapped_argss, lag_lead, ignore_nulls)
1189            }
1190            AggregateFunc::FirstValue {
1191                order_by: inner_order_by,
1192                window_frame,
1193            } => {
1194                assert_eq!(order_by, inner_order_by);
1195                // (No unwrapping to do on the args here, because there is only 1 arg, so it's not
1196                // wrapped into a record.)
1197                first_value_inner(encoded_argss, window_frame)
1198            }
1199            AggregateFunc::LastValue {
1200                order_by: inner_order_by,
1201                window_frame,
1202            } => {
1203                assert_eq!(order_by, inner_order_by);
1204                // (No unwrapping to do on the args here, because there is only 1 arg, so it's not
1205                // wrapped into a record.)
1206                last_value_inner(encoded_argss, &order_by_rows, window_frame)
1207            }
1208            _ => panic!("unknown window function in FusedValueWindowFunc"),
1209        };
1210        for (results, result) in results_per_row.iter_mut().zip_eq(results) {
1211            results.push(result);
1212        }
1213    }
1214
1215    callers_temp_storage.reserve(2 * original_rows.len());
1216    results_per_row
1217        .into_iter()
1218        .enumerate()
1219        .map(move |(i, results)| {
1220            callers_temp_storage.make_datum(|packer| {
1221                packer.push_list_with(|packer| {
1222                    packer
1223                        .push(callers_temp_storage.make_datum(|packer| packer.push_list(results)));
1224                    packer.push(original_rows[i]);
1225                });
1226            })
1227        })
1228}
1229
1230/// `input_datums` is an entire window partition.
1231/// The expected input is in the format of `[((OriginalRow, InputValue), OrderByExprs...)]`
1232/// See also in the comment in `window_func_applied_to`.
1233///
1234/// `wrapped_aggregate`: e.g., for `sum(...) OVER (...)`, this is the `sum(...)`.
1235///
1236/// Note that this `order_by` doesn't have expressions, only `ColumnOrder`s. For an explanation,
1237/// see the comment on `WindowExprType`.
1238fn window_aggr<'a, I, A>(
1239    input_datums: I,
1240    callers_temp_storage: &'a RowArena,
1241    wrapped_aggregate: &AggregateFunc,
1242    order_by: &[ColumnOrder],
1243    window_frame: &WindowFrame,
1244) -> Datum<'a>
1245where
1246    I: IntoIterator<Item = Datum<'a>>,
1247    A: OneByOneAggr,
1248{
1249    let temp_storage = RowArena::new();
1250    let iter = window_aggr_no_list::<I, A>(
1251        input_datums,
1252        &temp_storage,
1253        wrapped_aggregate,
1254        order_by,
1255        window_frame,
1256    );
1257    callers_temp_storage.make_datum(|packer| {
1258        packer.push_list(iter);
1259    })
1260}
1261
1262/// Like `window_aggr`, but doesn't perform the final wrapping in a list, returning an Iterator
1263/// instead.
1264fn window_aggr_no_list<'a: 'b, 'b, I, A>(
1265    input_datums: I,
1266    callers_temp_storage: &'b RowArena,
1267    wrapped_aggregate: &AggregateFunc,
1268    order_by: &[ColumnOrder],
1269    window_frame: &WindowFrame,
1270) -> impl Iterator<Item = Datum<'b>>
1271where
1272    I: IntoIterator<Item = Datum<'a>>,
1273    A: OneByOneAggr,
1274{
1275    // Sort the datums according to the ORDER BY expressions and return the ((OriginalRow, InputValue), OrderByRow) record
1276    // The OrderByRow is kept around because it is required to compute the peer groups in RANGE mode
1277    let datums = order_aggregate_datums_with_rank(input_datums, order_by);
1278
1279    // Decode the input (OriginalRow, InputValue) into separate datums, while keeping the OrderByRow
1280    let size_hint = datums.size_hint().0;
1281    let mut args: Vec<Datum> = Vec::with_capacity(size_hint);
1282    let mut original_rows: Vec<Datum> = Vec::with_capacity(size_hint);
1283    let mut order_by_rows = Vec::with_capacity(size_hint);
1284    for (d, order_by_row) in datums.into_iter() {
1285        let mut iter = d.unwrap_list().iter();
1286        let original_row = iter.next().unwrap();
1287        let arg = iter.next().unwrap();
1288        order_by_rows.push(order_by_row);
1289        original_rows.push(original_row);
1290        args.push(arg);
1291    }
1292
1293    let results = window_aggr_inner::<A>(
1294        args,
1295        &order_by_rows,
1296        wrapped_aggregate,
1297        order_by,
1298        window_frame,
1299        callers_temp_storage,
1300    );
1301
1302    callers_temp_storage.reserve(results.len());
1303    results
1304        .into_iter()
1305        .zip_eq(original_rows)
1306        .map(|(result_value, original_row)| {
1307            callers_temp_storage.make_datum(|packer| {
1308                packer.push_list_with(|packer| {
1309                    packer.push(result_value);
1310                    packer.push(original_row);
1311                });
1312            })
1313        })
1314}
1315
1316fn window_aggr_inner<'a, A>(
1317    mut args: Vec<Datum<'a>>,
1318    order_by_rows: &Vec<Row>,
1319    wrapped_aggregate: &AggregateFunc,
1320    order_by: &[ColumnOrder],
1321    window_frame: &WindowFrame,
1322    temp_storage: &'a RowArena,
1323) -> Vec<Datum<'a>>
1324where
1325    A: OneByOneAggr,
1326{
1327    let length = args.len();
1328    let mut result: Vec<Datum> = Vec::with_capacity(length);
1329
1330    // In this degenerate case, all results would be `wrapped_aggregate.default()` (usually null).
1331    // However, this currently can't happen, because
1332    // - Groups frame mode is currently not supported;
1333    // - Range frame mode is currently supported only for the default frame, which includes the
1334    //   current row.
1335    soft_assert_or_log!(
1336        !((matches!(window_frame.units, WindowFrameUnits::Groups)
1337            || matches!(window_frame.units, WindowFrameUnits::Range))
1338            && !window_frame.includes_current_row()),
1339        "window frame without current row"
1340    );
1341
1342    if (matches!(
1343        window_frame.start_bound,
1344        WindowFrameBound::UnboundedPreceding
1345    ) && matches!(window_frame.end_bound, WindowFrameBound::UnboundedFollowing))
1346        || (order_by.is_empty()
1347            && (matches!(window_frame.units, WindowFrameUnits::Groups)
1348                || matches!(window_frame.units, WindowFrameUnits::Range))
1349            && window_frame.includes_current_row())
1350    {
1351        // Either
1352        //  - UNBOUNDED frame in both directions, or
1353        //  - There is no ORDER BY and the frame is such that the current peer group is included.
1354        //    (The current peer group will be the whole partition if there is no ORDER BY.)
1355        // We simply need to compute the aggregate once, on the entire partition, and each input
1356        // row will get this one aggregate value as result.
1357        let result_value = wrapped_aggregate.eval(args, temp_storage);
1358        // Every row will get the above aggregate as result.
1359        for _ in 0..length {
1360            result.push(result_value);
1361        }
1362    } else {
1363        fn rows_between_unbounded_preceding_and_current_row<'a, A>(
1364            args: Vec<Datum<'a>>,
1365            result: &mut Vec<Datum<'a>>,
1366            mut one_by_one_aggr: A,
1367            temp_storage: &'a RowArena,
1368        ) where
1369            A: OneByOneAggr,
1370        {
1371            for current_arg in args.into_iter() {
1372                one_by_one_aggr.give(&current_arg);
1373                let result_value = one_by_one_aggr.get_current_aggregate(temp_storage);
1374                result.push(result_value);
1375            }
1376        }
1377
1378        fn groups_between_unbounded_preceding_and_current_row<'a, A>(
1379            args: Vec<Datum<'a>>,
1380            order_by_rows: &Vec<Row>,
1381            result: &mut Vec<Datum<'a>>,
1382            mut one_by_one_aggr: A,
1383            temp_storage: &'a RowArena,
1384        ) where
1385            A: OneByOneAggr,
1386        {
1387            let mut peer_group_start = 0;
1388            while peer_group_start < args.len() {
1389                // Find the boundaries of the current peer group.
1390                // peer_group_start will point to the first element of the peer group,
1391                // peer_group_end will point to _just after_ the last element of the peer group.
1392                let mut peer_group_end = peer_group_start + 1;
1393                while peer_group_end < args.len()
1394                    && order_by_rows[peer_group_start] == order_by_rows[peer_group_end]
1395                {
1396                    // The peer group goes on while the OrderByRows not differ.
1397                    peer_group_end += 1;
1398                }
1399                // Let's compute the aggregate (which will be the same for all records in this
1400                // peer group).
1401                for current_arg in args[peer_group_start..peer_group_end].iter() {
1402                    one_by_one_aggr.give(current_arg);
1403                }
1404                let agg_for_peer_group = one_by_one_aggr.get_current_aggregate(temp_storage);
1405                // Put the above aggregate into each record in the peer group.
1406                for _ in args[peer_group_start..peer_group_end].iter() {
1407                    result.push(agg_for_peer_group);
1408                }
1409                // Point to the start of the next peer group.
1410                peer_group_start = peer_group_end;
1411            }
1412        }
1413
1414        fn rows_between_offset_and_offset<'a>(
1415            args: Vec<Datum<'a>>,
1416            result: &mut Vec<Datum<'a>>,
1417            wrapped_aggregate: &AggregateFunc,
1418            temp_storage: &'a RowArena,
1419            offset_start: i64,
1420            offset_end: i64,
1421        ) {
1422            let len = args
1423                .len()
1424                .to_i64()
1425                .expect("window partition's len should fit into i64");
1426            for i in 0..len {
1427                let i = i.to_i64().expect("window partition shouldn't be super big");
1428                // Trim the start of the frame to make it not reach over the start of the window
1429                // partition.
1430                let frame_start = max(i + offset_start, 0)
1431                    .to_usize()
1432                    .expect("The max made sure it's not negative");
1433                // Trim the end of the frame to make it not reach over the end of the window
1434                // partition.
1435                let frame_end = min(i + offset_end, len - 1).to_usize();
1436                match frame_end {
1437                    Some(frame_end) => {
1438                        if frame_start <= frame_end {
1439                            // Compute the aggregate on the frame.
1440                            // TODO:
1441                            // This implementation is quite slow if the frame is large: we do an
1442                            // inner loop over the entire frame, and compute the aggregate from
1443                            // scratch. We could do better:
1444                            //  - For invertible aggregations we could do a rolling aggregation.
1445                            //  - There are various tricks for min/max as well, making use of either
1446                            //    the fixed size of the window, or that we are not retracting
1447                            //    arbitrary elements but doing queue operations. E.g., see
1448                            //    http://codercareer.blogspot.com/2012/02/no-33-maximums-in-sliding-windows.html
1449                            let frame_values = args[frame_start..=frame_end].iter().cloned();
1450                            let result_value = wrapped_aggregate.eval(frame_values, temp_storage);
1451                            result.push(result_value);
1452                        } else {
1453                            // frame_start > frame_end, so this is an empty frame.
1454                            let result_value = wrapped_aggregate.default();
1455                            result.push(result_value);
1456                        }
1457                    }
1458                    None => {
1459                        // frame_end would be negative, so this is an empty frame.
1460                        let result_value = wrapped_aggregate.default();
1461                        result.push(result_value);
1462                    }
1463                }
1464            }
1465        }
1466
1467        match (
1468            &window_frame.units,
1469            &window_frame.start_bound,
1470            &window_frame.end_bound,
1471        ) {
1472            // Cases where one edge of the frame is CurrentRow.
1473            // Note that these cases could be merged into the more general cases below where one
1474            // edge is some offset (with offset = 0), but the CurrentRow cases probably cover 95%
1475            // of user queries, so let's make this simple and fast.
1476            (Rows, UnboundedPreceding, CurrentRow) => {
1477                rows_between_unbounded_preceding_and_current_row::<A>(
1478                    args,
1479                    &mut result,
1480                    A::new(wrapped_aggregate, false),
1481                    temp_storage,
1482                );
1483            }
1484            (Rows, CurrentRow, UnboundedFollowing) => {
1485                // Same as above, but reverse.
1486                args.reverse();
1487                rows_between_unbounded_preceding_and_current_row::<A>(
1488                    args,
1489                    &mut result,
1490                    A::new(wrapped_aggregate, true),
1491                    temp_storage,
1492                );
1493                result.reverse();
1494            }
1495            (Range, UnboundedPreceding, CurrentRow) => {
1496                // Note that for the default frame, the RANGE frame mode is identical to the GROUPS
1497                // frame mode.
1498                groups_between_unbounded_preceding_and_current_row::<A>(
1499                    args,
1500                    order_by_rows,
1501                    &mut result,
1502                    A::new(wrapped_aggregate, false),
1503                    temp_storage,
1504                );
1505            }
1506            // The next several cases all call `rows_between_offset_and_offset`. Note that the
1507            // offset passed to `rows_between_offset_and_offset` should be negated when it's
1508            // PRECEDING.
1509            (Rows, OffsetPreceding(start_prec), OffsetPreceding(end_prec)) => {
1510                let start_prec = start_prec.to_i64().expect(
1511                    "window frame start OFFSET shouldn't be super big (the planning ensured this)",
1512                );
1513                let end_prec = end_prec.to_i64().expect(
1514                    "window frame end OFFSET shouldn't be super big (the planning ensured this)",
1515                );
1516                rows_between_offset_and_offset(
1517                    args,
1518                    &mut result,
1519                    wrapped_aggregate,
1520                    temp_storage,
1521                    -start_prec,
1522                    -end_prec,
1523                );
1524            }
1525            (Rows, OffsetPreceding(start_prec), OffsetFollowing(end_fol)) => {
1526                let start_prec = start_prec.to_i64().expect(
1527                    "window frame start OFFSET shouldn't be super big (the planning ensured this)",
1528                );
1529                let end_fol = end_fol.to_i64().expect(
1530                    "window frame end OFFSET shouldn't be super big (the planning ensured this)",
1531                );
1532                rows_between_offset_and_offset(
1533                    args,
1534                    &mut result,
1535                    wrapped_aggregate,
1536                    temp_storage,
1537                    -start_prec,
1538                    end_fol,
1539                );
1540            }
1541            (Rows, OffsetFollowing(start_fol), OffsetFollowing(end_fol)) => {
1542                let start_fol = start_fol.to_i64().expect(
1543                    "window frame start OFFSET shouldn't be super big (the planning ensured this)",
1544                );
1545                let end_fol = end_fol.to_i64().expect(
1546                    "window frame end OFFSET shouldn't be super big (the planning ensured this)",
1547                );
1548                rows_between_offset_and_offset(
1549                    args,
1550                    &mut result,
1551                    wrapped_aggregate,
1552                    temp_storage,
1553                    start_fol,
1554                    end_fol,
1555                );
1556            }
1557            (Rows, OffsetFollowing(_), OffsetPreceding(_)) => {
1558                unreachable!() // The planning ensured that this nonsensical case can't happen
1559            }
1560            (Rows, OffsetPreceding(start_prec), CurrentRow) => {
1561                let start_prec = start_prec.to_i64().expect(
1562                    "window frame start OFFSET shouldn't be super big (the planning ensured this)",
1563                );
1564                let end_fol = 0;
1565                rows_between_offset_and_offset(
1566                    args,
1567                    &mut result,
1568                    wrapped_aggregate,
1569                    temp_storage,
1570                    -start_prec,
1571                    end_fol,
1572                );
1573            }
1574            (Rows, CurrentRow, OffsetFollowing(end_fol)) => {
1575                let start_fol = 0;
1576                let end_fol = end_fol.to_i64().expect(
1577                    "window frame end OFFSET shouldn't be super big (the planning ensured this)",
1578                );
1579                rows_between_offset_and_offset(
1580                    args,
1581                    &mut result,
1582                    wrapped_aggregate,
1583                    temp_storage,
1584                    start_fol,
1585                    end_fol,
1586                );
1587            }
1588            (Rows, CurrentRow, CurrentRow) => {
1589                // We could have a more efficient implementation for this, but this is probably
1590                // super rare. (Might be more common with RANGE or GROUPS frame mode, though!)
1591                let start_fol = 0;
1592                let end_fol = 0;
1593                rows_between_offset_and_offset(
1594                    args,
1595                    &mut result,
1596                    wrapped_aggregate,
1597                    temp_storage,
1598                    start_fol,
1599                    end_fol,
1600                );
1601            }
1602            (Rows, CurrentRow, OffsetPreceding(_))
1603            | (Rows, UnboundedFollowing, _)
1604            | (Rows, _, UnboundedPreceding)
1605            | (Rows, OffsetFollowing(..), CurrentRow) => {
1606                unreachable!() // The planning ensured that these nonsensical cases can't happen
1607            }
1608            (Rows, UnboundedPreceding, UnboundedFollowing) => {
1609                // This is handled by the complicated if condition near the beginning of this
1610                // function.
1611                unreachable!()
1612            }
1613            (Rows, UnboundedPreceding, OffsetPreceding(_))
1614            | (Rows, UnboundedPreceding, OffsetFollowing(_))
1615            | (Rows, OffsetPreceding(..), UnboundedFollowing)
1616            | (Rows, OffsetFollowing(..), UnboundedFollowing) => {
1617                // Unsupported. Bail in the planner.
1618                // https://github.com/MaterializeInc/database-issues/issues/6720
1619                unreachable!()
1620            }
1621            (Range, _, _) => {
1622                // Unsupported.
1623                // The planner doesn't allow Range frame mode for now (except for the default
1624                // frame), see https://github.com/MaterializeInc/database-issues/issues/6585
1625                // Note that it would be easy to handle (Range, CurrentRow, UnboundedFollowing):
1626                // it would be similar to (Rows, CurrentRow, UnboundedFollowing), but would call
1627                // groups_between_unbounded_preceding_current_row.
1628                unreachable!()
1629            }
1630            (Groups, _, _) => {
1631                // Unsupported.
1632                // The planner doesn't allow Groups frame mode for now, see
1633                // https://github.com/MaterializeInc/database-issues/issues/6588
1634                unreachable!()
1635            }
1636        }
1637    }
1638
1639    result
1640}
1641
1642/// Computes a bundle of fused window aggregations.
1643/// The input is similar to `window_aggr`, but `InputValue` is not just a single value, but a record
1644/// where each component is the input to one of the aggregations.
1645fn fused_window_aggr<'a, I, A>(
1646    input_datums: I,
1647    callers_temp_storage: &'a RowArena,
1648    wrapped_aggregates: &Vec<AggregateFunc>,
1649    order_by: &Vec<ColumnOrder>,
1650    window_frame: &WindowFrame,
1651) -> Datum<'a>
1652where
1653    I: IntoIterator<Item = Datum<'a>>,
1654    A: OneByOneAggr,
1655{
1656    let temp_storage = RowArena::new();
1657    let iter = fused_window_aggr_no_list::<_, A>(
1658        input_datums,
1659        &temp_storage,
1660        wrapped_aggregates,
1661        order_by,
1662        window_frame,
1663    );
1664    callers_temp_storage.make_datum(|packer| {
1665        packer.push_list(iter);
1666    })
1667}
1668
1669/// Like `fused_window_aggr`, but doesn't perform the final wrapping in a list, returning an
1670/// Iterator instead.
1671fn fused_window_aggr_no_list<'a: 'b, 'b, I, A>(
1672    input_datums: I,
1673    callers_temp_storage: &'b RowArena,
1674    wrapped_aggregates: &Vec<AggregateFunc>,
1675    order_by: &Vec<ColumnOrder>,
1676    window_frame: &WindowFrame,
1677) -> impl Iterator<Item = Datum<'b>>
1678where
1679    I: IntoIterator<Item = Datum<'a>>,
1680    A: OneByOneAggr,
1681{
1682    // Sort the datums according to the ORDER BY expressions and return the ((OriginalRow, InputValue), OrderByRow) record
1683    // The OrderByRow is kept around because it is required to compute the peer groups in RANGE mode
1684    let datums = order_aggregate_datums_with_rank(input_datums, order_by);
1685
1686    let size_hint = datums.size_hint().0;
1687    let mut argss = vec![Vec::with_capacity(size_hint); wrapped_aggregates.len()];
1688    let mut original_rows = Vec::with_capacity(size_hint);
1689    let mut order_by_rows = Vec::with_capacity(size_hint);
1690    for (d, order_by_row) in datums {
1691        let mut iter = d.unwrap_list().iter();
1692        let original_row = iter.next().unwrap();
1693        original_rows.push(original_row);
1694        let args_iter = iter.next().unwrap().unwrap_list().iter();
1695        // Push each argument into the respective list
1696        for (args, arg) in argss.iter_mut().zip_eq(args_iter) {
1697            args.push(arg);
1698        }
1699        order_by_rows.push(order_by_row);
1700    }
1701
1702    let mut results_per_row =
1703        vec![Vec::with_capacity(wrapped_aggregates.len()); original_rows.len()];
1704    for (wrapped_aggr, args) in wrapped_aggregates.iter().zip_eq(argss) {
1705        let results = window_aggr_inner::<A>(
1706            args,
1707            &order_by_rows,
1708            wrapped_aggr,
1709            order_by,
1710            window_frame,
1711            callers_temp_storage,
1712        );
1713        for (results, result) in results_per_row.iter_mut().zip_eq(results) {
1714            results.push(result);
1715        }
1716    }
1717
1718    callers_temp_storage.reserve(2 * original_rows.len());
1719    results_per_row
1720        .into_iter()
1721        .enumerate()
1722        .map(move |(i, results)| {
1723            callers_temp_storage.make_datum(|packer| {
1724                packer.push_list_with(|packer| {
1725                    packer
1726                        .push(callers_temp_storage.make_datum(|packer| packer.push_list(results)));
1727                    packer.push(original_rows[i]);
1728                });
1729            })
1730        })
1731}
1732
1733/// An implementation of an aggregation where we can send in the input elements one-by-one, and
1734/// can also ask the current aggregate at any moment. (This just delegates to other aggregation
1735/// evaluation approaches.)
1736pub trait OneByOneAggr {
1737    /// The `reverse` parameter makes the aggregations process input elements in reverse order.
1738    /// This has an effect only for non-commutative aggregations, e.g. `list_agg`. These are
1739    /// currently only some of the Basic aggregations. (Basic aggregations are handled by
1740    /// `NaiveOneByOneAggr`).
1741    fn new(agg: &AggregateFunc, reverse: bool) -> Self;
1742    /// Pushes one input element into the aggregation.
1743    fn give(&mut self, d: &Datum);
1744    /// Returns the value of the aggregate computed on the given values so far.
1745    fn get_current_aggregate<'a>(&self, temp_storage: &'a RowArena) -> Datum<'a>;
1746}
1747
1748/// Naive implementation of [OneByOneAggr], suitable for stuff like const folding, but too slow for
1749/// rendering. This relies only on infrastructure available in `mz-expr`. It simply saves all the
1750/// given input, and calls the given [AggregateFunc]'s `eval` method when asked about the current
1751/// aggregate. (For Accumulable and Hierarchical aggregations, the rendering has more efficient
1752/// implementations, but for Basic aggregations even the rendering uses this naive implementation.)
1753#[derive(Debug)]
1754pub struct NaiveOneByOneAggr {
1755    agg: AggregateFunc,
1756    input: Vec<Row>,
1757    reverse: bool,
1758}
1759
1760impl OneByOneAggr for NaiveOneByOneAggr {
1761    fn new(agg: &AggregateFunc, reverse: bool) -> Self {
1762        NaiveOneByOneAggr {
1763            agg: agg.clone(),
1764            input: Vec::new(),
1765            reverse,
1766        }
1767    }
1768
1769    fn give(&mut self, d: &Datum) {
1770        let mut row = Row::default();
1771        row.packer().push(d);
1772        self.input.push(row);
1773    }
1774
1775    fn get_current_aggregate<'a>(&self, temp_storage: &'a RowArena) -> Datum<'a> {
1776        temp_storage.make_datum(|packer| {
1777            packer.push(if !self.reverse {
1778                self.agg
1779                    .eval(self.input.iter().map(|r| r.unpack_first()), temp_storage)
1780            } else {
1781                self.agg.eval(
1782                    self.input.iter().rev().map(|r| r.unpack_first()),
1783                    temp_storage,
1784                )
1785            });
1786        })
1787    }
1788}
1789
1790/// Identify whether the given aggregate function is Lag or Lead, since they share
1791/// implementations.
1792#[derive(
1793    Clone,
1794    Debug,
1795    Eq,
1796    PartialEq,
1797    Ord,
1798    PartialOrd,
1799    Serialize,
1800    Deserialize,
1801    Hash,
1802    MzReflect
1803)]
1804pub enum LagLeadType {
1805    Lag,
1806    Lead,
1807}
1808
1809#[derive(
1810    Clone,
1811    Debug,
1812    Eq,
1813    PartialEq,
1814    Ord,
1815    PartialOrd,
1816    Serialize,
1817    Deserialize,
1818    Hash,
1819    MzReflect
1820)]
1821pub enum AggregateFunc {
1822    MaxNumeric,
1823    MaxInt16,
1824    MaxInt32,
1825    MaxInt64,
1826    MaxUInt16,
1827    MaxUInt32,
1828    MaxUInt64,
1829    MaxMzTimestamp,
1830    MaxFloat32,
1831    MaxFloat64,
1832    MaxBool,
1833    MaxString,
1834    MaxDate,
1835    MaxTimestamp,
1836    MaxTimestampTz,
1837    MaxInterval,
1838    MaxTime,
1839    MinNumeric,
1840    MinInt16,
1841    MinInt32,
1842    MinInt64,
1843    MinUInt16,
1844    MinUInt32,
1845    MinUInt64,
1846    MinMzTimestamp,
1847    MinFloat32,
1848    MinFloat64,
1849    MinBool,
1850    MinString,
1851    MinDate,
1852    MinTimestamp,
1853    MinTimestampTz,
1854    MinInterval,
1855    MinTime,
1856    SumInt16,
1857    SumInt32,
1858    SumInt64,
1859    SumUInt16,
1860    SumUInt32,
1861    SumUInt64,
1862    SumFloat32,
1863    SumFloat64,
1864    SumNumeric,
1865    Count,
1866    Any,
1867    All,
1868    /// Accumulates `Datum::List`s whose first element is a JSON-typed `Datum`s
1869    /// into a JSON list. The other elements are columns used by `order_by`.
1870    ///
1871    /// WARNING: Unlike the `jsonb_agg` function that is exposed by the SQL
1872    /// layer, this function filters out `Datum::Null`, for consistency with
1873    /// the other aggregate functions.
1874    JsonbAgg {
1875        order_by: Vec<ColumnOrder>,
1876    },
1877    /// Zips `Datum::List`s whose first element is a JSON-typed `Datum`s into a
1878    /// JSON map. The other elements are columns used by `order_by`.
1879    ///
1880    /// WARNING: Unlike the `jsonb_object_agg` function that is exposed by the SQL
1881    /// layer, this function filters out `Datum::Null`, for consistency with
1882    /// the other aggregate functions.
1883    JsonbObjectAgg {
1884        order_by: Vec<ColumnOrder>,
1885    },
1886    /// Zips a `Datum::List` whose first element is a `Datum::List` guaranteed
1887    /// to be non-empty and whose len % 2 == 0 into a `Datum::Map`. The other
1888    /// elements are columns used by `order_by`.
1889    MapAgg {
1890        order_by: Vec<ColumnOrder>,
1891        value_type: SqlScalarType,
1892    },
1893    /// Accumulates `Datum::Array`s of `SqlScalarType::Record` whose first element is a `Datum::Array`
1894    /// into a single `Datum::Array` (the remaining fields are used by `order_by`).
1895    ArrayConcat {
1896        order_by: Vec<ColumnOrder>,
1897    },
1898    /// Accumulates `Datum::List`s of `SqlScalarType::Record` whose first field is a `Datum::List`
1899    /// into a single `Datum::List` (the remaining fields are used by `order_by`).
1900    ListConcat {
1901        order_by: Vec<ColumnOrder>,
1902    },
1903    StringAgg {
1904        order_by: Vec<ColumnOrder>,
1905    },
1906    RowNumber {
1907        order_by: Vec<ColumnOrder>,
1908    },
1909    Rank {
1910        order_by: Vec<ColumnOrder>,
1911    },
1912    DenseRank {
1913        order_by: Vec<ColumnOrder>,
1914    },
1915    LagLead {
1916        order_by: Vec<ColumnOrder>,
1917        lag_lead: LagLeadType,
1918        ignore_nulls: bool,
1919    },
1920    FirstValue {
1921        order_by: Vec<ColumnOrder>,
1922        window_frame: WindowFrame,
1923    },
1924    LastValue {
1925        order_by: Vec<ColumnOrder>,
1926        window_frame: WindowFrame,
1927    },
1928    /// Several value window functions fused into one function, to amortize overheads.
1929    FusedValueWindowFunc {
1930        funcs: Vec<AggregateFunc>,
1931        /// Currently, all the fused functions must have the same `order_by`. (We can later
1932        /// eliminate this limitation.)
1933        order_by: Vec<ColumnOrder>,
1934    },
1935    WindowAggregate {
1936        wrapped_aggregate: Box<AggregateFunc>,
1937        order_by: Vec<ColumnOrder>,
1938        window_frame: WindowFrame,
1939    },
1940    FusedWindowAggregate {
1941        wrapped_aggregates: Vec<AggregateFunc>,
1942        order_by: Vec<ColumnOrder>,
1943        window_frame: WindowFrame,
1944    },
1945    /// Accumulates any number of `Datum::Dummy`s into `Datum::Dummy`.
1946    ///
1947    /// Useful for removing an expensive aggregation while maintaining the shape
1948    /// of a reduce operator.
1949    Dummy,
1950}
1951
1952impl AggregateFunc {
1953    pub fn eval<'a, I>(&self, datums: I, temp_storage: &'a RowArena) -> Datum<'a>
1954    where
1955        I: IntoIterator<Item = Datum<'a>>,
1956    {
1957        match self {
1958            AggregateFunc::MaxNumeric => {
1959                max_datum::<'a, I, OrderedDecimal<numeric::Numeric>>(datums)
1960            }
1961            AggregateFunc::MaxInt16 => max_datum::<'a, I, i16>(datums),
1962            AggregateFunc::MaxInt32 => max_datum::<'a, I, i32>(datums),
1963            AggregateFunc::MaxInt64 => max_datum::<'a, I, i64>(datums),
1964            AggregateFunc::MaxUInt16 => max_datum::<'a, I, u16>(datums),
1965            AggregateFunc::MaxUInt32 => max_datum::<'a, I, u32>(datums),
1966            AggregateFunc::MaxUInt64 => max_datum::<'a, I, u64>(datums),
1967            AggregateFunc::MaxMzTimestamp => max_datum::<'a, I, mz_repr::Timestamp>(datums),
1968            AggregateFunc::MaxFloat32 => max_datum::<'a, I, OrderedFloat<f32>>(datums),
1969            AggregateFunc::MaxFloat64 => max_datum::<'a, I, OrderedFloat<f64>>(datums),
1970            AggregateFunc::MaxBool => max_datum::<'a, I, bool>(datums),
1971            AggregateFunc::MaxString => max_string(datums),
1972            AggregateFunc::MaxDate => max_datum::<'a, I, Date>(datums),
1973            AggregateFunc::MaxTimestamp => {
1974                max_datum::<'a, I, CheckedTimestamp<NaiveDateTime>>(datums)
1975            }
1976            AggregateFunc::MaxTimestampTz => {
1977                max_datum::<'a, I, CheckedTimestamp<DateTime<Utc>>>(datums)
1978            }
1979            AggregateFunc::MaxInterval => max_datum::<'a, I, Interval>(datums),
1980            AggregateFunc::MaxTime => max_datum::<'a, I, NaiveTime>(datums),
1981            AggregateFunc::MinNumeric => {
1982                min_datum::<'a, I, OrderedDecimal<numeric::Numeric>>(datums)
1983            }
1984            AggregateFunc::MinInt16 => min_datum::<'a, I, i16>(datums),
1985            AggregateFunc::MinInt32 => min_datum::<'a, I, i32>(datums),
1986            AggregateFunc::MinInt64 => min_datum::<'a, I, i64>(datums),
1987            AggregateFunc::MinUInt16 => min_datum::<'a, I, u16>(datums),
1988            AggregateFunc::MinUInt32 => min_datum::<'a, I, u32>(datums),
1989            AggregateFunc::MinUInt64 => min_datum::<'a, I, u64>(datums),
1990            AggregateFunc::MinMzTimestamp => min_datum::<'a, I, mz_repr::Timestamp>(datums),
1991            AggregateFunc::MinFloat32 => min_datum::<'a, I, OrderedFloat<f32>>(datums),
1992            AggregateFunc::MinFloat64 => min_datum::<'a, I, OrderedFloat<f64>>(datums),
1993            AggregateFunc::MinBool => min_datum::<'a, I, bool>(datums),
1994            AggregateFunc::MinString => min_string(datums),
1995            AggregateFunc::MinDate => min_datum::<'a, I, Date>(datums),
1996            AggregateFunc::MinTimestamp => {
1997                min_datum::<'a, I, CheckedTimestamp<NaiveDateTime>>(datums)
1998            }
1999            AggregateFunc::MinTimestampTz => {
2000                min_datum::<'a, I, CheckedTimestamp<DateTime<Utc>>>(datums)
2001            }
2002            AggregateFunc::MinInterval => min_datum::<'a, I, Interval>(datums),
2003            AggregateFunc::MinTime => min_datum::<'a, I, NaiveTime>(datums),
2004            AggregateFunc::SumInt16 => sum_datum::<'a, I, i16, i64>(datums),
2005            AggregateFunc::SumInt32 => sum_datum::<'a, I, i32, i64>(datums),
2006            AggregateFunc::SumInt64 => sum_datum::<'a, I, i64, i128>(datums),
2007            AggregateFunc::SumUInt16 => sum_datum::<'a, I, u16, u64>(datums),
2008            AggregateFunc::SumUInt32 => sum_datum::<'a, I, u32, u64>(datums),
2009            AggregateFunc::SumUInt64 => sum_datum::<'a, I, u64, u128>(datums),
2010            AggregateFunc::SumFloat32 => sum_datum::<'a, I, f32, f32>(datums),
2011            AggregateFunc::SumFloat64 => sum_datum::<'a, I, f64, f64>(datums),
2012            AggregateFunc::SumNumeric => sum_numeric(datums),
2013            AggregateFunc::Count => count(datums),
2014            AggregateFunc::Any => any(datums),
2015            AggregateFunc::All => all(datums),
2016            AggregateFunc::JsonbAgg { order_by } => jsonb_agg(datums, temp_storage, order_by),
2017            AggregateFunc::MapAgg { order_by, .. } | AggregateFunc::JsonbObjectAgg { order_by } => {
2018                dict_agg(datums, temp_storage, order_by)
2019            }
2020            AggregateFunc::ArrayConcat { order_by } => array_concat(datums, temp_storage, order_by),
2021            AggregateFunc::ListConcat { order_by } => list_concat(datums, temp_storage, order_by),
2022            AggregateFunc::StringAgg { order_by } => string_agg(datums, temp_storage, order_by),
2023            AggregateFunc::RowNumber { order_by } => row_number(datums, temp_storage, order_by),
2024            AggregateFunc::Rank { order_by } => rank(datums, temp_storage, order_by),
2025            AggregateFunc::DenseRank { order_by } => dense_rank(datums, temp_storage, order_by),
2026            AggregateFunc::LagLead {
2027                order_by,
2028                lag_lead: lag_lead_type,
2029                ignore_nulls,
2030            } => lag_lead(datums, temp_storage, order_by, lag_lead_type, ignore_nulls),
2031            AggregateFunc::FirstValue {
2032                order_by,
2033                window_frame,
2034            } => first_value(datums, temp_storage, order_by, window_frame),
2035            AggregateFunc::LastValue {
2036                order_by,
2037                window_frame,
2038            } => last_value(datums, temp_storage, order_by, window_frame),
2039            AggregateFunc::WindowAggregate {
2040                wrapped_aggregate,
2041                order_by,
2042                window_frame,
2043            } => window_aggr::<_, NaiveOneByOneAggr>(
2044                datums,
2045                temp_storage,
2046                wrapped_aggregate,
2047                order_by,
2048                window_frame,
2049            ),
2050            AggregateFunc::FusedValueWindowFunc { funcs, order_by } => {
2051                fused_value_window_func(datums, temp_storage, funcs, order_by)
2052            }
2053            AggregateFunc::FusedWindowAggregate {
2054                wrapped_aggregates,
2055                order_by,
2056                window_frame,
2057            } => fused_window_aggr::<_, NaiveOneByOneAggr>(
2058                datums,
2059                temp_storage,
2060                wrapped_aggregates,
2061                order_by,
2062                window_frame,
2063            ),
2064            AggregateFunc::Dummy => Datum::Dummy,
2065        }
2066    }
2067
2068    /// Like `eval`, but it's given a [OneByOneAggr]. If `self` is a `WindowAggregate`, then
2069    /// the given [OneByOneAggr] will be used to evaluate the wrapped aggregate inside the
2070    /// `WindowAggregate`. If `self` is not a `WindowAggregate`, then it simply calls `eval`.
2071    pub fn eval_with_fast_window_agg<'a, I, W>(
2072        &self,
2073        datums: I,
2074        temp_storage: &'a RowArena,
2075    ) -> Datum<'a>
2076    where
2077        I: IntoIterator<Item = Datum<'a>>,
2078        W: OneByOneAggr,
2079    {
2080        match self {
2081            AggregateFunc::WindowAggregate {
2082                wrapped_aggregate,
2083                order_by,
2084                window_frame,
2085            } => window_aggr::<_, W>(
2086                datums,
2087                temp_storage,
2088                wrapped_aggregate,
2089                order_by,
2090                window_frame,
2091            ),
2092            AggregateFunc::FusedWindowAggregate {
2093                wrapped_aggregates,
2094                order_by,
2095                window_frame,
2096            } => fused_window_aggr::<_, W>(
2097                datums,
2098                temp_storage,
2099                wrapped_aggregates,
2100                order_by,
2101                window_frame,
2102            ),
2103            _ => self.eval(datums, temp_storage),
2104        }
2105    }
2106
2107    pub fn eval_with_unnest_list<'a, I, W>(
2108        &self,
2109        datums: I,
2110        temp_storage: &'a RowArena,
2111    ) -> impl Iterator<Item = Datum<'a>>
2112    where
2113        I: IntoIterator<Item = Datum<'a>>,
2114        W: OneByOneAggr,
2115    {
2116        // TODO: Use `enum_dispatch` to construct a unified iterator instead of `collect_vec`.
2117        assert!(self.can_fuse_with_unnest_list());
2118        match self {
2119            AggregateFunc::RowNumber { order_by } => {
2120                row_number_no_list(datums, temp_storage, order_by).collect_vec()
2121            }
2122            AggregateFunc::Rank { order_by } => {
2123                rank_no_list(datums, temp_storage, order_by).collect_vec()
2124            }
2125            AggregateFunc::DenseRank { order_by } => {
2126                dense_rank_no_list(datums, temp_storage, order_by).collect_vec()
2127            }
2128            AggregateFunc::LagLead {
2129                order_by,
2130                lag_lead: lag_lead_type,
2131                ignore_nulls,
2132            } => lag_lead_no_list(datums, temp_storage, order_by, lag_lead_type, ignore_nulls)
2133                .collect_vec(),
2134            AggregateFunc::FirstValue {
2135                order_by,
2136                window_frame,
2137            } => first_value_no_list(datums, temp_storage, order_by, window_frame).collect_vec(),
2138            AggregateFunc::LastValue {
2139                order_by,
2140                window_frame,
2141            } => last_value_no_list(datums, temp_storage, order_by, window_frame).collect_vec(),
2142            AggregateFunc::FusedValueWindowFunc { funcs, order_by } => {
2143                fused_value_window_func_no_list(datums, temp_storage, funcs, order_by).collect_vec()
2144            }
2145            AggregateFunc::WindowAggregate {
2146                wrapped_aggregate,
2147                order_by,
2148                window_frame,
2149            } => window_aggr_no_list::<_, W>(
2150                datums,
2151                temp_storage,
2152                wrapped_aggregate,
2153                order_by,
2154                window_frame,
2155            )
2156            .collect_vec(),
2157            AggregateFunc::FusedWindowAggregate {
2158                wrapped_aggregates,
2159                order_by,
2160                window_frame,
2161            } => fused_window_aggr_no_list::<_, W>(
2162                datums,
2163                temp_storage,
2164                wrapped_aggregates,
2165                order_by,
2166                window_frame,
2167            )
2168            .collect_vec(),
2169            _ => unreachable!("asserted above that `can_fuse_with_unnest_list`"),
2170        }
2171        .into_iter()
2172    }
2173
2174    /// Returns the output of the aggregation function when applied on an empty
2175    /// input relation.
2176    pub fn default(&self) -> Datum<'static> {
2177        match self {
2178            AggregateFunc::Count => Datum::Int64(0),
2179            AggregateFunc::Any => Datum::False,
2180            AggregateFunc::All => Datum::True,
2181            AggregateFunc::Dummy => Datum::Dummy,
2182            _ => Datum::Null,
2183        }
2184    }
2185
2186    /// Returns a datum whose inclusion in the aggregation will not change its
2187    /// result.
2188    pub fn identity_datum(&self) -> Datum<'static> {
2189        match self {
2190            AggregateFunc::Any => Datum::False,
2191            AggregateFunc::All => Datum::True,
2192            AggregateFunc::Dummy => Datum::Dummy,
2193            AggregateFunc::ArrayConcat { .. } => Datum::empty_array(),
2194            AggregateFunc::ListConcat { .. } => Datum::empty_list(),
2195            AggregateFunc::RowNumber { .. }
2196            | AggregateFunc::Rank { .. }
2197            | AggregateFunc::DenseRank { .. }
2198            | AggregateFunc::LagLead { .. }
2199            | AggregateFunc::FirstValue { .. }
2200            | AggregateFunc::LastValue { .. }
2201            | AggregateFunc::WindowAggregate { .. }
2202            | AggregateFunc::FusedValueWindowFunc { .. }
2203            | AggregateFunc::FusedWindowAggregate { .. } => Datum::empty_list(),
2204            AggregateFunc::MaxNumeric
2205            | AggregateFunc::MaxInt16
2206            | AggregateFunc::MaxInt32
2207            | AggregateFunc::MaxInt64
2208            | AggregateFunc::MaxUInt16
2209            | AggregateFunc::MaxUInt32
2210            | AggregateFunc::MaxUInt64
2211            | AggregateFunc::MaxMzTimestamp
2212            | AggregateFunc::MaxFloat32
2213            | AggregateFunc::MaxFloat64
2214            | AggregateFunc::MaxBool
2215            | AggregateFunc::MaxString
2216            | AggregateFunc::MaxDate
2217            | AggregateFunc::MaxTimestamp
2218            | AggregateFunc::MaxTimestampTz
2219            | AggregateFunc::MaxInterval
2220            | AggregateFunc::MaxTime
2221            | AggregateFunc::MinNumeric
2222            | AggregateFunc::MinInt16
2223            | AggregateFunc::MinInt32
2224            | AggregateFunc::MinInt64
2225            | AggregateFunc::MinUInt16
2226            | AggregateFunc::MinUInt32
2227            | AggregateFunc::MinUInt64
2228            | AggregateFunc::MinMzTimestamp
2229            | AggregateFunc::MinFloat32
2230            | AggregateFunc::MinFloat64
2231            | AggregateFunc::MinBool
2232            | AggregateFunc::MinString
2233            | AggregateFunc::MinDate
2234            | AggregateFunc::MinTimestamp
2235            | AggregateFunc::MinTimestampTz
2236            | AggregateFunc::MinInterval
2237            | AggregateFunc::MinTime
2238            | AggregateFunc::SumInt16
2239            | AggregateFunc::SumInt32
2240            | AggregateFunc::SumInt64
2241            | AggregateFunc::SumUInt16
2242            | AggregateFunc::SumUInt32
2243            | AggregateFunc::SumUInt64
2244            | AggregateFunc::SumFloat32
2245            | AggregateFunc::SumFloat64
2246            | AggregateFunc::SumNumeric
2247            | AggregateFunc::Count
2248            | AggregateFunc::JsonbAgg { .. }
2249            | AggregateFunc::JsonbObjectAgg { .. }
2250            | AggregateFunc::MapAgg { .. }
2251            | AggregateFunc::StringAgg { .. } => Datum::Null,
2252        }
2253    }
2254
2255    pub fn can_fuse_with_unnest_list(&self) -> bool {
2256        match self {
2257            AggregateFunc::RowNumber { .. }
2258            | AggregateFunc::Rank { .. }
2259            | AggregateFunc::DenseRank { .. }
2260            | AggregateFunc::LagLead { .. }
2261            | AggregateFunc::FirstValue { .. }
2262            | AggregateFunc::LastValue { .. }
2263            | AggregateFunc::WindowAggregate { .. }
2264            | AggregateFunc::FusedValueWindowFunc { .. }
2265            | AggregateFunc::FusedWindowAggregate { .. } => true,
2266            AggregateFunc::ArrayConcat { .. }
2267            | AggregateFunc::ListConcat { .. }
2268            | AggregateFunc::Any
2269            | AggregateFunc::All
2270            | AggregateFunc::Dummy
2271            | AggregateFunc::MaxNumeric
2272            | AggregateFunc::MaxInt16
2273            | AggregateFunc::MaxInt32
2274            | AggregateFunc::MaxInt64
2275            | AggregateFunc::MaxUInt16
2276            | AggregateFunc::MaxUInt32
2277            | AggregateFunc::MaxUInt64
2278            | AggregateFunc::MaxMzTimestamp
2279            | AggregateFunc::MaxFloat32
2280            | AggregateFunc::MaxFloat64
2281            | AggregateFunc::MaxBool
2282            | AggregateFunc::MaxString
2283            | AggregateFunc::MaxDate
2284            | AggregateFunc::MaxTimestamp
2285            | AggregateFunc::MaxTimestampTz
2286            | AggregateFunc::MaxInterval
2287            | AggregateFunc::MaxTime
2288            | AggregateFunc::MinNumeric
2289            | AggregateFunc::MinInt16
2290            | AggregateFunc::MinInt32
2291            | AggregateFunc::MinInt64
2292            | AggregateFunc::MinUInt16
2293            | AggregateFunc::MinUInt32
2294            | AggregateFunc::MinUInt64
2295            | AggregateFunc::MinMzTimestamp
2296            | AggregateFunc::MinFloat32
2297            | AggregateFunc::MinFloat64
2298            | AggregateFunc::MinBool
2299            | AggregateFunc::MinString
2300            | AggregateFunc::MinDate
2301            | AggregateFunc::MinTimestamp
2302            | AggregateFunc::MinTimestampTz
2303            | AggregateFunc::MinInterval
2304            | AggregateFunc::MinTime
2305            | AggregateFunc::SumInt16
2306            | AggregateFunc::SumInt32
2307            | AggregateFunc::SumInt64
2308            | AggregateFunc::SumUInt16
2309            | AggregateFunc::SumUInt32
2310            | AggregateFunc::SumUInt64
2311            | AggregateFunc::SumFloat32
2312            | AggregateFunc::SumFloat64
2313            | AggregateFunc::SumNumeric
2314            | AggregateFunc::Count
2315            | AggregateFunc::JsonbAgg { .. }
2316            | AggregateFunc::JsonbObjectAgg { .. }
2317            | AggregateFunc::MapAgg { .. }
2318            | AggregateFunc::StringAgg { .. } => false,
2319        }
2320    }
2321
2322    /// The output column type for the result of an aggregation.
2323    ///
2324    /// The output column type also contains nullability information, which
2325    /// is (without further information) true for aggregations that are not
2326    /// counts.
2327    pub fn output_sql_type(&self, input_type: SqlColumnType) -> SqlColumnType {
2328        let scalar_type = match self {
2329            AggregateFunc::Count => SqlScalarType::Int64,
2330            AggregateFunc::Any => SqlScalarType::Bool,
2331            AggregateFunc::All => SqlScalarType::Bool,
2332            AggregateFunc::JsonbAgg { .. } => SqlScalarType::Jsonb,
2333            AggregateFunc::JsonbObjectAgg { .. } => SqlScalarType::Jsonb,
2334            AggregateFunc::SumInt16 => SqlScalarType::Int64,
2335            AggregateFunc::SumInt32 => SqlScalarType::Int64,
2336            AggregateFunc::SumInt64 => SqlScalarType::Numeric {
2337                max_scale: Some(NumericMaxScale::ZERO),
2338            },
2339            AggregateFunc::SumUInt16 => SqlScalarType::UInt64,
2340            AggregateFunc::SumUInt32 => SqlScalarType::UInt64,
2341            AggregateFunc::SumUInt64 => SqlScalarType::Numeric {
2342                max_scale: Some(NumericMaxScale::ZERO),
2343            },
2344            AggregateFunc::MapAgg { value_type, .. } => SqlScalarType::Map {
2345                value_type: Box::new(value_type.clone()),
2346                custom_id: None,
2347            },
2348            AggregateFunc::ArrayConcat { .. } | AggregateFunc::ListConcat { .. } => {
2349                match input_type.scalar_type {
2350                    // The input is wrapped in a Record if there's an ORDER BY, so extract it out.
2351                    SqlScalarType::Record { ref fields, .. } => fields[0].1.scalar_type.clone(),
2352                    _ => unreachable!(),
2353                }
2354            }
2355            AggregateFunc::StringAgg { .. } => SqlScalarType::String,
2356            AggregateFunc::RowNumber { .. } => {
2357                AggregateFunc::output_type_ranking_window_funcs(&input_type, "?row_number?")
2358            }
2359            AggregateFunc::Rank { .. } => {
2360                AggregateFunc::output_type_ranking_window_funcs(&input_type, "?rank?")
2361            }
2362            AggregateFunc::DenseRank { .. } => {
2363                AggregateFunc::output_type_ranking_window_funcs(&input_type, "?dense_rank?")
2364            }
2365            AggregateFunc::LagLead { lag_lead: lag_lead_type, .. } => {
2366                // The input type for Lag is ((OriginalRow, EncodedArgs), OrderByExprs...)
2367                let fields = input_type.scalar_type.unwrap_record_element_type();
2368                let original_row_type = fields[0].unwrap_record_element_type()[0]
2369                    .clone()
2370                    .nullable(false);
2371                let encoded_args = fields[0].unwrap_record_element_type()[1];
2372                let output_type_inner =
2373                    Self::lag_lead_output_type_inner_from_encoded_args(encoded_args);
2374                let column_name = Self::lag_lead_result_column_name(lag_lead_type);
2375
2376                SqlScalarType::List {
2377                    element_type: Box::new(SqlScalarType::Record {
2378                        fields: [
2379                            (column_name, output_type_inner),
2380                            (ColumnName::from("?orig_row?"), original_row_type),
2381                        ].into(),
2382                        custom_id: None,
2383                    }),
2384                    custom_id: None,
2385                }
2386            }
2387            AggregateFunc::FirstValue { .. } => {
2388                // The input type for FirstValue is ((OriginalRow, Arg), OrderByExprs...)
2389                let fields = input_type.scalar_type.unwrap_record_element_type();
2390                let original_row_type = fields[0].unwrap_record_element_type()[0]
2391                    .clone()
2392                    .nullable(false);
2393                let value_type = fields[0].unwrap_record_element_type()[1]
2394                    .clone()
2395                    .nullable(true); // null when the partition is empty
2396
2397                SqlScalarType::List {
2398                    element_type: Box::new(SqlScalarType::Record {
2399                        fields: [
2400                            (ColumnName::from("?first_value?"), value_type),
2401                            (ColumnName::from("?orig_row?"), original_row_type),
2402                        ].into(),
2403                        custom_id: None,
2404                    }),
2405                    custom_id: None,
2406                }
2407            }
2408            AggregateFunc::LastValue { .. } => {
2409                // The input type for LastValue is ((OriginalRow, Arg), OrderByExprs...)
2410                let fields = input_type.scalar_type.unwrap_record_element_type();
2411                let original_row_type = fields[0].unwrap_record_element_type()[0]
2412                    .clone()
2413                    .nullable(false);
2414                let value_type = fields[0].unwrap_record_element_type()[1]
2415                    .clone()
2416                    .nullable(true); // null when the partition is empty
2417
2418                SqlScalarType::List {
2419                    element_type: Box::new(SqlScalarType::Record {
2420                        fields: [
2421                            (ColumnName::from("?last_value?"), value_type),
2422                            (ColumnName::from("?orig_row?"), original_row_type),
2423                        ].into(),
2424                        custom_id: None,
2425                    }),
2426                    custom_id: None,
2427                }
2428            }
2429            AggregateFunc::WindowAggregate {
2430                wrapped_aggregate, ..
2431            } => {
2432                // The input type for a window aggregate is ((OriginalRow, Arg), OrderByExprs...)
2433                let fields = input_type.scalar_type.unwrap_record_element_type();
2434                let original_row_type = fields[0].unwrap_record_element_type()[0]
2435                    .clone()
2436                    .nullable(false);
2437                let arg_type = fields[0].unwrap_record_element_type()[1]
2438                    .clone()
2439                    .nullable(true);
2440                let wrapped_aggr_out_type = wrapped_aggregate.output_sql_type(arg_type);
2441
2442                SqlScalarType::List {
2443                    element_type: Box::new(SqlScalarType::Record {
2444                        fields: [
2445                            (ColumnName::from("?window_agg?"), wrapped_aggr_out_type),
2446                            (ColumnName::from("?orig_row?"), original_row_type),
2447                        ].into(),
2448                        custom_id: None,
2449                    }),
2450                    custom_id: None,
2451                }
2452            }
2453            AggregateFunc::FusedWindowAggregate {
2454                wrapped_aggregates, ..
2455            } => {
2456                // The input type for a fused window aggregate is ((OriginalRow, Args), OrderByExprs...)
2457                // where `Args` is a record.
2458                let fields = input_type.scalar_type.unwrap_record_element_type();
2459                let original_row_type = fields[0].unwrap_record_element_type()[0]
2460                    .clone()
2461                    .nullable(false);
2462                let args_type = fields[0].unwrap_record_element_type()[1];
2463                let arg_types = args_type.unwrap_record_element_type();
2464                let out_fields = arg_types.iter().zip_eq(wrapped_aggregates).map(
2465                    |(arg_type, wrapped_agg)| {
2466                    (
2467                        ColumnName::from(wrapped_agg.name()),
2468                        wrapped_agg.output_sql_type((**arg_type).clone().nullable(true)),
2469                    )
2470                }).collect_vec();
2471
2472                SqlScalarType::List {
2473                    element_type: Box::new(SqlScalarType::Record {
2474                        fields: [
2475                            (ColumnName::from("?fused_window_agg?"), SqlScalarType::Record {
2476                                fields: out_fields.into(),
2477                                custom_id: None,
2478                            }.nullable(false)),
2479                            (ColumnName::from("?orig_row?"), original_row_type),
2480                        ].into(),
2481                        custom_id: None,
2482                    }),
2483                    custom_id: None,
2484                }
2485            }
2486            AggregateFunc::FusedValueWindowFunc { funcs, order_by: _ } => {
2487                // The input type is ((OriginalRow, EncodedArgs), OrderByExprs...)
2488                // where EncodedArgs is a record, where each element is the argument to one of the
2489                // function calls that got fused. This is a record for lag/lead, and a simple type
2490                // for first_value/last_value.
2491                let fields = input_type.scalar_type.unwrap_record_element_type();
2492                let original_row_type = fields[0].unwrap_record_element_type()[0]
2493                    .clone()
2494                    .nullable(false);
2495                let encoded_args_type = fields[0]
2496                    .unwrap_record_element_type()[1]
2497                    .unwrap_record_element_type();
2498
2499                SqlScalarType::List {
2500                    element_type: Box::new(SqlScalarType::Record {
2501                        fields: [
2502                            (
2503                                ColumnName::from("?fused_value_window_func?"),
2504                                SqlScalarType::Record {
2505                                fields: encoded_args_type.into_iter().zip_eq(funcs).map(
2506                                    |(arg_type, func)| {
2507                                    match func {
2508                                        AggregateFunc::LagLead {
2509                                            lag_lead: lag_lead_type, ..
2510                                        } => {
2511                                            let name = Self::lag_lead_result_column_name(
2512                                                lag_lead_type,
2513                                            );
2514                                            let ty = Self
2515                                                ::lag_lead_output_type_inner_from_encoded_args(
2516                                                    arg_type,
2517                                                );
2518                                            (name, ty)
2519                                        },
2520                                        AggregateFunc::FirstValue { .. } => {
2521                                            (
2522                                                ColumnName::from("?first_value?"),
2523                                                arg_type.clone().nullable(true),
2524                                            )
2525                                        }
2526                                        AggregateFunc::LastValue { .. } => {
2527                                            (
2528                                                ColumnName::from("?last_value?"),
2529                                                arg_type.clone().nullable(true),
2530                                            )
2531                                        }
2532                                        _ => panic!("FusedValueWindowFunc has an unknown function"),
2533                                    }
2534                                }).collect(),
2535                                custom_id: None,
2536                            }.nullable(false)),
2537                            (ColumnName::from("?orig_row?"), original_row_type),
2538                        ].into(),
2539                        custom_id: None,
2540                    }),
2541                    custom_id: None,
2542                }
2543            }
2544            AggregateFunc::Dummy
2545            | AggregateFunc::MaxNumeric
2546            | AggregateFunc::MaxInt16
2547            | AggregateFunc::MaxInt32
2548            | AggregateFunc::MaxInt64
2549            | AggregateFunc::MaxUInt16
2550            | AggregateFunc::MaxUInt32
2551            | AggregateFunc::MaxUInt64
2552            | AggregateFunc::MaxMzTimestamp
2553            | AggregateFunc::MaxFloat32
2554            | AggregateFunc::MaxFloat64
2555            | AggregateFunc::MaxBool
2556            // Note AggregateFunc::MaxString, MinString rely on returning input
2557            // type as output type to support the proper return type for
2558            // character input.
2559            | AggregateFunc::MaxString
2560            | AggregateFunc::MaxDate
2561            | AggregateFunc::MaxTimestamp
2562            | AggregateFunc::MaxTimestampTz
2563            | AggregateFunc::MaxInterval
2564            | AggregateFunc::MaxTime
2565            | AggregateFunc::MinNumeric
2566            | AggregateFunc::MinInt16
2567            | AggregateFunc::MinInt32
2568            | AggregateFunc::MinInt64
2569            | AggregateFunc::MinUInt16
2570            | AggregateFunc::MinUInt32
2571            | AggregateFunc::MinUInt64
2572            | AggregateFunc::MinMzTimestamp
2573            | AggregateFunc::MinFloat32
2574            | AggregateFunc::MinFloat64
2575            | AggregateFunc::MinBool
2576            | AggregateFunc::MinString
2577            | AggregateFunc::MinDate
2578            | AggregateFunc::MinTimestamp
2579            | AggregateFunc::MinTimestampTz
2580            | AggregateFunc::MinInterval
2581            | AggregateFunc::MinTime
2582            | AggregateFunc::SumFloat32
2583            | AggregateFunc::SumFloat64
2584            | AggregateFunc::SumNumeric => input_type.scalar_type.clone(),
2585        };
2586        // Count never produces null, and other aggregations only produce
2587        // null in the presence of null inputs.
2588        let nullable = match self {
2589            AggregateFunc::Count => false,
2590            // Use the nullability of the underlying column being aggregated, not the Records wrapping it
2591            AggregateFunc::StringAgg { .. } => match input_type.scalar_type {
2592                // The outer Record wraps the input in the first position, and any ORDER BY expressions afterwards
2593                SqlScalarType::Record { fields, .. } => match &fields[0].1.scalar_type {
2594                    // The inner Record is a (value, separator) tuple
2595                    SqlScalarType::Record { fields, .. } => fields[0].1.nullable,
2596                    _ => unreachable!(),
2597                },
2598                _ => unreachable!(),
2599            },
2600            _ => input_type.nullable,
2601        };
2602        scalar_type.nullable(nullable)
2603    }
2604
2605    /// Computes the representation type of this aggregate function.
2606    ///
2607    /// This is a wrapper around [`Self::output_sql_type`] that converts the result to a representation type.
2608    pub fn output_type(&self, input_type: ReprColumnType) -> ReprColumnType {
2609        ReprColumnType::from(&self.output_sql_type(SqlColumnType::from_repr(&input_type)))
2610    }
2611
2612    /// Compute output type for ROW_NUMBER, RANK, DENSE_RANK
2613    fn output_type_ranking_window_funcs(
2614        input_type: &SqlColumnType,
2615        col_name: &str,
2616    ) -> SqlScalarType {
2617        match input_type.scalar_type {
2618            SqlScalarType::Record { ref fields, .. } => SqlScalarType::List {
2619                element_type: Box::new(SqlScalarType::Record {
2620                    fields: [
2621                        (
2622                            ColumnName::from(col_name),
2623                            SqlScalarType::Int64.nullable(false),
2624                        ),
2625                        (ColumnName::from("?orig_row?"), {
2626                            let inner = match &fields[0].1.scalar_type {
2627                                SqlScalarType::List { element_type, .. } => element_type.clone(),
2628                                _ => unreachable!(),
2629                            };
2630                            inner.nullable(false)
2631                        }),
2632                    ]
2633                    .into(),
2634                    custom_id: None,
2635                }),
2636                custom_id: None,
2637            },
2638            _ => unreachable!(),
2639        }
2640    }
2641
2642    /// Given the `EncodedArgs` part of `((OriginalRow, EncodedArgs), OrderByExprs...)`,
2643    /// this computes the type of the first field of the output type. (The first field is the
2644    /// real result, the rest is the original row.)
2645    fn lag_lead_output_type_inner_from_encoded_args(
2646        encoded_args_type: &SqlScalarType,
2647    ) -> SqlColumnType {
2648        // lag/lead have 3 arguments, and the output type is
2649        // the same as the first of these, but always nullable. (It's null when the
2650        // lag/lead computation reaches over the bounds of the window partition.)
2651        encoded_args_type.unwrap_record_element_type()[0]
2652            .clone()
2653            .nullable(true)
2654    }
2655
2656    fn lag_lead_result_column_name(lag_lead_type: &LagLeadType) -> ColumnName {
2657        ColumnName::from(match lag_lead_type {
2658            LagLeadType::Lag => "?lag?",
2659            LagLeadType::Lead => "?lead?",
2660        })
2661    }
2662
2663    /// Returns true if the non-null constraint on the aggregation can be
2664    /// converted into a non-null constraint on its parameter expression, ie.
2665    /// whether the result of the aggregation is null if all the input values
2666    /// are null.
2667    pub fn propagates_nonnull_constraint(&self) -> bool {
2668        match self {
2669            AggregateFunc::MaxNumeric
2670            | AggregateFunc::MaxInt16
2671            | AggregateFunc::MaxInt32
2672            | AggregateFunc::MaxInt64
2673            | AggregateFunc::MaxUInt16
2674            | AggregateFunc::MaxUInt32
2675            | AggregateFunc::MaxUInt64
2676            | AggregateFunc::MaxMzTimestamp
2677            | AggregateFunc::MaxFloat32
2678            | AggregateFunc::MaxFloat64
2679            | AggregateFunc::MaxBool
2680            | AggregateFunc::MaxString
2681            | AggregateFunc::MaxDate
2682            | AggregateFunc::MaxTimestamp
2683            | AggregateFunc::MaxTimestampTz
2684            | AggregateFunc::MaxInterval
2685            | AggregateFunc::MaxTime
2686            | AggregateFunc::MinNumeric
2687            | AggregateFunc::MinInt16
2688            | AggregateFunc::MinInt32
2689            | AggregateFunc::MinInt64
2690            | AggregateFunc::MinUInt16
2691            | AggregateFunc::MinUInt32
2692            | AggregateFunc::MinUInt64
2693            | AggregateFunc::MinMzTimestamp
2694            | AggregateFunc::MinFloat32
2695            | AggregateFunc::MinFloat64
2696            | AggregateFunc::MinBool
2697            | AggregateFunc::MinString
2698            | AggregateFunc::MinDate
2699            | AggregateFunc::MinTimestamp
2700            | AggregateFunc::MinTimestampTz
2701            | AggregateFunc::MinInterval
2702            | AggregateFunc::MinTime
2703            | AggregateFunc::SumInt16
2704            | AggregateFunc::SumInt32
2705            | AggregateFunc::SumInt64
2706            | AggregateFunc::SumUInt16
2707            | AggregateFunc::SumUInt32
2708            | AggregateFunc::SumUInt64
2709            | AggregateFunc::SumFloat32
2710            | AggregateFunc::SumFloat64
2711            | AggregateFunc::SumNumeric
2712            | AggregateFunc::StringAgg { .. } => true,
2713            // Count is never null
2714            AggregateFunc::Count
2715            | AggregateFunc::Any
2716            | AggregateFunc::All
2717            | AggregateFunc::JsonbAgg { .. }
2718            | AggregateFunc::JsonbObjectAgg { .. }
2719            | AggregateFunc::MapAgg { .. }
2720            | AggregateFunc::ArrayConcat { .. }
2721            | AggregateFunc::ListConcat { .. }
2722            | AggregateFunc::RowNumber { .. }
2723            | AggregateFunc::Rank { .. }
2724            | AggregateFunc::DenseRank { .. }
2725            | AggregateFunc::LagLead { .. }
2726            | AggregateFunc::FirstValue { .. }
2727            | AggregateFunc::LastValue { .. }
2728            | AggregateFunc::FusedValueWindowFunc { .. }
2729            | AggregateFunc::WindowAggregate { .. }
2730            | AggregateFunc::FusedWindowAggregate { .. }
2731            | AggregateFunc::Dummy => false,
2732        }
2733    }
2734}
2735
2736fn jsonb_each<'a>(a: Datum<'a>) -> impl Iterator<Item = (Row, Diff)> + 'a {
2737    // First produce a map, so that a common iterator can be returned.
2738    let map = match a {
2739        Datum::Map(dict) => dict,
2740        _ => mz_repr::DatumMap::empty(),
2741    };
2742
2743    map.iter()
2744        .map(move |(k, v)| (Row::pack_slice(&[Datum::String(k), v]), Diff::ONE))
2745}
2746
2747fn jsonb_each_stringify<'a>(
2748    a: Datum<'a>,
2749    temp_storage: &'a RowArena,
2750) -> impl Iterator<Item = (Row, Diff)> + 'a {
2751    // First produce a map, so that a common iterator can be returned.
2752    let map = match a {
2753        Datum::Map(dict) => dict,
2754        _ => mz_repr::DatumMap::empty(),
2755    };
2756
2757    map.iter().map(move |(k, mut v)| {
2758        v = jsonb_stringify(v, temp_storage)
2759            .map(Datum::String)
2760            .unwrap_or(Datum::Null);
2761        (Row::pack_slice(&[Datum::String(k), v]), Diff::ONE)
2762    })
2763}
2764
2765fn jsonb_object_keys<'a>(a: Datum<'a>) -> impl Iterator<Item = (Row, Diff)> + 'a {
2766    let map = match a {
2767        Datum::Map(dict) => dict,
2768        _ => mz_repr::DatumMap::empty(),
2769    };
2770
2771    map.iter()
2772        .map(move |(k, _)| (Row::pack_slice(&[Datum::String(k)]), Diff::ONE))
2773}
2774
2775fn jsonb_array_elements<'a>(a: Datum<'a>) -> impl Iterator<Item = (Row, Diff)> + 'a {
2776    let list = match a {
2777        Datum::List(list) => list,
2778        _ => mz_repr::DatumList::empty(),
2779    };
2780    list.iter().map(move |e| (Row::pack_slice(&[e]), Diff::ONE))
2781}
2782
2783fn jsonb_array_elements_stringify<'a>(
2784    a: Datum<'a>,
2785    temp_storage: &'a RowArena,
2786) -> impl Iterator<Item = (Row, Diff)> + 'a {
2787    let list = match a {
2788        Datum::List(list) => list,
2789        _ => mz_repr::DatumList::empty(),
2790    };
2791    list.iter().map(move |mut e| {
2792        e = jsonb_stringify(e, temp_storage)
2793            .map(Datum::String)
2794            .unwrap_or(Datum::Null);
2795        (Row::pack_slice(&[e]), Diff::ONE)
2796    })
2797}
2798
2799fn regexp_extract(a: Datum, r: &AnalyzedRegex) -> Option<(Row, Diff)> {
2800    let r = r.inner();
2801    let a = a.unwrap_str();
2802    let captures = r.captures(a)?;
2803    let datums = captures
2804        .iter()
2805        .skip(1)
2806        .map(|m| Datum::from(m.map(|m| m.as_str())));
2807    Some((Row::pack(datums), Diff::ONE))
2808}
2809
2810fn regexp_matches<'a>(
2811    exprs: &[Datum<'a>],
2812) -> Result<impl Iterator<Item = (Row, Diff)> + 'a, EvalError> {
2813    // There are only two acceptable ways to call this function:
2814    // 1. regexp_matches(string, regex)
2815    // 2. regexp_matches(string, regex, flag)
2816    assert!(exprs.len() == 2 || exprs.len() == 3);
2817    let a = exprs[0].unwrap_str();
2818    let r = exprs[1].unwrap_str();
2819
2820    let (regex, opts) = if exprs.len() == 3 {
2821        let flag = exprs[2].unwrap_str();
2822        let opts = AnalyzedRegexOpts::from_str(flag)?;
2823        (AnalyzedRegex::new(r, opts)?, opts)
2824    } else {
2825        let opts = AnalyzedRegexOpts::default();
2826        (AnalyzedRegex::new(r, opts)?, opts)
2827    };
2828
2829    let regex = regex.inner().clone();
2830
2831    let iter = regex.captures_iter(a).map(move |captures| {
2832        let matches = captures
2833            .iter()
2834            // The first match is the *entire* match, we want the capture groups by themselves.
2835            .skip(1)
2836            .map(|m| Datum::from(m.map(|m| m.as_str())))
2837            .collect::<Vec<_>>();
2838
2839        let mut binding = SharedRow::get();
2840        let mut packer = binding.packer();
2841
2842        let dimension = ArrayDimension {
2843            lower_bound: 1,
2844            length: matches.len(),
2845        };
2846        packer
2847            .try_push_array(&[dimension], matches)
2848            .expect("generated dimensions above");
2849
2850        (binding.clone(), Diff::ONE)
2851    });
2852
2853    // This is slightly unfortunate, but we need to collect the captures into a
2854    // Vec before we can yield them, because we can't return a iter with a
2855    // reference to the local `regex` variable.
2856    // We attempt to minimize the cost of this by using a SmallVec.
2857    let out = iter.collect::<SmallVec<[_; 3]>>();
2858
2859    if opts.global {
2860        Ok(Either::Left(out.into_iter()))
2861    } else {
2862        Ok(Either::Right(out.into_iter().take(1)))
2863    }
2864}
2865
2866fn generate_series<N>(
2867    start: N,
2868    stop: N,
2869    step: N,
2870) -> Result<impl Iterator<Item = (Row, Diff)>, EvalError>
2871where
2872    N: Integer + Signed + CheckedAdd + Clone,
2873    Datum<'static>: From<N>,
2874{
2875    if step == N::zero() {
2876        return Err(EvalError::InvalidParameterValue(
2877            "step size cannot equal zero".into(),
2878        ));
2879    }
2880    Ok(num::range_step_inclusive(start, stop, step)
2881        .map(move |i| (Row::pack_slice(&[Datum::from(i)]), Diff::ONE)))
2882}
2883
2884/// Like
2885/// [`num::range_step_inclusive`](https://github.com/rust-num/num-iter/blob/ddb14c1e796d401014c6c7a727de61d8109ad986/src/lib.rs#L279),
2886/// but for our timestamp types using [`Interval`] for `step`.xwxw
2887#[derive(Clone)]
2888pub struct TimestampRangeStepInclusive<T> {
2889    state: CheckedTimestamp<T>,
2890    stop: CheckedTimestamp<T>,
2891    step: Interval,
2892    rev: bool,
2893    done: bool,
2894}
2895
2896impl<T: TimestampLike> Iterator for TimestampRangeStepInclusive<T> {
2897    type Item = CheckedTimestamp<T>;
2898
2899    #[inline]
2900    fn next(&mut self) -> Option<CheckedTimestamp<T>> {
2901        if !self.done
2902            && ((self.rev && self.state >= self.stop) || (!self.rev && self.state <= self.stop))
2903        {
2904            let result = self.state.clone();
2905            match add_timestamp_months(self.state.deref(), self.step.months) {
2906                Ok(state) => match state.checked_add_signed(self.step.duration_as_chrono()) {
2907                    Some(v) => match CheckedTimestamp::from_timestamplike(v) {
2908                        Ok(v) => self.state = v,
2909                        Err(_) => self.done = true,
2910                    },
2911                    None => self.done = true,
2912                },
2913                Err(..) => {
2914                    self.done = true;
2915                }
2916            }
2917
2918            Some(result)
2919        } else {
2920            None
2921        }
2922    }
2923}
2924
2925fn generate_series_ts<T: TimestampLike>(
2926    start: CheckedTimestamp<T>,
2927    stop: CheckedTimestamp<T>,
2928    step: Interval,
2929    conv: fn(CheckedTimestamp<T>) -> Datum<'static>,
2930) -> Result<impl Iterator<Item = (Row, Diff)>, EvalError> {
2931    let normalized_step = step.as_microseconds();
2932    if normalized_step == 0 {
2933        return Err(EvalError::InvalidParameterValue(
2934            "step size cannot equal zero".into(),
2935        ));
2936    }
2937    let rev = normalized_step < 0;
2938
2939    let trsi = TimestampRangeStepInclusive {
2940        state: start,
2941        stop,
2942        step,
2943        rev,
2944        done: false,
2945    };
2946
2947    Ok(trsi.map(move |i| (Row::pack_slice(&[conv(i)]), Diff::ONE)))
2948}
2949
2950fn generate_subscripts_array(
2951    a: Datum,
2952    dim: i32,
2953) -> Result<Box<dyn Iterator<Item = (Row, Diff)>>, EvalError> {
2954    if dim <= 0 {
2955        return Ok(Box::new(iter::empty()));
2956    }
2957
2958    match a.unwrap_array().dims().into_iter().nth(
2959        (dim - 1)
2960            .try_into()
2961            .map_err(|_| EvalError::Int32OutOfRange((dim - 1).to_string().into()))?,
2962    ) {
2963        Some(requested_dim) => Ok(Box::new(generate_series::<i32>(
2964            requested_dim.lower_bound.try_into().map_err(|_| {
2965                EvalError::Int32OutOfRange(requested_dim.lower_bound.to_string().into())
2966            })?,
2967            requested_dim
2968                .length
2969                .try_into()
2970                .map_err(|_| EvalError::Int32OutOfRange(requested_dim.length.to_string().into()))?,
2971            1,
2972        )?)),
2973        None => Ok(Box::new(iter::empty())),
2974    }
2975}
2976
2977fn unnest_array<'a>(a: Datum<'a>) -> impl Iterator<Item = (Row, Diff)> + 'a {
2978    a.unwrap_array()
2979        .elements()
2980        .iter()
2981        .map(move |e| (Row::pack_slice(&[e]), Diff::ONE))
2982}
2983
2984fn unnest_list<'a>(a: Datum<'a>) -> impl Iterator<Item = (Row, Diff)> + 'a {
2985    a.unwrap_list()
2986        .iter()
2987        .map(move |e| (Row::pack_slice(&[e]), Diff::ONE))
2988}
2989
2990fn unnest_map<'a>(a: Datum<'a>) -> impl Iterator<Item = (Row, Diff)> + 'a {
2991    a.unwrap_map()
2992        .iter()
2993        .map(move |(k, v)| (Row::pack_slice(&[Datum::from(k), v]), Diff::ONE))
2994}
2995
2996impl AggregateFunc {
2997    /// The base function name without the `~[...]` suffix used when rendering
2998    /// variants that represent a parameterized function family.
2999    pub fn name(&self) -> &'static str {
3000        match self {
3001            Self::MaxNumeric => "max",
3002            Self::MaxInt16 => "max",
3003            Self::MaxInt32 => "max",
3004            Self::MaxInt64 => "max",
3005            Self::MaxUInt16 => "max",
3006            Self::MaxUInt32 => "max",
3007            Self::MaxUInt64 => "max",
3008            Self::MaxMzTimestamp => "max",
3009            Self::MaxFloat32 => "max",
3010            Self::MaxFloat64 => "max",
3011            Self::MaxBool => "max",
3012            Self::MaxString => "max",
3013            Self::MaxDate => "max",
3014            Self::MaxTimestamp => "max",
3015            Self::MaxTimestampTz => "max",
3016            Self::MaxInterval => "max",
3017            Self::MaxTime => "max",
3018            Self::MinNumeric => "min",
3019            Self::MinInt16 => "min",
3020            Self::MinInt32 => "min",
3021            Self::MinInt64 => "min",
3022            Self::MinUInt16 => "min",
3023            Self::MinUInt32 => "min",
3024            Self::MinUInt64 => "min",
3025            Self::MinMzTimestamp => "min",
3026            Self::MinFloat32 => "min",
3027            Self::MinFloat64 => "min",
3028            Self::MinBool => "min",
3029            Self::MinString => "min",
3030            Self::MinDate => "min",
3031            Self::MinTimestamp => "min",
3032            Self::MinTimestampTz => "min",
3033            Self::MinInterval => "min",
3034            Self::MinTime => "min",
3035            Self::SumInt16 => "sum",
3036            Self::SumInt32 => "sum",
3037            Self::SumInt64 => "sum",
3038            Self::SumUInt16 => "sum",
3039            Self::SumUInt32 => "sum",
3040            Self::SumUInt64 => "sum",
3041            Self::SumFloat32 => "sum",
3042            Self::SumFloat64 => "sum",
3043            Self::SumNumeric => "sum",
3044            Self::Count => "count",
3045            Self::Any => "any",
3046            Self::All => "all",
3047            Self::JsonbAgg { .. } => "jsonb_agg",
3048            Self::JsonbObjectAgg { .. } => "jsonb_object_agg",
3049            Self::MapAgg { .. } => "map_agg",
3050            Self::ArrayConcat { .. } => "array_agg",
3051            Self::ListConcat { .. } => "list_agg",
3052            Self::StringAgg { .. } => "string_agg",
3053            Self::RowNumber { .. } => "row_number",
3054            Self::Rank { .. } => "rank",
3055            Self::DenseRank { .. } => "dense_rank",
3056            Self::LagLead {
3057                lag_lead: LagLeadType::Lag,
3058                ..
3059            } => "lag",
3060            Self::LagLead {
3061                lag_lead: LagLeadType::Lead,
3062                ..
3063            } => "lead",
3064            Self::FirstValue { .. } => "first_value",
3065            Self::LastValue { .. } => "last_value",
3066            Self::WindowAggregate { .. } => "window_agg",
3067            Self::FusedValueWindowFunc { .. } => "fused_value_window_func",
3068            Self::FusedWindowAggregate { .. } => "fused_window_agg",
3069            Self::Dummy => "dummy",
3070        }
3071    }
3072}
3073
3074impl<'a, M> fmt::Display for HumanizedExpr<'a, AggregateFunc, M>
3075where
3076    M: HumanizerMode,
3077{
3078    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
3079        use AggregateFunc::*;
3080        let name = self.expr.name();
3081        match self.expr {
3082            JsonbAgg { order_by }
3083            | JsonbObjectAgg { order_by }
3084            | MapAgg { order_by, .. }
3085            | ArrayConcat { order_by }
3086            | ListConcat { order_by }
3087            | StringAgg { order_by }
3088            | RowNumber { order_by }
3089            | Rank { order_by }
3090            | DenseRank { order_by } => {
3091                let order_by = order_by.iter().map(|col| self.child(col));
3092                write!(f, "{}[order_by=[{}]]", name, separated(", ", order_by))
3093            }
3094            LagLead {
3095                lag_lead: _,
3096                ignore_nulls,
3097                order_by,
3098            } => {
3099                let order_by = order_by.iter().map(|col| self.child(col));
3100                f.write_str(name)?;
3101                f.write_str("[")?;
3102                if *ignore_nulls {
3103                    f.write_str("ignore_nulls=true, ")?;
3104                }
3105                write!(f, "order_by=[{}]", separated(", ", order_by))?;
3106                f.write_str("]")
3107            }
3108            FirstValue {
3109                order_by,
3110                window_frame,
3111            } => {
3112                let order_by = order_by.iter().map(|col| self.child(col));
3113                f.write_str(name)?;
3114                f.write_str("[")?;
3115                write!(f, "order_by=[{}]", separated(", ", order_by))?;
3116                if *window_frame != WindowFrame::default() {
3117                    write!(f, " {}", window_frame)?;
3118                }
3119                f.write_str("]")
3120            }
3121            LastValue {
3122                order_by,
3123                window_frame,
3124            } => {
3125                let order_by = order_by.iter().map(|col| self.child(col));
3126                f.write_str(name)?;
3127                f.write_str("[")?;
3128                write!(f, "order_by=[{}]", separated(", ", order_by))?;
3129                if *window_frame != WindowFrame::default() {
3130                    write!(f, " {}", window_frame)?;
3131                }
3132                f.write_str("]")
3133            }
3134            WindowAggregate {
3135                wrapped_aggregate,
3136                order_by,
3137                window_frame,
3138            } => {
3139                let order_by = order_by.iter().map(|col| self.child(col));
3140                let wrapped_aggregate = self.child(wrapped_aggregate.deref());
3141                f.write_str(name)?;
3142                f.write_str("[")?;
3143                write!(f, "{} ", wrapped_aggregate)?;
3144                write!(f, "order_by=[{}]", separated(", ", order_by))?;
3145                if *window_frame != WindowFrame::default() {
3146                    write!(f, " {}", window_frame)?;
3147                }
3148                f.write_str("]")
3149            }
3150            FusedValueWindowFunc { funcs, order_by } => {
3151                let order_by = order_by.iter().map(|col| self.child(col));
3152                let funcs = separated(", ", funcs.iter().map(|func| self.child(func)));
3153                f.write_str(name)?;
3154                f.write_str("[")?;
3155                write!(f, "{} ", funcs)?;
3156                write!(f, "order_by=[{}]", separated(", ", order_by))?;
3157                f.write_str("]")
3158            }
3159            _ => f.write_str(name),
3160        }
3161    }
3162}
3163
3164#[derive(
3165    Clone,
3166    Debug,
3167    Eq,
3168    PartialEq,
3169    Ord,
3170    PartialOrd,
3171    Serialize,
3172    Deserialize,
3173    Hash,
3174    MzReflect
3175)]
3176pub struct CaptureGroupDesc {
3177    pub index: u32,
3178    pub name: Option<String>,
3179    pub nullable: bool,
3180}
3181
3182#[derive(
3183    Clone,
3184    Copy,
3185    Debug,
3186    Eq,
3187    PartialEq,
3188    Ord,
3189    PartialOrd,
3190    Serialize,
3191    Deserialize,
3192    Hash,
3193    MzReflect,
3194    Default
3195)]
3196pub struct AnalyzedRegexOpts {
3197    pub case_insensitive: bool,
3198    pub global: bool,
3199}
3200
3201impl FromStr for AnalyzedRegexOpts {
3202    type Err = EvalError;
3203
3204    fn from_str(s: &str) -> Result<Self, Self::Err> {
3205        let mut opts = AnalyzedRegexOpts::default();
3206        for c in s.chars() {
3207            match c {
3208                'i' => opts.case_insensitive = true,
3209                'g' => opts.global = true,
3210                _ => return Err(EvalError::InvalidRegexFlag(c)),
3211            }
3212        }
3213        Ok(opts)
3214    }
3215}
3216
3217#[derive(
3218    Clone,
3219    Debug,
3220    Eq,
3221    PartialEq,
3222    Ord,
3223    PartialOrd,
3224    Serialize,
3225    Deserialize,
3226    Hash,
3227    MzReflect
3228)]
3229pub struct AnalyzedRegex(ReprRegex, Vec<CaptureGroupDesc>, AnalyzedRegexOpts);
3230
3231impl AnalyzedRegex {
3232    pub fn new(s: &str, opts: AnalyzedRegexOpts) -> Result<Self, RegexCompilationError> {
3233        let r = ReprRegex::new(s, opts.case_insensitive)?;
3234        // TODO(benesch): remove potentially dangerous usage of `as`.
3235        #[allow(clippy::as_conversions)]
3236        let descs: Vec<_> = r
3237            .capture_names()
3238            .enumerate()
3239            // The first capture is the entire matched string.
3240            // This will often not be useful, so skip it.
3241            // If people want it they can just surround their
3242            // entire regex in an explicit capture group.
3243            .skip(1)
3244            .map(|(i, name)| CaptureGroupDesc {
3245                index: i as u32,
3246                name: name.map(String::from),
3247                // TODO -- we can do better.
3248                // https://github.com/MaterializeInc/database-issues/issues/612
3249                nullable: true,
3250            })
3251            .collect();
3252        Ok(Self(r, descs, opts))
3253    }
3254    pub fn capture_groups_len(&self) -> usize {
3255        self.1.len()
3256    }
3257    pub fn capture_groups_iter(&self) -> impl Iterator<Item = &CaptureGroupDesc> {
3258        self.1.iter()
3259    }
3260    pub fn inner(&self) -> &Regex {
3261        &(self.0).regex
3262    }
3263    pub fn opts(&self) -> &AnalyzedRegexOpts {
3264        &self.2
3265    }
3266}
3267
3268pub fn csv_extract(a: Datum<'_>, n_cols: usize) -> impl Iterator<Item = (Row, Diff)> + '_ {
3269    let bytes = a.unwrap_str().as_bytes();
3270    let mut row = Row::default();
3271    let csv_reader = csv::ReaderBuilder::new()
3272        .has_headers(false)
3273        .from_reader(bytes);
3274    csv_reader.into_records().filter_map(move |res| match res {
3275        Ok(sr) if sr.len() == n_cols => {
3276            row.packer().extend(sr.iter().map(Datum::String));
3277            Some((row.clone(), Diff::ONE))
3278        }
3279        _ => None,
3280    })
3281}
3282
3283pub fn repeat(a: Datum) -> Option<(Row, Diff)> {
3284    let n = a.unwrap_int64();
3285    if n != 0 {
3286        Some((Row::default(), n.into()))
3287    } else {
3288        None
3289    }
3290}
3291
3292fn wrap<'a>(datums: &'a [Datum<'a>], width: usize) -> impl Iterator<Item = (Row, Diff)> + 'a {
3293    datums
3294        .chunks(width)
3295        .map(|chunk| (Row::pack(chunk), Diff::ONE))
3296}
3297
3298fn acl_explode<'a>(
3299    acl_items: Datum<'a>,
3300    temp_storage: &'a RowArena,
3301) -> Result<impl Iterator<Item = (Row, Diff)> + 'a, EvalError> {
3302    let acl_items = acl_items.unwrap_array();
3303    let mut res = Vec::new();
3304    for acl_item in acl_items.elements().iter() {
3305        if acl_item.is_null() {
3306            return Err(EvalError::AclArrayNullElement);
3307        }
3308        let acl_item = acl_item.unwrap_acl_item();
3309        for privilege in acl_item.acl_mode.explode() {
3310            let row = [
3311                Datum::UInt32(acl_item.grantor.0),
3312                Datum::UInt32(acl_item.grantee.0),
3313                Datum::String(temp_storage.push_string(privilege.to_string())),
3314                // GRANT OPTION is not implemented, so we hardcode false.
3315                Datum::False,
3316            ];
3317            res.push((Row::pack_slice(&row), Diff::ONE));
3318        }
3319    }
3320    Ok(res.into_iter())
3321}
3322
3323fn mz_acl_explode<'a>(
3324    mz_acl_items: Datum<'a>,
3325    temp_storage: &'a RowArena,
3326) -> Result<impl Iterator<Item = (Row, Diff)> + 'a, EvalError> {
3327    let mz_acl_items = mz_acl_items.unwrap_array();
3328    let mut res = Vec::new();
3329    for mz_acl_item in mz_acl_items.elements().iter() {
3330        if mz_acl_item.is_null() {
3331            return Err(EvalError::MzAclArrayNullElement);
3332        }
3333        let mz_acl_item = mz_acl_item.unwrap_mz_acl_item();
3334        for privilege in mz_acl_item.acl_mode.explode() {
3335            let row = [
3336                Datum::String(temp_storage.push_string(mz_acl_item.grantor.to_string())),
3337                Datum::String(temp_storage.push_string(mz_acl_item.grantee.to_string())),
3338                Datum::String(temp_storage.push_string(privilege.to_string())),
3339                // GRANT OPTION is not implemented, so we hardcode false.
3340                Datum::False,
3341            ];
3342            res.push((Row::pack_slice(&row), Diff::ONE));
3343        }
3344    }
3345    Ok(res.into_iter())
3346}
3347
3348/// When adding a new `TableFunc` variant, please consider adding it to
3349/// `TableFunc::with_ordinality`!
3350#[derive(
3351    Clone,
3352    Debug,
3353    Eq,
3354    PartialEq,
3355    Ord,
3356    PartialOrd,
3357    Serialize,
3358    Deserialize,
3359    Hash,
3360    MzReflect
3361)]
3362pub enum TableFunc {
3363    AclExplode,
3364    MzAclExplode,
3365    JsonbEach,
3366    JsonbEachStringify,
3367    JsonbObjectKeys,
3368    JsonbArrayElements,
3369    JsonbArrayElementsStringify,
3370    RegexpExtract(AnalyzedRegex),
3371    CsvExtract(usize),
3372    GenerateSeriesInt32,
3373    GenerateSeriesInt64,
3374    GenerateSeriesTimestamp,
3375    GenerateSeriesTimestampTz,
3376    /// Supplied with an input count,
3377    ///   1. Adds a column as if a typed subquery result,
3378    ///   2. Filters the row away if the count is only one,
3379    ///   3. Errors if the count is not exactly one.
3380    /// The intent is that this presents as if a subquery result with too many
3381    /// records contributing. The error column has the same type as the result
3382    /// should have, but we only produce it if the count exceeds one.
3383    ///
3384    /// This logic could nearly be achieved with map, filter, project logic,
3385    /// but has been challenging to do in a way that respects the vagaries of
3386    /// SQL and our semantics. If we reveal a constant value in the column we
3387    /// risk the optimizer pruning the branch; if we reveal that this will not
3388    /// produce rows we risk the optimizer pruning the branch; if we reveal that
3389    /// the only possible value is an error we risk the optimizer propagating that
3390    /// error without guards.
3391    ///
3392    /// Before replacing this by an `MirScalarExpr`, quadruple check that it
3393    /// would not result in misoptimizations due to expression evaluation order
3394    /// being utterly undefined, and predicate pushdown trimming any fragments
3395    /// that might produce columns that will not be needed.
3396    GuardSubquerySize {
3397        column_type: SqlScalarType,
3398    },
3399    Repeat,
3400    UnnestArray {
3401        el_typ: SqlScalarType,
3402    },
3403    UnnestList {
3404        el_typ: SqlScalarType,
3405    },
3406    UnnestMap {
3407        value_type: SqlScalarType,
3408    },
3409    /// Given `n` input expressions, wraps them into `n / width` rows, each of
3410    /// `width` columns.
3411    ///
3412    /// This function is not intended to be called directly by end users, but
3413    /// is useful in the planning of e.g. VALUES clauses.
3414    Wrap {
3415        types: Vec<SqlColumnType>,
3416        width: usize,
3417    },
3418    GenerateSubscriptsArray,
3419    /// Execute some arbitrary scalar function as a table function.
3420    TabletizedScalar {
3421        name: String,
3422        relation: SqlRelationType,
3423    },
3424    RegexpMatches,
3425    /// Implements the WITH ORDINALITY clause.
3426    ///
3427    /// Don't construct `TableFunc::WithOrdinality` manually! Use the `with_ordinality` constructor
3428    /// function instead, which checks whether the given table function supports `WithOrdinality`.
3429    #[allow(private_interfaces)]
3430    WithOrdinality(WithOrdinality),
3431}
3432
3433/// Evaluates the inner table function, expands its results into unary (repeating each row as
3434/// many times as the diff indicates), and appends an integer corresponding to the ordinal
3435/// position (starting from 1). For example, it numbers the elements of a list when calling
3436/// `unnest_list`.
3437///
3438/// Private enum variant of `TableFunc`. Don't construct this directly, but use
3439/// `TableFunc::with_ordinality` instead.
3440#[derive(
3441    Clone,
3442    Debug,
3443    Eq,
3444    PartialEq,
3445    Ord,
3446    PartialOrd,
3447    Serialize,
3448    Deserialize,
3449    Hash,
3450    MzReflect
3451)]
3452struct WithOrdinality {
3453    inner: Box<TableFunc>,
3454}
3455
3456impl TableFunc {
3457    /// Adds `WITH ORDINALITY` to a table function if it's allowed on the given table function.
3458    pub fn with_ordinality(inner: TableFunc) -> Option<TableFunc> {
3459        match inner {
3460            TableFunc::AclExplode
3461            | TableFunc::MzAclExplode
3462            | TableFunc::JsonbEach
3463            | TableFunc::JsonbEachStringify
3464            | TableFunc::JsonbObjectKeys
3465            | TableFunc::JsonbArrayElements
3466            | TableFunc::JsonbArrayElementsStringify
3467            | TableFunc::RegexpExtract(_)
3468            | TableFunc::CsvExtract(_)
3469            | TableFunc::GenerateSeriesInt32
3470            | TableFunc::GenerateSeriesInt64
3471            | TableFunc::GenerateSeriesTimestamp
3472            | TableFunc::GenerateSeriesTimestampTz
3473            | TableFunc::GuardSubquerySize { .. }
3474            | TableFunc::Repeat
3475            | TableFunc::UnnestArray { .. }
3476            | TableFunc::UnnestList { .. }
3477            | TableFunc::UnnestMap { .. }
3478            | TableFunc::Wrap { .. }
3479            | TableFunc::GenerateSubscriptsArray
3480            | TableFunc::TabletizedScalar { .. }
3481            | TableFunc::RegexpMatches => Some(TableFunc::WithOrdinality(WithOrdinality {
3482                inner: Box::new(inner),
3483            })),
3484            // IMPORTANT: Before adding a new table function here, consider negative diffs:
3485            // `WithOrdinality::eval` will panic if the inner table function emits a negative diff.
3486            TableFunc::WithOrdinality(_) => None,
3487        }
3488    }
3489}
3490
3491impl TableFunc {
3492    /// Executes `self` on the given input row (`datums`).
3493    pub fn eval<'a>(
3494        &'a self,
3495        datums: &'a [Datum<'a>],
3496        temp_storage: &'a RowArena,
3497    ) -> Result<Box<dyn Iterator<Item = (Row, Diff)> + 'a>, EvalError> {
3498        if self.empty_on_null_input() && datums.iter().any(|d| d.is_null()) {
3499            return Ok(Box::new(vec![].into_iter()));
3500        }
3501        match self {
3502            TableFunc::AclExplode => Ok(Box::new(acl_explode(datums[0], temp_storage)?)),
3503            TableFunc::MzAclExplode => Ok(Box::new(mz_acl_explode(datums[0], temp_storage)?)),
3504            TableFunc::JsonbEach => Ok(Box::new(jsonb_each(datums[0]))),
3505            TableFunc::JsonbEachStringify => {
3506                Ok(Box::new(jsonb_each_stringify(datums[0], temp_storage)))
3507            }
3508            TableFunc::JsonbObjectKeys => Ok(Box::new(jsonb_object_keys(datums[0]))),
3509            TableFunc::JsonbArrayElements => Ok(Box::new(jsonb_array_elements(datums[0]))),
3510            TableFunc::JsonbArrayElementsStringify => Ok(Box::new(jsonb_array_elements_stringify(
3511                datums[0],
3512                temp_storage,
3513            ))),
3514            TableFunc::RegexpExtract(a) => Ok(Box::new(regexp_extract(datums[0], a).into_iter())),
3515            TableFunc::CsvExtract(n_cols) => Ok(Box::new(csv_extract(datums[0], *n_cols))),
3516            TableFunc::GenerateSeriesInt32 => {
3517                let res = generate_series(
3518                    datums[0].unwrap_int32(),
3519                    datums[1].unwrap_int32(),
3520                    datums[2].unwrap_int32(),
3521                )?;
3522                Ok(Box::new(res))
3523            }
3524            TableFunc::GenerateSeriesInt64 => {
3525                let res = generate_series(
3526                    datums[0].unwrap_int64(),
3527                    datums[1].unwrap_int64(),
3528                    datums[2].unwrap_int64(),
3529                )?;
3530                Ok(Box::new(res))
3531            }
3532            TableFunc::GenerateSeriesTimestamp => {
3533                fn pass_through<'a>(d: CheckedTimestamp<NaiveDateTime>) -> Datum<'a> {
3534                    Datum::from(d)
3535                }
3536                let res = generate_series_ts(
3537                    datums[0].unwrap_timestamp(),
3538                    datums[1].unwrap_timestamp(),
3539                    datums[2].unwrap_interval(),
3540                    pass_through,
3541                )?;
3542                Ok(Box::new(res))
3543            }
3544            TableFunc::GenerateSeriesTimestampTz => {
3545                fn gen_ts_tz<'a>(d: CheckedTimestamp<DateTime<Utc>>) -> Datum<'a> {
3546                    Datum::from(d)
3547                }
3548                let res = generate_series_ts(
3549                    datums[0].unwrap_timestamptz(),
3550                    datums[1].unwrap_timestamptz(),
3551                    datums[2].unwrap_interval(),
3552                    gen_ts_tz,
3553                )?;
3554                Ok(Box::new(res))
3555            }
3556            TableFunc::GenerateSubscriptsArray => {
3557                generate_subscripts_array(datums[0], datums[1].unwrap_int32())
3558            }
3559            TableFunc::GuardSubquerySize { column_type: _ } => {
3560                // We error if the count is not one,
3561                // and produce no rows if equal to one.
3562                let count = datums[0].unwrap_int64();
3563                if count == 1 {
3564                    Ok(Box::new([].into_iter()))
3565                } else if count > 1 {
3566                    Err(EvalError::MultipleRowsFromSubquery)
3567                } else if count < 0 {
3568                    Err(EvalError::NegativeRowsFromSubquery)
3569                } else {
3570                    // This shouldn't happen because this is not an SQL `count` but an MIR `count`,
3571                    // which produces no output on 0 input rows.
3572                    soft_panic_or_log!("subquery counting unexpectedly produced 0");
3573                    Err(EvalError::Internal(
3574                        "subquery counting unexpectedly produced 0".into(),
3575                    ))
3576                }
3577            }
3578            TableFunc::Repeat => Ok(Box::new(repeat(datums[0]).into_iter())),
3579            TableFunc::UnnestArray { .. } => Ok(Box::new(unnest_array(datums[0]))),
3580            TableFunc::UnnestList { .. } => Ok(Box::new(unnest_list(datums[0]))),
3581            TableFunc::UnnestMap { .. } => Ok(Box::new(unnest_map(datums[0]))),
3582            TableFunc::Wrap { width, .. } => Ok(Box::new(wrap(datums, *width))),
3583            TableFunc::TabletizedScalar { .. } => {
3584                let r = Row::pack_slice(datums);
3585                Ok(Box::new(std::iter::once((r, Diff::ONE))))
3586            }
3587            TableFunc::RegexpMatches => Ok(Box::new(regexp_matches(datums)?)),
3588            TableFunc::WithOrdinality(func_with_ordinality) => {
3589                func_with_ordinality.eval(datums, temp_storage)
3590            }
3591        }
3592    }
3593
3594    pub fn output_sql_type(&self) -> SqlRelationType {
3595        let (column_types, keys) = match self {
3596            TableFunc::AclExplode => {
3597                let column_types = vec![
3598                    SqlScalarType::Oid.nullable(false),
3599                    SqlScalarType::Oid.nullable(false),
3600                    SqlScalarType::String.nullable(false),
3601                    SqlScalarType::Bool.nullable(false),
3602                ];
3603                let keys = vec![];
3604                (column_types, keys)
3605            }
3606            TableFunc::MzAclExplode => {
3607                let column_types = vec![
3608                    SqlScalarType::String.nullable(false),
3609                    SqlScalarType::String.nullable(false),
3610                    SqlScalarType::String.nullable(false),
3611                    SqlScalarType::Bool.nullable(false),
3612                ];
3613                let keys = vec![];
3614                (column_types, keys)
3615            }
3616            TableFunc::JsonbEach => {
3617                let column_types = vec![
3618                    SqlScalarType::String.nullable(false),
3619                    SqlScalarType::Jsonb.nullable(false),
3620                ];
3621                let keys = vec![];
3622                (column_types, keys)
3623            }
3624            TableFunc::JsonbEachStringify => {
3625                let column_types = vec![
3626                    SqlScalarType::String.nullable(false),
3627                    SqlScalarType::String.nullable(true),
3628                ];
3629                let keys = vec![];
3630                (column_types, keys)
3631            }
3632            TableFunc::JsonbObjectKeys => {
3633                let column_types = vec![SqlScalarType::String.nullable(false)];
3634                let keys = vec![];
3635                (column_types, keys)
3636            }
3637            TableFunc::JsonbArrayElements => {
3638                let column_types = vec![SqlScalarType::Jsonb.nullable(false)];
3639                let keys = vec![];
3640                (column_types, keys)
3641            }
3642            TableFunc::JsonbArrayElementsStringify => {
3643                let column_types = vec![SqlScalarType::String.nullable(true)];
3644                let keys = vec![];
3645                (column_types, keys)
3646            }
3647            TableFunc::RegexpExtract(a) => {
3648                let column_types = a
3649                    .capture_groups_iter()
3650                    .map(|cg| SqlScalarType::String.nullable(cg.nullable))
3651                    .collect();
3652                let keys = vec![];
3653                (column_types, keys)
3654            }
3655            TableFunc::CsvExtract(n_cols) => {
3656                let column_types = iter::repeat(SqlScalarType::String.nullable(false))
3657                    .take(*n_cols)
3658                    .collect();
3659                let keys = vec![];
3660                (column_types, keys)
3661            }
3662            TableFunc::GenerateSeriesInt32 => {
3663                let column_types = vec![SqlScalarType::Int32.nullable(false)];
3664                let keys = vec![vec![0]];
3665                (column_types, keys)
3666            }
3667            TableFunc::GenerateSeriesInt64 => {
3668                let column_types = vec![SqlScalarType::Int64.nullable(false)];
3669                let keys = vec![vec![0]];
3670                (column_types, keys)
3671            }
3672            TableFunc::GenerateSeriesTimestamp => {
3673                let column_types =
3674                    vec![SqlScalarType::Timestamp { precision: None }.nullable(false)];
3675                let keys = vec![vec![0]];
3676                (column_types, keys)
3677            }
3678            TableFunc::GenerateSeriesTimestampTz => {
3679                let column_types =
3680                    vec![SqlScalarType::TimestampTz { precision: None }.nullable(false)];
3681                let keys = vec![vec![0]];
3682                (column_types, keys)
3683            }
3684            TableFunc::GenerateSubscriptsArray => {
3685                let column_types = vec![SqlScalarType::Int32.nullable(false)];
3686                let keys = vec![vec![0]];
3687                (column_types, keys)
3688            }
3689            TableFunc::GuardSubquerySize { column_type } => {
3690                let column_types = vec![column_type.clone().nullable(false)];
3691                let keys = vec![];
3692                (column_types, keys)
3693            }
3694            TableFunc::Repeat => {
3695                let column_types = vec![];
3696                let keys = vec![];
3697                (column_types, keys)
3698            }
3699            TableFunc::UnnestArray { el_typ } => {
3700                let column_types = vec![el_typ.clone().nullable(true)];
3701                let keys = vec![];
3702                (column_types, keys)
3703            }
3704            TableFunc::UnnestList { el_typ } => {
3705                let column_types = vec![el_typ.clone().nullable(true)];
3706                let keys = vec![];
3707                (column_types, keys)
3708            }
3709            TableFunc::UnnestMap { value_type } => {
3710                let column_types = vec![
3711                    SqlScalarType::String.nullable(false),
3712                    value_type.clone().nullable(true),
3713                ];
3714                let keys = vec![vec![0]];
3715                (column_types, keys)
3716            }
3717            TableFunc::Wrap { types, .. } => {
3718                let column_types = types.clone();
3719                let keys = vec![];
3720                (column_types, keys)
3721            }
3722            TableFunc::TabletizedScalar { relation, .. } => {
3723                return relation.clone();
3724            }
3725            TableFunc::RegexpMatches => {
3726                let column_types =
3727                    vec![SqlScalarType::Array(Box::new(SqlScalarType::String)).nullable(false)];
3728                let keys = vec![];
3729
3730                (column_types, keys)
3731            }
3732            TableFunc::WithOrdinality(WithOrdinality { inner }) => {
3733                let mut typ = inner.output_sql_type();
3734                // Add the ordinality column.
3735                typ.column_types.push(SqlScalarType::Int64.nullable(false));
3736                // The ordinality column is always a key.
3737                typ.keys.push(vec![typ.column_types.len() - 1]);
3738                (typ.column_types, typ.keys)
3739            }
3740        };
3741
3742        soft_assert_eq_no_log!(column_types.len(), self.output_arity());
3743
3744        if !keys.is_empty() {
3745            SqlRelationType::new(column_types).with_keys(keys)
3746        } else {
3747            SqlRelationType::new(column_types)
3748        }
3749    }
3750
3751    /// Computes the representation type of this table function.
3752    ///
3753    /// This is a wrapper around [`Self::output_sql_type`] that converts the result to a representation type.
3754    pub fn output_type(&self) -> ReprRelationType {
3755        ReprRelationType::from(&self.output_sql_type())
3756    }
3757
3758    pub fn output_arity(&self) -> usize {
3759        match self {
3760            TableFunc::AclExplode => 4,
3761            TableFunc::MzAclExplode => 4,
3762            TableFunc::JsonbEach => 2,
3763            TableFunc::JsonbEachStringify => 2,
3764            TableFunc::JsonbObjectKeys => 1,
3765            TableFunc::JsonbArrayElements => 1,
3766            TableFunc::JsonbArrayElementsStringify => 1,
3767            TableFunc::RegexpExtract(a) => a.capture_groups_len(),
3768            TableFunc::CsvExtract(n_cols) => *n_cols,
3769            TableFunc::GenerateSeriesInt32 => 1,
3770            TableFunc::GenerateSeriesInt64 => 1,
3771            TableFunc::GenerateSeriesTimestamp => 1,
3772            TableFunc::GenerateSeriesTimestampTz => 1,
3773            TableFunc::GenerateSubscriptsArray => 1,
3774            TableFunc::GuardSubquerySize { .. } => 1,
3775            TableFunc::Repeat => 0,
3776            TableFunc::UnnestArray { .. } => 1,
3777            TableFunc::UnnestList { .. } => 1,
3778            TableFunc::UnnestMap { .. } => 2,
3779            TableFunc::Wrap { width, .. } => *width,
3780            TableFunc::TabletizedScalar { relation, .. } => relation.column_types.len(),
3781            TableFunc::RegexpMatches => 1,
3782            TableFunc::WithOrdinality(WithOrdinality { inner }) => inner.output_arity() + 1,
3783        }
3784    }
3785
3786    pub fn empty_on_null_input(&self) -> bool {
3787        match self {
3788            TableFunc::AclExplode
3789            | TableFunc::MzAclExplode
3790            | TableFunc::JsonbEach
3791            | TableFunc::JsonbEachStringify
3792            | TableFunc::JsonbObjectKeys
3793            | TableFunc::JsonbArrayElements
3794            | TableFunc::JsonbArrayElementsStringify
3795            | TableFunc::GenerateSeriesInt32
3796            | TableFunc::GenerateSeriesInt64
3797            | TableFunc::GenerateSeriesTimestamp
3798            | TableFunc::GenerateSeriesTimestampTz
3799            | TableFunc::GenerateSubscriptsArray
3800            | TableFunc::RegexpExtract(_)
3801            | TableFunc::CsvExtract(_)
3802            | TableFunc::Repeat
3803            | TableFunc::UnnestArray { .. }
3804            | TableFunc::UnnestList { .. }
3805            | TableFunc::UnnestMap { .. }
3806            | TableFunc::RegexpMatches => true,
3807            TableFunc::GuardSubquerySize { .. } => false,
3808            TableFunc::Wrap { .. } => false,
3809            TableFunc::TabletizedScalar { .. } => false,
3810            TableFunc::WithOrdinality(WithOrdinality { inner }) => inner.empty_on_null_input(),
3811        }
3812    }
3813
3814    /// True iff the table function preserves the append-only property of its input.
3815    pub fn preserves_monotonicity(&self) -> bool {
3816        // Most variants preserve monotonicity, but all variants are enumerated to
3817        // ensure that added variants at least check that this is the case.
3818        match self {
3819            TableFunc::AclExplode => false,
3820            TableFunc::MzAclExplode => false,
3821            TableFunc::JsonbEach => true,
3822            TableFunc::JsonbEachStringify => true,
3823            TableFunc::JsonbObjectKeys => true,
3824            TableFunc::JsonbArrayElements => true,
3825            TableFunc::JsonbArrayElementsStringify => true,
3826            TableFunc::RegexpExtract(_) => true,
3827            TableFunc::CsvExtract(_) => true,
3828            TableFunc::GenerateSeriesInt32 => true,
3829            TableFunc::GenerateSeriesInt64 => true,
3830            TableFunc::GenerateSeriesTimestamp => true,
3831            TableFunc::GenerateSeriesTimestampTz => true,
3832            TableFunc::GenerateSubscriptsArray => true,
3833            TableFunc::Repeat => false,
3834            TableFunc::UnnestArray { .. } => true,
3835            TableFunc::UnnestList { .. } => true,
3836            TableFunc::UnnestMap { .. } => true,
3837            TableFunc::Wrap { .. } => true,
3838            TableFunc::TabletizedScalar { .. } => true,
3839            TableFunc::RegexpMatches => true,
3840            TableFunc::GuardSubquerySize { .. } => false,
3841            TableFunc::WithOrdinality(WithOrdinality { inner }) => inner.preserves_monotonicity(),
3842        }
3843    }
3844}
3845
3846impl fmt::Display for TableFunc {
3847    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
3848        match self {
3849            TableFunc::AclExplode => f.write_str("aclexplode"),
3850            TableFunc::MzAclExplode => f.write_str("mz_aclexplode"),
3851            TableFunc::JsonbEach => f.write_str("jsonb_each"),
3852            TableFunc::JsonbEachStringify => f.write_str("jsonb_each_text"),
3853            TableFunc::JsonbObjectKeys => f.write_str("jsonb_object_keys"),
3854            TableFunc::JsonbArrayElements => f.write_str("jsonb_array_elements"),
3855            TableFunc::JsonbArrayElementsStringify => f.write_str("jsonb_array_elements_text"),
3856            TableFunc::RegexpExtract(a) => write!(f, "regexp_extract({:?}, _)", a.0),
3857            TableFunc::CsvExtract(n_cols) => write!(f, "csv_extract({}, _)", n_cols),
3858            TableFunc::GenerateSeriesInt32 => f.write_str("generate_series"),
3859            TableFunc::GenerateSeriesInt64 => f.write_str("generate_series"),
3860            TableFunc::GenerateSeriesTimestamp => f.write_str("generate_series"),
3861            TableFunc::GenerateSeriesTimestampTz => f.write_str("generate_series"),
3862            TableFunc::GenerateSubscriptsArray => f.write_str("generate_subscripts"),
3863            TableFunc::GuardSubquerySize { .. } => f.write_str("guard_subquery_size"),
3864            TableFunc::Repeat => f.write_str("repeat_row"),
3865            TableFunc::UnnestArray { .. } => f.write_str("unnest_array"),
3866            TableFunc::UnnestList { .. } => f.write_str("unnest_list"),
3867            TableFunc::UnnestMap { .. } => f.write_str("unnest_map"),
3868            TableFunc::Wrap { width, .. } => write!(f, "wrap{}", width),
3869            TableFunc::TabletizedScalar { name, .. } => f.write_str(name),
3870            TableFunc::RegexpMatches => write!(f, "regexp_matches(_, _, _)"),
3871            TableFunc::WithOrdinality(WithOrdinality { inner }) => {
3872                write!(f, "{}[with_ordinality]", inner)
3873            }
3874        }
3875    }
3876}
3877
3878impl WithOrdinality {
3879    /// Executes the `self.inner` table function on the given input row (`datums`), and zips
3880    /// 1, 2, 3, ... to the result as a new column. We need to expand rows with non-1 diffs into the
3881    /// corresponding number of rows with unit diffs, because the ordinality column will have
3882    /// different values for each copy.
3883    ///
3884    /// # Panics
3885    ///
3886    /// Panics if the `inner` table function emits a negative diff.
3887    fn eval<'a>(
3888        &'a self,
3889        datums: &'a [Datum<'a>],
3890        temp_storage: &'a RowArena,
3891    ) -> Result<Box<dyn Iterator<Item = (Row, Diff)> + 'a>, EvalError> {
3892        let mut next_ordinal: i64 = 1;
3893        let it = self
3894            .inner
3895            .eval(datums, temp_storage)?
3896            .flat_map(move |(mut row, diff)| {
3897                let diff = diff.into_inner();
3898                // WITH ORDINALITY is not well-defined for negative diffs. This is ok, since the
3899                // only table function that can emit negative diffs is `repeat_row`, which is in
3900                // `mz_unsafe`, so users can never call it.
3901                //
3902                // (We also don't need to worry about negative diffs in FlatMap's input, because
3903                // the diff of the input of the FlatMap is factored in after we return from here.)
3904                assert!(diff >= 0);
3905                // The ordinals that will be associated with this row.
3906                let mut ordinals = next_ordinal..(next_ordinal + diff);
3907                next_ordinal += diff;
3908                // The maximum byte capacity we need for the original row and its ordinal.
3909                let cap = row.data_len() + datum_size(&Datum::Int64(next_ordinal));
3910                iter::from_fn(move || {
3911                    let ordinal = ordinals.next()?;
3912                    let mut row = if ordinals.is_empty() {
3913                        // This is the last row, so no need to clone. (Most table functions emit
3914                        // only 1 diffs, so this completely avoids cloning in most cases.)
3915                        std::mem::take(&mut row)
3916                    } else {
3917                        let mut new_row = Row::with_capacity(cap);
3918                        new_row.clone_from(&row);
3919                        new_row
3920                    };
3921                    RowPacker::for_existing_row(&mut row).push(Datum::Int64(ordinal));
3922                    Some((row, Diff::ONE))
3923                })
3924            });
3925        Ok(Box::new(it))
3926    }
3927}