Skip to main content

mz_expr/relation/
func.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10#![allow(missing_docs)]
11
12use std::cmp::{max, min};
13use std::iter::Sum;
14use std::ops::Deref;
15use std::str::FromStr;
16use std::{fmt, iter};
17
18use chrono::{DateTime, NaiveDateTime, NaiveTime, Utc};
19use dec::OrderedDecimal;
20use itertools::{Either, Itertools};
21use mz_lowertest::MzReflect;
22use mz_ore::cast::CastFrom;
23
24use mz_ore::str::separated;
25use mz_ore::{soft_assert_eq_no_log, soft_assert_or_log};
26use mz_repr::adt::array::ArrayDimension;
27use mz_repr::adt::date::Date;
28use mz_repr::adt::interval::Interval;
29use mz_repr::adt::numeric::{self, Numeric, NumericMaxScale};
30use mz_repr::adt::regex::{Regex as ReprRegex, RegexCompilationError};
31use mz_repr::adt::timestamp::{CheckedTimestamp, TimestampLike};
32use mz_repr::{
33    ColumnName, Datum, Diff, Row, RowArena, RowPacker, SharedRow, SqlColumnType, SqlRelationType,
34    SqlScalarType, datum_size,
35};
36use num::{CheckedAdd, Integer, Signed, ToPrimitive};
37use ordered_float::OrderedFloat;
38use regex::Regex;
39use serde::{Deserialize, Serialize};
40use smallvec::SmallVec;
41
42use crate::EvalError;
43use crate::WindowFrameBound::{
44    CurrentRow, OffsetFollowing, OffsetPreceding, UnboundedFollowing, UnboundedPreceding,
45};
46use crate::WindowFrameUnits::{Groups, Range, Rows};
47use crate::explain::{HumanizedExpr, HumanizerMode};
48use crate::relation::{
49    ColumnOrder, WindowFrame, WindowFrameBound, WindowFrameUnits, compare_columns,
50};
51use crate::scalar::func::{add_timestamp_months, jsonb_stringify};
52
53// TODO(jamii) be careful about overflow in sum/avg
54// see https://timely.zulipchat.com/#narrow/stream/186635-engineering/topic/additional.20work/near/163507435
55
56fn max_string<'a, I>(datums: I) -> Datum<'a>
57where
58    I: IntoIterator<Item = Datum<'a>>,
59{
60    match datums
61        .into_iter()
62        .filter(|d| !d.is_null())
63        .max_by(|a, b| a.unwrap_str().cmp(b.unwrap_str()))
64    {
65        Some(datum) => datum,
66        None => Datum::Null,
67    }
68}
69
70fn max_datum<'a, I, DatumType>(datums: I) -> Datum<'a>
71where
72    I: IntoIterator<Item = Datum<'a>>,
73    DatumType: TryFrom<Datum<'a>> + Ord,
74    <DatumType as TryFrom<Datum<'a>>>::Error: std::fmt::Debug,
75    Datum<'a>: From<Option<DatumType>>,
76{
77    let x: Option<DatumType> = datums
78        .into_iter()
79        .filter(|d| !d.is_null())
80        .map(|d| DatumType::try_from(d).expect("unexpected type"))
81        .max();
82
83    x.into()
84}
85
86fn min_datum<'a, I, DatumType>(datums: I) -> Datum<'a>
87where
88    I: IntoIterator<Item = Datum<'a>>,
89    DatumType: TryFrom<Datum<'a>> + Ord,
90    <DatumType as TryFrom<Datum<'a>>>::Error: std::fmt::Debug,
91    Datum<'a>: From<Option<DatumType>>,
92{
93    let x: Option<DatumType> = datums
94        .into_iter()
95        .filter(|d| !d.is_null())
96        .map(|d| DatumType::try_from(d).expect("unexpected type"))
97        .min();
98
99    x.into()
100}
101
102fn min_string<'a, I>(datums: I) -> Datum<'a>
103where
104    I: IntoIterator<Item = Datum<'a>>,
105{
106    match datums
107        .into_iter()
108        .filter(|d| !d.is_null())
109        .min_by(|a, b| a.unwrap_str().cmp(b.unwrap_str()))
110    {
111        Some(datum) => datum,
112        None => Datum::Null,
113    }
114}
115
116fn sum_datum<'a, I, DatumType, ResultType>(datums: I) -> Datum<'a>
117where
118    I: IntoIterator<Item = Datum<'a>>,
119    DatumType: TryFrom<Datum<'a>>,
120    <DatumType as TryFrom<Datum<'a>>>::Error: std::fmt::Debug,
121    ResultType: From<DatumType> + Sum + Into<Datum<'a>>,
122{
123    let mut datums = datums.into_iter().filter(|d| !d.is_null()).peekable();
124    if datums.peek().is_none() {
125        Datum::Null
126    } else {
127        let x = datums
128            .map(|d| ResultType::from(DatumType::try_from(d).expect("unexpected type")))
129            .sum::<ResultType>();
130        x.into()
131    }
132}
133
134fn sum_numeric<'a, I>(datums: I) -> Datum<'a>
135where
136    I: IntoIterator<Item = Datum<'a>>,
137{
138    let mut cx = numeric::cx_datum();
139    let mut sum = Numeric::zero();
140    let mut empty = true;
141    for d in datums {
142        if !d.is_null() {
143            empty = false;
144            cx.add(&mut sum, &d.unwrap_numeric().0);
145        }
146    }
147    match empty {
148        true => Datum::Null,
149        false => Datum::from(sum),
150    }
151}
152
153// TODO(benesch): remove potentially dangerous usage of `as`.
154#[allow(clippy::as_conversions)]
155fn count<'a, I>(datums: I) -> Datum<'a>
156where
157    I: IntoIterator<Item = Datum<'a>>,
158{
159    // TODO(jkosh44) This should error when the count can't fit inside of an `i64` instead of returning a negative result.
160    let x: i64 = datums.into_iter().filter(|d| !d.is_null()).count() as i64;
161    Datum::from(x)
162}
163
164fn any<'a, I>(datums: I) -> Datum<'a>
165where
166    I: IntoIterator<Item = Datum<'a>>,
167{
168    datums
169        .into_iter()
170        .fold(Datum::False, |state, next| match (state, next) {
171            (Datum::True, _) | (_, Datum::True) => Datum::True,
172            (Datum::Null, _) | (_, Datum::Null) => Datum::Null,
173            _ => Datum::False,
174        })
175}
176
177fn all<'a, I>(datums: I) -> Datum<'a>
178where
179    I: IntoIterator<Item = Datum<'a>>,
180{
181    datums
182        .into_iter()
183        .fold(Datum::True, |state, next| match (state, next) {
184            (Datum::False, _) | (_, Datum::False) => Datum::False,
185            (Datum::Null, _) | (_, Datum::Null) => Datum::Null,
186            _ => Datum::True,
187        })
188}
189
190fn string_agg<'a, I>(datums: I, temp_storage: &'a RowArena, order_by: &[ColumnOrder]) -> Datum<'a>
191where
192    I: IntoIterator<Item = Datum<'a>>,
193{
194    const EMPTY_SEP: &str = "";
195
196    let datums = order_aggregate_datums(datums, order_by);
197    let mut sep_value_pairs = datums.into_iter().filter_map(|d| {
198        if d.is_null() {
199            return None;
200        }
201        let mut value_sep = d.unwrap_list().iter();
202        match (value_sep.next().unwrap(), value_sep.next().unwrap()) {
203            (Datum::Null, _) => None,
204            (Datum::String(val), Datum::Null) => Some((EMPTY_SEP, val)),
205            (Datum::String(val), Datum::String(sep)) => Some((sep, val)),
206            _ => unreachable!(),
207        }
208    });
209
210    let mut s = String::default();
211    match sep_value_pairs.next() {
212        // First value not prefixed by its separator
213        Some((_, value)) => s.push_str(value),
214        // If no non-null values sent, return NULL.
215        None => return Datum::Null,
216    }
217
218    for (sep, value) in sep_value_pairs {
219        s.push_str(sep);
220        s.push_str(value);
221    }
222
223    Datum::String(temp_storage.push_string(s))
224}
225
226fn jsonb_agg<'a, I>(datums: I, temp_storage: &'a RowArena, order_by: &[ColumnOrder]) -> Datum<'a>
227where
228    I: IntoIterator<Item = Datum<'a>>,
229{
230    let datums = order_aggregate_datums(datums, order_by);
231    temp_storage.make_datum(|packer| {
232        packer.push_list(datums.into_iter().filter(|d| !d.is_null()));
233    })
234}
235
236fn dict_agg<'a, I>(datums: I, temp_storage: &'a RowArena, order_by: &[ColumnOrder]) -> Datum<'a>
237where
238    I: IntoIterator<Item = Datum<'a>>,
239{
240    let datums = order_aggregate_datums(datums, order_by);
241    temp_storage.make_datum(|packer| {
242        let mut datums: Vec<_> = datums
243            .into_iter()
244            .filter_map(|d| {
245                if d.is_null() {
246                    return None;
247                }
248                let mut list = d.unwrap_list().iter();
249                let key = list.next().unwrap();
250                let val = list.next().unwrap();
251                if key.is_null() {
252                    // TODO(benesch): this should produce an error, but
253                    // aggregate functions cannot presently produce errors.
254                    None
255                } else {
256                    Some((key.unwrap_str(), val))
257                }
258            })
259            .collect();
260        // datums are ordered by any ORDER BY clause now, and we want to preserve
261        // the last entry for each key, but we also need to present unique and sorted
262        // keys to push_dict. Use sort_by here, which is stable, and so will preserve
263        // the ORDER BY order. Then reverse and dedup to retain the last of each
264        // key. Reverse again so we're back in push_dict order.
265        datums.sort_by_key(|(k, _v)| *k);
266        datums.reverse();
267        datums.dedup_by_key(|(k, _v)| *k);
268        datums.reverse();
269        packer.push_dict(datums);
270    })
271}
272
273/// Assuming datums is a List, sort them by the 2nd through Nth elements
274/// corresponding to order_by, then return the 1st element.
275///
276/// Near the usages of this function, we sometimes want to produce Datums with a shorter lifetime
277/// than 'a. We have to actually perform the shortening of the lifetime here, inside this function,
278/// because if we were to simply return `impl Iterator<Item = Datum<'a>>`, that wouldn't be
279/// covariant in the item type, because opaque types are always invariant. (Contrast this with how
280/// we perform the shortening _inside_ this function: the input of the `map` is known to
281/// specifically be `std::vec::IntoIter`, which is known to be covariant.)
282pub fn order_aggregate_datums<'a: 'b, 'b, I>(
283    datums: I,
284    order_by: &[ColumnOrder],
285) -> impl Iterator<Item = Datum<'b>>
286where
287    I: IntoIterator<Item = Datum<'a>>,
288{
289    order_aggregate_datums_with_rank_inner(datums, order_by)
290        .into_iter()
291        // (`payload` is coerced here to `Datum<'b>` in the argument of the closure)
292        .map(|(payload, _order_datums)| payload)
293}
294
295/// Assuming datums is a List, sort them by the 2nd through Nth elements
296/// corresponding to order_by, then return the 1st element and computed order by expression.
297fn order_aggregate_datums_with_rank<'a, I>(
298    datums: I,
299    order_by: &[ColumnOrder],
300) -> impl Iterator<Item = (Datum<'a>, Row)>
301where
302    I: IntoIterator<Item = Datum<'a>>,
303{
304    order_aggregate_datums_with_rank_inner(datums, order_by)
305        .into_iter()
306        .map(|(payload, order_by_datums)| (payload, Row::pack(order_by_datums)))
307}
308
309fn order_aggregate_datums_with_rank_inner<'a, I>(
310    datums: I,
311    order_by: &[ColumnOrder],
312) -> Vec<(Datum<'a>, Vec<Datum<'a>>)>
313where
314    I: IntoIterator<Item = Datum<'a>>,
315{
316    let mut decoded: Vec<(Datum, Vec<Datum>)> = datums
317        .into_iter()
318        .map(|d| {
319            let list = d.unwrap_list();
320            let mut list_it = list.iter();
321            let payload = list_it.next().unwrap();
322
323            // We decode the order_by Datums here instead of the comparison function, because the
324            // comparison function is expected to be called `O(log n)` times on each input row.
325            // The only downside is that the decoded data might be bigger, but I think that's fine,
326            // because:
327            // - if we have a window partition so big that this would create a memory problem, then
328            //   the non-incrementalness of window functions will create a serious CPU problem
329            //   anyway,
330            // - and anyhow various other parts of the window function code already do decoding
331            //   upfront.
332            let mut order_by_datums = Vec::with_capacity(order_by.len());
333            for _ in 0..order_by.len() {
334                order_by_datums.push(
335                    list_it
336                        .next()
337                        .expect("must have exactly the same number of Datums as `order_by`"),
338                );
339            }
340
341            (payload, order_by_datums)
342        })
343        .collect();
344
345    let mut sort_by =
346        |(payload_left, left_order_by_datums): &(Datum, Vec<Datum>),
347         (payload_right, right_order_by_datums): &(Datum, Vec<Datum>)| {
348            compare_columns(
349                order_by,
350                left_order_by_datums,
351                right_order_by_datums,
352                || payload_left.cmp(payload_right),
353            )
354        };
355    // `sort_unstable_by` can be faster and uses less memory than `sort_by`. An unstable sort is
356    // enough here, because if two elements are equal in our `compare` function, then the elements
357    // are actually binary-equal (because of the `tiebreaker` given to `compare_columns`), so it
358    // doesn't matter what order they end up in.
359    decoded.sort_unstable_by(&mut sort_by);
360    decoded
361}
362
363fn array_concat<'a, I>(datums: I, temp_storage: &'a RowArena, order_by: &[ColumnOrder]) -> Datum<'a>
364where
365    I: IntoIterator<Item = Datum<'a>>,
366{
367    let datums = order_aggregate_datums(datums, order_by);
368    let datums: Vec<_> = datums
369        .into_iter()
370        .map(|d| d.unwrap_array().elements().iter())
371        .flatten()
372        .collect();
373    let dims = ArrayDimension {
374        lower_bound: 1,
375        length: datums.len(),
376    };
377    temp_storage.make_datum(|packer| {
378        packer.try_push_array(&[dims], datums).unwrap();
379    })
380}
381
382fn list_concat<'a, I>(datums: I, temp_storage: &'a RowArena, order_by: &[ColumnOrder]) -> Datum<'a>
383where
384    I: IntoIterator<Item = Datum<'a>>,
385{
386    let datums = order_aggregate_datums(datums, order_by);
387    temp_storage.make_datum(|packer| {
388        packer.push_list(datums.into_iter().map(|d| d.unwrap_list().iter()).flatten());
389    })
390}
391
392/// The expected input is in the format of `[((OriginalRow, [EncodedArgs]), OrderByExprs...)]`
393/// The output is in the format of `[result_value, original_row]`.
394/// See an example at `lag_lead`, where the input-output formats are similar.
395fn row_number<'a, I>(
396    datums: I,
397    callers_temp_storage: &'a RowArena,
398    order_by: &[ColumnOrder],
399) -> Datum<'a>
400where
401    I: IntoIterator<Item = Datum<'a>>,
402{
403    // We want to use our own temp_storage here, to avoid flooding `callers_temp_storage` with a
404    // large number of new datums. This is because we don't want to make an assumption about
405    // whether the caller creates a new temp_storage between window partitions.
406    let temp_storage = RowArena::new();
407    let datums = row_number_no_list(datums, &temp_storage, order_by);
408
409    callers_temp_storage.make_datum(|packer| {
410        packer.push_list(datums);
411    })
412}
413
414/// Like `row_number`, but doesn't perform the final wrapping in a list, returning an Iterator
415/// instead.
416fn row_number_no_list<'a: 'b, 'b, I>(
417    datums: I,
418    callers_temp_storage: &'b RowArena,
419    order_by: &[ColumnOrder],
420) -> impl Iterator<Item = Datum<'b>>
421where
422    I: IntoIterator<Item = Datum<'a>>,
423{
424    let datums = order_aggregate_datums(datums, order_by);
425
426    callers_temp_storage.reserve(datums.size_hint().0);
427    #[allow(clippy::disallowed_methods)]
428    datums
429        .into_iter()
430        .map(|d| d.unwrap_list().iter())
431        .flatten()
432        .zip(1i64..)
433        .map(|(d, i)| {
434            callers_temp_storage.make_datum(|packer| {
435                packer.push_list_with(|packer| {
436                    packer.push(Datum::Int64(i));
437                    packer.push(d);
438                });
439            })
440        })
441}
442
443/// The expected input is in the format of `[((OriginalRow, [EncodedArgs]), OrderByExprs...)]`
444/// The output is in the format of `[result_value, original_row]`.
445/// See an example at `lag_lead`, where the input-output formats are similar.
446fn rank<'a, I>(datums: I, callers_temp_storage: &'a RowArena, order_by: &[ColumnOrder]) -> Datum<'a>
447where
448    I: IntoIterator<Item = Datum<'a>>,
449{
450    let temp_storage = RowArena::new();
451    let datums = rank_no_list(datums, &temp_storage, order_by);
452
453    callers_temp_storage.make_datum(|packer| {
454        packer.push_list(datums);
455    })
456}
457
458/// Like `rank`, but doesn't perform the final wrapping in a list, returning an Iterator
459/// instead.
460fn rank_no_list<'a: 'b, 'b, I>(
461    datums: I,
462    callers_temp_storage: &'b RowArena,
463    order_by: &[ColumnOrder],
464) -> impl Iterator<Item = Datum<'b>>
465where
466    I: IntoIterator<Item = Datum<'a>>,
467{
468    // Keep the row used for ordering around, as it is used to determine the rank
469    let datums = order_aggregate_datums_with_rank(datums, order_by);
470
471    let mut datums = datums
472        .into_iter()
473        .map(|(d0, order_row)| {
474            d0.unwrap_list()
475                .iter()
476                .map(move |d1| (d1, order_row.clone()))
477        })
478        .flatten();
479
480    callers_temp_storage.reserve(datums.size_hint().0);
481    datums
482        .next()
483        .map_or(vec![], |(first_datum, first_order_row)| {
484            // Folding with (last order_by row, last assigned rank,
485            // row number, output vec)
486            datums.fold(
487                (first_order_row, 1, 1, vec![(first_datum, 1)]),
488                |mut acc, (next_datum, next_order_row)| {
489                let (ref mut acc_row, ref mut acc_rank, ref mut acc_row_num, ref mut output) = acc;
490                *acc_row_num += 1;
491                // Identity is based on the order_by expression
492                if *acc_row != next_order_row {
493                    *acc_rank = *acc_row_num;
494                    *acc_row = next_order_row;
495                }
496
497                (*output).push((next_datum, *acc_rank));
498                acc
499            })
500        }.3).into_iter().map(|(d, i)| {
501        callers_temp_storage.make_datum(|packer| {
502            packer.push_list_with(|packer| {
503                packer.push(Datum::Int64(i));
504                packer.push(d);
505            });
506        })
507    })
508}
509
510/// The expected input is in the format of `[((OriginalRow, [EncodedArgs]), OrderByExprs...)]`
511/// The output is in the format of `[result_value, original_row]`.
512/// See an example at `lag_lead`, where the input-output formats are similar.
513fn dense_rank<'a, I>(
514    datums: I,
515    callers_temp_storage: &'a RowArena,
516    order_by: &[ColumnOrder],
517) -> Datum<'a>
518where
519    I: IntoIterator<Item = Datum<'a>>,
520{
521    let temp_storage = RowArena::new();
522    let datums = dense_rank_no_list(datums, &temp_storage, order_by);
523
524    callers_temp_storage.make_datum(|packer| {
525        packer.push_list(datums);
526    })
527}
528
529/// Like `dense_rank`, but doesn't perform the final wrapping in a list, returning an Iterator
530/// instead.
531fn dense_rank_no_list<'a: 'b, 'b, I>(
532    datums: I,
533    callers_temp_storage: &'b RowArena,
534    order_by: &[ColumnOrder],
535) -> impl Iterator<Item = Datum<'b>>
536where
537    I: IntoIterator<Item = Datum<'a>>,
538{
539    // Keep the row used for ordering around, as it is used to determine the rank
540    let datums = order_aggregate_datums_with_rank(datums, order_by);
541
542    let mut datums = datums
543        .into_iter()
544        .map(|(d0, order_row)| {
545            d0.unwrap_list()
546                .iter()
547                .map(move |d1| (d1, order_row.clone()))
548        })
549        .flatten();
550
551    callers_temp_storage.reserve(datums.size_hint().0);
552    datums
553        .next()
554        .map_or(vec![], |(first_datum, first_order_row)| {
555            // Folding with (last order_by row, last assigned rank,
556            // output vec)
557            datums.fold(
558                (first_order_row, 1, vec![(first_datum, 1)]),
559                |mut acc, (next_datum, next_order_row)| {
560                let (ref mut acc_row, ref mut acc_rank, ref mut output) = acc;
561                // Identity is based on the order_by expression
562                if *acc_row != next_order_row {
563                    *acc_rank += 1;
564                    *acc_row = next_order_row;
565                }
566
567                (*output).push((next_datum, *acc_rank));
568                acc
569            })
570        }.2).into_iter().map(|(d, i)| {
571        callers_temp_storage.make_datum(|packer| {
572            packer.push_list_with(|packer| {
573                packer.push(Datum::Int64(i));
574                packer.push(d);
575            });
576        })
577    })
578}
579
580/// The expected input is in the format of `[((OriginalRow, EncodedArgs), OrderByExprs...)]`
581/// For example,
582///
583/// lag(x*y, 1, null) over (partition by x+y order by x-y, x/y)
584///
585/// list of:
586/// row(
587///   row(
588///     row(#0, #1),
589///     row((#0 * #1), 1, null)
590///   ),
591///   (#0 - #1),
592///   (#0 / #1)
593/// )
594///
595/// The output is in the format of `[result_value, original_row]`, e.g.
596/// list of:
597/// row(
598///   42,
599///   row(7, 8)
600/// )
601fn lag_lead<'a, I>(
602    datums: I,
603    callers_temp_storage: &'a RowArena,
604    order_by: &[ColumnOrder],
605    lag_lead_type: &LagLeadType,
606    ignore_nulls: &bool,
607) -> Datum<'a>
608where
609    I: IntoIterator<Item = Datum<'a>>,
610{
611    let temp_storage = RowArena::new();
612    let iter = lag_lead_no_list(datums, &temp_storage, order_by, lag_lead_type, ignore_nulls);
613    callers_temp_storage.make_datum(|packer| {
614        packer.push_list(iter);
615    })
616}
617
618/// Like `lag_lead`, but doesn't perform the final wrapping in a list, returning an Iterator
619/// instead.
620fn lag_lead_no_list<'a: 'b, 'b, I>(
621    datums: I,
622    callers_temp_storage: &'b RowArena,
623    order_by: &[ColumnOrder],
624    lag_lead_type: &LagLeadType,
625    ignore_nulls: &bool,
626) -> impl Iterator<Item = Datum<'b>>
627where
628    I: IntoIterator<Item = Datum<'a>>,
629{
630    // Sort the datums according to the ORDER BY expressions and return the (OriginalRow, EncodedArgs) record
631    let datums = order_aggregate_datums(datums, order_by);
632
633    // Take the (OriginalRow, EncodedArgs) records and unwrap them into separate datums.
634    // EncodedArgs = (InputValue, Offset, DefaultValue) for Lag/Lead
635    // (`OriginalRow` is kept in a record form, as we don't need to look inside that.)
636    let (orig_rows, unwrapped_args): (Vec<_>, Vec<_>) = datums
637        .into_iter()
638        .map(|d| {
639            let mut iter = d.unwrap_list().iter();
640            let original_row = iter.next().unwrap();
641            let (input_value, offset, default_value) =
642                unwrap_lag_lead_encoded_args(iter.next().unwrap());
643            (original_row, (input_value, offset, default_value))
644        })
645        .unzip();
646
647    let result = lag_lead_inner(unwrapped_args, lag_lead_type, ignore_nulls);
648
649    callers_temp_storage.reserve(result.len());
650    result
651        .into_iter()
652        .zip_eq(orig_rows)
653        .map(|(result_value, original_row)| {
654            callers_temp_storage.make_datum(|packer| {
655                packer.push_list_with(|packer| {
656                    packer.push(result_value);
657                    packer.push(original_row);
658                });
659            })
660        })
661}
662
663/// lag/lead's arguments are in a record. This function unwraps this record.
664fn unwrap_lag_lead_encoded_args(encoded_args: Datum) -> (Datum, Datum, Datum) {
665    let mut encoded_args_iter = encoded_args.unwrap_list().iter();
666    let (input_value, offset, default_value) = (
667        encoded_args_iter.next().unwrap(),
668        encoded_args_iter.next().unwrap(),
669        encoded_args_iter.next().unwrap(),
670    );
671    (input_value, offset, default_value)
672}
673
674/// Each element of `args` has the 3 arguments evaluated for a single input row.
675/// Returns the results for each input row.
676fn lag_lead_inner<'a>(
677    args: Vec<(Datum<'a>, Datum<'a>, Datum<'a>)>,
678    lag_lead_type: &LagLeadType,
679    ignore_nulls: &bool,
680) -> Vec<Datum<'a>> {
681    if *ignore_nulls {
682        lag_lead_inner_ignore_nulls(args, lag_lead_type)
683    } else {
684        lag_lead_inner_respect_nulls(args, lag_lead_type)
685    }
686}
687
688fn lag_lead_inner_respect_nulls<'a>(
689    args: Vec<(Datum<'a>, Datum<'a>, Datum<'a>)>,
690    lag_lead_type: &LagLeadType,
691) -> Vec<Datum<'a>> {
692    let mut result: Vec<Datum> = Vec::with_capacity(args.len());
693    for (idx, (_, offset, default_value)) in args.iter().enumerate() {
694        // Null offsets are acceptable, and always return null
695        if offset.is_null() {
696            result.push(Datum::Null);
697            continue;
698        }
699
700        let idx = i64::try_from(idx).expect("Array index does not fit in i64");
701        let offset = i64::from(offset.unwrap_int32());
702        let offset = match lag_lead_type {
703            LagLeadType::Lag => -offset,
704            LagLeadType::Lead => offset,
705        };
706
707        // Get a Datum from `datums`. Return None if index is out of range.
708        let datums_get = |i: i64| -> Option<Datum> {
709            match u64::try_from(i) {
710                Ok(i) => args
711                    .get(usize::cast_from(i))
712                    .map(|d| Some(d.0)) // succeeded in getting a Datum from the vec
713                    .unwrap_or(None), // overindexing
714                Err(_) => None, // underindexing (negative index)
715            }
716        };
717
718        let lagged_value = datums_get(idx + offset).unwrap_or(*default_value);
719
720        result.push(lagged_value);
721    }
722
723    result
724}
725
726// `i64` indexes get involved in this function because it's convenient to allow negative indexes and
727// have `datums_get` fail on them, and thus handle the beginning and end of the input vector
728// uniformly, rather than checking underflow separately during index manipulations.
729#[allow(clippy::as_conversions)]
730fn lag_lead_inner_ignore_nulls<'a>(
731    args: Vec<(Datum<'a>, Datum<'a>, Datum<'a>)>,
732    lag_lead_type: &LagLeadType,
733) -> Vec<Datum<'a>> {
734    // We check here once that even the largest index fits in `i64`, and then do silent `as`
735    // conversions from `usize` indexes to `i64` indexes throughout this function.
736    if i64::try_from(args.len()).is_err() {
737        panic!("window partition way too big")
738    }
739    // Preparation: Make sure we can jump over a run of nulls in constant time, i.e., regardless of
740    // how many nulls the run has. The following skip tables will point to the next non-null index.
741    let mut skip_nulls_backward = vec![None; args.len()];
742    let mut last_non_null: i64 = -1;
743    let pairs = args
744        .iter()
745        .enumerate()
746        .zip_eq(skip_nulls_backward.iter_mut());
747    for ((i, (d, _, _)), slot) in pairs {
748        if d.is_null() {
749            *slot = Some(last_non_null);
750        } else {
751            last_non_null = i as i64;
752        }
753    }
754    let mut skip_nulls_forward = vec![None; args.len()];
755    let mut last_non_null: i64 = args.len() as i64;
756    let pairs = args
757        .iter()
758        .enumerate()
759        .rev()
760        .zip_eq(skip_nulls_forward.iter_mut().rev());
761    for ((i, (d, _, _)), slot) in pairs {
762        if d.is_null() {
763            *slot = Some(last_non_null);
764        } else {
765            last_non_null = i as i64;
766        }
767    }
768
769    // The actual computation.
770    let mut result: Vec<Datum> = Vec::with_capacity(args.len());
771    for (idx, (_, offset, default_value)) in args.iter().enumerate() {
772        // Null offsets are acceptable, and always return null
773        if offset.is_null() {
774            result.push(Datum::Null);
775            continue;
776        }
777
778        let idx = idx as i64; // checked at the beginning of the function that len() fits
779        let offset = i64::cast_from(offset.unwrap_int32());
780        let offset = match lag_lead_type {
781            LagLeadType::Lag => -offset,
782            LagLeadType::Lead => offset,
783        };
784        let increment = offset.signum();
785
786        // Get a Datum from `datums`. Return None if index is out of range.
787        let datums_get = |i: i64| -> Option<Datum> {
788            match u64::try_from(i) {
789                Ok(i) => args
790                    .get(usize::cast_from(i))
791                    .map(|d| Some(d.0)) // succeeded in getting a Datum from the vec
792                    .unwrap_or(None), // overindexing
793                Err(_) => None, // underindexing (negative index)
794            }
795        };
796
797        let lagged_value = if increment != 0 {
798            // We start j from idx, and step j until we have seen an abs(offset) number of non-null
799            // values or reach the beginning or end of the partition.
800            //
801            // If offset is big, then this is slow: Considering the entire function, it's
802            // `O(partition_size * offset)`.
803            // However, a common use case is an offset of 1, for which this doesn't matter.
804            // TODO: For larger offsets, we could have a completely different implementation
805            // that starts the inner loop from the index where we found the previous result:
806            // https://github.com/MaterializeInc/materialize/pull/29287#discussion_r1738695174
807            let mut j = idx;
808            for _ in 0..num::abs(offset) {
809                j += increment;
810                // Jump over a run of nulls
811                if datums_get(j).is_some_and(|d| d.is_null()) {
812                    let ju = j as usize; // `j >= 0` because of the above `is_some_and`
813                    if increment > 0 {
814                        j = skip_nulls_forward[ju].expect("checked above that it's null");
815                    } else {
816                        j = skip_nulls_backward[ju].expect("checked above that it's null");
817                    }
818                }
819                if datums_get(j).is_none() {
820                    break;
821                }
822            }
823            match datums_get(j) {
824                Some(datum) => datum,
825                None => *default_value,
826            }
827        } else {
828            assert_eq!(offset, 0);
829            let datum = datums_get(idx).expect("known to exist");
830            if !datum.is_null() {
831                datum
832            } else {
833                // Not clear what should the semantics be here. See
834                // https://github.com/MaterializeInc/database-issues/issues/8497
835                // (We used to run into an infinite loop in this case, so panicking is
836                // better.)
837                panic!("0 offset in lag/lead IGNORE NULLS");
838            }
839        };
840
841        result.push(lagged_value);
842    }
843
844    result
845}
846
847/// The expected input is in the format of [((OriginalRow, InputValue), OrderByExprs...)]
848fn first_value<'a, I>(
849    datums: I,
850    callers_temp_storage: &'a RowArena,
851    order_by: &[ColumnOrder],
852    window_frame: &WindowFrame,
853) -> Datum<'a>
854where
855    I: IntoIterator<Item = Datum<'a>>,
856{
857    let temp_storage = RowArena::new();
858    let iter = first_value_no_list(datums, &temp_storage, order_by, window_frame);
859    callers_temp_storage.make_datum(|packer| {
860        packer.push_list(iter);
861    })
862}
863
864/// Like `first_value`, but doesn't perform the final wrapping in a list, returning an Iterator
865/// instead.
866fn first_value_no_list<'a: 'b, 'b, I>(
867    datums: I,
868    callers_temp_storage: &'b RowArena,
869    order_by: &[ColumnOrder],
870    window_frame: &WindowFrame,
871) -> impl Iterator<Item = Datum<'b>>
872where
873    I: IntoIterator<Item = Datum<'a>>,
874{
875    // Sort the datums according to the ORDER BY expressions and return the (OriginalRow, InputValue) record
876    let datums = order_aggregate_datums(datums, order_by);
877
878    // Decode the input (OriginalRow, InputValue) into separate datums
879    let (orig_rows, args): (Vec<_>, Vec<_>) = datums
880        .into_iter()
881        .map(|d| {
882            let mut iter = d.unwrap_list().iter();
883            let original_row = iter.next().unwrap();
884            let arg = iter.next().unwrap();
885
886            (original_row, arg)
887        })
888        .unzip();
889
890    let results = first_value_inner(args, window_frame);
891
892    callers_temp_storage.reserve(results.len());
893    results
894        .into_iter()
895        .zip_eq(orig_rows)
896        .map(|(result_value, original_row)| {
897            callers_temp_storage.make_datum(|packer| {
898                packer.push_list_with(|packer| {
899                    packer.push(result_value);
900                    packer.push(original_row);
901                });
902            })
903        })
904}
905
906fn first_value_inner<'a>(datums: Vec<Datum<'a>>, window_frame: &WindowFrame) -> Vec<Datum<'a>> {
907    let length = datums.len();
908    let mut result: Vec<Datum> = Vec::with_capacity(length);
909    for (idx, current_datum) in datums.iter().enumerate() {
910        let first_value = match &window_frame.start_bound {
911            // Always return the current value
912            WindowFrameBound::CurrentRow => *current_datum,
913            WindowFrameBound::UnboundedPreceding => {
914                if let WindowFrameBound::OffsetPreceding(end_offset) = &window_frame.end_bound {
915                    let end_offset = usize::cast_from(*end_offset);
916
917                    // If the frame ends before the first row, return null
918                    if idx < end_offset {
919                        Datum::Null
920                    } else {
921                        datums[0]
922                    }
923                } else {
924                    datums[0]
925                }
926            }
927            WindowFrameBound::OffsetPreceding(offset) => {
928                let start_offset = usize::cast_from(*offset);
929                let start_idx = idx.saturating_sub(start_offset);
930                if let WindowFrameBound::OffsetPreceding(end_offset) = &window_frame.end_bound {
931                    let end_offset = usize::cast_from(*end_offset);
932
933                    // If the frame is empty or ends before the first row, return null
934                    if start_offset < end_offset || idx < end_offset {
935                        Datum::Null
936                    } else {
937                        datums[start_idx]
938                    }
939                } else {
940                    datums[start_idx]
941                }
942            }
943            WindowFrameBound::OffsetFollowing(offset) => {
944                let start_offset = usize::cast_from(*offset);
945                let start_idx = idx.saturating_add(start_offset);
946                if let WindowFrameBound::OffsetFollowing(end_offset) = &window_frame.end_bound {
947                    // If the frame is empty or starts after the last row, return null
948                    if offset > end_offset || start_idx >= length {
949                        Datum::Null
950                    } else {
951                        datums[start_idx]
952                    }
953                } else {
954                    datums
955                        .get(start_idx)
956                        .map(|d| d.clone())
957                        .unwrap_or(Datum::Null)
958                }
959            }
960            // Forbidden during planning
961            WindowFrameBound::UnboundedFollowing => unreachable!(),
962        };
963        result.push(first_value);
964    }
965    result
966}
967
968/// The expected input is in the format of [((OriginalRow, InputValue), OrderByExprs...)]
969fn last_value<'a, I>(
970    datums: I,
971    callers_temp_storage: &'a RowArena,
972    order_by: &[ColumnOrder],
973    window_frame: &WindowFrame,
974) -> Datum<'a>
975where
976    I: IntoIterator<Item = Datum<'a>>,
977{
978    let temp_storage = RowArena::new();
979    let iter = last_value_no_list(datums, &temp_storage, order_by, window_frame);
980    callers_temp_storage.make_datum(|packer| {
981        packer.push_list(iter);
982    })
983}
984
985/// Like `last_value`, but doesn't perform the final wrapping in a list, returning an Iterator
986/// instead.
987fn last_value_no_list<'a: 'b, 'b, I>(
988    datums: I,
989    callers_temp_storage: &'b RowArena,
990    order_by: &[ColumnOrder],
991    window_frame: &WindowFrame,
992) -> impl Iterator<Item = Datum<'b>>
993where
994    I: IntoIterator<Item = Datum<'a>>,
995{
996    // Sort the datums according to the ORDER BY expressions and return the ((OriginalRow, InputValue), OrderByRow) record
997    // The OrderByRow is kept around because it is required to compute the peer groups in RANGE mode
998    let datums = order_aggregate_datums_with_rank(datums, order_by);
999
1000    // Decode the input (OriginalRow, InputValue) into separate datums, while keeping the OrderByRow
1001    let size_hint = datums.size_hint().0;
1002    let mut args = Vec::with_capacity(size_hint);
1003    let mut original_rows = Vec::with_capacity(size_hint);
1004    let mut order_by_rows = Vec::with_capacity(size_hint);
1005    for (d, order_by_row) in datums.into_iter() {
1006        let mut iter = d.unwrap_list().iter();
1007        let original_row = iter.next().unwrap();
1008        let arg = iter.next().unwrap();
1009        order_by_rows.push(order_by_row);
1010        original_rows.push(original_row);
1011        args.push(arg);
1012    }
1013
1014    let results = last_value_inner(args, &order_by_rows, window_frame);
1015
1016    callers_temp_storage.reserve(results.len());
1017    results
1018        .into_iter()
1019        .zip_eq(original_rows)
1020        .map(|(result_value, original_row)| {
1021            callers_temp_storage.make_datum(|packer| {
1022                packer.push_list_with(|packer| {
1023                    packer.push(result_value);
1024                    packer.push(original_row);
1025                });
1026            })
1027        })
1028}
1029
1030fn last_value_inner<'a>(
1031    args: Vec<Datum<'a>>,
1032    order_by_rows: &Vec<Row>,
1033    window_frame: &WindowFrame,
1034) -> Vec<Datum<'a>> {
1035    let length = args.len();
1036    let mut results: Vec<Datum> = Vec::with_capacity(length);
1037    for (idx, (current_datum, order_by_row)) in args.iter().zip_eq(order_by_rows).enumerate() {
1038        let last_value = match &window_frame.end_bound {
1039            WindowFrameBound::CurrentRow => match &window_frame.units {
1040                // Always return the current value when in ROWS mode
1041                WindowFrameUnits::Rows => *current_datum,
1042                WindowFrameUnits::Range => {
1043                    // When in RANGE mode, return the last value of the peer group
1044                    // The peer group is the group of rows with the same ORDER BY value
1045                    // Note: Range is only supported for the default window frame (RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW),
1046                    // which is why it does not appear in the other branches
1047                    let target_idx = order_by_rows[idx..]
1048                        .iter()
1049                        .enumerate()
1050                        .take_while(|(_, row)| *row == order_by_row)
1051                        .last()
1052                        .unwrap()
1053                        .0
1054                        + idx;
1055                    args[target_idx]
1056                }
1057                // GROUPS is not supported, and forbidden during planning
1058                WindowFrameUnits::Groups => unreachable!(),
1059            },
1060            WindowFrameBound::UnboundedFollowing => {
1061                if let WindowFrameBound::OffsetFollowing(start_offset) = &window_frame.start_bound {
1062                    let start_offset = usize::cast_from(*start_offset);
1063
1064                    // If the frame starts after the last row of the window, return null
1065                    if idx + start_offset > length - 1 {
1066                        Datum::Null
1067                    } else {
1068                        args[length - 1]
1069                    }
1070                } else {
1071                    args[length - 1]
1072                }
1073            }
1074            WindowFrameBound::OffsetFollowing(offset) => {
1075                let end_offset = usize::cast_from(*offset);
1076                let end_idx = idx.saturating_add(end_offset);
1077                if let WindowFrameBound::OffsetFollowing(start_offset) = &window_frame.start_bound {
1078                    let start_offset = usize::cast_from(*start_offset);
1079                    let start_idx = idx.saturating_add(start_offset);
1080
1081                    // If the frame is empty or starts after the last row of the window, return null
1082                    if end_offset < start_offset || start_idx >= length {
1083                        Datum::Null
1084                    } else {
1085                        // Return the last valid element in the window
1086                        args.get(end_idx).unwrap_or(&args[length - 1]).clone()
1087                    }
1088                } else {
1089                    args.get(end_idx).unwrap_or(&args[length - 1]).clone()
1090                }
1091            }
1092            WindowFrameBound::OffsetPreceding(offset) => {
1093                let end_offset = usize::cast_from(*offset);
1094                let end_idx = idx.saturating_sub(end_offset);
1095                if idx < end_offset {
1096                    // If the frame ends before the first row, return null
1097                    Datum::Null
1098                } else if let WindowFrameBound::OffsetPreceding(start_offset) =
1099                    &window_frame.start_bound
1100                {
1101                    // If the frame is empty, return null
1102                    if offset > start_offset {
1103                        Datum::Null
1104                    } else {
1105                        args[end_idx]
1106                    }
1107                } else {
1108                    args[end_idx]
1109                }
1110            }
1111            // Forbidden during planning
1112            WindowFrameBound::UnboundedPreceding => unreachable!(),
1113        };
1114        results.push(last_value);
1115    }
1116    results
1117}
1118
1119/// Executes `FusedValueWindowFunc` on a reduction group.
1120/// The expected input is in the format of `[((OriginalRow, (Args1, Args2, ...)), OrderByExprs...)]`
1121/// where `Args1`, `Args2`, are the arguments of each of the fused functions. For functions that
1122/// have only a single argument (first_value/last_value), these are simple values. For functions
1123/// that have multiple arguments (lag/lead), these are also records.
1124fn fused_value_window_func<'a, I>(
1125    input_datums: I,
1126    callers_temp_storage: &'a RowArena,
1127    funcs: &Vec<AggregateFunc>,
1128    order_by: &Vec<ColumnOrder>,
1129) -> Datum<'a>
1130where
1131    I: IntoIterator<Item = Datum<'a>>,
1132{
1133    let temp_storage = RowArena::new();
1134    let iter = fused_value_window_func_no_list(input_datums, &temp_storage, funcs, order_by);
1135    callers_temp_storage.make_datum(|packer| {
1136        packer.push_list(iter);
1137    })
1138}
1139
1140/// Like `fused_value_window_func`, but doesn't perform the final wrapping in a list, returning an
1141/// Iterator instead.
1142fn fused_value_window_func_no_list<'a: 'b, 'b, I>(
1143    input_datums: I,
1144    callers_temp_storage: &'b RowArena,
1145    funcs: &Vec<AggregateFunc>,
1146    order_by: &Vec<ColumnOrder>,
1147) -> impl Iterator<Item = Datum<'b>>
1148where
1149    I: IntoIterator<Item = Datum<'a>>,
1150{
1151    let has_last_value = funcs
1152        .iter()
1153        .any(|f| matches!(f, AggregateFunc::LastValue { .. }));
1154
1155    let input_datums_with_ranks = order_aggregate_datums_with_rank(input_datums, order_by);
1156
1157    let size_hint = input_datums_with_ranks.size_hint().0;
1158    let mut encoded_argsss = vec![Vec::with_capacity(size_hint); funcs.len()];
1159    let mut original_rows = Vec::with_capacity(size_hint);
1160    let mut order_by_rows = Vec::with_capacity(size_hint);
1161    for (d, order_by_row) in input_datums_with_ranks {
1162        let mut iter = d.unwrap_list().iter();
1163        let original_row = iter.next().unwrap();
1164        original_rows.push(original_row);
1165        let mut argss_iter = iter.next().unwrap().unwrap_list().iter();
1166        for i in 0..funcs.len() {
1167            let encoded_args = argss_iter.next().unwrap();
1168            encoded_argsss[i].push(encoded_args);
1169        }
1170        if has_last_value {
1171            order_by_rows.push(order_by_row);
1172        }
1173    }
1174
1175    let mut results_per_row = vec![Vec::with_capacity(funcs.len()); original_rows.len()];
1176    for (func, encoded_argss) in funcs.iter().zip_eq(encoded_argsss) {
1177        let results = match func {
1178            AggregateFunc::LagLead {
1179                order_by: inner_order_by,
1180                lag_lead,
1181                ignore_nulls,
1182            } => {
1183                assert_eq!(order_by, inner_order_by);
1184                let unwrapped_argss = encoded_argss
1185                    .into_iter()
1186                    .map(|encoded_args| unwrap_lag_lead_encoded_args(encoded_args))
1187                    .collect();
1188                lag_lead_inner(unwrapped_argss, lag_lead, ignore_nulls)
1189            }
1190            AggregateFunc::FirstValue {
1191                order_by: inner_order_by,
1192                window_frame,
1193            } => {
1194                assert_eq!(order_by, inner_order_by);
1195                // (No unwrapping to do on the args here, because there is only 1 arg, so it's not
1196                // wrapped into a record.)
1197                first_value_inner(encoded_argss, window_frame)
1198            }
1199            AggregateFunc::LastValue {
1200                order_by: inner_order_by,
1201                window_frame,
1202            } => {
1203                assert_eq!(order_by, inner_order_by);
1204                // (No unwrapping to do on the args here, because there is only 1 arg, so it's not
1205                // wrapped into a record.)
1206                last_value_inner(encoded_argss, &order_by_rows, window_frame)
1207            }
1208            _ => panic!("unknown window function in FusedValueWindowFunc"),
1209        };
1210        for (results, result) in results_per_row.iter_mut().zip_eq(results) {
1211            results.push(result);
1212        }
1213    }
1214
1215    callers_temp_storage.reserve(2 * original_rows.len());
1216    results_per_row
1217        .into_iter()
1218        .enumerate()
1219        .map(move |(i, results)| {
1220            callers_temp_storage.make_datum(|packer| {
1221                packer.push_list_with(|packer| {
1222                    packer
1223                        .push(callers_temp_storage.make_datum(|packer| packer.push_list(results)));
1224                    packer.push(original_rows[i]);
1225                });
1226            })
1227        })
1228}
1229
1230/// `input_datums` is an entire window partition.
1231/// The expected input is in the format of `[((OriginalRow, InputValue), OrderByExprs...)]`
1232/// See also in the comment in `window_func_applied_to`.
1233///
1234/// `wrapped_aggregate`: e.g., for `sum(...) OVER (...)`, this is the `sum(...)`.
1235///
1236/// Note that this `order_by` doesn't have expressions, only `ColumnOrder`s. For an explanation,
1237/// see the comment on `WindowExprType`.
1238fn window_aggr<'a, I, A>(
1239    input_datums: I,
1240    callers_temp_storage: &'a RowArena,
1241    wrapped_aggregate: &AggregateFunc,
1242    order_by: &[ColumnOrder],
1243    window_frame: &WindowFrame,
1244) -> Datum<'a>
1245where
1246    I: IntoIterator<Item = Datum<'a>>,
1247    A: OneByOneAggr,
1248{
1249    let temp_storage = RowArena::new();
1250    let iter = window_aggr_no_list::<I, A>(
1251        input_datums,
1252        &temp_storage,
1253        wrapped_aggregate,
1254        order_by,
1255        window_frame,
1256    );
1257    callers_temp_storage.make_datum(|packer| {
1258        packer.push_list(iter);
1259    })
1260}
1261
1262/// Like `window_aggr`, but doesn't perform the final wrapping in a list, returning an Iterator
1263/// instead.
1264fn window_aggr_no_list<'a: 'b, 'b, I, A>(
1265    input_datums: I,
1266    callers_temp_storage: &'b RowArena,
1267    wrapped_aggregate: &AggregateFunc,
1268    order_by: &[ColumnOrder],
1269    window_frame: &WindowFrame,
1270) -> impl Iterator<Item = Datum<'b>>
1271where
1272    I: IntoIterator<Item = Datum<'a>>,
1273    A: OneByOneAggr,
1274{
1275    // Sort the datums according to the ORDER BY expressions and return the ((OriginalRow, InputValue), OrderByRow) record
1276    // The OrderByRow is kept around because it is required to compute the peer groups in RANGE mode
1277    let datums = order_aggregate_datums_with_rank(input_datums, order_by);
1278
1279    // Decode the input (OriginalRow, InputValue) into separate datums, while keeping the OrderByRow
1280    let size_hint = datums.size_hint().0;
1281    let mut args: Vec<Datum> = Vec::with_capacity(size_hint);
1282    let mut original_rows: Vec<Datum> = Vec::with_capacity(size_hint);
1283    let mut order_by_rows = Vec::with_capacity(size_hint);
1284    for (d, order_by_row) in datums.into_iter() {
1285        let mut iter = d.unwrap_list().iter();
1286        let original_row = iter.next().unwrap();
1287        let arg = iter.next().unwrap();
1288        order_by_rows.push(order_by_row);
1289        original_rows.push(original_row);
1290        args.push(arg);
1291    }
1292
1293    let results = window_aggr_inner::<A>(
1294        args,
1295        &order_by_rows,
1296        wrapped_aggregate,
1297        order_by,
1298        window_frame,
1299        callers_temp_storage,
1300    );
1301
1302    callers_temp_storage.reserve(results.len());
1303    results
1304        .into_iter()
1305        .zip_eq(original_rows)
1306        .map(|(result_value, original_row)| {
1307            callers_temp_storage.make_datum(|packer| {
1308                packer.push_list_with(|packer| {
1309                    packer.push(result_value);
1310                    packer.push(original_row);
1311                });
1312            })
1313        })
1314}
1315
1316fn window_aggr_inner<'a, A>(
1317    mut args: Vec<Datum<'a>>,
1318    order_by_rows: &Vec<Row>,
1319    wrapped_aggregate: &AggregateFunc,
1320    order_by: &[ColumnOrder],
1321    window_frame: &WindowFrame,
1322    temp_storage: &'a RowArena,
1323) -> Vec<Datum<'a>>
1324where
1325    A: OneByOneAggr,
1326{
1327    let length = args.len();
1328    let mut result: Vec<Datum> = Vec::with_capacity(length);
1329
1330    // In this degenerate case, all results would be `wrapped_aggregate.default()` (usually null).
1331    // However, this currently can't happen, because
1332    // - Groups frame mode is currently not supported;
1333    // - Range frame mode is currently supported only for the default frame, which includes the
1334    //   current row.
1335    soft_assert_or_log!(
1336        !((matches!(window_frame.units, WindowFrameUnits::Groups)
1337            || matches!(window_frame.units, WindowFrameUnits::Range))
1338            && !window_frame.includes_current_row()),
1339        "window frame without current row"
1340    );
1341
1342    if (matches!(
1343        window_frame.start_bound,
1344        WindowFrameBound::UnboundedPreceding
1345    ) && matches!(window_frame.end_bound, WindowFrameBound::UnboundedFollowing))
1346        || (order_by.is_empty()
1347            && (matches!(window_frame.units, WindowFrameUnits::Groups)
1348                || matches!(window_frame.units, WindowFrameUnits::Range))
1349            && window_frame.includes_current_row())
1350    {
1351        // Either
1352        //  - UNBOUNDED frame in both directions, or
1353        //  - There is no ORDER BY and the frame is such that the current peer group is included.
1354        //    (The current peer group will be the whole partition if there is no ORDER BY.)
1355        // We simply need to compute the aggregate once, on the entire partition, and each input
1356        // row will get this one aggregate value as result.
1357        let result_value = wrapped_aggregate.eval(args, temp_storage);
1358        // Every row will get the above aggregate as result.
1359        for _ in 0..length {
1360            result.push(result_value);
1361        }
1362    } else {
1363        fn rows_between_unbounded_preceding_and_current_row<'a, A>(
1364            args: Vec<Datum<'a>>,
1365            result: &mut Vec<Datum<'a>>,
1366            mut one_by_one_aggr: A,
1367            temp_storage: &'a RowArena,
1368        ) where
1369            A: OneByOneAggr,
1370        {
1371            for current_arg in args.into_iter() {
1372                one_by_one_aggr.give(&current_arg);
1373                let result_value = one_by_one_aggr.get_current_aggregate(temp_storage);
1374                result.push(result_value);
1375            }
1376        }
1377
1378        fn groups_between_unbounded_preceding_and_current_row<'a, A>(
1379            args: Vec<Datum<'a>>,
1380            order_by_rows: &Vec<Row>,
1381            result: &mut Vec<Datum<'a>>,
1382            mut one_by_one_aggr: A,
1383            temp_storage: &'a RowArena,
1384        ) where
1385            A: OneByOneAggr,
1386        {
1387            let mut peer_group_start = 0;
1388            while peer_group_start < args.len() {
1389                // Find the boundaries of the current peer group.
1390                // peer_group_start will point to the first element of the peer group,
1391                // peer_group_end will point to _just after_ the last element of the peer group.
1392                let mut peer_group_end = peer_group_start + 1;
1393                while peer_group_end < args.len()
1394                    && order_by_rows[peer_group_start] == order_by_rows[peer_group_end]
1395                {
1396                    // The peer group goes on while the OrderByRows not differ.
1397                    peer_group_end += 1;
1398                }
1399                // Let's compute the aggregate (which will be the same for all records in this
1400                // peer group).
1401                for current_arg in args[peer_group_start..peer_group_end].iter() {
1402                    one_by_one_aggr.give(current_arg);
1403                }
1404                let agg_for_peer_group = one_by_one_aggr.get_current_aggregate(temp_storage);
1405                // Put the above aggregate into each record in the peer group.
1406                for _ in args[peer_group_start..peer_group_end].iter() {
1407                    result.push(agg_for_peer_group);
1408                }
1409                // Point to the start of the next peer group.
1410                peer_group_start = peer_group_end;
1411            }
1412        }
1413
1414        fn rows_between_offset_and_offset<'a>(
1415            args: Vec<Datum<'a>>,
1416            result: &mut Vec<Datum<'a>>,
1417            wrapped_aggregate: &AggregateFunc,
1418            temp_storage: &'a RowArena,
1419            offset_start: i64,
1420            offset_end: i64,
1421        ) {
1422            let len = args
1423                .len()
1424                .to_i64()
1425                .expect("window partition's len should fit into i64");
1426            for i in 0..len {
1427                let i = i.to_i64().expect("window partition shouldn't be super big");
1428                // Trim the start of the frame to make it not reach over the start of the window
1429                // partition.
1430                let frame_start = max(i + offset_start, 0)
1431                    .to_usize()
1432                    .expect("The max made sure it's not negative");
1433                // Trim the end of the frame to make it not reach over the end of the window
1434                // partition.
1435                let frame_end = min(i + offset_end, len - 1).to_usize();
1436                match frame_end {
1437                    Some(frame_end) => {
1438                        if frame_start <= frame_end {
1439                            // Compute the aggregate on the frame.
1440                            // TODO:
1441                            // This implementation is quite slow if the frame is large: we do an
1442                            // inner loop over the entire frame, and compute the aggregate from
1443                            // scratch. We could do better:
1444                            //  - For invertible aggregations we could do a rolling aggregation.
1445                            //  - There are various tricks for min/max as well, making use of either
1446                            //    the fixed size of the window, or that we are not retracting
1447                            //    arbitrary elements but doing queue operations. E.g., see
1448                            //    http://codercareer.blogspot.com/2012/02/no-33-maximums-in-sliding-windows.html
1449                            let frame_values = args[frame_start..=frame_end].iter().cloned();
1450                            let result_value = wrapped_aggregate.eval(frame_values, temp_storage);
1451                            result.push(result_value);
1452                        } else {
1453                            // frame_start > frame_end, so this is an empty frame.
1454                            let result_value = wrapped_aggregate.default();
1455                            result.push(result_value);
1456                        }
1457                    }
1458                    None => {
1459                        // frame_end would be negative, so this is an empty frame.
1460                        let result_value = wrapped_aggregate.default();
1461                        result.push(result_value);
1462                    }
1463                }
1464            }
1465        }
1466
1467        match (
1468            &window_frame.units,
1469            &window_frame.start_bound,
1470            &window_frame.end_bound,
1471        ) {
1472            // Cases where one edge of the frame is CurrentRow.
1473            // Note that these cases could be merged into the more general cases below where one
1474            // edge is some offset (with offset = 0), but the CurrentRow cases probably cover 95%
1475            // of user queries, so let's make this simple and fast.
1476            (Rows, UnboundedPreceding, CurrentRow) => {
1477                rows_between_unbounded_preceding_and_current_row::<A>(
1478                    args,
1479                    &mut result,
1480                    A::new(wrapped_aggregate, false),
1481                    temp_storage,
1482                );
1483            }
1484            (Rows, CurrentRow, UnboundedFollowing) => {
1485                // Same as above, but reverse.
1486                args.reverse();
1487                rows_between_unbounded_preceding_and_current_row::<A>(
1488                    args,
1489                    &mut result,
1490                    A::new(wrapped_aggregate, true),
1491                    temp_storage,
1492                );
1493                result.reverse();
1494            }
1495            (Range, UnboundedPreceding, CurrentRow) => {
1496                // Note that for the default frame, the RANGE frame mode is identical to the GROUPS
1497                // frame mode.
1498                groups_between_unbounded_preceding_and_current_row::<A>(
1499                    args,
1500                    order_by_rows,
1501                    &mut result,
1502                    A::new(wrapped_aggregate, false),
1503                    temp_storage,
1504                );
1505            }
1506            // The next several cases all call `rows_between_offset_and_offset`. Note that the
1507            // offset passed to `rows_between_offset_and_offset` should be negated when it's
1508            // PRECEDING.
1509            (Rows, OffsetPreceding(start_prec), OffsetPreceding(end_prec)) => {
1510                let start_prec = start_prec.to_i64().expect(
1511                    "window frame start OFFSET shouldn't be super big (the planning ensured this)",
1512                );
1513                let end_prec = end_prec.to_i64().expect(
1514                    "window frame end OFFSET shouldn't be super big (the planning ensured this)",
1515                );
1516                rows_between_offset_and_offset(
1517                    args,
1518                    &mut result,
1519                    wrapped_aggregate,
1520                    temp_storage,
1521                    -start_prec,
1522                    -end_prec,
1523                );
1524            }
1525            (Rows, OffsetPreceding(start_prec), OffsetFollowing(end_fol)) => {
1526                let start_prec = start_prec.to_i64().expect(
1527                    "window frame start OFFSET shouldn't be super big (the planning ensured this)",
1528                );
1529                let end_fol = end_fol.to_i64().expect(
1530                    "window frame end OFFSET shouldn't be super big (the planning ensured this)",
1531                );
1532                rows_between_offset_and_offset(
1533                    args,
1534                    &mut result,
1535                    wrapped_aggregate,
1536                    temp_storage,
1537                    -start_prec,
1538                    end_fol,
1539                );
1540            }
1541            (Rows, OffsetFollowing(start_fol), OffsetFollowing(end_fol)) => {
1542                let start_fol = start_fol.to_i64().expect(
1543                    "window frame start OFFSET shouldn't be super big (the planning ensured this)",
1544                );
1545                let end_fol = end_fol.to_i64().expect(
1546                    "window frame end OFFSET shouldn't be super big (the planning ensured this)",
1547                );
1548                rows_between_offset_and_offset(
1549                    args,
1550                    &mut result,
1551                    wrapped_aggregate,
1552                    temp_storage,
1553                    start_fol,
1554                    end_fol,
1555                );
1556            }
1557            (Rows, OffsetFollowing(_), OffsetPreceding(_)) => {
1558                unreachable!() // The planning ensured that this nonsensical case can't happen
1559            }
1560            (Rows, OffsetPreceding(start_prec), CurrentRow) => {
1561                let start_prec = start_prec.to_i64().expect(
1562                    "window frame start OFFSET shouldn't be super big (the planning ensured this)",
1563                );
1564                let end_fol = 0;
1565                rows_between_offset_and_offset(
1566                    args,
1567                    &mut result,
1568                    wrapped_aggregate,
1569                    temp_storage,
1570                    -start_prec,
1571                    end_fol,
1572                );
1573            }
1574            (Rows, CurrentRow, OffsetFollowing(end_fol)) => {
1575                let start_fol = 0;
1576                let end_fol = end_fol.to_i64().expect(
1577                    "window frame end OFFSET shouldn't be super big (the planning ensured this)",
1578                );
1579                rows_between_offset_and_offset(
1580                    args,
1581                    &mut result,
1582                    wrapped_aggregate,
1583                    temp_storage,
1584                    start_fol,
1585                    end_fol,
1586                );
1587            }
1588            (Rows, CurrentRow, CurrentRow) => {
1589                // We could have a more efficient implementation for this, but this is probably
1590                // super rare. (Might be more common with RANGE or GROUPS frame mode, though!)
1591                let start_fol = 0;
1592                let end_fol = 0;
1593                rows_between_offset_and_offset(
1594                    args,
1595                    &mut result,
1596                    wrapped_aggregate,
1597                    temp_storage,
1598                    start_fol,
1599                    end_fol,
1600                );
1601            }
1602            (Rows, CurrentRow, OffsetPreceding(_))
1603            | (Rows, UnboundedFollowing, _)
1604            | (Rows, _, UnboundedPreceding)
1605            | (Rows, OffsetFollowing(..), CurrentRow) => {
1606                unreachable!() // The planning ensured that these nonsensical cases can't happen
1607            }
1608            (Rows, UnboundedPreceding, UnboundedFollowing) => {
1609                // This is handled by the complicated if condition near the beginning of this
1610                // function.
1611                unreachable!()
1612            }
1613            (Rows, UnboundedPreceding, OffsetPreceding(_))
1614            | (Rows, UnboundedPreceding, OffsetFollowing(_))
1615            | (Rows, OffsetPreceding(..), UnboundedFollowing)
1616            | (Rows, OffsetFollowing(..), UnboundedFollowing) => {
1617                // Unsupported. Bail in the planner.
1618                // https://github.com/MaterializeInc/database-issues/issues/6720
1619                unreachable!()
1620            }
1621            (Range, _, _) => {
1622                // Unsupported.
1623                // The planner doesn't allow Range frame mode for now (except for the default
1624                // frame), see https://github.com/MaterializeInc/database-issues/issues/6585
1625                // Note that it would be easy to handle (Range, CurrentRow, UnboundedFollowing):
1626                // it would be similar to (Rows, CurrentRow, UnboundedFollowing), but would call
1627                // groups_between_unbounded_preceding_current_row.
1628                unreachable!()
1629            }
1630            (Groups, _, _) => {
1631                // Unsupported.
1632                // The planner doesn't allow Groups frame mode for now, see
1633                // https://github.com/MaterializeInc/database-issues/issues/6588
1634                unreachable!()
1635            }
1636        }
1637    }
1638
1639    result
1640}
1641
1642/// Computes a bundle of fused window aggregations.
1643/// The input is similar to `window_aggr`, but `InputValue` is not just a single value, but a record
1644/// where each component is the input to one of the aggregations.
1645fn fused_window_aggr<'a, I, A>(
1646    input_datums: I,
1647    callers_temp_storage: &'a RowArena,
1648    wrapped_aggregates: &Vec<AggregateFunc>,
1649    order_by: &Vec<ColumnOrder>,
1650    window_frame: &WindowFrame,
1651) -> Datum<'a>
1652where
1653    I: IntoIterator<Item = Datum<'a>>,
1654    A: OneByOneAggr,
1655{
1656    let temp_storage = RowArena::new();
1657    let iter = fused_window_aggr_no_list::<_, A>(
1658        input_datums,
1659        &temp_storage,
1660        wrapped_aggregates,
1661        order_by,
1662        window_frame,
1663    );
1664    callers_temp_storage.make_datum(|packer| {
1665        packer.push_list(iter);
1666    })
1667}
1668
1669/// Like `fused_window_aggr`, but doesn't perform the final wrapping in a list, returning an
1670/// Iterator instead.
1671fn fused_window_aggr_no_list<'a: 'b, 'b, I, A>(
1672    input_datums: I,
1673    callers_temp_storage: &'b RowArena,
1674    wrapped_aggregates: &Vec<AggregateFunc>,
1675    order_by: &Vec<ColumnOrder>,
1676    window_frame: &WindowFrame,
1677) -> impl Iterator<Item = Datum<'b>>
1678where
1679    I: IntoIterator<Item = Datum<'a>>,
1680    A: OneByOneAggr,
1681{
1682    // Sort the datums according to the ORDER BY expressions and return the ((OriginalRow, InputValue), OrderByRow) record
1683    // The OrderByRow is kept around because it is required to compute the peer groups in RANGE mode
1684    let datums = order_aggregate_datums_with_rank(input_datums, order_by);
1685
1686    let size_hint = datums.size_hint().0;
1687    let mut argss = vec![Vec::with_capacity(size_hint); wrapped_aggregates.len()];
1688    let mut original_rows = Vec::with_capacity(size_hint);
1689    let mut order_by_rows = Vec::with_capacity(size_hint);
1690    for (d, order_by_row) in datums {
1691        let mut iter = d.unwrap_list().iter();
1692        let original_row = iter.next().unwrap();
1693        original_rows.push(original_row);
1694        let args_iter = iter.next().unwrap().unwrap_list().iter();
1695        // Push each argument into the respective list
1696        for (args, arg) in argss.iter_mut().zip_eq(args_iter) {
1697            args.push(arg);
1698        }
1699        order_by_rows.push(order_by_row);
1700    }
1701
1702    let mut results_per_row =
1703        vec![Vec::with_capacity(wrapped_aggregates.len()); original_rows.len()];
1704    for (wrapped_aggr, args) in wrapped_aggregates.iter().zip_eq(argss) {
1705        let results = window_aggr_inner::<A>(
1706            args,
1707            &order_by_rows,
1708            wrapped_aggr,
1709            order_by,
1710            window_frame,
1711            callers_temp_storage,
1712        );
1713        for (results, result) in results_per_row.iter_mut().zip_eq(results) {
1714            results.push(result);
1715        }
1716    }
1717
1718    callers_temp_storage.reserve(2 * original_rows.len());
1719    results_per_row
1720        .into_iter()
1721        .enumerate()
1722        .map(move |(i, results)| {
1723            callers_temp_storage.make_datum(|packer| {
1724                packer.push_list_with(|packer| {
1725                    packer
1726                        .push(callers_temp_storage.make_datum(|packer| packer.push_list(results)));
1727                    packer.push(original_rows[i]);
1728                });
1729            })
1730        })
1731}
1732
1733/// An implementation of an aggregation where we can send in the input elements one-by-one, and
1734/// can also ask the current aggregate at any moment. (This just delegates to other aggregation
1735/// evaluation approaches.)
1736pub trait OneByOneAggr {
1737    /// The `reverse` parameter makes the aggregations process input elements in reverse order.
1738    /// This has an effect only for non-commutative aggregations, e.g. `list_agg`. These are
1739    /// currently only some of the Basic aggregations. (Basic aggregations are handled by
1740    /// `NaiveOneByOneAggr`).
1741    fn new(agg: &AggregateFunc, reverse: bool) -> Self;
1742    /// Pushes one input element into the aggregation.
1743    fn give(&mut self, d: &Datum);
1744    /// Returns the value of the aggregate computed on the given values so far.
1745    fn get_current_aggregate<'a>(&self, temp_storage: &'a RowArena) -> Datum<'a>;
1746}
1747
1748/// Naive implementation of [OneByOneAggr], suitable for stuff like const folding, but too slow for
1749/// rendering. This relies only on infrastructure available in `mz-expr`. It simply saves all the
1750/// given input, and calls the given [AggregateFunc]'s `eval` method when asked about the current
1751/// aggregate. (For Accumulable and Hierarchical aggregations, the rendering has more efficient
1752/// implementations, but for Basic aggregations even the rendering uses this naive implementation.)
1753#[derive(Debug)]
1754pub struct NaiveOneByOneAggr {
1755    agg: AggregateFunc,
1756    input: Vec<Row>,
1757    reverse: bool,
1758}
1759
1760impl OneByOneAggr for NaiveOneByOneAggr {
1761    fn new(agg: &AggregateFunc, reverse: bool) -> Self {
1762        NaiveOneByOneAggr {
1763            agg: agg.clone(),
1764            input: Vec::new(),
1765            reverse,
1766        }
1767    }
1768
1769    fn give(&mut self, d: &Datum) {
1770        let mut row = Row::default();
1771        row.packer().push(d);
1772        self.input.push(row);
1773    }
1774
1775    fn get_current_aggregate<'a>(&self, temp_storage: &'a RowArena) -> Datum<'a> {
1776        temp_storage.make_datum(|packer| {
1777            packer.push(if !self.reverse {
1778                self.agg
1779                    .eval(self.input.iter().map(|r| r.unpack_first()), temp_storage)
1780            } else {
1781                self.agg.eval(
1782                    self.input.iter().rev().map(|r| r.unpack_first()),
1783                    temp_storage,
1784                )
1785            });
1786        })
1787    }
1788}
1789
1790/// Identify whether the given aggregate function is Lag or Lead, since they share
1791/// implementations.
1792#[derive(
1793    Clone,
1794    Debug,
1795    Eq,
1796    PartialEq,
1797    Ord,
1798    PartialOrd,
1799    Serialize,
1800    Deserialize,
1801    Hash,
1802    MzReflect
1803)]
1804pub enum LagLeadType {
1805    Lag,
1806    Lead,
1807}
1808
1809#[derive(
1810    Clone,
1811    Debug,
1812    Eq,
1813    PartialEq,
1814    Ord,
1815    PartialOrd,
1816    Serialize,
1817    Deserialize,
1818    Hash,
1819    MzReflect
1820)]
1821pub enum AggregateFunc {
1822    MaxNumeric,
1823    MaxInt16,
1824    MaxInt32,
1825    MaxInt64,
1826    MaxUInt16,
1827    MaxUInt32,
1828    MaxUInt64,
1829    MaxMzTimestamp,
1830    MaxFloat32,
1831    MaxFloat64,
1832    MaxBool,
1833    MaxString,
1834    MaxDate,
1835    MaxTimestamp,
1836    MaxTimestampTz,
1837    MaxInterval,
1838    MaxTime,
1839    MinNumeric,
1840    MinInt16,
1841    MinInt32,
1842    MinInt64,
1843    MinUInt16,
1844    MinUInt32,
1845    MinUInt64,
1846    MinMzTimestamp,
1847    MinFloat32,
1848    MinFloat64,
1849    MinBool,
1850    MinString,
1851    MinDate,
1852    MinTimestamp,
1853    MinTimestampTz,
1854    MinInterval,
1855    MinTime,
1856    SumInt16,
1857    SumInt32,
1858    SumInt64,
1859    SumUInt16,
1860    SumUInt32,
1861    SumUInt64,
1862    SumFloat32,
1863    SumFloat64,
1864    SumNumeric,
1865    Count,
1866    Any,
1867    All,
1868    /// Accumulates `Datum::List`s whose first element is a JSON-typed `Datum`s
1869    /// into a JSON list. The other elements are columns used by `order_by`.
1870    ///
1871    /// WARNING: Unlike the `jsonb_agg` function that is exposed by the SQL
1872    /// layer, this function filters out `Datum::Null`, for consistency with
1873    /// the other aggregate functions.
1874    JsonbAgg {
1875        order_by: Vec<ColumnOrder>,
1876    },
1877    /// Zips `Datum::List`s whose first element is a JSON-typed `Datum`s into a
1878    /// JSON map. The other elements are columns used by `order_by`.
1879    ///
1880    /// WARNING: Unlike the `jsonb_object_agg` function that is exposed by the SQL
1881    /// layer, this function filters out `Datum::Null`, for consistency with
1882    /// the other aggregate functions.
1883    JsonbObjectAgg {
1884        order_by: Vec<ColumnOrder>,
1885    },
1886    /// Zips a `Datum::List` whose first element is a `Datum::List` guaranteed
1887    /// to be non-empty and whose len % 2 == 0 into a `Datum::Map`. The other
1888    /// elements are columns used by `order_by`.
1889    MapAgg {
1890        order_by: Vec<ColumnOrder>,
1891        value_type: SqlScalarType,
1892    },
1893    /// Accumulates `Datum::Array`s of `SqlScalarType::Record` whose first element is a `Datum::Array`
1894    /// into a single `Datum::Array` (the remaining fields are used by `order_by`).
1895    ArrayConcat {
1896        order_by: Vec<ColumnOrder>,
1897    },
1898    /// Accumulates `Datum::List`s of `SqlScalarType::Record` whose first field is a `Datum::List`
1899    /// into a single `Datum::List` (the remaining fields are used by `order_by`).
1900    ListConcat {
1901        order_by: Vec<ColumnOrder>,
1902    },
1903    StringAgg {
1904        order_by: Vec<ColumnOrder>,
1905    },
1906    RowNumber {
1907        order_by: Vec<ColumnOrder>,
1908    },
1909    Rank {
1910        order_by: Vec<ColumnOrder>,
1911    },
1912    DenseRank {
1913        order_by: Vec<ColumnOrder>,
1914    },
1915    LagLead {
1916        order_by: Vec<ColumnOrder>,
1917        lag_lead: LagLeadType,
1918        ignore_nulls: bool,
1919    },
1920    FirstValue {
1921        order_by: Vec<ColumnOrder>,
1922        window_frame: WindowFrame,
1923    },
1924    LastValue {
1925        order_by: Vec<ColumnOrder>,
1926        window_frame: WindowFrame,
1927    },
1928    /// Several value window functions fused into one function, to amortize overheads.
1929    FusedValueWindowFunc {
1930        funcs: Vec<AggregateFunc>,
1931        /// Currently, all the fused functions must have the same `order_by`. (We can later
1932        /// eliminate this limitation.)
1933        order_by: Vec<ColumnOrder>,
1934    },
1935    WindowAggregate {
1936        wrapped_aggregate: Box<AggregateFunc>,
1937        order_by: Vec<ColumnOrder>,
1938        window_frame: WindowFrame,
1939    },
1940    FusedWindowAggregate {
1941        wrapped_aggregates: Vec<AggregateFunc>,
1942        order_by: Vec<ColumnOrder>,
1943        window_frame: WindowFrame,
1944    },
1945    /// Accumulates any number of `Datum::Dummy`s into `Datum::Dummy`.
1946    ///
1947    /// Useful for removing an expensive aggregation while maintaining the shape
1948    /// of a reduce operator.
1949    Dummy,
1950}
1951
1952impl AggregateFunc {
1953    pub fn eval<'a, I>(&self, datums: I, temp_storage: &'a RowArena) -> Datum<'a>
1954    where
1955        I: IntoIterator<Item = Datum<'a>>,
1956    {
1957        match self {
1958            AggregateFunc::MaxNumeric => {
1959                max_datum::<'a, I, OrderedDecimal<numeric::Numeric>>(datums)
1960            }
1961            AggregateFunc::MaxInt16 => max_datum::<'a, I, i16>(datums),
1962            AggregateFunc::MaxInt32 => max_datum::<'a, I, i32>(datums),
1963            AggregateFunc::MaxInt64 => max_datum::<'a, I, i64>(datums),
1964            AggregateFunc::MaxUInt16 => max_datum::<'a, I, u16>(datums),
1965            AggregateFunc::MaxUInt32 => max_datum::<'a, I, u32>(datums),
1966            AggregateFunc::MaxUInt64 => max_datum::<'a, I, u64>(datums),
1967            AggregateFunc::MaxMzTimestamp => max_datum::<'a, I, mz_repr::Timestamp>(datums),
1968            AggregateFunc::MaxFloat32 => max_datum::<'a, I, OrderedFloat<f32>>(datums),
1969            AggregateFunc::MaxFloat64 => max_datum::<'a, I, OrderedFloat<f64>>(datums),
1970            AggregateFunc::MaxBool => max_datum::<'a, I, bool>(datums),
1971            AggregateFunc::MaxString => max_string(datums),
1972            AggregateFunc::MaxDate => max_datum::<'a, I, Date>(datums),
1973            AggregateFunc::MaxTimestamp => {
1974                max_datum::<'a, I, CheckedTimestamp<NaiveDateTime>>(datums)
1975            }
1976            AggregateFunc::MaxTimestampTz => {
1977                max_datum::<'a, I, CheckedTimestamp<DateTime<Utc>>>(datums)
1978            }
1979            AggregateFunc::MaxInterval => max_datum::<'a, I, Interval>(datums),
1980            AggregateFunc::MaxTime => max_datum::<'a, I, NaiveTime>(datums),
1981            AggregateFunc::MinNumeric => {
1982                min_datum::<'a, I, OrderedDecimal<numeric::Numeric>>(datums)
1983            }
1984            AggregateFunc::MinInt16 => min_datum::<'a, I, i16>(datums),
1985            AggregateFunc::MinInt32 => min_datum::<'a, I, i32>(datums),
1986            AggregateFunc::MinInt64 => min_datum::<'a, I, i64>(datums),
1987            AggregateFunc::MinUInt16 => min_datum::<'a, I, u16>(datums),
1988            AggregateFunc::MinUInt32 => min_datum::<'a, I, u32>(datums),
1989            AggregateFunc::MinUInt64 => min_datum::<'a, I, u64>(datums),
1990            AggregateFunc::MinMzTimestamp => min_datum::<'a, I, mz_repr::Timestamp>(datums),
1991            AggregateFunc::MinFloat32 => min_datum::<'a, I, OrderedFloat<f32>>(datums),
1992            AggregateFunc::MinFloat64 => min_datum::<'a, I, OrderedFloat<f64>>(datums),
1993            AggregateFunc::MinBool => min_datum::<'a, I, bool>(datums),
1994            AggregateFunc::MinString => min_string(datums),
1995            AggregateFunc::MinDate => min_datum::<'a, I, Date>(datums),
1996            AggregateFunc::MinTimestamp => {
1997                min_datum::<'a, I, CheckedTimestamp<NaiveDateTime>>(datums)
1998            }
1999            AggregateFunc::MinTimestampTz => {
2000                min_datum::<'a, I, CheckedTimestamp<DateTime<Utc>>>(datums)
2001            }
2002            AggregateFunc::MinInterval => min_datum::<'a, I, Interval>(datums),
2003            AggregateFunc::MinTime => min_datum::<'a, I, NaiveTime>(datums),
2004            AggregateFunc::SumInt16 => sum_datum::<'a, I, i16, i64>(datums),
2005            AggregateFunc::SumInt32 => sum_datum::<'a, I, i32, i64>(datums),
2006            AggregateFunc::SumInt64 => sum_datum::<'a, I, i64, i128>(datums),
2007            AggregateFunc::SumUInt16 => sum_datum::<'a, I, u16, u64>(datums),
2008            AggregateFunc::SumUInt32 => sum_datum::<'a, I, u32, u64>(datums),
2009            AggregateFunc::SumUInt64 => sum_datum::<'a, I, u64, u128>(datums),
2010            AggregateFunc::SumFloat32 => sum_datum::<'a, I, f32, f32>(datums),
2011            AggregateFunc::SumFloat64 => sum_datum::<'a, I, f64, f64>(datums),
2012            AggregateFunc::SumNumeric => sum_numeric(datums),
2013            AggregateFunc::Count => count(datums),
2014            AggregateFunc::Any => any(datums),
2015            AggregateFunc::All => all(datums),
2016            AggregateFunc::JsonbAgg { order_by } => jsonb_agg(datums, temp_storage, order_by),
2017            AggregateFunc::MapAgg { order_by, .. } | AggregateFunc::JsonbObjectAgg { order_by } => {
2018                dict_agg(datums, temp_storage, order_by)
2019            }
2020            AggregateFunc::ArrayConcat { order_by } => array_concat(datums, temp_storage, order_by),
2021            AggregateFunc::ListConcat { order_by } => list_concat(datums, temp_storage, order_by),
2022            AggregateFunc::StringAgg { order_by } => string_agg(datums, temp_storage, order_by),
2023            AggregateFunc::RowNumber { order_by } => row_number(datums, temp_storage, order_by),
2024            AggregateFunc::Rank { order_by } => rank(datums, temp_storage, order_by),
2025            AggregateFunc::DenseRank { order_by } => dense_rank(datums, temp_storage, order_by),
2026            AggregateFunc::LagLead {
2027                order_by,
2028                lag_lead: lag_lead_type,
2029                ignore_nulls,
2030            } => lag_lead(datums, temp_storage, order_by, lag_lead_type, ignore_nulls),
2031            AggregateFunc::FirstValue {
2032                order_by,
2033                window_frame,
2034            } => first_value(datums, temp_storage, order_by, window_frame),
2035            AggregateFunc::LastValue {
2036                order_by,
2037                window_frame,
2038            } => last_value(datums, temp_storage, order_by, window_frame),
2039            AggregateFunc::WindowAggregate {
2040                wrapped_aggregate,
2041                order_by,
2042                window_frame,
2043            } => window_aggr::<_, NaiveOneByOneAggr>(
2044                datums,
2045                temp_storage,
2046                wrapped_aggregate,
2047                order_by,
2048                window_frame,
2049            ),
2050            AggregateFunc::FusedValueWindowFunc { funcs, order_by } => {
2051                fused_value_window_func(datums, temp_storage, funcs, order_by)
2052            }
2053            AggregateFunc::FusedWindowAggregate {
2054                wrapped_aggregates,
2055                order_by,
2056                window_frame,
2057            } => fused_window_aggr::<_, NaiveOneByOneAggr>(
2058                datums,
2059                temp_storage,
2060                wrapped_aggregates,
2061                order_by,
2062                window_frame,
2063            ),
2064            AggregateFunc::Dummy => Datum::Dummy,
2065        }
2066    }
2067
2068    /// Like `eval`, but it's given a [OneByOneAggr]. If `self` is a `WindowAggregate`, then
2069    /// the given [OneByOneAggr] will be used to evaluate the wrapped aggregate inside the
2070    /// `WindowAggregate`. If `self` is not a `WindowAggregate`, then it simply calls `eval`.
2071    pub fn eval_with_fast_window_agg<'a, I, W>(
2072        &self,
2073        datums: I,
2074        temp_storage: &'a RowArena,
2075    ) -> Datum<'a>
2076    where
2077        I: IntoIterator<Item = Datum<'a>>,
2078        W: OneByOneAggr,
2079    {
2080        match self {
2081            AggregateFunc::WindowAggregate {
2082                wrapped_aggregate,
2083                order_by,
2084                window_frame,
2085            } => window_aggr::<_, W>(
2086                datums,
2087                temp_storage,
2088                wrapped_aggregate,
2089                order_by,
2090                window_frame,
2091            ),
2092            AggregateFunc::FusedWindowAggregate {
2093                wrapped_aggregates,
2094                order_by,
2095                window_frame,
2096            } => fused_window_aggr::<_, W>(
2097                datums,
2098                temp_storage,
2099                wrapped_aggregates,
2100                order_by,
2101                window_frame,
2102            ),
2103            _ => self.eval(datums, temp_storage),
2104        }
2105    }
2106
2107    pub fn eval_with_unnest_list<'a, I, W>(
2108        &self,
2109        datums: I,
2110        temp_storage: &'a RowArena,
2111    ) -> impl Iterator<Item = Datum<'a>>
2112    where
2113        I: IntoIterator<Item = Datum<'a>>,
2114        W: OneByOneAggr,
2115    {
2116        // TODO: Use `enum_dispatch` to construct a unified iterator instead of `collect_vec`.
2117        assert!(self.can_fuse_with_unnest_list());
2118        match self {
2119            AggregateFunc::RowNumber { order_by } => {
2120                row_number_no_list(datums, temp_storage, order_by).collect_vec()
2121            }
2122            AggregateFunc::Rank { order_by } => {
2123                rank_no_list(datums, temp_storage, order_by).collect_vec()
2124            }
2125            AggregateFunc::DenseRank { order_by } => {
2126                dense_rank_no_list(datums, temp_storage, order_by).collect_vec()
2127            }
2128            AggregateFunc::LagLead {
2129                order_by,
2130                lag_lead: lag_lead_type,
2131                ignore_nulls,
2132            } => lag_lead_no_list(datums, temp_storage, order_by, lag_lead_type, ignore_nulls)
2133                .collect_vec(),
2134            AggregateFunc::FirstValue {
2135                order_by,
2136                window_frame,
2137            } => first_value_no_list(datums, temp_storage, order_by, window_frame).collect_vec(),
2138            AggregateFunc::LastValue {
2139                order_by,
2140                window_frame,
2141            } => last_value_no_list(datums, temp_storage, order_by, window_frame).collect_vec(),
2142            AggregateFunc::FusedValueWindowFunc { funcs, order_by } => {
2143                fused_value_window_func_no_list(datums, temp_storage, funcs, order_by).collect_vec()
2144            }
2145            AggregateFunc::WindowAggregate {
2146                wrapped_aggregate,
2147                order_by,
2148                window_frame,
2149            } => window_aggr_no_list::<_, W>(
2150                datums,
2151                temp_storage,
2152                wrapped_aggregate,
2153                order_by,
2154                window_frame,
2155            )
2156            .collect_vec(),
2157            AggregateFunc::FusedWindowAggregate {
2158                wrapped_aggregates,
2159                order_by,
2160                window_frame,
2161            } => fused_window_aggr_no_list::<_, W>(
2162                datums,
2163                temp_storage,
2164                wrapped_aggregates,
2165                order_by,
2166                window_frame,
2167            )
2168            .collect_vec(),
2169            _ => unreachable!("asserted above that `can_fuse_with_unnest_list`"),
2170        }
2171        .into_iter()
2172    }
2173
2174    /// Returns the output of the aggregation function when applied on an empty
2175    /// input relation.
2176    pub fn default(&self) -> Datum<'static> {
2177        match self {
2178            AggregateFunc::Count => Datum::Int64(0),
2179            AggregateFunc::Any => Datum::False,
2180            AggregateFunc::All => Datum::True,
2181            AggregateFunc::Dummy => Datum::Dummy,
2182            _ => Datum::Null,
2183        }
2184    }
2185
2186    /// Returns a datum whose inclusion in the aggregation will not change its
2187    /// result.
2188    pub fn identity_datum(&self) -> Datum<'static> {
2189        match self {
2190            AggregateFunc::Any => Datum::False,
2191            AggregateFunc::All => Datum::True,
2192            AggregateFunc::Dummy => Datum::Dummy,
2193            AggregateFunc::ArrayConcat { .. } => Datum::empty_array(),
2194            AggregateFunc::ListConcat { .. } => Datum::empty_list(),
2195            AggregateFunc::RowNumber { .. }
2196            | AggregateFunc::Rank { .. }
2197            | AggregateFunc::DenseRank { .. }
2198            | AggregateFunc::LagLead { .. }
2199            | AggregateFunc::FirstValue { .. }
2200            | AggregateFunc::LastValue { .. }
2201            | AggregateFunc::WindowAggregate { .. }
2202            | AggregateFunc::FusedValueWindowFunc { .. }
2203            | AggregateFunc::FusedWindowAggregate { .. } => Datum::empty_list(),
2204            AggregateFunc::MaxNumeric
2205            | AggregateFunc::MaxInt16
2206            | AggregateFunc::MaxInt32
2207            | AggregateFunc::MaxInt64
2208            | AggregateFunc::MaxUInt16
2209            | AggregateFunc::MaxUInt32
2210            | AggregateFunc::MaxUInt64
2211            | AggregateFunc::MaxMzTimestamp
2212            | AggregateFunc::MaxFloat32
2213            | AggregateFunc::MaxFloat64
2214            | AggregateFunc::MaxBool
2215            | AggregateFunc::MaxString
2216            | AggregateFunc::MaxDate
2217            | AggregateFunc::MaxTimestamp
2218            | AggregateFunc::MaxTimestampTz
2219            | AggregateFunc::MaxInterval
2220            | AggregateFunc::MaxTime
2221            | AggregateFunc::MinNumeric
2222            | AggregateFunc::MinInt16
2223            | AggregateFunc::MinInt32
2224            | AggregateFunc::MinInt64
2225            | AggregateFunc::MinUInt16
2226            | AggregateFunc::MinUInt32
2227            | AggregateFunc::MinUInt64
2228            | AggregateFunc::MinMzTimestamp
2229            | AggregateFunc::MinFloat32
2230            | AggregateFunc::MinFloat64
2231            | AggregateFunc::MinBool
2232            | AggregateFunc::MinString
2233            | AggregateFunc::MinDate
2234            | AggregateFunc::MinTimestamp
2235            | AggregateFunc::MinTimestampTz
2236            | AggregateFunc::MinInterval
2237            | AggregateFunc::MinTime
2238            | AggregateFunc::SumInt16
2239            | AggregateFunc::SumInt32
2240            | AggregateFunc::SumInt64
2241            | AggregateFunc::SumUInt16
2242            | AggregateFunc::SumUInt32
2243            | AggregateFunc::SumUInt64
2244            | AggregateFunc::SumFloat32
2245            | AggregateFunc::SumFloat64
2246            | AggregateFunc::SumNumeric
2247            | AggregateFunc::Count
2248            | AggregateFunc::JsonbAgg { .. }
2249            | AggregateFunc::JsonbObjectAgg { .. }
2250            | AggregateFunc::MapAgg { .. }
2251            | AggregateFunc::StringAgg { .. } => Datum::Null,
2252        }
2253    }
2254
2255    pub fn can_fuse_with_unnest_list(&self) -> bool {
2256        match self {
2257            AggregateFunc::RowNumber { .. }
2258            | AggregateFunc::Rank { .. }
2259            | AggregateFunc::DenseRank { .. }
2260            | AggregateFunc::LagLead { .. }
2261            | AggregateFunc::FirstValue { .. }
2262            | AggregateFunc::LastValue { .. }
2263            | AggregateFunc::WindowAggregate { .. }
2264            | AggregateFunc::FusedValueWindowFunc { .. }
2265            | AggregateFunc::FusedWindowAggregate { .. } => true,
2266            AggregateFunc::ArrayConcat { .. }
2267            | AggregateFunc::ListConcat { .. }
2268            | AggregateFunc::Any
2269            | AggregateFunc::All
2270            | AggregateFunc::Dummy
2271            | AggregateFunc::MaxNumeric
2272            | AggregateFunc::MaxInt16
2273            | AggregateFunc::MaxInt32
2274            | AggregateFunc::MaxInt64
2275            | AggregateFunc::MaxUInt16
2276            | AggregateFunc::MaxUInt32
2277            | AggregateFunc::MaxUInt64
2278            | AggregateFunc::MaxMzTimestamp
2279            | AggregateFunc::MaxFloat32
2280            | AggregateFunc::MaxFloat64
2281            | AggregateFunc::MaxBool
2282            | AggregateFunc::MaxString
2283            | AggregateFunc::MaxDate
2284            | AggregateFunc::MaxTimestamp
2285            | AggregateFunc::MaxTimestampTz
2286            | AggregateFunc::MaxInterval
2287            | AggregateFunc::MaxTime
2288            | AggregateFunc::MinNumeric
2289            | AggregateFunc::MinInt16
2290            | AggregateFunc::MinInt32
2291            | AggregateFunc::MinInt64
2292            | AggregateFunc::MinUInt16
2293            | AggregateFunc::MinUInt32
2294            | AggregateFunc::MinUInt64
2295            | AggregateFunc::MinMzTimestamp
2296            | AggregateFunc::MinFloat32
2297            | AggregateFunc::MinFloat64
2298            | AggregateFunc::MinBool
2299            | AggregateFunc::MinString
2300            | AggregateFunc::MinDate
2301            | AggregateFunc::MinTimestamp
2302            | AggregateFunc::MinTimestampTz
2303            | AggregateFunc::MinInterval
2304            | AggregateFunc::MinTime
2305            | AggregateFunc::SumInt16
2306            | AggregateFunc::SumInt32
2307            | AggregateFunc::SumInt64
2308            | AggregateFunc::SumUInt16
2309            | AggregateFunc::SumUInt32
2310            | AggregateFunc::SumUInt64
2311            | AggregateFunc::SumFloat32
2312            | AggregateFunc::SumFloat64
2313            | AggregateFunc::SumNumeric
2314            | AggregateFunc::Count
2315            | AggregateFunc::JsonbAgg { .. }
2316            | AggregateFunc::JsonbObjectAgg { .. }
2317            | AggregateFunc::MapAgg { .. }
2318            | AggregateFunc::StringAgg { .. } => false,
2319        }
2320    }
2321
2322    /// The output column type for the result of an aggregation.
2323    ///
2324    /// The output column type also contains nullability information, which
2325    /// is (without further information) true for aggregations that are not
2326    /// counts.
2327    pub fn output_type(&self, input_type: SqlColumnType) -> SqlColumnType {
2328        let scalar_type = match self {
2329            AggregateFunc::Count => SqlScalarType::Int64,
2330            AggregateFunc::Any => SqlScalarType::Bool,
2331            AggregateFunc::All => SqlScalarType::Bool,
2332            AggregateFunc::JsonbAgg { .. } => SqlScalarType::Jsonb,
2333            AggregateFunc::JsonbObjectAgg { .. } => SqlScalarType::Jsonb,
2334            AggregateFunc::SumInt16 => SqlScalarType::Int64,
2335            AggregateFunc::SumInt32 => SqlScalarType::Int64,
2336            AggregateFunc::SumInt64 => SqlScalarType::Numeric {
2337                max_scale: Some(NumericMaxScale::ZERO),
2338            },
2339            AggregateFunc::SumUInt16 => SqlScalarType::UInt64,
2340            AggregateFunc::SumUInt32 => SqlScalarType::UInt64,
2341            AggregateFunc::SumUInt64 => SqlScalarType::Numeric {
2342                max_scale: Some(NumericMaxScale::ZERO),
2343            },
2344            AggregateFunc::MapAgg { value_type, .. } => SqlScalarType::Map {
2345                value_type: Box::new(value_type.clone()),
2346                custom_id: None,
2347            },
2348            AggregateFunc::ArrayConcat { .. } | AggregateFunc::ListConcat { .. } => {
2349                match input_type.scalar_type {
2350                    // The input is wrapped in a Record if there's an ORDER BY, so extract it out.
2351                    SqlScalarType::Record { ref fields, .. } => fields[0].1.scalar_type.clone(),
2352                    _ => unreachable!(),
2353                }
2354            }
2355            AggregateFunc::StringAgg { .. } => SqlScalarType::String,
2356            AggregateFunc::RowNumber { .. } => {
2357                AggregateFunc::output_type_ranking_window_funcs(&input_type, "?row_number?")
2358            }
2359            AggregateFunc::Rank { .. } => {
2360                AggregateFunc::output_type_ranking_window_funcs(&input_type, "?rank?")
2361            }
2362            AggregateFunc::DenseRank { .. } => {
2363                AggregateFunc::output_type_ranking_window_funcs(&input_type, "?dense_rank?")
2364            }
2365            AggregateFunc::LagLead { lag_lead: lag_lead_type, .. } => {
2366                // The input type for Lag is ((OriginalRow, EncodedArgs), OrderByExprs...)
2367                let fields = input_type.scalar_type.unwrap_record_element_type();
2368                let original_row_type = fields[0].unwrap_record_element_type()[0]
2369                    .clone()
2370                    .nullable(false);
2371                let encoded_args = fields[0].unwrap_record_element_type()[1];
2372                let output_type_inner =
2373                    Self::lag_lead_output_type_inner_from_encoded_args(encoded_args);
2374                let column_name = Self::lag_lead_result_column_name(lag_lead_type);
2375
2376                SqlScalarType::List {
2377                    element_type: Box::new(SqlScalarType::Record {
2378                        fields: [
2379                            (column_name, output_type_inner),
2380                            (ColumnName::from("?orig_row?"), original_row_type),
2381                        ].into(),
2382                        custom_id: None,
2383                    }),
2384                    custom_id: None,
2385                }
2386            }
2387            AggregateFunc::FirstValue { .. } => {
2388                // The input type for FirstValue is ((OriginalRow, Arg), OrderByExprs...)
2389                let fields = input_type.scalar_type.unwrap_record_element_type();
2390                let original_row_type = fields[0].unwrap_record_element_type()[0]
2391                    .clone()
2392                    .nullable(false);
2393                let value_type = fields[0].unwrap_record_element_type()[1]
2394                    .clone()
2395                    .nullable(true); // null when the partition is empty
2396
2397                SqlScalarType::List {
2398                    element_type: Box::new(SqlScalarType::Record {
2399                        fields: [
2400                            (ColumnName::from("?first_value?"), value_type),
2401                            (ColumnName::from("?orig_row?"), original_row_type),
2402                        ].into(),
2403                        custom_id: None,
2404                    }),
2405                    custom_id: None,
2406                }
2407            }
2408            AggregateFunc::LastValue { .. } => {
2409                // The input type for LastValue is ((OriginalRow, Arg), OrderByExprs...)
2410                let fields = input_type.scalar_type.unwrap_record_element_type();
2411                let original_row_type = fields[0].unwrap_record_element_type()[0]
2412                    .clone()
2413                    .nullable(false);
2414                let value_type = fields[0].unwrap_record_element_type()[1]
2415                    .clone()
2416                    .nullable(true); // null when the partition is empty
2417
2418                SqlScalarType::List {
2419                    element_type: Box::new(SqlScalarType::Record {
2420                        fields: [
2421                            (ColumnName::from("?last_value?"), value_type),
2422                            (ColumnName::from("?orig_row?"), original_row_type),
2423                        ].into(),
2424                        custom_id: None,
2425                    }),
2426                    custom_id: None,
2427                }
2428            }
2429            AggregateFunc::WindowAggregate {
2430                wrapped_aggregate, ..
2431            } => {
2432                // The input type for a window aggregate is ((OriginalRow, Arg), OrderByExprs...)
2433                let fields = input_type.scalar_type.unwrap_record_element_type();
2434                let original_row_type = fields[0].unwrap_record_element_type()[0]
2435                    .clone()
2436                    .nullable(false);
2437                let arg_type = fields[0].unwrap_record_element_type()[1]
2438                    .clone()
2439                    .nullable(true);
2440                let wrapped_aggr_out_type = wrapped_aggregate.output_type(arg_type);
2441
2442                SqlScalarType::List {
2443                    element_type: Box::new(SqlScalarType::Record {
2444                        fields: [
2445                            (ColumnName::from("?window_agg?"), wrapped_aggr_out_type),
2446                            (ColumnName::from("?orig_row?"), original_row_type),
2447                        ].into(),
2448                        custom_id: None,
2449                    }),
2450                    custom_id: None,
2451                }
2452            }
2453            AggregateFunc::FusedWindowAggregate {
2454                wrapped_aggregates, ..
2455            } => {
2456                // The input type for a fused window aggregate is ((OriginalRow, Args), OrderByExprs...)
2457                // where `Args` is a record.
2458                let fields = input_type.scalar_type.unwrap_record_element_type();
2459                let original_row_type = fields[0].unwrap_record_element_type()[0]
2460                    .clone()
2461                    .nullable(false);
2462                let args_type = fields[0].unwrap_record_element_type()[1];
2463                let arg_types = args_type.unwrap_record_element_type();
2464                let out_fields = arg_types.iter().zip_eq(wrapped_aggregates).map(
2465                    |(arg_type, wrapped_agg)| {
2466                    (
2467                        ColumnName::from(wrapped_agg.name()),
2468                        wrapped_agg.output_type((**arg_type).clone().nullable(true)),
2469                    )
2470                }).collect_vec();
2471
2472                SqlScalarType::List {
2473                    element_type: Box::new(SqlScalarType::Record {
2474                        fields: [
2475                            (ColumnName::from("?fused_window_agg?"), SqlScalarType::Record {
2476                                fields: out_fields.into(),
2477                                custom_id: None,
2478                            }.nullable(false)),
2479                            (ColumnName::from("?orig_row?"), original_row_type),
2480                        ].into(),
2481                        custom_id: None,
2482                    }),
2483                    custom_id: None,
2484                }
2485            }
2486            AggregateFunc::FusedValueWindowFunc { funcs, order_by: _ } => {
2487                // The input type is ((OriginalRow, EncodedArgs), OrderByExprs...)
2488                // where EncodedArgs is a record, where each element is the argument to one of the
2489                // function calls that got fused. This is a record for lag/lead, and a simple type
2490                // for first_value/last_value.
2491                let fields = input_type.scalar_type.unwrap_record_element_type();
2492                let original_row_type = fields[0].unwrap_record_element_type()[0]
2493                    .clone()
2494                    .nullable(false);
2495                let encoded_args_type = fields[0]
2496                    .unwrap_record_element_type()[1]
2497                    .unwrap_record_element_type();
2498
2499                SqlScalarType::List {
2500                    element_type: Box::new(SqlScalarType::Record {
2501                        fields: [
2502                            (
2503                                ColumnName::from("?fused_value_window_func?"),
2504                                SqlScalarType::Record {
2505                                fields: encoded_args_type.into_iter().zip_eq(funcs).map(
2506                                    |(arg_type, func)| {
2507                                    match func {
2508                                        AggregateFunc::LagLead {
2509                                            lag_lead: lag_lead_type, ..
2510                                        } => {
2511                                            let name = Self::lag_lead_result_column_name(
2512                                                lag_lead_type,
2513                                            );
2514                                            let ty = Self
2515                                                ::lag_lead_output_type_inner_from_encoded_args(
2516                                                    arg_type,
2517                                                );
2518                                            (name, ty)
2519                                        },
2520                                        AggregateFunc::FirstValue { .. } => {
2521                                            (
2522                                                ColumnName::from("?first_value?"),
2523                                                arg_type.clone().nullable(true),
2524                                            )
2525                                        }
2526                                        AggregateFunc::LastValue { .. } => {
2527                                            (
2528                                                ColumnName::from("?last_value?"),
2529                                                arg_type.clone().nullable(true),
2530                                            )
2531                                        }
2532                                        _ => panic!("FusedValueWindowFunc has an unknown function"),
2533                                    }
2534                                }).collect(),
2535                                custom_id: None,
2536                            }.nullable(false)),
2537                            (ColumnName::from("?orig_row?"), original_row_type),
2538                        ].into(),
2539                        custom_id: None,
2540                    }),
2541                    custom_id: None,
2542                }
2543            }
2544            AggregateFunc::Dummy
2545            | AggregateFunc::MaxNumeric
2546            | AggregateFunc::MaxInt16
2547            | AggregateFunc::MaxInt32
2548            | AggregateFunc::MaxInt64
2549            | AggregateFunc::MaxUInt16
2550            | AggregateFunc::MaxUInt32
2551            | AggregateFunc::MaxUInt64
2552            | AggregateFunc::MaxMzTimestamp
2553            | AggregateFunc::MaxFloat32
2554            | AggregateFunc::MaxFloat64
2555            | AggregateFunc::MaxBool
2556            // Note AggregateFunc::MaxString, MinString rely on returning input
2557            // type as output type to support the proper return type for
2558            // character input.
2559            | AggregateFunc::MaxString
2560            | AggregateFunc::MaxDate
2561            | AggregateFunc::MaxTimestamp
2562            | AggregateFunc::MaxTimestampTz
2563            | AggregateFunc::MaxInterval
2564            | AggregateFunc::MaxTime
2565            | AggregateFunc::MinNumeric
2566            | AggregateFunc::MinInt16
2567            | AggregateFunc::MinInt32
2568            | AggregateFunc::MinInt64
2569            | AggregateFunc::MinUInt16
2570            | AggregateFunc::MinUInt32
2571            | AggregateFunc::MinUInt64
2572            | AggregateFunc::MinMzTimestamp
2573            | AggregateFunc::MinFloat32
2574            | AggregateFunc::MinFloat64
2575            | AggregateFunc::MinBool
2576            | AggregateFunc::MinString
2577            | AggregateFunc::MinDate
2578            | AggregateFunc::MinTimestamp
2579            | AggregateFunc::MinTimestampTz
2580            | AggregateFunc::MinInterval
2581            | AggregateFunc::MinTime
2582            | AggregateFunc::SumFloat32
2583            | AggregateFunc::SumFloat64
2584            | AggregateFunc::SumNumeric => input_type.scalar_type.clone(),
2585        };
2586        // Count never produces null, and other aggregations only produce
2587        // null in the presence of null inputs.
2588        let nullable = match self {
2589            AggregateFunc::Count => false,
2590            // Use the nullability of the underlying column being aggregated, not the Records wrapping it
2591            AggregateFunc::StringAgg { .. } => match input_type.scalar_type {
2592                // The outer Record wraps the input in the first position, and any ORDER BY expressions afterwards
2593                SqlScalarType::Record { fields, .. } => match &fields[0].1.scalar_type {
2594                    // The inner Record is a (value, separator) tuple
2595                    SqlScalarType::Record { fields, .. } => fields[0].1.nullable,
2596                    _ => unreachable!(),
2597                },
2598                _ => unreachable!(),
2599            },
2600            _ => input_type.nullable,
2601        };
2602        scalar_type.nullable(nullable)
2603    }
2604
2605    /// Compute output type for ROW_NUMBER, RANK, DENSE_RANK
2606    fn output_type_ranking_window_funcs(
2607        input_type: &SqlColumnType,
2608        col_name: &str,
2609    ) -> SqlScalarType {
2610        match input_type.scalar_type {
2611            SqlScalarType::Record { ref fields, .. } => SqlScalarType::List {
2612                element_type: Box::new(SqlScalarType::Record {
2613                    fields: [
2614                        (
2615                            ColumnName::from(col_name),
2616                            SqlScalarType::Int64.nullable(false),
2617                        ),
2618                        (ColumnName::from("?orig_row?"), {
2619                            let inner = match &fields[0].1.scalar_type {
2620                                SqlScalarType::List { element_type, .. } => element_type.clone(),
2621                                _ => unreachable!(),
2622                            };
2623                            inner.nullable(false)
2624                        }),
2625                    ]
2626                    .into(),
2627                    custom_id: None,
2628                }),
2629                custom_id: None,
2630            },
2631            _ => unreachable!(),
2632        }
2633    }
2634
2635    /// Given the `EncodedArgs` part of `((OriginalRow, EncodedArgs), OrderByExprs...)`,
2636    /// this computes the type of the first field of the output type. (The first field is the
2637    /// real result, the rest is the original row.)
2638    fn lag_lead_output_type_inner_from_encoded_args(
2639        encoded_args_type: &SqlScalarType,
2640    ) -> SqlColumnType {
2641        // lag/lead have 3 arguments, and the output type is
2642        // the same as the first of these, but always nullable. (It's null when the
2643        // lag/lead computation reaches over the bounds of the window partition.)
2644        encoded_args_type.unwrap_record_element_type()[0]
2645            .clone()
2646            .nullable(true)
2647    }
2648
2649    fn lag_lead_result_column_name(lag_lead_type: &LagLeadType) -> ColumnName {
2650        ColumnName::from(match lag_lead_type {
2651            LagLeadType::Lag => "?lag?",
2652            LagLeadType::Lead => "?lead?",
2653        })
2654    }
2655
2656    /// Returns true if the non-null constraint on the aggregation can be
2657    /// converted into a non-null constraint on its parameter expression, ie.
2658    /// whether the result of the aggregation is null if all the input values
2659    /// are null.
2660    pub fn propagates_nonnull_constraint(&self) -> bool {
2661        match self {
2662            AggregateFunc::MaxNumeric
2663            | AggregateFunc::MaxInt16
2664            | AggregateFunc::MaxInt32
2665            | AggregateFunc::MaxInt64
2666            | AggregateFunc::MaxUInt16
2667            | AggregateFunc::MaxUInt32
2668            | AggregateFunc::MaxUInt64
2669            | AggregateFunc::MaxMzTimestamp
2670            | AggregateFunc::MaxFloat32
2671            | AggregateFunc::MaxFloat64
2672            | AggregateFunc::MaxBool
2673            | AggregateFunc::MaxString
2674            | AggregateFunc::MaxDate
2675            | AggregateFunc::MaxTimestamp
2676            | AggregateFunc::MaxTimestampTz
2677            | AggregateFunc::MinNumeric
2678            | AggregateFunc::MinInt16
2679            | AggregateFunc::MinInt32
2680            | AggregateFunc::MinInt64
2681            | AggregateFunc::MinUInt16
2682            | AggregateFunc::MinUInt32
2683            | AggregateFunc::MinUInt64
2684            | AggregateFunc::MinMzTimestamp
2685            | AggregateFunc::MinFloat32
2686            | AggregateFunc::MinFloat64
2687            | AggregateFunc::MinBool
2688            | AggregateFunc::MinString
2689            | AggregateFunc::MinDate
2690            | AggregateFunc::MinTimestamp
2691            | AggregateFunc::MinTimestampTz
2692            | AggregateFunc::SumInt16
2693            | AggregateFunc::SumInt32
2694            | AggregateFunc::SumInt64
2695            | AggregateFunc::SumUInt16
2696            | AggregateFunc::SumUInt32
2697            | AggregateFunc::SumUInt64
2698            | AggregateFunc::SumFloat32
2699            | AggregateFunc::SumFloat64
2700            | AggregateFunc::SumNumeric
2701            | AggregateFunc::StringAgg { .. } => true,
2702            // Count is never null
2703            AggregateFunc::Count => false,
2704            _ => false,
2705        }
2706    }
2707}
2708
2709fn jsonb_each<'a>(a: Datum<'a>) -> impl Iterator<Item = (Row, Diff)> + 'a {
2710    // First produce a map, so that a common iterator can be returned.
2711    let map = match a {
2712        Datum::Map(dict) => dict,
2713        _ => mz_repr::DatumMap::empty(),
2714    };
2715
2716    map.iter()
2717        .map(move |(k, v)| (Row::pack_slice(&[Datum::String(k), v]), Diff::ONE))
2718}
2719
2720fn jsonb_each_stringify<'a>(
2721    a: Datum<'a>,
2722    temp_storage: &'a RowArena,
2723) -> impl Iterator<Item = (Row, Diff)> + 'a {
2724    // First produce a map, so that a common iterator can be returned.
2725    let map = match a {
2726        Datum::Map(dict) => dict,
2727        _ => mz_repr::DatumMap::empty(),
2728    };
2729
2730    map.iter().map(move |(k, mut v)| {
2731        v = jsonb_stringify(v, temp_storage)
2732            .map(Datum::String)
2733            .unwrap_or(Datum::Null);
2734        (Row::pack_slice(&[Datum::String(k), v]), Diff::ONE)
2735    })
2736}
2737
2738fn jsonb_object_keys<'a>(a: Datum<'a>) -> impl Iterator<Item = (Row, Diff)> + 'a {
2739    let map = match a {
2740        Datum::Map(dict) => dict,
2741        _ => mz_repr::DatumMap::empty(),
2742    };
2743
2744    map.iter()
2745        .map(move |(k, _)| (Row::pack_slice(&[Datum::String(k)]), Diff::ONE))
2746}
2747
2748fn jsonb_array_elements<'a>(a: Datum<'a>) -> impl Iterator<Item = (Row, Diff)> + 'a {
2749    let list = match a {
2750        Datum::List(list) => list,
2751        _ => mz_repr::DatumList::empty(),
2752    };
2753    list.iter().map(move |e| (Row::pack_slice(&[e]), Diff::ONE))
2754}
2755
2756fn jsonb_array_elements_stringify<'a>(
2757    a: Datum<'a>,
2758    temp_storage: &'a RowArena,
2759) -> impl Iterator<Item = (Row, Diff)> + 'a {
2760    let list = match a {
2761        Datum::List(list) => list,
2762        _ => mz_repr::DatumList::empty(),
2763    };
2764    list.iter().map(move |mut e| {
2765        e = jsonb_stringify(e, temp_storage)
2766            .map(Datum::String)
2767            .unwrap_or(Datum::Null);
2768        (Row::pack_slice(&[e]), Diff::ONE)
2769    })
2770}
2771
2772fn regexp_extract(a: Datum, r: &AnalyzedRegex) -> Option<(Row, Diff)> {
2773    let r = r.inner();
2774    let a = a.unwrap_str();
2775    let captures = r.captures(a)?;
2776    let datums = captures
2777        .iter()
2778        .skip(1)
2779        .map(|m| Datum::from(m.map(|m| m.as_str())));
2780    Some((Row::pack(datums), Diff::ONE))
2781}
2782
2783fn regexp_matches<'a, 'r: 'a>(
2784    exprs: &[Datum<'a>],
2785) -> Result<impl Iterator<Item = (Row, Diff)> + 'a, EvalError> {
2786    // There are only two acceptable ways to call this function:
2787    // 1. regexp_matches(string, regex)
2788    // 2. regexp_matches(string, regex, flag)
2789    assert!(exprs.len() == 2 || exprs.len() == 3);
2790    let a = exprs[0].unwrap_str();
2791    let r = exprs[1].unwrap_str();
2792
2793    let (regex, opts) = if exprs.len() == 3 {
2794        let flag = exprs[2].unwrap_str();
2795        let opts = AnalyzedRegexOpts::from_str(flag)?;
2796        (AnalyzedRegex::new(r, opts)?, opts)
2797    } else {
2798        let opts = AnalyzedRegexOpts::default();
2799        (AnalyzedRegex::new(r, opts)?, opts)
2800    };
2801
2802    let regex = regex.inner().clone();
2803
2804    let iter = regex.captures_iter(a).map(move |captures| {
2805        let matches = captures
2806            .iter()
2807            // The first match is the *entire* match, we want the capture groups by themselves.
2808            .skip(1)
2809            .map(|m| Datum::from(m.map(|m| m.as_str())))
2810            .collect::<Vec<_>>();
2811
2812        let mut binding = SharedRow::get();
2813        let mut packer = binding.packer();
2814
2815        let dimension = ArrayDimension {
2816            lower_bound: 1,
2817            length: matches.len(),
2818        };
2819        packer
2820            .try_push_array(&[dimension], matches)
2821            .expect("generated dimensions above");
2822
2823        (binding.clone(), Diff::ONE)
2824    });
2825
2826    // This is slightly unfortunate, but we need to collect the captures into a
2827    // Vec before we can yield them, because we can't return a iter with a
2828    // reference to the local `regex` variable.
2829    // We attempt to minimize the cost of this by using a SmallVec.
2830    let out = iter.collect::<SmallVec<[_; 3]>>();
2831
2832    if opts.global {
2833        Ok(Either::Left(out.into_iter()))
2834    } else {
2835        Ok(Either::Right(out.into_iter().take(1)))
2836    }
2837}
2838
2839fn generate_series<N>(
2840    start: N,
2841    stop: N,
2842    step: N,
2843) -> Result<impl Iterator<Item = (Row, Diff)>, EvalError>
2844where
2845    N: Integer + Signed + CheckedAdd + Clone,
2846    Datum<'static>: From<N>,
2847{
2848    if step == N::zero() {
2849        return Err(EvalError::InvalidParameterValue(
2850            "step size cannot equal zero".into(),
2851        ));
2852    }
2853    Ok(num::range_step_inclusive(start, stop, step)
2854        .map(move |i| (Row::pack_slice(&[Datum::from(i)]), Diff::ONE)))
2855}
2856
2857/// Like
2858/// [`num::range_step_inclusive`](https://github.com/rust-num/num-iter/blob/ddb14c1e796d401014c6c7a727de61d8109ad986/src/lib.rs#L279),
2859/// but for our timestamp types using [`Interval`] for `step`.xwxw
2860#[derive(Clone)]
2861pub struct TimestampRangeStepInclusive<T> {
2862    state: CheckedTimestamp<T>,
2863    stop: CheckedTimestamp<T>,
2864    step: Interval,
2865    rev: bool,
2866    done: bool,
2867}
2868
2869impl<T: TimestampLike> Iterator for TimestampRangeStepInclusive<T> {
2870    type Item = CheckedTimestamp<T>;
2871
2872    #[inline]
2873    fn next(&mut self) -> Option<CheckedTimestamp<T>> {
2874        if !self.done
2875            && ((self.rev && self.state >= self.stop) || (!self.rev && self.state <= self.stop))
2876        {
2877            let result = self.state.clone();
2878            match add_timestamp_months(self.state.deref(), self.step.months) {
2879                Ok(state) => match state.checked_add_signed(self.step.duration_as_chrono()) {
2880                    Some(v) => match CheckedTimestamp::from_timestamplike(v) {
2881                        Ok(v) => self.state = v,
2882                        Err(_) => self.done = true,
2883                    },
2884                    None => self.done = true,
2885                },
2886                Err(..) => {
2887                    self.done = true;
2888                }
2889            }
2890
2891            Some(result)
2892        } else {
2893            None
2894        }
2895    }
2896}
2897
2898fn generate_series_ts<T: TimestampLike>(
2899    start: CheckedTimestamp<T>,
2900    stop: CheckedTimestamp<T>,
2901    step: Interval,
2902    conv: fn(CheckedTimestamp<T>) -> Datum<'static>,
2903) -> Result<impl Iterator<Item = (Row, Diff)>, EvalError> {
2904    let normalized_step = step.as_microseconds();
2905    if normalized_step == 0 {
2906        return Err(EvalError::InvalidParameterValue(
2907            "step size cannot equal zero".into(),
2908        ));
2909    }
2910    let rev = normalized_step < 0;
2911
2912    let trsi = TimestampRangeStepInclusive {
2913        state: start,
2914        stop,
2915        step,
2916        rev,
2917        done: false,
2918    };
2919
2920    Ok(trsi.map(move |i| (Row::pack_slice(&[conv(i)]), Diff::ONE)))
2921}
2922
2923fn generate_subscripts_array(
2924    a: Datum,
2925    dim: i32,
2926) -> Result<Box<dyn Iterator<Item = (Row, Diff)>>, EvalError> {
2927    if dim <= 0 {
2928        return Ok(Box::new(iter::empty()));
2929    }
2930
2931    match a.unwrap_array().dims().into_iter().nth(
2932        (dim - 1)
2933            .try_into()
2934            .map_err(|_| EvalError::Int32OutOfRange((dim - 1).to_string().into()))?,
2935    ) {
2936        Some(requested_dim) => Ok(Box::new(generate_series::<i32>(
2937            requested_dim.lower_bound.try_into().map_err(|_| {
2938                EvalError::Int32OutOfRange(requested_dim.lower_bound.to_string().into())
2939            })?,
2940            requested_dim
2941                .length
2942                .try_into()
2943                .map_err(|_| EvalError::Int32OutOfRange(requested_dim.length.to_string().into()))?,
2944            1,
2945        )?)),
2946        None => Ok(Box::new(iter::empty())),
2947    }
2948}
2949
2950fn unnest_array<'a>(a: Datum<'a>) -> impl Iterator<Item = (Row, Diff)> + 'a {
2951    a.unwrap_array()
2952        .elements()
2953        .iter()
2954        .map(move |e| (Row::pack_slice(&[e]), Diff::ONE))
2955}
2956
2957fn unnest_list<'a>(a: Datum<'a>) -> impl Iterator<Item = (Row, Diff)> + 'a {
2958    a.unwrap_list()
2959        .iter()
2960        .map(move |e| (Row::pack_slice(&[e]), Diff::ONE))
2961}
2962
2963fn unnest_map<'a>(a: Datum<'a>) -> impl Iterator<Item = (Row, Diff)> + 'a {
2964    a.unwrap_map()
2965        .iter()
2966        .map(move |(k, v)| (Row::pack_slice(&[Datum::from(k), v]), Diff::ONE))
2967}
2968
2969impl AggregateFunc {
2970    /// The base function name without the `~[...]` suffix used when rendering
2971    /// variants that represent a parameterized function family.
2972    pub fn name(&self) -> &'static str {
2973        match self {
2974            Self::MaxNumeric => "max",
2975            Self::MaxInt16 => "max",
2976            Self::MaxInt32 => "max",
2977            Self::MaxInt64 => "max",
2978            Self::MaxUInt16 => "max",
2979            Self::MaxUInt32 => "max",
2980            Self::MaxUInt64 => "max",
2981            Self::MaxMzTimestamp => "max",
2982            Self::MaxFloat32 => "max",
2983            Self::MaxFloat64 => "max",
2984            Self::MaxBool => "max",
2985            Self::MaxString => "max",
2986            Self::MaxDate => "max",
2987            Self::MaxTimestamp => "max",
2988            Self::MaxTimestampTz => "max",
2989            Self::MaxInterval => "max",
2990            Self::MaxTime => "max",
2991            Self::MinNumeric => "min",
2992            Self::MinInt16 => "min",
2993            Self::MinInt32 => "min",
2994            Self::MinInt64 => "min",
2995            Self::MinUInt16 => "min",
2996            Self::MinUInt32 => "min",
2997            Self::MinUInt64 => "min",
2998            Self::MinMzTimestamp => "min",
2999            Self::MinFloat32 => "min",
3000            Self::MinFloat64 => "min",
3001            Self::MinBool => "min",
3002            Self::MinString => "min",
3003            Self::MinDate => "min",
3004            Self::MinTimestamp => "min",
3005            Self::MinTimestampTz => "min",
3006            Self::MinInterval => "min",
3007            Self::MinTime => "min",
3008            Self::SumInt16 => "sum",
3009            Self::SumInt32 => "sum",
3010            Self::SumInt64 => "sum",
3011            Self::SumUInt16 => "sum",
3012            Self::SumUInt32 => "sum",
3013            Self::SumUInt64 => "sum",
3014            Self::SumFloat32 => "sum",
3015            Self::SumFloat64 => "sum",
3016            Self::SumNumeric => "sum",
3017            Self::Count => "count",
3018            Self::Any => "any",
3019            Self::All => "all",
3020            Self::JsonbAgg { .. } => "jsonb_agg",
3021            Self::JsonbObjectAgg { .. } => "jsonb_object_agg",
3022            Self::MapAgg { .. } => "map_agg",
3023            Self::ArrayConcat { .. } => "array_agg",
3024            Self::ListConcat { .. } => "list_agg",
3025            Self::StringAgg { .. } => "string_agg",
3026            Self::RowNumber { .. } => "row_number",
3027            Self::Rank { .. } => "rank",
3028            Self::DenseRank { .. } => "dense_rank",
3029            Self::LagLead {
3030                lag_lead: LagLeadType::Lag,
3031                ..
3032            } => "lag",
3033            Self::LagLead {
3034                lag_lead: LagLeadType::Lead,
3035                ..
3036            } => "lead",
3037            Self::FirstValue { .. } => "first_value",
3038            Self::LastValue { .. } => "last_value",
3039            Self::WindowAggregate { .. } => "window_agg",
3040            Self::FusedValueWindowFunc { .. } => "fused_value_window_func",
3041            Self::FusedWindowAggregate { .. } => "fused_window_agg",
3042            Self::Dummy => "dummy",
3043        }
3044    }
3045}
3046
3047impl<'a, M> fmt::Display for HumanizedExpr<'a, AggregateFunc, M>
3048where
3049    M: HumanizerMode,
3050{
3051    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
3052        use AggregateFunc::*;
3053        let name = self.expr.name();
3054        match self.expr {
3055            JsonbAgg { order_by }
3056            | JsonbObjectAgg { order_by }
3057            | MapAgg { order_by, .. }
3058            | ArrayConcat { order_by }
3059            | ListConcat { order_by }
3060            | StringAgg { order_by }
3061            | RowNumber { order_by }
3062            | Rank { order_by }
3063            | DenseRank { order_by } => {
3064                let order_by = order_by.iter().map(|col| self.child(col));
3065                write!(f, "{}[order_by=[{}]]", name, separated(", ", order_by))
3066            }
3067            LagLead {
3068                lag_lead: _,
3069                ignore_nulls,
3070                order_by,
3071            } => {
3072                let order_by = order_by.iter().map(|col| self.child(col));
3073                f.write_str(name)?;
3074                f.write_str("[")?;
3075                if *ignore_nulls {
3076                    f.write_str("ignore_nulls=true, ")?;
3077                }
3078                write!(f, "order_by=[{}]", separated(", ", order_by))?;
3079                f.write_str("]")
3080            }
3081            FirstValue {
3082                order_by,
3083                window_frame,
3084            } => {
3085                let order_by = order_by.iter().map(|col| self.child(col));
3086                f.write_str(name)?;
3087                f.write_str("[")?;
3088                write!(f, "order_by=[{}]", separated(", ", order_by))?;
3089                if *window_frame != WindowFrame::default() {
3090                    write!(f, " {}", window_frame)?;
3091                }
3092                f.write_str("]")
3093            }
3094            LastValue {
3095                order_by,
3096                window_frame,
3097            } => {
3098                let order_by = order_by.iter().map(|col| self.child(col));
3099                f.write_str(name)?;
3100                f.write_str("[")?;
3101                write!(f, "order_by=[{}]", separated(", ", order_by))?;
3102                if *window_frame != WindowFrame::default() {
3103                    write!(f, " {}", window_frame)?;
3104                }
3105                f.write_str("]")
3106            }
3107            WindowAggregate {
3108                wrapped_aggregate,
3109                order_by,
3110                window_frame,
3111            } => {
3112                let order_by = order_by.iter().map(|col| self.child(col));
3113                let wrapped_aggregate = self.child(wrapped_aggregate.deref());
3114                f.write_str(name)?;
3115                f.write_str("[")?;
3116                write!(f, "{} ", wrapped_aggregate)?;
3117                write!(f, "order_by=[{}]", separated(", ", order_by))?;
3118                if *window_frame != WindowFrame::default() {
3119                    write!(f, " {}", window_frame)?;
3120                }
3121                f.write_str("]")
3122            }
3123            FusedValueWindowFunc { funcs, order_by } => {
3124                let order_by = order_by.iter().map(|col| self.child(col));
3125                let funcs = separated(", ", funcs.iter().map(|func| self.child(func)));
3126                f.write_str(name)?;
3127                f.write_str("[")?;
3128                write!(f, "{} ", funcs)?;
3129                write!(f, "order_by=[{}]", separated(", ", order_by))?;
3130                f.write_str("]")
3131            }
3132            _ => f.write_str(name),
3133        }
3134    }
3135}
3136
3137#[derive(
3138    Clone,
3139    Debug,
3140    Eq,
3141    PartialEq,
3142    Ord,
3143    PartialOrd,
3144    Serialize,
3145    Deserialize,
3146    Hash,
3147    MzReflect
3148)]
3149pub struct CaptureGroupDesc {
3150    pub index: u32,
3151    pub name: Option<String>,
3152    pub nullable: bool,
3153}
3154
3155#[derive(
3156    Clone,
3157    Copy,
3158    Debug,
3159    Eq,
3160    PartialEq,
3161    Ord,
3162    PartialOrd,
3163    Serialize,
3164    Deserialize,
3165    Hash,
3166    MzReflect,
3167    Default
3168)]
3169pub struct AnalyzedRegexOpts {
3170    pub case_insensitive: bool,
3171    pub global: bool,
3172}
3173
3174impl FromStr for AnalyzedRegexOpts {
3175    type Err = EvalError;
3176
3177    fn from_str(s: &str) -> Result<Self, Self::Err> {
3178        let mut opts = AnalyzedRegexOpts::default();
3179        for c in s.chars() {
3180            match c {
3181                'i' => opts.case_insensitive = true,
3182                'g' => opts.global = true,
3183                _ => return Err(EvalError::InvalidRegexFlag(c)),
3184            }
3185        }
3186        Ok(opts)
3187    }
3188}
3189
3190#[derive(
3191    Clone,
3192    Debug,
3193    Eq,
3194    PartialEq,
3195    Ord,
3196    PartialOrd,
3197    Serialize,
3198    Deserialize,
3199    Hash,
3200    MzReflect
3201)]
3202pub struct AnalyzedRegex(ReprRegex, Vec<CaptureGroupDesc>, AnalyzedRegexOpts);
3203
3204impl AnalyzedRegex {
3205    pub fn new(s: &str, opts: AnalyzedRegexOpts) -> Result<Self, RegexCompilationError> {
3206        let r = ReprRegex::new(s, opts.case_insensitive)?;
3207        // TODO(benesch): remove potentially dangerous usage of `as`.
3208        #[allow(clippy::as_conversions)]
3209        let descs: Vec<_> = r
3210            .capture_names()
3211            .enumerate()
3212            // The first capture is the entire matched string.
3213            // This will often not be useful, so skip it.
3214            // If people want it they can just surround their
3215            // entire regex in an explicit capture group.
3216            .skip(1)
3217            .map(|(i, name)| CaptureGroupDesc {
3218                index: i as u32,
3219                name: name.map(String::from),
3220                // TODO -- we can do better.
3221                // https://github.com/MaterializeInc/database-issues/issues/612
3222                nullable: true,
3223            })
3224            .collect();
3225        Ok(Self(r, descs, opts))
3226    }
3227    pub fn capture_groups_len(&self) -> usize {
3228        self.1.len()
3229    }
3230    pub fn capture_groups_iter(&self) -> impl Iterator<Item = &CaptureGroupDesc> {
3231        self.1.iter()
3232    }
3233    pub fn inner(&self) -> &Regex {
3234        &(self.0).regex
3235    }
3236    pub fn opts(&self) -> &AnalyzedRegexOpts {
3237        &self.2
3238    }
3239}
3240
3241pub fn csv_extract(a: Datum<'_>, n_cols: usize) -> impl Iterator<Item = (Row, Diff)> + '_ {
3242    let bytes = a.unwrap_str().as_bytes();
3243    let mut row = Row::default();
3244    let csv_reader = csv::ReaderBuilder::new()
3245        .has_headers(false)
3246        .from_reader(bytes);
3247    csv_reader.into_records().filter_map(move |res| match res {
3248        Ok(sr) if sr.len() == n_cols => {
3249            row.packer().extend(sr.iter().map(Datum::String));
3250            Some((row.clone(), Diff::ONE))
3251        }
3252        _ => None,
3253    })
3254}
3255
3256pub fn repeat(a: Datum) -> Option<(Row, Diff)> {
3257    let n = a.unwrap_int64();
3258    if n != 0 {
3259        Some((Row::default(), n.into()))
3260    } else {
3261        None
3262    }
3263}
3264
3265fn wrap<'a>(datums: &'a [Datum<'a>], width: usize) -> impl Iterator<Item = (Row, Diff)> + 'a {
3266    datums
3267        .chunks(width)
3268        .map(|chunk| (Row::pack(chunk), Diff::ONE))
3269}
3270
3271fn acl_explode<'a>(
3272    acl_items: Datum<'a>,
3273    temp_storage: &'a RowArena,
3274) -> Result<impl Iterator<Item = (Row, Diff)> + 'a, EvalError> {
3275    let acl_items = acl_items.unwrap_array();
3276    let mut res = Vec::new();
3277    for acl_item in acl_items.elements().iter() {
3278        if acl_item.is_null() {
3279            return Err(EvalError::AclArrayNullElement);
3280        }
3281        let acl_item = acl_item.unwrap_acl_item();
3282        for privilege in acl_item.acl_mode.explode() {
3283            let row = [
3284                Datum::UInt32(acl_item.grantor.0),
3285                Datum::UInt32(acl_item.grantee.0),
3286                Datum::String(temp_storage.push_string(privilege.to_string())),
3287                // GRANT OPTION is not implemented, so we hardcode false.
3288                Datum::False,
3289            ];
3290            res.push((Row::pack_slice(&row), Diff::ONE));
3291        }
3292    }
3293    Ok(res.into_iter())
3294}
3295
3296fn mz_acl_explode<'a>(
3297    mz_acl_items: Datum<'a>,
3298    temp_storage: &'a RowArena,
3299) -> Result<impl Iterator<Item = (Row, Diff)> + 'a, EvalError> {
3300    let mz_acl_items = mz_acl_items.unwrap_array();
3301    let mut res = Vec::new();
3302    for mz_acl_item in mz_acl_items.elements().iter() {
3303        if mz_acl_item.is_null() {
3304            return Err(EvalError::MzAclArrayNullElement);
3305        }
3306        let mz_acl_item = mz_acl_item.unwrap_mz_acl_item();
3307        for privilege in mz_acl_item.acl_mode.explode() {
3308            let row = [
3309                Datum::String(temp_storage.push_string(mz_acl_item.grantor.to_string())),
3310                Datum::String(temp_storage.push_string(mz_acl_item.grantee.to_string())),
3311                Datum::String(temp_storage.push_string(privilege.to_string())),
3312                // GRANT OPTION is not implemented, so we hardcode false.
3313                Datum::False,
3314            ];
3315            res.push((Row::pack_slice(&row), Diff::ONE));
3316        }
3317    }
3318    Ok(res.into_iter())
3319}
3320
3321/// When adding a new `TableFunc` variant, please consider adding it to
3322/// `TableFunc::with_ordinality`!
3323#[derive(
3324    Clone,
3325    Debug,
3326    Eq,
3327    PartialEq,
3328    Ord,
3329    PartialOrd,
3330    Serialize,
3331    Deserialize,
3332    Hash,
3333    MzReflect
3334)]
3335pub enum TableFunc {
3336    AclExplode,
3337    MzAclExplode,
3338    JsonbEach,
3339    JsonbEachStringify,
3340    JsonbObjectKeys,
3341    JsonbArrayElements,
3342    JsonbArrayElementsStringify,
3343    RegexpExtract(AnalyzedRegex),
3344    CsvExtract(usize),
3345    GenerateSeriesInt32,
3346    GenerateSeriesInt64,
3347    GenerateSeriesTimestamp,
3348    GenerateSeriesTimestampTz,
3349    /// Supplied with an input count,
3350    ///   1. Adds a column as if a typed subquery result,
3351    ///   2. Filters the row away if the count is only one,
3352    ///   3. Errors if the count is not exactly one.
3353    /// The intent is that this presents as if a subquery result with too many
3354    /// records contributing. The error column has the same type as the result
3355    /// should have, but we only produce it if the count exceeds one.
3356    ///
3357    /// This logic could nearly be achieved with map, filter, project logic,
3358    /// but has been challenging to do in a way that respects the vagaries of
3359    /// SQL and our semantics. If we reveal a constant value in the column we
3360    /// risk the optimizer pruning the branch; if we reveal that this will not
3361    /// produce rows we risk the optimizer pruning the branch; if we reveal that
3362    /// the only possible value is an error we risk the optimizer propagating that
3363    /// error without guards.
3364    ///
3365    /// Before replacing this by an `MirScalarExpr`, quadruple check that it
3366    /// would not result in misoptimizations due to expression evaluation order
3367    /// being utterly undefined, and predicate pushdown trimming any fragments
3368    /// that might produce columns that will not be needed.
3369    GuardSubquerySize {
3370        column_type: SqlScalarType,
3371    },
3372    Repeat,
3373    UnnestArray {
3374        el_typ: SqlScalarType,
3375    },
3376    UnnestList {
3377        el_typ: SqlScalarType,
3378    },
3379    UnnestMap {
3380        value_type: SqlScalarType,
3381    },
3382    /// Given `n` input expressions, wraps them into `n / width` rows, each of
3383    /// `width` columns.
3384    ///
3385    /// This function is not intended to be called directly by end users, but
3386    /// is useful in the planning of e.g. VALUES clauses.
3387    Wrap {
3388        types: Vec<SqlColumnType>,
3389        width: usize,
3390    },
3391    GenerateSubscriptsArray,
3392    /// Execute some arbitrary scalar function as a table function.
3393    TabletizedScalar {
3394        name: String,
3395        relation: SqlRelationType,
3396    },
3397    RegexpMatches,
3398    /// Implements the WITH ORDINALITY clause.
3399    ///
3400    /// Don't construct `TableFunc::WithOrdinality` manually! Use the `with_ordinality` constructor
3401    /// function instead, which checks whether the given table function supports `WithOrdinality`.
3402    #[allow(private_interfaces)]
3403    WithOrdinality(WithOrdinality),
3404}
3405
3406/// Evaluates the inner table function, expands its results into unary (repeating each row as
3407/// many times as the diff indicates), and appends an integer corresponding to the ordinal
3408/// position (starting from 1). For example, it numbers the elements of a list when calling
3409/// `unnest_list`.
3410///
3411/// Private enum variant of `TableFunc`. Don't construct this directly, but use
3412/// `TableFunc::with_ordinality` instead.
3413#[derive(
3414    Clone,
3415    Debug,
3416    Eq,
3417    PartialEq,
3418    Ord,
3419    PartialOrd,
3420    Serialize,
3421    Deserialize,
3422    Hash,
3423    MzReflect
3424)]
3425struct WithOrdinality {
3426    inner: Box<TableFunc>,
3427}
3428
3429impl TableFunc {
3430    /// Adds `WITH ORDINALITY` to a table function if it's allowed on the given table function.
3431    pub fn with_ordinality(inner: TableFunc) -> Option<TableFunc> {
3432        match inner {
3433            TableFunc::AclExplode
3434            | TableFunc::MzAclExplode
3435            | TableFunc::JsonbEach
3436            | TableFunc::JsonbEachStringify
3437            | TableFunc::JsonbObjectKeys
3438            | TableFunc::JsonbArrayElements
3439            | TableFunc::JsonbArrayElementsStringify
3440            | TableFunc::RegexpExtract(_)
3441            | TableFunc::CsvExtract(_)
3442            | TableFunc::GenerateSeriesInt32
3443            | TableFunc::GenerateSeriesInt64
3444            | TableFunc::GenerateSeriesTimestamp
3445            | TableFunc::GenerateSeriesTimestampTz
3446            | TableFunc::GuardSubquerySize { .. }
3447            | TableFunc::Repeat
3448            | TableFunc::UnnestArray { .. }
3449            | TableFunc::UnnestList { .. }
3450            | TableFunc::UnnestMap { .. }
3451            | TableFunc::Wrap { .. }
3452            | TableFunc::GenerateSubscriptsArray
3453            | TableFunc::TabletizedScalar { .. }
3454            | TableFunc::RegexpMatches => Some(TableFunc::WithOrdinality(WithOrdinality {
3455                inner: Box::new(inner),
3456            })),
3457            // IMPORTANT: Before adding a new table function here, consider negative diffs:
3458            // `WithOrdinality::eval` will panic if the inner table function emits a negative diff.
3459            TableFunc::WithOrdinality(_) => None,
3460        }
3461    }
3462}
3463
3464impl TableFunc {
3465    /// Executes `self` on the given input row (`datums`).
3466    pub fn eval<'a>(
3467        &'a self,
3468        datums: &'a [Datum<'a>],
3469        temp_storage: &'a RowArena,
3470    ) -> Result<Box<dyn Iterator<Item = (Row, Diff)> + 'a>, EvalError> {
3471        if self.empty_on_null_input() && datums.iter().any(|d| d.is_null()) {
3472            return Ok(Box::new(vec![].into_iter()));
3473        }
3474        match self {
3475            TableFunc::AclExplode => Ok(Box::new(acl_explode(datums[0], temp_storage)?)),
3476            TableFunc::MzAclExplode => Ok(Box::new(mz_acl_explode(datums[0], temp_storage)?)),
3477            TableFunc::JsonbEach => Ok(Box::new(jsonb_each(datums[0]))),
3478            TableFunc::JsonbEachStringify => {
3479                Ok(Box::new(jsonb_each_stringify(datums[0], temp_storage)))
3480            }
3481            TableFunc::JsonbObjectKeys => Ok(Box::new(jsonb_object_keys(datums[0]))),
3482            TableFunc::JsonbArrayElements => Ok(Box::new(jsonb_array_elements(datums[0]))),
3483            TableFunc::JsonbArrayElementsStringify => Ok(Box::new(jsonb_array_elements_stringify(
3484                datums[0],
3485                temp_storage,
3486            ))),
3487            TableFunc::RegexpExtract(a) => Ok(Box::new(regexp_extract(datums[0], a).into_iter())),
3488            TableFunc::CsvExtract(n_cols) => Ok(Box::new(csv_extract(datums[0], *n_cols))),
3489            TableFunc::GenerateSeriesInt32 => {
3490                let res = generate_series(
3491                    datums[0].unwrap_int32(),
3492                    datums[1].unwrap_int32(),
3493                    datums[2].unwrap_int32(),
3494                )?;
3495                Ok(Box::new(res))
3496            }
3497            TableFunc::GenerateSeriesInt64 => {
3498                let res = generate_series(
3499                    datums[0].unwrap_int64(),
3500                    datums[1].unwrap_int64(),
3501                    datums[2].unwrap_int64(),
3502                )?;
3503                Ok(Box::new(res))
3504            }
3505            TableFunc::GenerateSeriesTimestamp => {
3506                fn pass_through<'a>(d: CheckedTimestamp<NaiveDateTime>) -> Datum<'a> {
3507                    Datum::from(d)
3508                }
3509                let res = generate_series_ts(
3510                    datums[0].unwrap_timestamp(),
3511                    datums[1].unwrap_timestamp(),
3512                    datums[2].unwrap_interval(),
3513                    pass_through,
3514                )?;
3515                Ok(Box::new(res))
3516            }
3517            TableFunc::GenerateSeriesTimestampTz => {
3518                fn gen_ts_tz<'a>(d: CheckedTimestamp<DateTime<Utc>>) -> Datum<'a> {
3519                    Datum::from(d)
3520                }
3521                let res = generate_series_ts(
3522                    datums[0].unwrap_timestamptz(),
3523                    datums[1].unwrap_timestamptz(),
3524                    datums[2].unwrap_interval(),
3525                    gen_ts_tz,
3526                )?;
3527                Ok(Box::new(res))
3528            }
3529            TableFunc::GenerateSubscriptsArray => {
3530                generate_subscripts_array(datums[0], datums[1].unwrap_int32())
3531            }
3532            TableFunc::GuardSubquerySize { column_type: _ } => {
3533                // We error if the count is not one,
3534                // and produce no rows if equal to one.
3535                let count = datums[0].unwrap_int64();
3536                if count != 1 {
3537                    Err(EvalError::MultipleRowsFromSubquery)
3538                } else {
3539                    Ok(Box::new([].into_iter()))
3540                }
3541            }
3542            TableFunc::Repeat => Ok(Box::new(repeat(datums[0]).into_iter())),
3543            TableFunc::UnnestArray { .. } => Ok(Box::new(unnest_array(datums[0]))),
3544            TableFunc::UnnestList { .. } => Ok(Box::new(unnest_list(datums[0]))),
3545            TableFunc::UnnestMap { .. } => Ok(Box::new(unnest_map(datums[0]))),
3546            TableFunc::Wrap { width, .. } => Ok(Box::new(wrap(datums, *width))),
3547            TableFunc::TabletizedScalar { .. } => {
3548                let r = Row::pack_slice(datums);
3549                Ok(Box::new(std::iter::once((r, Diff::ONE))))
3550            }
3551            TableFunc::RegexpMatches => Ok(Box::new(regexp_matches(datums)?)),
3552            TableFunc::WithOrdinality(func_with_ordinality) => {
3553                func_with_ordinality.eval(datums, temp_storage)
3554            }
3555        }
3556    }
3557
3558    pub fn output_type(&self) -> SqlRelationType {
3559        let (column_types, keys) = match self {
3560            TableFunc::AclExplode => {
3561                let column_types = vec![
3562                    SqlScalarType::Oid.nullable(false),
3563                    SqlScalarType::Oid.nullable(false),
3564                    SqlScalarType::String.nullable(false),
3565                    SqlScalarType::Bool.nullable(false),
3566                ];
3567                let keys = vec![];
3568                (column_types, keys)
3569            }
3570            TableFunc::MzAclExplode => {
3571                let column_types = vec![
3572                    SqlScalarType::String.nullable(false),
3573                    SqlScalarType::String.nullable(false),
3574                    SqlScalarType::String.nullable(false),
3575                    SqlScalarType::Bool.nullable(false),
3576                ];
3577                let keys = vec![];
3578                (column_types, keys)
3579            }
3580            TableFunc::JsonbEach => {
3581                let column_types = vec![
3582                    SqlScalarType::String.nullable(false),
3583                    SqlScalarType::Jsonb.nullable(false),
3584                ];
3585                let keys = vec![];
3586                (column_types, keys)
3587            }
3588            TableFunc::JsonbEachStringify => {
3589                let column_types = vec![
3590                    SqlScalarType::String.nullable(false),
3591                    SqlScalarType::String.nullable(true),
3592                ];
3593                let keys = vec![];
3594                (column_types, keys)
3595            }
3596            TableFunc::JsonbObjectKeys => {
3597                let column_types = vec![SqlScalarType::String.nullable(false)];
3598                let keys = vec![];
3599                (column_types, keys)
3600            }
3601            TableFunc::JsonbArrayElements => {
3602                let column_types = vec![SqlScalarType::Jsonb.nullable(false)];
3603                let keys = vec![];
3604                (column_types, keys)
3605            }
3606            TableFunc::JsonbArrayElementsStringify => {
3607                let column_types = vec![SqlScalarType::String.nullable(true)];
3608                let keys = vec![];
3609                (column_types, keys)
3610            }
3611            TableFunc::RegexpExtract(a) => {
3612                let column_types = a
3613                    .capture_groups_iter()
3614                    .map(|cg| SqlScalarType::String.nullable(cg.nullable))
3615                    .collect();
3616                let keys = vec![];
3617                (column_types, keys)
3618            }
3619            TableFunc::CsvExtract(n_cols) => {
3620                let column_types = iter::repeat(SqlScalarType::String.nullable(false))
3621                    .take(*n_cols)
3622                    .collect();
3623                let keys = vec![];
3624                (column_types, keys)
3625            }
3626            TableFunc::GenerateSeriesInt32 => {
3627                let column_types = vec![SqlScalarType::Int32.nullable(false)];
3628                let keys = vec![vec![0]];
3629                (column_types, keys)
3630            }
3631            TableFunc::GenerateSeriesInt64 => {
3632                let column_types = vec![SqlScalarType::Int64.nullable(false)];
3633                let keys = vec![vec![0]];
3634                (column_types, keys)
3635            }
3636            TableFunc::GenerateSeriesTimestamp => {
3637                let column_types =
3638                    vec![SqlScalarType::Timestamp { precision: None }.nullable(false)];
3639                let keys = vec![vec![0]];
3640                (column_types, keys)
3641            }
3642            TableFunc::GenerateSeriesTimestampTz => {
3643                let column_types =
3644                    vec![SqlScalarType::TimestampTz { precision: None }.nullable(false)];
3645                let keys = vec![vec![0]];
3646                (column_types, keys)
3647            }
3648            TableFunc::GenerateSubscriptsArray => {
3649                let column_types = vec![SqlScalarType::Int32.nullable(false)];
3650                let keys = vec![vec![0]];
3651                (column_types, keys)
3652            }
3653            TableFunc::GuardSubquerySize { column_type } => {
3654                let column_types = vec![column_type.clone().nullable(false)];
3655                let keys = vec![];
3656                (column_types, keys)
3657            }
3658            TableFunc::Repeat => {
3659                let column_types = vec![];
3660                let keys = vec![];
3661                (column_types, keys)
3662            }
3663            TableFunc::UnnestArray { el_typ } => {
3664                let column_types = vec![el_typ.clone().nullable(true)];
3665                let keys = vec![];
3666                (column_types, keys)
3667            }
3668            TableFunc::UnnestList { el_typ } => {
3669                let column_types = vec![el_typ.clone().nullable(true)];
3670                let keys = vec![];
3671                (column_types, keys)
3672            }
3673            TableFunc::UnnestMap { value_type } => {
3674                let column_types = vec![
3675                    SqlScalarType::String.nullable(false),
3676                    value_type.clone().nullable(true),
3677                ];
3678                let keys = vec![vec![0]];
3679                (column_types, keys)
3680            }
3681            TableFunc::Wrap { types, .. } => {
3682                let column_types = types.clone();
3683                let keys = vec![];
3684                (column_types, keys)
3685            }
3686            TableFunc::TabletizedScalar { relation, .. } => {
3687                return relation.clone();
3688            }
3689            TableFunc::RegexpMatches => {
3690                let column_types =
3691                    vec![SqlScalarType::Array(Box::new(SqlScalarType::String)).nullable(false)];
3692                let keys = vec![];
3693
3694                (column_types, keys)
3695            }
3696            TableFunc::WithOrdinality(WithOrdinality { inner }) => {
3697                let mut typ = inner.output_type();
3698                // Add the ordinality column.
3699                typ.column_types.push(SqlScalarType::Int64.nullable(false));
3700                // The ordinality column is always a key.
3701                typ.keys.push(vec![typ.column_types.len() - 1]);
3702                (typ.column_types, typ.keys)
3703            }
3704        };
3705
3706        soft_assert_eq_no_log!(column_types.len(), self.output_arity());
3707
3708        if !keys.is_empty() {
3709            SqlRelationType::new(column_types).with_keys(keys)
3710        } else {
3711            SqlRelationType::new(column_types)
3712        }
3713    }
3714
3715    pub fn output_arity(&self) -> usize {
3716        match self {
3717            TableFunc::AclExplode => 4,
3718            TableFunc::MzAclExplode => 4,
3719            TableFunc::JsonbEach => 2,
3720            TableFunc::JsonbEachStringify => 2,
3721            TableFunc::JsonbObjectKeys => 1,
3722            TableFunc::JsonbArrayElements => 1,
3723            TableFunc::JsonbArrayElementsStringify => 1,
3724            TableFunc::RegexpExtract(a) => a.capture_groups_len(),
3725            TableFunc::CsvExtract(n_cols) => *n_cols,
3726            TableFunc::GenerateSeriesInt32 => 1,
3727            TableFunc::GenerateSeriesInt64 => 1,
3728            TableFunc::GenerateSeriesTimestamp => 1,
3729            TableFunc::GenerateSeriesTimestampTz => 1,
3730            TableFunc::GenerateSubscriptsArray => 1,
3731            TableFunc::GuardSubquerySize { .. } => 1,
3732            TableFunc::Repeat => 0,
3733            TableFunc::UnnestArray { .. } => 1,
3734            TableFunc::UnnestList { .. } => 1,
3735            TableFunc::UnnestMap { .. } => 2,
3736            TableFunc::Wrap { width, .. } => *width,
3737            TableFunc::TabletizedScalar { relation, .. } => relation.column_types.len(),
3738            TableFunc::RegexpMatches => 1,
3739            TableFunc::WithOrdinality(WithOrdinality { inner }) => inner.output_arity() + 1,
3740        }
3741    }
3742
3743    pub fn empty_on_null_input(&self) -> bool {
3744        match self {
3745            TableFunc::AclExplode
3746            | TableFunc::MzAclExplode
3747            | TableFunc::JsonbEach
3748            | TableFunc::JsonbEachStringify
3749            | TableFunc::JsonbObjectKeys
3750            | TableFunc::JsonbArrayElements
3751            | TableFunc::JsonbArrayElementsStringify
3752            | TableFunc::GenerateSeriesInt32
3753            | TableFunc::GenerateSeriesInt64
3754            | TableFunc::GenerateSeriesTimestamp
3755            | TableFunc::GenerateSeriesTimestampTz
3756            | TableFunc::GenerateSubscriptsArray
3757            | TableFunc::RegexpExtract(_)
3758            | TableFunc::CsvExtract(_)
3759            | TableFunc::Repeat
3760            | TableFunc::UnnestArray { .. }
3761            | TableFunc::UnnestList { .. }
3762            | TableFunc::UnnestMap { .. }
3763            | TableFunc::RegexpMatches => true,
3764            TableFunc::GuardSubquerySize { .. } => false,
3765            TableFunc::Wrap { .. } => false,
3766            TableFunc::TabletizedScalar { .. } => false,
3767            TableFunc::WithOrdinality(WithOrdinality { inner }) => inner.empty_on_null_input(),
3768        }
3769    }
3770
3771    /// True iff the table function preserves the append-only property of its input.
3772    pub fn preserves_monotonicity(&self) -> bool {
3773        // Most variants preserve monotonicity, but all variants are enumerated to
3774        // ensure that added variants at least check that this is the case.
3775        match self {
3776            TableFunc::AclExplode => false,
3777            TableFunc::MzAclExplode => false,
3778            TableFunc::JsonbEach => true,
3779            TableFunc::JsonbEachStringify => true,
3780            TableFunc::JsonbObjectKeys => true,
3781            TableFunc::JsonbArrayElements => true,
3782            TableFunc::JsonbArrayElementsStringify => true,
3783            TableFunc::RegexpExtract(_) => true,
3784            TableFunc::CsvExtract(_) => true,
3785            TableFunc::GenerateSeriesInt32 => true,
3786            TableFunc::GenerateSeriesInt64 => true,
3787            TableFunc::GenerateSeriesTimestamp => true,
3788            TableFunc::GenerateSeriesTimestampTz => true,
3789            TableFunc::GenerateSubscriptsArray => true,
3790            TableFunc::Repeat => false,
3791            TableFunc::UnnestArray { .. } => true,
3792            TableFunc::UnnestList { .. } => true,
3793            TableFunc::UnnestMap { .. } => true,
3794            TableFunc::Wrap { .. } => true,
3795            TableFunc::TabletizedScalar { .. } => true,
3796            TableFunc::RegexpMatches => true,
3797            TableFunc::GuardSubquerySize { .. } => false,
3798            TableFunc::WithOrdinality(WithOrdinality { inner }) => inner.preserves_monotonicity(),
3799        }
3800    }
3801}
3802
3803impl fmt::Display for TableFunc {
3804    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
3805        match self {
3806            TableFunc::AclExplode => f.write_str("aclexplode"),
3807            TableFunc::MzAclExplode => f.write_str("mz_aclexplode"),
3808            TableFunc::JsonbEach => f.write_str("jsonb_each"),
3809            TableFunc::JsonbEachStringify => f.write_str("jsonb_each_text"),
3810            TableFunc::JsonbObjectKeys => f.write_str("jsonb_object_keys"),
3811            TableFunc::JsonbArrayElements => f.write_str("jsonb_array_elements"),
3812            TableFunc::JsonbArrayElementsStringify => f.write_str("jsonb_array_elements_text"),
3813            TableFunc::RegexpExtract(a) => write!(f, "regexp_extract({:?}, _)", a.0),
3814            TableFunc::CsvExtract(n_cols) => write!(f, "csv_extract({}, _)", n_cols),
3815            TableFunc::GenerateSeriesInt32 => f.write_str("generate_series"),
3816            TableFunc::GenerateSeriesInt64 => f.write_str("generate_series"),
3817            TableFunc::GenerateSeriesTimestamp => f.write_str("generate_series"),
3818            TableFunc::GenerateSeriesTimestampTz => f.write_str("generate_series"),
3819            TableFunc::GenerateSubscriptsArray => f.write_str("generate_subscripts"),
3820            TableFunc::GuardSubquerySize { .. } => f.write_str("guard_subquery_size"),
3821            TableFunc::Repeat => f.write_str("repeat_row"),
3822            TableFunc::UnnestArray { .. } => f.write_str("unnest_array"),
3823            TableFunc::UnnestList { .. } => f.write_str("unnest_list"),
3824            TableFunc::UnnestMap { .. } => f.write_str("unnest_map"),
3825            TableFunc::Wrap { width, .. } => write!(f, "wrap{}", width),
3826            TableFunc::TabletizedScalar { name, .. } => f.write_str(name),
3827            TableFunc::RegexpMatches => write!(f, "regexp_matches(_, _, _)"),
3828            TableFunc::WithOrdinality(WithOrdinality { inner }) => {
3829                write!(f, "{}[with_ordinality]", inner)
3830            }
3831        }
3832    }
3833}
3834
3835impl WithOrdinality {
3836    /// Executes the `self.inner` table function on the given input row (`datums`), and zips
3837    /// 1, 2, 3, ... to the result as a new column. We need to expand rows with non-1 diffs into the
3838    /// corresponding number of rows with unit diffs, because the ordinality column will have
3839    /// different values for each copy.
3840    ///
3841    /// # Panics
3842    ///
3843    /// Panics if the `inner` table function emits a negative diff.
3844    fn eval<'a>(
3845        &'a self,
3846        datums: &'a [Datum<'a>],
3847        temp_storage: &'a RowArena,
3848    ) -> Result<Box<dyn Iterator<Item = (Row, Diff)> + 'a>, EvalError> {
3849        let mut next_ordinal: i64 = 1;
3850        let it = self
3851            .inner
3852            .eval(datums, temp_storage)?
3853            .flat_map(move |(mut row, diff)| {
3854                let diff = diff.into_inner();
3855                // WITH ORDINALITY is not well-defined for negative diffs. This is ok, since the
3856                // only table function that can emit negative diffs is `repeat_row`, which is in
3857                // `mz_unsafe`, so users can never call it.
3858                //
3859                // (We also don't need to worry about negative diffs in FlatMap's input, because
3860                // the diff of the input of the FlatMap is factored in after we return from here.)
3861                assert!(diff >= 0);
3862                // The ordinals that will be associated with this row.
3863                let mut ordinals = next_ordinal..(next_ordinal + diff);
3864                next_ordinal += diff;
3865                // The maximum byte capacity we need for the original row and its ordinal.
3866                let cap = row.data_len() + datum_size(&Datum::Int64(next_ordinal));
3867                iter::from_fn(move || {
3868                    let ordinal = ordinals.next()?;
3869                    let mut row = if ordinals.is_empty() {
3870                        // This is the last row, so no need to clone. (Most table functions emit
3871                        // only 1 diffs, so this completely avoids cloning in most cases.)
3872                        std::mem::take(&mut row)
3873                    } else {
3874                        let mut new_row = Row::with_capacity(cap);
3875                        new_row.clone_from(&row);
3876                        new_row
3877                    };
3878                    RowPacker::for_existing_row(&mut row).push(Datum::Int64(ordinal));
3879                    Some((row, Diff::ONE))
3880                })
3881            });
3882        Ok(Box::new(it))
3883    }
3884}