mz_expr/relation/
func.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10#![allow(missing_docs)]
11
12use std::cmp::{max, min};
13use std::iter::Sum;
14use std::ops::Deref;
15use std::str::FromStr;
16use std::{fmt, iter};
17
18use chrono::{DateTime, NaiveDateTime, NaiveTime, Utc};
19use dec::OrderedDecimal;
20use itertools::{Either, Itertools};
21use mz_lowertest::MzReflect;
22use mz_ore::cast::CastFrom;
23
24use mz_ore::str::separated;
25use mz_ore::{soft_assert_eq_no_log, soft_assert_or_log};
26use mz_proto::{IntoRustIfSome, ProtoType, RustType, TryFromProtoError};
27use mz_repr::adt::array::ArrayDimension;
28use mz_repr::adt::date::Date;
29use mz_repr::adt::interval::Interval;
30use mz_repr::adt::numeric::{self, Numeric, NumericMaxScale};
31use mz_repr::adt::regex::Regex as ReprRegex;
32use mz_repr::adt::timestamp::{CheckedTimestamp, TimestampLike};
33use mz_repr::{
34    ColumnName, ColumnType, Datum, Diff, RelationType, Row, RowArena, RowPacker, ScalarType,
35    SharedRow, datum_size,
36};
37use num::{CheckedAdd, Integer, Signed, ToPrimitive};
38use ordered_float::OrderedFloat;
39use proptest::prelude::{Arbitrary, Just};
40use proptest::strategy::{BoxedStrategy, Strategy, Union};
41use proptest_derive::Arbitrary;
42use regex::Regex;
43use serde::{Deserialize, Serialize};
44use smallvec::SmallVec;
45
46use crate::EvalError;
47use crate::WindowFrameBound::{
48    CurrentRow, OffsetFollowing, OffsetPreceding, UnboundedFollowing, UnboundedPreceding,
49};
50use crate::WindowFrameUnits::{Groups, Range, Rows};
51use crate::explain::{HumanizedExpr, HumanizerMode};
52use crate::relation::proto_aggregate_func::{
53    self, ProtoColumnOrders, ProtoFusedValueWindowFunc, ProtoFusedWindowAggregate,
54};
55use crate::relation::proto_table_func::{ProtoTabletizedScalar, ProtoWithOrdinality};
56use crate::relation::{
57    ColumnOrder, ProtoAggregateFunc, ProtoTableFunc, WindowFrame, WindowFrameBound,
58    WindowFrameUnits, compare_columns, proto_table_func,
59};
60use crate::scalar::func::{add_timestamp_months, jsonb_stringify};
61
62include!(concat!(env!("OUT_DIR"), "/mz_expr.relation.func.rs"));
63
64// TODO(jamii) be careful about overflow in sum/avg
65// see https://timely.zulipchat.com/#narrow/stream/186635-engineering/topic/additional.20work/near/163507435
66
67fn max_string<'a, I>(datums: I) -> Datum<'a>
68where
69    I: IntoIterator<Item = Datum<'a>>,
70{
71    match datums
72        .into_iter()
73        .filter(|d| !d.is_null())
74        .max_by(|a, b| a.unwrap_str().cmp(b.unwrap_str()))
75    {
76        Some(datum) => datum,
77        None => Datum::Null,
78    }
79}
80
81fn max_datum<'a, I, DatumType>(datums: I) -> Datum<'a>
82where
83    I: IntoIterator<Item = Datum<'a>>,
84    DatumType: TryFrom<Datum<'a>> + Ord,
85    <DatumType as TryFrom<Datum<'a>>>::Error: std::fmt::Debug,
86    Datum<'a>: From<Option<DatumType>>,
87{
88    let x: Option<DatumType> = datums
89        .into_iter()
90        .filter(|d| !d.is_null())
91        .map(|d| DatumType::try_from(d).expect("unexpected type"))
92        .max();
93
94    x.into()
95}
96
97fn min_datum<'a, I, DatumType>(datums: I) -> Datum<'a>
98where
99    I: IntoIterator<Item = Datum<'a>>,
100    DatumType: TryFrom<Datum<'a>> + Ord,
101    <DatumType as TryFrom<Datum<'a>>>::Error: std::fmt::Debug,
102    Datum<'a>: From<Option<DatumType>>,
103{
104    let x: Option<DatumType> = datums
105        .into_iter()
106        .filter(|d| !d.is_null())
107        .map(|d| DatumType::try_from(d).expect("unexpected type"))
108        .min();
109
110    x.into()
111}
112
113fn min_string<'a, I>(datums: I) -> Datum<'a>
114where
115    I: IntoIterator<Item = Datum<'a>>,
116{
117    match datums
118        .into_iter()
119        .filter(|d| !d.is_null())
120        .min_by(|a, b| a.unwrap_str().cmp(b.unwrap_str()))
121    {
122        Some(datum) => datum,
123        None => Datum::Null,
124    }
125}
126
127fn sum_datum<'a, I, DatumType, ResultType>(datums: I) -> Datum<'a>
128where
129    I: IntoIterator<Item = Datum<'a>>,
130    DatumType: TryFrom<Datum<'a>>,
131    <DatumType as TryFrom<Datum<'a>>>::Error: std::fmt::Debug,
132    ResultType: From<DatumType> + Sum + Into<Datum<'a>>,
133{
134    let mut datums = datums.into_iter().filter(|d| !d.is_null()).peekable();
135    if datums.peek().is_none() {
136        Datum::Null
137    } else {
138        let x = datums
139            .map(|d| ResultType::from(DatumType::try_from(d).expect("unexpected type")))
140            .sum::<ResultType>();
141        x.into()
142    }
143}
144
145fn sum_numeric<'a, I>(datums: I) -> Datum<'a>
146where
147    I: IntoIterator<Item = Datum<'a>>,
148{
149    let mut cx = numeric::cx_datum();
150    let mut sum = Numeric::zero();
151    let mut empty = true;
152    for d in datums {
153        if !d.is_null() {
154            empty = false;
155            cx.add(&mut sum, &d.unwrap_numeric().0);
156        }
157    }
158    match empty {
159        true => Datum::Null,
160        false => Datum::from(sum),
161    }
162}
163
164// TODO(benesch): remove potentially dangerous usage of `as`.
165#[allow(clippy::as_conversions)]
166fn count<'a, I>(datums: I) -> Datum<'a>
167where
168    I: IntoIterator<Item = Datum<'a>>,
169{
170    // TODO(jkosh44) This should error when the count can't fit inside of an `i64` instead of returning a negative result.
171    let x: i64 = datums.into_iter().filter(|d| !d.is_null()).count() as i64;
172    Datum::from(x)
173}
174
175fn any<'a, I>(datums: I) -> Datum<'a>
176where
177    I: IntoIterator<Item = Datum<'a>>,
178{
179    datums
180        .into_iter()
181        .fold(Datum::False, |state, next| match (state, next) {
182            (Datum::True, _) | (_, Datum::True) => Datum::True,
183            (Datum::Null, _) | (_, Datum::Null) => Datum::Null,
184            _ => Datum::False,
185        })
186}
187
188fn all<'a, I>(datums: I) -> Datum<'a>
189where
190    I: IntoIterator<Item = Datum<'a>>,
191{
192    datums
193        .into_iter()
194        .fold(Datum::True, |state, next| match (state, next) {
195            (Datum::False, _) | (_, Datum::False) => Datum::False,
196            (Datum::Null, _) | (_, Datum::Null) => Datum::Null,
197            _ => Datum::True,
198        })
199}
200
201fn string_agg<'a, I>(datums: I, temp_storage: &'a RowArena, order_by: &[ColumnOrder]) -> Datum<'a>
202where
203    I: IntoIterator<Item = Datum<'a>>,
204{
205    const EMPTY_SEP: &str = "";
206
207    let datums = order_aggregate_datums(datums, order_by);
208    let mut sep_value_pairs = datums.into_iter().filter_map(|d| {
209        if d.is_null() {
210            return None;
211        }
212        let mut value_sep = d.unwrap_list().iter();
213        match (value_sep.next().unwrap(), value_sep.next().unwrap()) {
214            (Datum::Null, _) => None,
215            (Datum::String(val), Datum::Null) => Some((EMPTY_SEP, val)),
216            (Datum::String(val), Datum::String(sep)) => Some((sep, val)),
217            _ => unreachable!(),
218        }
219    });
220
221    let mut s = String::default();
222    match sep_value_pairs.next() {
223        // First value not prefixed by its separator
224        Some((_, value)) => s.push_str(value),
225        // If no non-null values sent, return NULL.
226        None => return Datum::Null,
227    }
228
229    for (sep, value) in sep_value_pairs {
230        s.push_str(sep);
231        s.push_str(value);
232    }
233
234    Datum::String(temp_storage.push_string(s))
235}
236
237fn jsonb_agg<'a, I>(datums: I, temp_storage: &'a RowArena, order_by: &[ColumnOrder]) -> Datum<'a>
238where
239    I: IntoIterator<Item = Datum<'a>>,
240{
241    let datums = order_aggregate_datums(datums, order_by);
242    temp_storage.make_datum(|packer| {
243        packer.push_list(datums.into_iter().filter(|d| !d.is_null()));
244    })
245}
246
247fn dict_agg<'a, I>(datums: I, temp_storage: &'a RowArena, order_by: &[ColumnOrder]) -> Datum<'a>
248where
249    I: IntoIterator<Item = Datum<'a>>,
250{
251    let datums = order_aggregate_datums(datums, order_by);
252    temp_storage.make_datum(|packer| {
253        let mut datums: Vec<_> = datums
254            .into_iter()
255            .filter_map(|d| {
256                if d.is_null() {
257                    return None;
258                }
259                let mut list = d.unwrap_list().iter();
260                let key = list.next().unwrap();
261                let val = list.next().unwrap();
262                if key.is_null() {
263                    // TODO(benesch): this should produce an error, but
264                    // aggregate functions cannot presently produce errors.
265                    None
266                } else {
267                    Some((key.unwrap_str(), val))
268                }
269            })
270            .collect();
271        // datums are ordered by any ORDER BY clause now, and we want to preserve
272        // the last entry for each key, but we also need to present unique and sorted
273        // keys to push_dict. Use sort_by here, which is stable, and so will preserve
274        // the ORDER BY order. Then reverse and dedup to retain the last of each
275        // key. Reverse again so we're back in push_dict order.
276        datums.sort_by_key(|(k, _v)| *k);
277        datums.reverse();
278        datums.dedup_by_key(|(k, _v)| *k);
279        datums.reverse();
280        packer.push_dict(datums);
281    })
282}
283
284/// Assuming datums is a List, sort them by the 2nd through Nth elements
285/// corresponding to order_by, then return the 1st element.
286///
287/// Near the usages of this function, we sometimes want to produce Datums with a shorter lifetime
288/// than 'a. We have to actually perform the shortening of the lifetime here, inside this function,
289/// because if we were to simply return `impl Iterator<Item = Datum<'a>>`, that wouldn't be
290/// covariant in the item type, because opaque types are always invariant. (Contrast this with how
291/// we perform the shortening _inside_ this function: the input of the `map` is known to
292/// specifically be `std::vec::IntoIter`, which is known to be covariant.)
293pub fn order_aggregate_datums<'a: 'b, 'b, I>(
294    datums: I,
295    order_by: &[ColumnOrder],
296) -> impl Iterator<Item = Datum<'b>>
297where
298    I: IntoIterator<Item = Datum<'a>>,
299{
300    order_aggregate_datums_with_rank_inner(datums, order_by)
301        .into_iter()
302        // (`payload` is coerced here to `Datum<'b>` in the argument of the closure)
303        .map(|(payload, _order_datums)| payload)
304}
305
306/// Assuming datums is a List, sort them by the 2nd through Nth elements
307/// corresponding to order_by, then return the 1st element and computed order by expression.
308fn order_aggregate_datums_with_rank<'a, I>(
309    datums: I,
310    order_by: &[ColumnOrder],
311) -> impl Iterator<Item = (Datum<'a>, Row)>
312where
313    I: IntoIterator<Item = Datum<'a>>,
314{
315    order_aggregate_datums_with_rank_inner(datums, order_by)
316        .into_iter()
317        .map(|(payload, order_by_datums)| (payload, Row::pack(order_by_datums)))
318}
319
320fn order_aggregate_datums_with_rank_inner<'a, I>(
321    datums: I,
322    order_by: &[ColumnOrder],
323) -> Vec<(Datum<'a>, Vec<Datum<'a>>)>
324where
325    I: IntoIterator<Item = Datum<'a>>,
326{
327    let mut decoded: Vec<(Datum, Vec<Datum>)> = datums
328        .into_iter()
329        .map(|d| {
330            let list = d.unwrap_list();
331            let mut list_it = list.iter();
332            let payload = list_it.next().unwrap();
333
334            // We decode the order_by Datums here instead of the comparison function, because the
335            // comparison function is expected to be called `O(log n)` times on each input row.
336            // The only downside is that the decoded data might be bigger, but I think that's fine,
337            // because:
338            // - if we have a window partition so big that this would create a memory problem, then
339            //   the non-incrementalness of window functions will create a serious CPU problem
340            //   anyway,
341            // - and anyhow various other parts of the window function code already do decoding
342            //   upfront.
343            let mut order_by_datums = Vec::with_capacity(order_by.len());
344            for _ in 0..order_by.len() {
345                order_by_datums.push(
346                    list_it
347                        .next()
348                        .expect("must have exactly the same number of Datums as `order_by`"),
349                );
350            }
351
352            (payload, order_by_datums)
353        })
354        .collect();
355
356    let mut sort_by =
357        |(payload_left, left_order_by_datums): &(Datum, Vec<Datum>),
358         (payload_right, right_order_by_datums): &(Datum, Vec<Datum>)| {
359            compare_columns(
360                order_by,
361                left_order_by_datums,
362                right_order_by_datums,
363                || payload_left.cmp(payload_right),
364            )
365        };
366    // `sort_unstable_by` can be faster and uses less memory than `sort_by`. An unstable sort is
367    // enough here, because if two elements are equal in our `compare` function, then the elements
368    // are actually binary-equal (because of the `tiebreaker` given to `compare_columns`), so it
369    // doesn't matter what order they end up in.
370    decoded.sort_unstable_by(&mut sort_by);
371    decoded
372}
373
374fn array_concat<'a, I>(datums: I, temp_storage: &'a RowArena, order_by: &[ColumnOrder]) -> Datum<'a>
375where
376    I: IntoIterator<Item = Datum<'a>>,
377{
378    let datums = order_aggregate_datums(datums, order_by);
379    let datums: Vec<_> = datums
380        .into_iter()
381        .map(|d| d.unwrap_array().elements().iter())
382        .flatten()
383        .collect();
384    let dims = ArrayDimension {
385        lower_bound: 1,
386        length: datums.len(),
387    };
388    temp_storage.make_datum(|packer| {
389        packer.try_push_array(&[dims], datums).unwrap();
390    })
391}
392
393fn list_concat<'a, I>(datums: I, temp_storage: &'a RowArena, order_by: &[ColumnOrder]) -> Datum<'a>
394where
395    I: IntoIterator<Item = Datum<'a>>,
396{
397    let datums = order_aggregate_datums(datums, order_by);
398    temp_storage.make_datum(|packer| {
399        packer.push_list(datums.into_iter().map(|d| d.unwrap_list().iter()).flatten());
400    })
401}
402
403/// The expected input is in the format of `[((OriginalRow, [EncodedArgs]), OrderByExprs...)]`
404/// The output is in the format of `[result_value, original_row]`.
405/// See an example at `lag_lead`, where the input-output formats are similar.
406fn row_number<'a, I>(
407    datums: I,
408    callers_temp_storage: &'a RowArena,
409    order_by: &[ColumnOrder],
410) -> Datum<'a>
411where
412    I: IntoIterator<Item = Datum<'a>>,
413{
414    // We want to use our own temp_storage here, to avoid flooding `callers_temp_storage` with a
415    // large number of new datums. This is because we don't want to make an assumption about
416    // whether the caller creates a new temp_storage between window partitions.
417    let temp_storage = RowArena::new();
418    let datums = row_number_no_list(datums, &temp_storage, order_by);
419
420    callers_temp_storage.make_datum(|packer| {
421        packer.push_list(datums);
422    })
423}
424
425/// Like `row_number`, but doesn't perform the final wrapping in a list, returning an Iterator
426/// instead.
427fn row_number_no_list<'a: 'b, 'b, I>(
428    datums: I,
429    callers_temp_storage: &'b RowArena,
430    order_by: &[ColumnOrder],
431) -> impl Iterator<Item = Datum<'b>>
432where
433    I: IntoIterator<Item = Datum<'a>>,
434{
435    let datums = order_aggregate_datums(datums, order_by);
436
437    callers_temp_storage.reserve(datums.size_hint().0);
438    datums
439        .into_iter()
440        .map(|d| d.unwrap_list().iter())
441        .flatten()
442        .zip(1i64..)
443        .map(|(d, i)| {
444            callers_temp_storage.make_datum(|packer| {
445                packer.push_list_with(|packer| {
446                    packer.push(Datum::Int64(i));
447                    packer.push(d);
448                });
449            })
450        })
451}
452
453/// The expected input is in the format of `[((OriginalRow, [EncodedArgs]), OrderByExprs...)]`
454/// The output is in the format of `[result_value, original_row]`.
455/// See an example at `lag_lead`, where the input-output formats are similar.
456fn rank<'a, I>(datums: I, callers_temp_storage: &'a RowArena, order_by: &[ColumnOrder]) -> Datum<'a>
457where
458    I: IntoIterator<Item = Datum<'a>>,
459{
460    let temp_storage = RowArena::new();
461    let datums = rank_no_list(datums, &temp_storage, order_by);
462
463    callers_temp_storage.make_datum(|packer| {
464        packer.push_list(datums);
465    })
466}
467
468/// Like `rank`, but doesn't perform the final wrapping in a list, returning an Iterator
469/// instead.
470fn rank_no_list<'a: 'b, 'b, I>(
471    datums: I,
472    callers_temp_storage: &'b RowArena,
473    order_by: &[ColumnOrder],
474) -> impl Iterator<Item = Datum<'b>>
475where
476    I: IntoIterator<Item = Datum<'a>>,
477{
478    // Keep the row used for ordering around, as it is used to determine the rank
479    let datums = order_aggregate_datums_with_rank(datums, order_by);
480
481    let mut datums = datums
482        .into_iter()
483        .map(|(d0, order_row)| {
484            d0.unwrap_list()
485                .iter()
486                .map(move |d1| (d1, order_row.clone()))
487        })
488        .flatten();
489
490    callers_temp_storage.reserve(datums.size_hint().0);
491    datums
492        .next()
493        .map_or(vec![], |(first_datum, first_order_row)| {
494            // Folding with (last order_by row, last assigned rank, row number, output vec)
495            datums.fold((first_order_row, 1, 1, vec![(first_datum, 1)]), |mut acc, (next_datum, next_order_row)| {
496                let (ref mut acc_row, ref mut acc_rank, ref mut acc_row_num, ref mut output) = acc;
497                *acc_row_num += 1;
498                // Identity is based on the order_by expression
499                if *acc_row != next_order_row {
500                    *acc_rank = *acc_row_num;
501                    *acc_row = next_order_row;
502                }
503
504                (*output).push((next_datum, *acc_rank));
505                acc
506            })
507        }.3).into_iter().map(|(d, i)| {
508        callers_temp_storage.make_datum(|packer| {
509            packer.push_list_with(|packer| {
510                packer.push(Datum::Int64(i));
511                packer.push(d);
512            });
513        })
514    })
515}
516
517/// The expected input is in the format of `[((OriginalRow, [EncodedArgs]), OrderByExprs...)]`
518/// The output is in the format of `[result_value, original_row]`.
519/// See an example at `lag_lead`, where the input-output formats are similar.
520fn dense_rank<'a, I>(
521    datums: I,
522    callers_temp_storage: &'a RowArena,
523    order_by: &[ColumnOrder],
524) -> Datum<'a>
525where
526    I: IntoIterator<Item = Datum<'a>>,
527{
528    let temp_storage = RowArena::new();
529    let datums = dense_rank_no_list(datums, &temp_storage, order_by);
530
531    callers_temp_storage.make_datum(|packer| {
532        packer.push_list(datums);
533    })
534}
535
536/// Like `dense_rank`, but doesn't perform the final wrapping in a list, returning an Iterator
537/// instead.
538fn dense_rank_no_list<'a: 'b, 'b, I>(
539    datums: I,
540    callers_temp_storage: &'b RowArena,
541    order_by: &[ColumnOrder],
542) -> impl Iterator<Item = Datum<'b>>
543where
544    I: IntoIterator<Item = Datum<'a>>,
545{
546    // Keep the row used for ordering around, as it is used to determine the rank
547    let datums = order_aggregate_datums_with_rank(datums, order_by);
548
549    let mut datums = datums
550        .into_iter()
551        .map(|(d0, order_row)| {
552            d0.unwrap_list()
553                .iter()
554                .map(move |d1| (d1, order_row.clone()))
555        })
556        .flatten();
557
558    callers_temp_storage.reserve(datums.size_hint().0);
559    datums
560        .next()
561        .map_or(vec![], |(first_datum, first_order_row)| {
562            // Folding with (last order_by row, last assigned rank, output vec)
563            datums.fold((first_order_row, 1, vec![(first_datum, 1)]), |mut acc, (next_datum, next_order_row)| {
564                let (ref mut acc_row, ref mut acc_rank, ref mut output) = acc;
565                // Identity is based on the order_by expression
566                if *acc_row != next_order_row {
567                    *acc_rank += 1;
568                    *acc_row = next_order_row;
569                }
570
571                (*output).push((next_datum, *acc_rank));
572                acc
573            })
574        }.2).into_iter().map(|(d, i)| {
575        callers_temp_storage.make_datum(|packer| {
576            packer.push_list_with(|packer| {
577                packer.push(Datum::Int64(i));
578                packer.push(d);
579            });
580        })
581    })
582}
583
584/// The expected input is in the format of `[((OriginalRow, EncodedArgs), OrderByExprs...)]`
585/// For example,
586///
587/// lag(x*y, 1, null) over (partition by x+y order by x-y, x/y)
588///
589/// list of:
590/// row(
591///   row(
592///     row(#0, #1),
593///     row((#0 * #1), 1, null)
594///   ),
595///   (#0 - #1),
596///   (#0 / #1)
597/// )
598///
599/// The output is in the format of `[result_value, original_row]`, e.g.
600/// list of:
601/// row(
602///   42,
603///   row(7, 8)
604/// )
605fn lag_lead<'a, I>(
606    datums: I,
607    callers_temp_storage: &'a RowArena,
608    order_by: &[ColumnOrder],
609    lag_lead_type: &LagLeadType,
610    ignore_nulls: &bool,
611) -> Datum<'a>
612where
613    I: IntoIterator<Item = Datum<'a>>,
614{
615    let temp_storage = RowArena::new();
616    let iter = lag_lead_no_list(datums, &temp_storage, order_by, lag_lead_type, ignore_nulls);
617    callers_temp_storage.make_datum(|packer| {
618        packer.push_list(iter);
619    })
620}
621
622/// Like `lag_lead`, but doesn't perform the final wrapping in a list, returning an Iterator
623/// instead.
624fn lag_lead_no_list<'a: 'b, 'b, I>(
625    datums: I,
626    callers_temp_storage: &'b RowArena,
627    order_by: &[ColumnOrder],
628    lag_lead_type: &LagLeadType,
629    ignore_nulls: &bool,
630) -> impl Iterator<Item = Datum<'b>>
631where
632    I: IntoIterator<Item = Datum<'a>>,
633{
634    // Sort the datums according to the ORDER BY expressions and return the (OriginalRow, EncodedArgs) record
635    let datums = order_aggregate_datums(datums, order_by);
636
637    // Take the (OriginalRow, EncodedArgs) records and unwrap them into separate datums.
638    // EncodedArgs = (InputValue, Offset, DefaultValue) for Lag/Lead
639    // (`OriginalRow` is kept in a record form, as we don't need to look inside that.)
640    let (orig_rows, unwrapped_args): (Vec<_>, Vec<_>) = datums
641        .into_iter()
642        .map(|d| {
643            let mut iter = d.unwrap_list().iter();
644            let original_row = iter.next().unwrap();
645            let (input_value, offset, default_value) =
646                unwrap_lag_lead_encoded_args(iter.next().unwrap());
647            (original_row, (input_value, offset, default_value))
648        })
649        .unzip();
650
651    let result = lag_lead_inner(unwrapped_args, lag_lead_type, ignore_nulls);
652
653    callers_temp_storage.reserve(result.len());
654    result
655        .into_iter()
656        .zip_eq(orig_rows)
657        .map(|(result_value, original_row)| {
658            callers_temp_storage.make_datum(|packer| {
659                packer.push_list_with(|packer| {
660                    packer.push(result_value);
661                    packer.push(original_row);
662                });
663            })
664        })
665}
666
667/// lag/lead's arguments are in a record. This function unwraps this record.
668fn unwrap_lag_lead_encoded_args(encoded_args: Datum) -> (Datum, Datum, Datum) {
669    let mut encoded_args_iter = encoded_args.unwrap_list().iter();
670    let (input_value, offset, default_value) = (
671        encoded_args_iter.next().unwrap(),
672        encoded_args_iter.next().unwrap(),
673        encoded_args_iter.next().unwrap(),
674    );
675    (input_value, offset, default_value)
676}
677
678/// Each element of `args` has the 3 arguments evaluated for a single input row.
679/// Returns the results for each input row.
680fn lag_lead_inner<'a>(
681    args: Vec<(Datum<'a>, Datum<'a>, Datum<'a>)>,
682    lag_lead_type: &LagLeadType,
683    ignore_nulls: &bool,
684) -> Vec<Datum<'a>> {
685    if *ignore_nulls {
686        lag_lead_inner_ignore_nulls(args, lag_lead_type)
687    } else {
688        lag_lead_inner_respect_nulls(args, lag_lead_type)
689    }
690}
691
692fn lag_lead_inner_respect_nulls<'a>(
693    args: Vec<(Datum<'a>, Datum<'a>, Datum<'a>)>,
694    lag_lead_type: &LagLeadType,
695) -> Vec<Datum<'a>> {
696    let mut result: Vec<Datum> = Vec::with_capacity(args.len());
697    for (idx, (_, offset, default_value)) in args.iter().enumerate() {
698        // Null offsets are acceptable, and always return null
699        if offset.is_null() {
700            result.push(Datum::Null);
701            continue;
702        }
703
704        let idx = i64::try_from(idx).expect("Array index does not fit in i64");
705        let offset = i64::from(offset.unwrap_int32());
706        let offset = match lag_lead_type {
707            LagLeadType::Lag => -offset,
708            LagLeadType::Lead => offset,
709        };
710
711        // Get a Datum from `datums`. Return None if index is out of range.
712        let datums_get = |i: i64| -> Option<Datum> {
713            match u64::try_from(i) {
714                Ok(i) => args
715                    .get(usize::cast_from(i))
716                    .map(|d| Some(d.0)) // succeeded in getting a Datum from the vec
717                    .unwrap_or(None), // overindexing
718                Err(_) => None, // underindexing (negative index)
719            }
720        };
721
722        let lagged_value = datums_get(idx + offset).unwrap_or(*default_value);
723
724        result.push(lagged_value);
725    }
726
727    result
728}
729
730// `i64` indexes get involved in this function because it's convenient to allow negative indexes and
731// have `datums_get` fail on them, and thus handle the beginning and end of the input vector
732// uniformly, rather than checking underflow separately during index manipulations.
733#[allow(clippy::as_conversions)]
734fn lag_lead_inner_ignore_nulls<'a>(
735    args: Vec<(Datum<'a>, Datum<'a>, Datum<'a>)>,
736    lag_lead_type: &LagLeadType,
737) -> Vec<Datum<'a>> {
738    // We check here once that even the largest index fits in `i64`, and then do silent `as`
739    // conversions from `usize` indexes to `i64` indexes throughout this function.
740    if i64::try_from(args.len()).is_err() {
741        panic!("window partition way too big")
742    }
743    // Preparation: Make sure we can jump over a run of nulls in constant time, i.e., regardless of
744    // how many nulls the run has. The following skip tables will point to the next non-null index.
745    let mut skip_nulls_backward = vec![None; args.len()];
746    let mut last_non_null: i64 = -1;
747    let pairs = args
748        .iter()
749        .enumerate()
750        .zip_eq(skip_nulls_backward.iter_mut());
751    for ((i, (d, _, _)), slot) in pairs {
752        if d.is_null() {
753            *slot = Some(last_non_null);
754        } else {
755            last_non_null = i as i64;
756        }
757    }
758    let mut skip_nulls_forward = vec![None; args.len()];
759    let mut last_non_null: i64 = args.len() as i64;
760    let pairs = args
761        .iter()
762        .enumerate()
763        .rev()
764        .zip_eq(skip_nulls_forward.iter_mut().rev());
765    for ((i, (d, _, _)), slot) in pairs {
766        if d.is_null() {
767            *slot = Some(last_non_null);
768        } else {
769            last_non_null = i as i64;
770        }
771    }
772
773    // The actual computation.
774    let mut result: Vec<Datum> = Vec::with_capacity(args.len());
775    for (idx, (_, offset, default_value)) in args.iter().enumerate() {
776        // Null offsets are acceptable, and always return null
777        if offset.is_null() {
778            result.push(Datum::Null);
779            continue;
780        }
781
782        let idx = idx as i64; // checked at the beginning of the function that len() fits
783        let offset = i64::cast_from(offset.unwrap_int32());
784        let offset = match lag_lead_type {
785            LagLeadType::Lag => -offset,
786            LagLeadType::Lead => offset,
787        };
788        let increment = offset.signum();
789
790        // Get a Datum from `datums`. Return None if index is out of range.
791        let datums_get = |i: i64| -> Option<Datum> {
792            match u64::try_from(i) {
793                Ok(i) => args
794                    .get(usize::cast_from(i))
795                    .map(|d| Some(d.0)) // succeeded in getting a Datum from the vec
796                    .unwrap_or(None), // overindexing
797                Err(_) => None, // underindexing (negative index)
798            }
799        };
800
801        let lagged_value = if increment != 0 {
802            // We start j from idx, and step j until we have seen an abs(offset) number of non-null
803            // values or reach the beginning or end of the partition.
804            //
805            // If offset is big, then this is slow: Considering the entire function, it's
806            // `O(partition_size * offset)`.
807            // However, a common use case is an offset of 1, for which this doesn't matter.
808            // TODO: For larger offsets, we could have a completely different implementation
809            // that starts the inner loop from the index where we found the previous result:
810            // https://github.com/MaterializeInc/materialize/pull/29287#discussion_r1738695174
811            let mut j = idx;
812            for _ in 0..num::abs(offset) {
813                j += increment;
814                // Jump over a run of nulls
815                if datums_get(j).is_some_and(|d| d.is_null()) {
816                    let ju = j as usize; // `j >= 0` because of the above `is_some_and`
817                    if increment > 0 {
818                        j = skip_nulls_forward[ju].expect("checked above that it's null");
819                    } else {
820                        j = skip_nulls_backward[ju].expect("checked above that it's null");
821                    }
822                }
823                if datums_get(j).is_none() {
824                    break;
825                }
826            }
827            match datums_get(j) {
828                Some(datum) => datum,
829                None => *default_value,
830            }
831        } else {
832            assert_eq!(offset, 0);
833            let datum = datums_get(idx).expect("known to exist");
834            if !datum.is_null() {
835                datum
836            } else {
837                // I can imagine returning here either `default_value` or `null`.
838                // (I'm leaning towards `default_value`.)
839                // We used to run into an infinite loop in this case, so panicking is
840                // better. Started a SQL Council thread:
841                // https://materializeinc.slack.com/archives/C063H5S7NKE/p1724962369706729
842                panic!("0 offset in lag/lead IGNORE NULLS");
843            }
844        };
845
846        result.push(lagged_value);
847    }
848
849    result
850}
851
852/// The expected input is in the format of [((OriginalRow, InputValue), OrderByExprs...)]
853fn first_value<'a, I>(
854    datums: I,
855    callers_temp_storage: &'a RowArena,
856    order_by: &[ColumnOrder],
857    window_frame: &WindowFrame,
858) -> Datum<'a>
859where
860    I: IntoIterator<Item = Datum<'a>>,
861{
862    let temp_storage = RowArena::new();
863    let iter = first_value_no_list(datums, &temp_storage, order_by, window_frame);
864    callers_temp_storage.make_datum(|packer| {
865        packer.push_list(iter);
866    })
867}
868
869/// Like `first_value`, but doesn't perform the final wrapping in a list, returning an Iterator
870/// instead.
871fn first_value_no_list<'a: 'b, 'b, I>(
872    datums: I,
873    callers_temp_storage: &'b RowArena,
874    order_by: &[ColumnOrder],
875    window_frame: &WindowFrame,
876) -> impl Iterator<Item = Datum<'b>>
877where
878    I: IntoIterator<Item = Datum<'a>>,
879{
880    // Sort the datums according to the ORDER BY expressions and return the (OriginalRow, InputValue) record
881    let datums = order_aggregate_datums(datums, order_by);
882
883    // Decode the input (OriginalRow, InputValue) into separate datums
884    let (orig_rows, args): (Vec<_>, Vec<_>) = datums
885        .into_iter()
886        .map(|d| {
887            let mut iter = d.unwrap_list().iter();
888            let original_row = iter.next().unwrap();
889            let arg = iter.next().unwrap();
890
891            (original_row, arg)
892        })
893        .unzip();
894
895    let results = first_value_inner(args, window_frame);
896
897    callers_temp_storage.reserve(results.len());
898    results
899        .into_iter()
900        .zip_eq(orig_rows)
901        .map(|(result_value, original_row)| {
902            callers_temp_storage.make_datum(|packer| {
903                packer.push_list_with(|packer| {
904                    packer.push(result_value);
905                    packer.push(original_row);
906                });
907            })
908        })
909}
910
911fn first_value_inner<'a>(datums: Vec<Datum<'a>>, window_frame: &WindowFrame) -> Vec<Datum<'a>> {
912    let length = datums.len();
913    let mut result: Vec<Datum> = Vec::with_capacity(length);
914    for (idx, current_datum) in datums.iter().enumerate() {
915        let first_value = match &window_frame.start_bound {
916            // Always return the current value
917            WindowFrameBound::CurrentRow => *current_datum,
918            WindowFrameBound::UnboundedPreceding => {
919                if let WindowFrameBound::OffsetPreceding(end_offset) = &window_frame.end_bound {
920                    let end_offset = usize::cast_from(*end_offset);
921
922                    // If the frame ends before the first row, return null
923                    if idx < end_offset {
924                        Datum::Null
925                    } else {
926                        datums[0]
927                    }
928                } else {
929                    datums[0]
930                }
931            }
932            WindowFrameBound::OffsetPreceding(offset) => {
933                let start_offset = usize::cast_from(*offset);
934                let start_idx = idx.saturating_sub(start_offset);
935                if let WindowFrameBound::OffsetPreceding(end_offset) = &window_frame.end_bound {
936                    let end_offset = usize::cast_from(*end_offset);
937
938                    // If the frame is empty or ends before the first row, return null
939                    if start_offset < end_offset || idx < end_offset {
940                        Datum::Null
941                    } else {
942                        datums[start_idx]
943                    }
944                } else {
945                    datums[start_idx]
946                }
947            }
948            WindowFrameBound::OffsetFollowing(offset) => {
949                let start_offset = usize::cast_from(*offset);
950                let start_idx = idx.saturating_add(start_offset);
951                if let WindowFrameBound::OffsetFollowing(end_offset) = &window_frame.end_bound {
952                    // If the frame is empty or starts after the last row, return null
953                    if offset > end_offset || start_idx >= length {
954                        Datum::Null
955                    } else {
956                        datums[start_idx]
957                    }
958                } else {
959                    datums
960                        .get(start_idx)
961                        .map(|d| d.clone())
962                        .unwrap_or(Datum::Null)
963                }
964            }
965            // Forbidden during planning
966            WindowFrameBound::UnboundedFollowing => unreachable!(),
967        };
968        result.push(first_value);
969    }
970    result
971}
972
973/// The expected input is in the format of [((OriginalRow, InputValue), OrderByExprs...)]
974fn last_value<'a, I>(
975    datums: I,
976    callers_temp_storage: &'a RowArena,
977    order_by: &[ColumnOrder],
978    window_frame: &WindowFrame,
979) -> Datum<'a>
980where
981    I: IntoIterator<Item = Datum<'a>>,
982{
983    let temp_storage = RowArena::new();
984    let iter = last_value_no_list(datums, &temp_storage, order_by, window_frame);
985    callers_temp_storage.make_datum(|packer| {
986        packer.push_list(iter);
987    })
988}
989
990/// Like `last_value`, but doesn't perform the final wrapping in a list, returning an Iterator
991/// instead.
992fn last_value_no_list<'a: 'b, 'b, I>(
993    datums: I,
994    callers_temp_storage: &'b RowArena,
995    order_by: &[ColumnOrder],
996    window_frame: &WindowFrame,
997) -> impl Iterator<Item = Datum<'b>>
998where
999    I: IntoIterator<Item = Datum<'a>>,
1000{
1001    // Sort the datums according to the ORDER BY expressions and return the ((OriginalRow, InputValue), OrderByRow) record
1002    // The OrderByRow is kept around because it is required to compute the peer groups in RANGE mode
1003    let datums = order_aggregate_datums_with_rank(datums, order_by);
1004
1005    // Decode the input (OriginalRow, InputValue) into separate datums, while keeping the OrderByRow
1006    let size_hint = datums.size_hint().0;
1007    let mut args = Vec::with_capacity(size_hint);
1008    let mut original_rows = Vec::with_capacity(size_hint);
1009    let mut order_by_rows = Vec::with_capacity(size_hint);
1010    for (d, order_by_row) in datums.into_iter() {
1011        let mut iter = d.unwrap_list().iter();
1012        let original_row = iter.next().unwrap();
1013        let arg = iter.next().unwrap();
1014        order_by_rows.push(order_by_row);
1015        original_rows.push(original_row);
1016        args.push(arg);
1017    }
1018
1019    let results = last_value_inner(args, &order_by_rows, window_frame);
1020
1021    callers_temp_storage.reserve(results.len());
1022    results
1023        .into_iter()
1024        .zip_eq(original_rows)
1025        .map(|(result_value, original_row)| {
1026            callers_temp_storage.make_datum(|packer| {
1027                packer.push_list_with(|packer| {
1028                    packer.push(result_value);
1029                    packer.push(original_row);
1030                });
1031            })
1032        })
1033}
1034
1035fn last_value_inner<'a>(
1036    args: Vec<Datum<'a>>,
1037    order_by_rows: &Vec<Row>,
1038    window_frame: &WindowFrame,
1039) -> Vec<Datum<'a>> {
1040    let length = args.len();
1041    let mut results: Vec<Datum> = Vec::with_capacity(length);
1042    for (idx, (current_datum, order_by_row)) in args.iter().zip_eq(order_by_rows).enumerate() {
1043        let last_value = match &window_frame.end_bound {
1044            WindowFrameBound::CurrentRow => match &window_frame.units {
1045                // Always return the current value when in ROWS mode
1046                WindowFrameUnits::Rows => *current_datum,
1047                WindowFrameUnits::Range => {
1048                    // When in RANGE mode, return the last value of the peer group
1049                    // The peer group is the group of rows with the same ORDER BY value
1050                    // Note: Range is only supported for the default window frame (RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW),
1051                    // which is why it does not appear in the other branches
1052                    let target_idx = order_by_rows[idx..]
1053                        .iter()
1054                        .enumerate()
1055                        .take_while(|(_, row)| *row == order_by_row)
1056                        .last()
1057                        .unwrap()
1058                        .0
1059                        + idx;
1060                    args[target_idx]
1061                }
1062                // GROUPS is not supported, and forbidden during planning
1063                WindowFrameUnits::Groups => unreachable!(),
1064            },
1065            WindowFrameBound::UnboundedFollowing => {
1066                if let WindowFrameBound::OffsetFollowing(start_offset) = &window_frame.start_bound {
1067                    let start_offset = usize::cast_from(*start_offset);
1068
1069                    // If the frame starts after the last row of the window, return null
1070                    if idx + start_offset > length - 1 {
1071                        Datum::Null
1072                    } else {
1073                        args[length - 1]
1074                    }
1075                } else {
1076                    args[length - 1]
1077                }
1078            }
1079            WindowFrameBound::OffsetFollowing(offset) => {
1080                let end_offset = usize::cast_from(*offset);
1081                let end_idx = idx.saturating_add(end_offset);
1082                if let WindowFrameBound::OffsetFollowing(start_offset) = &window_frame.start_bound {
1083                    let start_offset = usize::cast_from(*start_offset);
1084                    let start_idx = idx.saturating_add(start_offset);
1085
1086                    // If the frame is empty or starts after the last row of the window, return null
1087                    if end_offset < start_offset || start_idx >= length {
1088                        Datum::Null
1089                    } else {
1090                        // Return the last valid element in the window
1091                        args.get(end_idx).unwrap_or(&args[length - 1]).clone()
1092                    }
1093                } else {
1094                    args.get(end_idx).unwrap_or(&args[length - 1]).clone()
1095                }
1096            }
1097            WindowFrameBound::OffsetPreceding(offset) => {
1098                let end_offset = usize::cast_from(*offset);
1099                let end_idx = idx.saturating_sub(end_offset);
1100                if idx < end_offset {
1101                    // If the frame ends before the first row, return null
1102                    Datum::Null
1103                } else if let WindowFrameBound::OffsetPreceding(start_offset) =
1104                    &window_frame.start_bound
1105                {
1106                    // If the frame is empty, return null
1107                    if offset > start_offset {
1108                        Datum::Null
1109                    } else {
1110                        args[end_idx]
1111                    }
1112                } else {
1113                    args[end_idx]
1114                }
1115            }
1116            // Forbidden during planning
1117            WindowFrameBound::UnboundedPreceding => unreachable!(),
1118        };
1119        results.push(last_value);
1120    }
1121    results
1122}
1123
1124/// Executes `FusedValueWindowFunc` on a reduction group.
1125/// The expected input is in the format of `[((OriginalRow, (Args1, Args2, ...)), OrderByExprs...)]`
1126/// where `Args1`, `Args2`, are the arguments of each of the fused functions. For functions that
1127/// have only a single argument (first_value/last_value), these are simple values. For functions
1128/// that have multiple arguments (lag/lead), these are also records.
1129fn fused_value_window_func<'a, I>(
1130    input_datums: I,
1131    callers_temp_storage: &'a RowArena,
1132    funcs: &Vec<AggregateFunc>,
1133    order_by: &Vec<ColumnOrder>,
1134) -> Datum<'a>
1135where
1136    I: IntoIterator<Item = Datum<'a>>,
1137{
1138    let temp_storage = RowArena::new();
1139    let iter = fused_value_window_func_no_list(input_datums, &temp_storage, funcs, order_by);
1140    callers_temp_storage.make_datum(|packer| {
1141        packer.push_list(iter);
1142    })
1143}
1144
1145/// Like `fused_value_window_func`, but doesn't perform the final wrapping in a list, returning an
1146/// Iterator instead.
1147fn fused_value_window_func_no_list<'a: 'b, 'b, I>(
1148    input_datums: I,
1149    callers_temp_storage: &'b RowArena,
1150    funcs: &Vec<AggregateFunc>,
1151    order_by: &Vec<ColumnOrder>,
1152) -> impl Iterator<Item = Datum<'b>>
1153where
1154    I: IntoIterator<Item = Datum<'a>>,
1155{
1156    let has_last_value = funcs
1157        .iter()
1158        .any(|f| matches!(f, AggregateFunc::LastValue { .. }));
1159
1160    let input_datums_with_ranks = order_aggregate_datums_with_rank(input_datums, order_by);
1161
1162    let size_hint = input_datums_with_ranks.size_hint().0;
1163    let mut encoded_argsss = vec![Vec::with_capacity(size_hint); funcs.len()];
1164    let mut original_rows = Vec::with_capacity(size_hint);
1165    let mut order_by_rows = Vec::with_capacity(size_hint);
1166    for (d, order_by_row) in input_datums_with_ranks {
1167        let mut iter = d.unwrap_list().iter();
1168        let original_row = iter.next().unwrap();
1169        original_rows.push(original_row);
1170        let mut argss_iter = iter.next().unwrap().unwrap_list().iter();
1171        for i in 0..funcs.len() {
1172            let encoded_args = argss_iter.next().unwrap();
1173            encoded_argsss[i].push(encoded_args);
1174        }
1175        if has_last_value {
1176            order_by_rows.push(order_by_row);
1177        }
1178    }
1179
1180    let mut results_per_row = vec![Vec::with_capacity(funcs.len()); original_rows.len()];
1181    for (func, encoded_argss) in funcs.iter().zip_eq(encoded_argsss) {
1182        let results = match func {
1183            AggregateFunc::LagLead {
1184                order_by: inner_order_by,
1185                lag_lead,
1186                ignore_nulls,
1187            } => {
1188                assert_eq!(order_by, inner_order_by);
1189                let unwrapped_argss = encoded_argss
1190                    .into_iter()
1191                    .map(|encoded_args| unwrap_lag_lead_encoded_args(encoded_args))
1192                    .collect();
1193                lag_lead_inner(unwrapped_argss, lag_lead, ignore_nulls)
1194            }
1195            AggregateFunc::FirstValue {
1196                order_by: inner_order_by,
1197                window_frame,
1198            } => {
1199                assert_eq!(order_by, inner_order_by);
1200                // (No unwrapping to do on the args here, because there is only 1 arg, so it's not
1201                // wrapped into a record.)
1202                first_value_inner(encoded_argss, window_frame)
1203            }
1204            AggregateFunc::LastValue {
1205                order_by: inner_order_by,
1206                window_frame,
1207            } => {
1208                assert_eq!(order_by, inner_order_by);
1209                // (No unwrapping to do on the args here, because there is only 1 arg, so it's not
1210                // wrapped into a record.)
1211                last_value_inner(encoded_argss, &order_by_rows, window_frame)
1212            }
1213            _ => panic!("unknown window function in FusedValueWindowFunc"),
1214        };
1215        for (results, result) in results_per_row.iter_mut().zip_eq(results) {
1216            results.push(result);
1217        }
1218    }
1219
1220    callers_temp_storage.reserve(2 * original_rows.len());
1221    results_per_row
1222        .into_iter()
1223        .enumerate()
1224        .map(move |(i, results)| {
1225            callers_temp_storage.make_datum(|packer| {
1226                packer.push_list_with(|packer| {
1227                    packer
1228                        .push(callers_temp_storage.make_datum(|packer| packer.push_list(results)));
1229                    packer.push(original_rows[i]);
1230                });
1231            })
1232        })
1233}
1234
1235/// `input_datums` is an entire window partition.
1236/// The expected input is in the format of `[((OriginalRow, InputValue), OrderByExprs...)]`
1237/// See also in the comment in `window_func_applied_to`.
1238///
1239/// `wrapped_aggregate`: e.g., for `sum(...) OVER (...)`, this is the `sum(...)`.
1240///
1241/// Note that this `order_by` doesn't have expressions, only `ColumnOrder`s. For an explanation,
1242/// see the comment on `WindowExprType`.
1243fn window_aggr<'a, I, A>(
1244    input_datums: I,
1245    callers_temp_storage: &'a RowArena,
1246    wrapped_aggregate: &AggregateFunc,
1247    order_by: &[ColumnOrder],
1248    window_frame: &WindowFrame,
1249) -> Datum<'a>
1250where
1251    I: IntoIterator<Item = Datum<'a>>,
1252    A: OneByOneAggr,
1253{
1254    let temp_storage = RowArena::new();
1255    let iter = window_aggr_no_list::<I, A>(
1256        input_datums,
1257        &temp_storage,
1258        wrapped_aggregate,
1259        order_by,
1260        window_frame,
1261    );
1262    callers_temp_storage.make_datum(|packer| {
1263        packer.push_list(iter);
1264    })
1265}
1266
1267/// Like `window_aggr`, but doesn't perform the final wrapping in a list, returning an Iterator
1268/// instead.
1269fn window_aggr_no_list<'a: 'b, 'b, I, A>(
1270    input_datums: I,
1271    callers_temp_storage: &'b RowArena,
1272    wrapped_aggregate: &AggregateFunc,
1273    order_by: &[ColumnOrder],
1274    window_frame: &WindowFrame,
1275) -> impl Iterator<Item = Datum<'b>>
1276where
1277    I: IntoIterator<Item = Datum<'a>>,
1278    A: OneByOneAggr,
1279{
1280    // Sort the datums according to the ORDER BY expressions and return the ((OriginalRow, InputValue), OrderByRow) record
1281    // The OrderByRow is kept around because it is required to compute the peer groups in RANGE mode
1282    let datums = order_aggregate_datums_with_rank(input_datums, order_by);
1283
1284    // Decode the input (OriginalRow, InputValue) into separate datums, while keeping the OrderByRow
1285    let size_hint = datums.size_hint().0;
1286    let mut args: Vec<Datum> = Vec::with_capacity(size_hint);
1287    let mut original_rows: Vec<Datum> = Vec::with_capacity(size_hint);
1288    let mut order_by_rows = Vec::with_capacity(size_hint);
1289    for (d, order_by_row) in datums.into_iter() {
1290        let mut iter = d.unwrap_list().iter();
1291        let original_row = iter.next().unwrap();
1292        let arg = iter.next().unwrap();
1293        order_by_rows.push(order_by_row);
1294        original_rows.push(original_row);
1295        args.push(arg);
1296    }
1297
1298    let results = window_aggr_inner::<A>(
1299        args,
1300        &order_by_rows,
1301        wrapped_aggregate,
1302        order_by,
1303        window_frame,
1304        callers_temp_storage,
1305    );
1306
1307    callers_temp_storage.reserve(results.len());
1308    results
1309        .into_iter()
1310        .zip_eq(original_rows)
1311        .map(|(result_value, original_row)| {
1312            callers_temp_storage.make_datum(|packer| {
1313                packer.push_list_with(|packer| {
1314                    packer.push(result_value);
1315                    packer.push(original_row);
1316                });
1317            })
1318        })
1319}
1320
1321fn window_aggr_inner<'a, A>(
1322    mut args: Vec<Datum<'a>>,
1323    order_by_rows: &Vec<Row>,
1324    wrapped_aggregate: &AggregateFunc,
1325    order_by: &[ColumnOrder],
1326    window_frame: &WindowFrame,
1327    temp_storage: &'a RowArena,
1328) -> Vec<Datum<'a>>
1329where
1330    A: OneByOneAggr,
1331{
1332    let length = args.len();
1333    let mut result: Vec<Datum> = Vec::with_capacity(length);
1334
1335    // In this degenerate case, all results would be `wrapped_aggregate.default()` (usually null).
1336    // However, this currently can't happen, because
1337    // - Groups frame mode is currently not supported;
1338    // - Range frame mode is currently supported only for the default frame, which includes the
1339    //   current row.
1340    soft_assert_or_log!(
1341        !((matches!(window_frame.units, WindowFrameUnits::Groups)
1342            || matches!(window_frame.units, WindowFrameUnits::Range))
1343            && !window_frame.includes_current_row()),
1344        "window frame without current row"
1345    );
1346
1347    if (matches!(
1348        window_frame.start_bound,
1349        WindowFrameBound::UnboundedPreceding
1350    ) && matches!(window_frame.end_bound, WindowFrameBound::UnboundedFollowing))
1351        || (order_by.is_empty()
1352            && (matches!(window_frame.units, WindowFrameUnits::Groups)
1353                || matches!(window_frame.units, WindowFrameUnits::Range))
1354            && window_frame.includes_current_row())
1355    {
1356        // Either
1357        //  - UNBOUNDED frame in both directions, or
1358        //  - There is no ORDER BY and the frame is such that the current peer group is included.
1359        //    (The current peer group will be the whole partition if there is no ORDER BY.)
1360        // We simply need to compute the aggregate once, on the entire partition, and each input
1361        // row will get this one aggregate value as result.
1362        let result_value = wrapped_aggregate.eval(args, temp_storage);
1363        // Every row will get the above aggregate as result.
1364        for _ in 0..length {
1365            result.push(result_value);
1366        }
1367    } else {
1368        fn rows_between_unbounded_preceding_and_current_row<'a, A>(
1369            args: Vec<Datum<'a>>,
1370            result: &mut Vec<Datum<'a>>,
1371            mut one_by_one_aggr: A,
1372            temp_storage: &'a RowArena,
1373        ) where
1374            A: OneByOneAggr,
1375        {
1376            for current_arg in args.into_iter() {
1377                one_by_one_aggr.give(&current_arg);
1378                let result_value = one_by_one_aggr.get_current_aggregate(temp_storage);
1379                result.push(result_value);
1380            }
1381        }
1382
1383        fn groups_between_unbounded_preceding_and_current_row<'a, A>(
1384            args: Vec<Datum<'a>>,
1385            order_by_rows: &Vec<Row>,
1386            result: &mut Vec<Datum<'a>>,
1387            mut one_by_one_aggr: A,
1388            temp_storage: &'a RowArena,
1389        ) where
1390            A: OneByOneAggr,
1391        {
1392            let mut peer_group_start = 0;
1393            while peer_group_start < args.len() {
1394                // Find the boundaries of the current peer group.
1395                // peer_group_start will point to the first element of the peer group,
1396                // peer_group_end will point to _just after_ the last element of the peer group.
1397                let mut peer_group_end = peer_group_start + 1;
1398                while peer_group_end < args.len()
1399                    && order_by_rows[peer_group_start] == order_by_rows[peer_group_end]
1400                {
1401                    // The peer group goes on while the OrderByRows not differ.
1402                    peer_group_end += 1;
1403                }
1404                // Let's compute the aggregate (which will be the same for all records in this
1405                // peer group).
1406                for current_arg in args[peer_group_start..peer_group_end].iter() {
1407                    one_by_one_aggr.give(current_arg);
1408                }
1409                let agg_for_peer_group = one_by_one_aggr.get_current_aggregate(temp_storage);
1410                // Put the above aggregate into each record in the peer group.
1411                for _ in args[peer_group_start..peer_group_end].iter() {
1412                    result.push(agg_for_peer_group);
1413                }
1414                // Point to the start of the next peer group.
1415                peer_group_start = peer_group_end;
1416            }
1417        }
1418
1419        fn rows_between_offset_and_offset<'a>(
1420            args: Vec<Datum<'a>>,
1421            result: &mut Vec<Datum<'a>>,
1422            wrapped_aggregate: &AggregateFunc,
1423            temp_storage: &'a RowArena,
1424            offset_start: i64,
1425            offset_end: i64,
1426        ) {
1427            let len = args
1428                .len()
1429                .to_i64()
1430                .expect("window partition's len should fit into i64");
1431            for i in 0..len {
1432                let i = i.to_i64().expect("window partition shouldn't be super big");
1433                // Trim the start of the frame to make it not reach over the start of the window
1434                // partition.
1435                let frame_start = max(i + offset_start, 0)
1436                    .to_usize()
1437                    .expect("The max made sure it's not negative");
1438                // Trim the end of the frame to make it not reach over the end of the window
1439                // partition.
1440                let frame_end = min(i + offset_end, len - 1).to_usize();
1441                match frame_end {
1442                    Some(frame_end) => {
1443                        if frame_start <= frame_end {
1444                            // Compute the aggregate on the frame.
1445                            // TODO:
1446                            // This implementation is quite slow if the frame is large: we do an
1447                            // inner loop over the entire frame, and compute the aggregate from
1448                            // scratch. We could do better:
1449                            //  - For invertible aggregations we could do a rolling aggregation.
1450                            //  - There are various tricks for min/max as well, making use of either
1451                            //    the fixed size of the window, or that we are not retracting
1452                            //    arbitrary elements but doing queue operations. E.g., see
1453                            //    http://codercareer.blogspot.com/2012/02/no-33-maximums-in-sliding-windows.html
1454                            let frame_values = args[frame_start..=frame_end].iter().cloned();
1455                            let result_value = wrapped_aggregate.eval(frame_values, temp_storage);
1456                            result.push(result_value);
1457                        } else {
1458                            // frame_start > frame_end, so this is an empty frame.
1459                            let result_value = wrapped_aggregate.default();
1460                            result.push(result_value);
1461                        }
1462                    }
1463                    None => {
1464                        // frame_end would be negative, so this is an empty frame.
1465                        let result_value = wrapped_aggregate.default();
1466                        result.push(result_value);
1467                    }
1468                }
1469            }
1470        }
1471
1472        match (
1473            &window_frame.units,
1474            &window_frame.start_bound,
1475            &window_frame.end_bound,
1476        ) {
1477            // Cases where one edge of the frame is CurrentRow.
1478            // Note that these cases could be merged into the more general cases below where one
1479            // edge is some offset (with offset = 0), but the CurrentRow cases probably cover 95%
1480            // of user queries, so let's make this simple and fast.
1481            (Rows, UnboundedPreceding, CurrentRow) => {
1482                rows_between_unbounded_preceding_and_current_row::<A>(
1483                    args,
1484                    &mut result,
1485                    A::new(wrapped_aggregate, false),
1486                    temp_storage,
1487                );
1488            }
1489            (Rows, CurrentRow, UnboundedFollowing) => {
1490                // Same as above, but reverse.
1491                args.reverse();
1492                rows_between_unbounded_preceding_and_current_row::<A>(
1493                    args,
1494                    &mut result,
1495                    A::new(wrapped_aggregate, true),
1496                    temp_storage,
1497                );
1498                result.reverse();
1499            }
1500            (Range, UnboundedPreceding, CurrentRow) => {
1501                // Note that for the default frame, the RANGE frame mode is identical to the GROUPS
1502                // frame mode.
1503                groups_between_unbounded_preceding_and_current_row::<A>(
1504                    args,
1505                    order_by_rows,
1506                    &mut result,
1507                    A::new(wrapped_aggregate, false),
1508                    temp_storage,
1509                );
1510            }
1511            // The next several cases all call `rows_between_offset_and_offset`. Note that the
1512            // offset passed to `rows_between_offset_and_offset` should be negated when it's
1513            // PRECEDING.
1514            (Rows, OffsetPreceding(start_prec), OffsetPreceding(end_prec)) => {
1515                let start_prec = start_prec.to_i64().expect(
1516                    "window frame start OFFSET shouldn't be super big (the planning ensured this)",
1517                );
1518                let end_prec = end_prec.to_i64().expect(
1519                    "window frame end OFFSET shouldn't be super big (the planning ensured this)",
1520                );
1521                rows_between_offset_and_offset(
1522                    args,
1523                    &mut result,
1524                    wrapped_aggregate,
1525                    temp_storage,
1526                    -start_prec,
1527                    -end_prec,
1528                );
1529            }
1530            (Rows, OffsetPreceding(start_prec), OffsetFollowing(end_fol)) => {
1531                let start_prec = start_prec.to_i64().expect(
1532                    "window frame start OFFSET shouldn't be super big (the planning ensured this)",
1533                );
1534                let end_fol = end_fol.to_i64().expect(
1535                    "window frame end OFFSET shouldn't be super big (the planning ensured this)",
1536                );
1537                rows_between_offset_and_offset(
1538                    args,
1539                    &mut result,
1540                    wrapped_aggregate,
1541                    temp_storage,
1542                    -start_prec,
1543                    end_fol,
1544                );
1545            }
1546            (Rows, OffsetFollowing(start_fol), OffsetFollowing(end_fol)) => {
1547                let start_fol = start_fol.to_i64().expect(
1548                    "window frame start OFFSET shouldn't be super big (the planning ensured this)",
1549                );
1550                let end_fol = end_fol.to_i64().expect(
1551                    "window frame end OFFSET shouldn't be super big (the planning ensured this)",
1552                );
1553                rows_between_offset_and_offset(
1554                    args,
1555                    &mut result,
1556                    wrapped_aggregate,
1557                    temp_storage,
1558                    start_fol,
1559                    end_fol,
1560                );
1561            }
1562            (Rows, OffsetFollowing(_), OffsetPreceding(_)) => {
1563                unreachable!() // The planning ensured that this nonsensical case can't happen
1564            }
1565            (Rows, OffsetPreceding(start_prec), CurrentRow) => {
1566                let start_prec = start_prec.to_i64().expect(
1567                    "window frame start OFFSET shouldn't be super big (the planning ensured this)",
1568                );
1569                let end_fol = 0;
1570                rows_between_offset_and_offset(
1571                    args,
1572                    &mut result,
1573                    wrapped_aggregate,
1574                    temp_storage,
1575                    -start_prec,
1576                    end_fol,
1577                );
1578            }
1579            (Rows, CurrentRow, OffsetFollowing(end_fol)) => {
1580                let start_fol = 0;
1581                let end_fol = end_fol.to_i64().expect(
1582                    "window frame end OFFSET shouldn't be super big (the planning ensured this)",
1583                );
1584                rows_between_offset_and_offset(
1585                    args,
1586                    &mut result,
1587                    wrapped_aggregate,
1588                    temp_storage,
1589                    start_fol,
1590                    end_fol,
1591                );
1592            }
1593            (Rows, CurrentRow, CurrentRow) => {
1594                // We could have a more efficient implementation for this, but this is probably
1595                // super rare. (Might be more common with RANGE or GROUPS frame mode, though!)
1596                let start_fol = 0;
1597                let end_fol = 0;
1598                rows_between_offset_and_offset(
1599                    args,
1600                    &mut result,
1601                    wrapped_aggregate,
1602                    temp_storage,
1603                    start_fol,
1604                    end_fol,
1605                );
1606            }
1607            (Rows, CurrentRow, OffsetPreceding(_))
1608            | (Rows, UnboundedFollowing, _)
1609            | (Rows, _, UnboundedPreceding)
1610            | (Rows, OffsetFollowing(..), CurrentRow) => {
1611                unreachable!() // The planning ensured that these nonsensical cases can't happen
1612            }
1613            (Rows, UnboundedPreceding, UnboundedFollowing) => {
1614                // This is handled by the complicated if condition near the beginning of this
1615                // function.
1616                unreachable!()
1617            }
1618            (Rows, UnboundedPreceding, OffsetPreceding(_))
1619            | (Rows, UnboundedPreceding, OffsetFollowing(_))
1620            | (Rows, OffsetPreceding(..), UnboundedFollowing)
1621            | (Rows, OffsetFollowing(..), UnboundedFollowing) => {
1622                // Unsupported. Bail in the planner.
1623                // https://github.com/MaterializeInc/database-issues/issues/6720
1624                unreachable!()
1625            }
1626            (Range, _, _) => {
1627                // Unsupported.
1628                // The planner doesn't allow Range frame mode for now (except for the default
1629                // frame), see https://github.com/MaterializeInc/database-issues/issues/6585
1630                // Note that it would be easy to handle (Range, CurrentRow, UnboundedFollowing):
1631                // it would be similar to (Rows, CurrentRow, UnboundedFollowing), but would call
1632                // groups_between_unbounded_preceding_current_row.
1633                unreachable!()
1634            }
1635            (Groups, _, _) => {
1636                // Unsupported.
1637                // The planner doesn't allow Groups frame mode for now, see
1638                // https://github.com/MaterializeInc/database-issues/issues/6588
1639                unreachable!()
1640            }
1641        }
1642    }
1643
1644    result
1645}
1646
1647/// Computes a bundle of fused window aggregations.
1648/// The input is similar to `window_aggr`, but `InputValue` is not just a single value, but a record
1649/// where each component is the input to one of the aggregations.
1650fn fused_window_aggr<'a, I, A>(
1651    input_datums: I,
1652    callers_temp_storage: &'a RowArena,
1653    wrapped_aggregates: &Vec<AggregateFunc>,
1654    order_by: &Vec<ColumnOrder>,
1655    window_frame: &WindowFrame,
1656) -> Datum<'a>
1657where
1658    I: IntoIterator<Item = Datum<'a>>,
1659    A: OneByOneAggr,
1660{
1661    let temp_storage = RowArena::new();
1662    let iter = fused_window_aggr_no_list::<_, A>(
1663        input_datums,
1664        &temp_storage,
1665        wrapped_aggregates,
1666        order_by,
1667        window_frame,
1668    );
1669    callers_temp_storage.make_datum(|packer| {
1670        packer.push_list(iter);
1671    })
1672}
1673
1674/// Like `fused_window_aggr`, but doesn't perform the final wrapping in a list, returning an
1675/// Iterator instead.
1676fn fused_window_aggr_no_list<'a: 'b, 'b, I, A>(
1677    input_datums: I,
1678    callers_temp_storage: &'b RowArena,
1679    wrapped_aggregates: &Vec<AggregateFunc>,
1680    order_by: &Vec<ColumnOrder>,
1681    window_frame: &WindowFrame,
1682) -> impl Iterator<Item = Datum<'b>>
1683where
1684    I: IntoIterator<Item = Datum<'a>>,
1685    A: OneByOneAggr,
1686{
1687    // Sort the datums according to the ORDER BY expressions and return the ((OriginalRow, InputValue), OrderByRow) record
1688    // The OrderByRow is kept around because it is required to compute the peer groups in RANGE mode
1689    let datums = order_aggregate_datums_with_rank(input_datums, order_by);
1690
1691    let size_hint = datums.size_hint().0;
1692    let mut argss = vec![Vec::with_capacity(size_hint); wrapped_aggregates.len()];
1693    let mut original_rows = Vec::with_capacity(size_hint);
1694    let mut order_by_rows = Vec::with_capacity(size_hint);
1695    for (d, order_by_row) in datums {
1696        let mut iter = d.unwrap_list().iter();
1697        let original_row = iter.next().unwrap();
1698        original_rows.push(original_row);
1699        let args_iter = iter.next().unwrap().unwrap_list().iter();
1700        // Push each argument into the respective list
1701        for (args, arg) in argss.iter_mut().zip_eq(args_iter) {
1702            args.push(arg);
1703        }
1704        order_by_rows.push(order_by_row);
1705    }
1706
1707    let mut results_per_row =
1708        vec![Vec::with_capacity(wrapped_aggregates.len()); original_rows.len()];
1709    for (wrapped_aggr, args) in wrapped_aggregates.iter().zip_eq(argss) {
1710        let results = window_aggr_inner::<A>(
1711            args,
1712            &order_by_rows,
1713            wrapped_aggr,
1714            order_by,
1715            window_frame,
1716            callers_temp_storage,
1717        );
1718        for (results, result) in results_per_row.iter_mut().zip_eq(results) {
1719            results.push(result);
1720        }
1721    }
1722
1723    callers_temp_storage.reserve(2 * original_rows.len());
1724    results_per_row
1725        .into_iter()
1726        .enumerate()
1727        .map(move |(i, results)| {
1728            callers_temp_storage.make_datum(|packer| {
1729                packer.push_list_with(|packer| {
1730                    packer
1731                        .push(callers_temp_storage.make_datum(|packer| packer.push_list(results)));
1732                    packer.push(original_rows[i]);
1733                });
1734            })
1735        })
1736}
1737
1738/// An implementation of an aggregation where we can send in the input elements one-by-one, and
1739/// can also ask the current aggregate at any moment. (This just delegates to other aggregation
1740/// evaluation approaches.)
1741pub trait OneByOneAggr {
1742    /// The `reverse` parameter makes the aggregations process input elements in reverse order.
1743    /// This has an effect only for non-commutative aggregations, e.g. `list_agg`. These are
1744    /// currently only some of the Basic aggregations. (Basic aggregations are handled by
1745    /// `NaiveOneByOneAggr`).
1746    fn new(agg: &AggregateFunc, reverse: bool) -> Self;
1747    /// Pushes one input element into the aggregation.
1748    fn give(&mut self, d: &Datum);
1749    /// Returns the value of the aggregate computed on the given values so far.
1750    fn get_current_aggregate<'a>(&self, temp_storage: &'a RowArena) -> Datum<'a>;
1751}
1752
1753/// Naive implementation of [OneByOneAggr], suitable for stuff like const folding, but too slow for
1754/// rendering. This relies only on infrastructure available in `mz-expr`. It simply saves all the
1755/// given input, and calls the given [AggregateFunc]'s `eval` method when asked about the current
1756/// aggregate. (For Accumulable and Hierarchical aggregations, the rendering has more efficient
1757/// implementations, but for Basic aggregations even the rendering uses this naive implementation.)
1758#[derive(Debug)]
1759pub struct NaiveOneByOneAggr {
1760    agg: AggregateFunc,
1761    input: Vec<Row>,
1762    reverse: bool,
1763}
1764
1765impl OneByOneAggr for NaiveOneByOneAggr {
1766    fn new(agg: &AggregateFunc, reverse: bool) -> Self {
1767        NaiveOneByOneAggr {
1768            agg: agg.clone(),
1769            input: Vec::new(),
1770            reverse,
1771        }
1772    }
1773
1774    fn give(&mut self, d: &Datum) {
1775        let mut row = Row::default();
1776        row.packer().push(d);
1777        self.input.push(row);
1778    }
1779
1780    fn get_current_aggregate<'a>(&self, temp_storage: &'a RowArena) -> Datum<'a> {
1781        temp_storage.make_datum(|packer| {
1782            packer.push(if !self.reverse {
1783                self.agg
1784                    .eval(self.input.iter().map(|r| r.unpack_first()), temp_storage)
1785            } else {
1786                self.agg.eval(
1787                    self.input.iter().rev().map(|r| r.unpack_first()),
1788                    temp_storage,
1789                )
1790            });
1791        })
1792    }
1793}
1794
1795/// Identify whether the given aggregate function is Lag or Lead, since they share
1796/// implementations.
1797#[derive(
1798    Arbitrary, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize, Hash, MzReflect,
1799)]
1800pub enum LagLeadType {
1801    Lag,
1802    Lead,
1803}
1804
1805#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize, Hash, MzReflect)]
1806pub enum AggregateFunc {
1807    MaxNumeric,
1808    MaxInt16,
1809    MaxInt32,
1810    MaxInt64,
1811    MaxUInt16,
1812    MaxUInt32,
1813    MaxUInt64,
1814    MaxMzTimestamp,
1815    MaxFloat32,
1816    MaxFloat64,
1817    MaxBool,
1818    MaxString,
1819    MaxDate,
1820    MaxTimestamp,
1821    MaxTimestampTz,
1822    MaxInterval,
1823    MaxTime,
1824    MinNumeric,
1825    MinInt16,
1826    MinInt32,
1827    MinInt64,
1828    MinUInt16,
1829    MinUInt32,
1830    MinUInt64,
1831    MinMzTimestamp,
1832    MinFloat32,
1833    MinFloat64,
1834    MinBool,
1835    MinString,
1836    MinDate,
1837    MinTimestamp,
1838    MinTimestampTz,
1839    MinInterval,
1840    MinTime,
1841    SumInt16,
1842    SumInt32,
1843    SumInt64,
1844    SumUInt16,
1845    SumUInt32,
1846    SumUInt64,
1847    SumFloat32,
1848    SumFloat64,
1849    SumNumeric,
1850    Count,
1851    Any,
1852    All,
1853    /// Accumulates `Datum::List`s whose first element is a JSON-typed `Datum`s
1854    /// into a JSON list. The other elements are columns used by `order_by`.
1855    ///
1856    /// WARNING: Unlike the `jsonb_agg` function that is exposed by the SQL
1857    /// layer, this function filters out `Datum::Null`, for consistency with
1858    /// the other aggregate functions.
1859    JsonbAgg {
1860        order_by: Vec<ColumnOrder>,
1861    },
1862    /// Zips `Datum::List`s whose first element is a JSON-typed `Datum`s into a
1863    /// JSON map. The other elements are columns used by `order_by`.
1864    ///
1865    /// WARNING: Unlike the `jsonb_object_agg` function that is exposed by the SQL
1866    /// layer, this function filters out `Datum::Null`, for consistency with
1867    /// the other aggregate functions.
1868    JsonbObjectAgg {
1869        order_by: Vec<ColumnOrder>,
1870    },
1871    /// Zips a `Datum::List` whose first element is a `Datum::List` guaranteed
1872    /// to be non-empty and whose len % 2 == 0 into a `Datum::Map`. The other
1873    /// elements are columns used by `order_by`.
1874    MapAgg {
1875        order_by: Vec<ColumnOrder>,
1876        value_type: ScalarType,
1877    },
1878    /// Accumulates `Datum::Array`s of `ScalarType::Record` whose first element is a `Datum::Array`
1879    /// into a single `Datum::Array` (the remaining fields are used by `order_by`).
1880    ArrayConcat {
1881        order_by: Vec<ColumnOrder>,
1882    },
1883    /// Accumulates `Datum::List`s of `ScalarType::Record` whose first field is a `Datum::List`
1884    /// into a single `Datum::List` (the remaining fields are used by `order_by`).
1885    ListConcat {
1886        order_by: Vec<ColumnOrder>,
1887    },
1888    StringAgg {
1889        order_by: Vec<ColumnOrder>,
1890    },
1891    RowNumber {
1892        order_by: Vec<ColumnOrder>,
1893    },
1894    Rank {
1895        order_by: Vec<ColumnOrder>,
1896    },
1897    DenseRank {
1898        order_by: Vec<ColumnOrder>,
1899    },
1900    LagLead {
1901        order_by: Vec<ColumnOrder>,
1902        lag_lead: LagLeadType,
1903        ignore_nulls: bool,
1904    },
1905    FirstValue {
1906        order_by: Vec<ColumnOrder>,
1907        window_frame: WindowFrame,
1908    },
1909    LastValue {
1910        order_by: Vec<ColumnOrder>,
1911        window_frame: WindowFrame,
1912    },
1913    /// Several value window functions fused into one function, to amortize overheads.
1914    FusedValueWindowFunc {
1915        funcs: Vec<AggregateFunc>,
1916        /// Currently, all the fused functions must have the same `order_by`. (We can later
1917        /// eliminate this limitation.)
1918        order_by: Vec<ColumnOrder>,
1919    },
1920    WindowAggregate {
1921        wrapped_aggregate: Box<AggregateFunc>,
1922        order_by: Vec<ColumnOrder>,
1923        window_frame: WindowFrame,
1924    },
1925    FusedWindowAggregate {
1926        wrapped_aggregates: Vec<AggregateFunc>,
1927        order_by: Vec<ColumnOrder>,
1928        window_frame: WindowFrame,
1929    },
1930    /// Accumulates any number of `Datum::Dummy`s into `Datum::Dummy`.
1931    ///
1932    /// Useful for removing an expensive aggregation while maintaining the shape
1933    /// of a reduce operator.
1934    Dummy,
1935}
1936
1937/// An explicit [`Arbitrary`] implementation needed here because of a known
1938/// `proptest` issue.
1939///
1940/// Revert to the derive-macro implementation once the issue[^1] is fixed.
1941///
1942/// [^1]: <https://github.com/AltSysrq/proptest/issues/152>
1943impl Arbitrary for AggregateFunc {
1944    type Parameters = ();
1945
1946    type Strategy = Union<BoxedStrategy<Self>>;
1947
1948    fn arbitrary_with(_: Self::Parameters) -> Self::Strategy {
1949        use proptest::collection::vec;
1950        use proptest::prelude::any as proptest_any;
1951        Union::new(vec![
1952            Just(AggregateFunc::MaxNumeric).boxed(),
1953            Just(AggregateFunc::MaxInt16).boxed(),
1954            Just(AggregateFunc::MaxInt32).boxed(),
1955            Just(AggregateFunc::MaxInt64).boxed(),
1956            Just(AggregateFunc::MaxUInt16).boxed(),
1957            Just(AggregateFunc::MaxUInt32).boxed(),
1958            Just(AggregateFunc::MaxUInt64).boxed(),
1959            Just(AggregateFunc::MaxMzTimestamp).boxed(),
1960            Just(AggregateFunc::MaxFloat32).boxed(),
1961            Just(AggregateFunc::MaxFloat64).boxed(),
1962            Just(AggregateFunc::MaxBool).boxed(),
1963            Just(AggregateFunc::MaxString).boxed(),
1964            Just(AggregateFunc::MaxTimestamp).boxed(),
1965            Just(AggregateFunc::MaxDate).boxed(),
1966            Just(AggregateFunc::MaxTimestampTz).boxed(),
1967            Just(AggregateFunc::MaxInterval).boxed(),
1968            Just(AggregateFunc::MaxTime).boxed(),
1969            Just(AggregateFunc::MinNumeric).boxed(),
1970            Just(AggregateFunc::MinInt16).boxed(),
1971            Just(AggregateFunc::MinInt32).boxed(),
1972            Just(AggregateFunc::MinInt64).boxed(),
1973            Just(AggregateFunc::MinUInt16).boxed(),
1974            Just(AggregateFunc::MinUInt32).boxed(),
1975            Just(AggregateFunc::MinUInt64).boxed(),
1976            Just(AggregateFunc::MinMzTimestamp).boxed(),
1977            Just(AggregateFunc::MinFloat32).boxed(),
1978            Just(AggregateFunc::MinFloat64).boxed(),
1979            Just(AggregateFunc::MinBool).boxed(),
1980            Just(AggregateFunc::MinString).boxed(),
1981            Just(AggregateFunc::MinDate).boxed(),
1982            Just(AggregateFunc::MinTimestamp).boxed(),
1983            Just(AggregateFunc::MinTimestampTz).boxed(),
1984            Just(AggregateFunc::MinInterval).boxed(),
1985            Just(AggregateFunc::MinTime).boxed(),
1986            Just(AggregateFunc::SumInt16).boxed(),
1987            Just(AggregateFunc::SumInt32).boxed(),
1988            Just(AggregateFunc::SumInt64).boxed(),
1989            Just(AggregateFunc::SumUInt16).boxed(),
1990            Just(AggregateFunc::SumUInt32).boxed(),
1991            Just(AggregateFunc::SumUInt64).boxed(),
1992            Just(AggregateFunc::SumFloat32).boxed(),
1993            Just(AggregateFunc::SumFloat64).boxed(),
1994            Just(AggregateFunc::SumNumeric).boxed(),
1995            Just(AggregateFunc::Count).boxed(),
1996            Just(AggregateFunc::Any).boxed(),
1997            Just(AggregateFunc::All).boxed(),
1998            vec(proptest_any::<ColumnOrder>(), 1..4)
1999                .prop_map(|order_by| AggregateFunc::JsonbAgg { order_by })
2000                .boxed(),
2001            vec(proptest_any::<ColumnOrder>(), 1..4)
2002                .prop_map(|order_by| AggregateFunc::JsonbObjectAgg { order_by })
2003                .boxed(),
2004            (
2005                vec(proptest_any::<ColumnOrder>(), 1..4),
2006                proptest_any::<ScalarType>(),
2007            )
2008                .prop_map(|(order_by, value_type)| AggregateFunc::MapAgg {
2009                    order_by,
2010                    value_type,
2011                })
2012                .boxed(),
2013            vec(proptest_any::<ColumnOrder>(), 1..4)
2014                .prop_map(|order_by| AggregateFunc::ArrayConcat { order_by })
2015                .boxed(),
2016            vec(proptest_any::<ColumnOrder>(), 1..4)
2017                .prop_map(|order_by| AggregateFunc::ListConcat { order_by })
2018                .boxed(),
2019            vec(proptest_any::<ColumnOrder>(), 1..4)
2020                .prop_map(|order_by| AggregateFunc::StringAgg { order_by })
2021                .boxed(),
2022            vec(proptest_any::<ColumnOrder>(), 1..4)
2023                .prop_map(|order_by| AggregateFunc::RowNumber { order_by })
2024                .boxed(),
2025            vec(proptest_any::<ColumnOrder>(), 1..4)
2026                .prop_map(|order_by| AggregateFunc::DenseRank { order_by })
2027                .boxed(),
2028            (
2029                vec(proptest_any::<ColumnOrder>(), 1..4),
2030                proptest_any::<LagLeadType>(),
2031                proptest_any::<bool>(),
2032            )
2033                .prop_map(
2034                    |(order_by, lag_lead, ignore_nulls)| AggregateFunc::LagLead {
2035                        order_by,
2036                        lag_lead,
2037                        ignore_nulls,
2038                    },
2039                )
2040                .boxed(),
2041            (
2042                vec(proptest_any::<ColumnOrder>(), 1..4),
2043                proptest_any::<WindowFrame>(),
2044            )
2045                .prop_map(|(order_by, window_frame)| AggregateFunc::FirstValue {
2046                    order_by,
2047                    window_frame,
2048                })
2049                .boxed(),
2050            (
2051                vec(proptest_any::<ColumnOrder>(), 1..4),
2052                proptest_any::<WindowFrame>(),
2053            )
2054                .prop_map(|(order_by, window_frame)| AggregateFunc::LastValue {
2055                    order_by,
2056                    window_frame,
2057                })
2058                .boxed(),
2059            Just(AggregateFunc::Dummy).boxed(),
2060        ])
2061    }
2062}
2063
2064impl RustType<ProtoColumnOrders> for Vec<ColumnOrder> {
2065    fn into_proto(&self) -> ProtoColumnOrders {
2066        ProtoColumnOrders {
2067            orders: self.into_proto(),
2068        }
2069    }
2070
2071    fn from_proto(proto: ProtoColumnOrders) -> Result<Self, TryFromProtoError> {
2072        proto.orders.into_rust()
2073    }
2074}
2075
2076impl RustType<ProtoAggregateFunc> for AggregateFunc {
2077    fn into_proto(&self) -> ProtoAggregateFunc {
2078        use proto_aggregate_func::Kind;
2079        ProtoAggregateFunc {
2080            kind: Some(match self {
2081                AggregateFunc::MaxNumeric => Kind::MaxNumeric(()),
2082                AggregateFunc::MaxInt16 => Kind::MaxInt16(()),
2083                AggregateFunc::MaxInt32 => Kind::MaxInt32(()),
2084                AggregateFunc::MaxInt64 => Kind::MaxInt64(()),
2085                AggregateFunc::MaxUInt16 => Kind::MaxUint16(()),
2086                AggregateFunc::MaxUInt32 => Kind::MaxUint32(()),
2087                AggregateFunc::MaxUInt64 => Kind::MaxUint64(()),
2088                AggregateFunc::MaxMzTimestamp => Kind::MaxMzTimestamp(()),
2089                AggregateFunc::MaxFloat32 => Kind::MaxFloat32(()),
2090                AggregateFunc::MaxFloat64 => Kind::MaxFloat64(()),
2091                AggregateFunc::MaxBool => Kind::MaxBool(()),
2092                AggregateFunc::MaxString => Kind::MaxString(()),
2093                AggregateFunc::MaxDate => Kind::MaxDate(()),
2094                AggregateFunc::MaxTimestamp => Kind::MaxTimestamp(()),
2095                AggregateFunc::MaxTimestampTz => Kind::MaxTimestampTz(()),
2096                AggregateFunc::MinNumeric => Kind::MinNumeric(()),
2097                AggregateFunc::MaxInterval => Kind::MaxInterval(()),
2098                AggregateFunc::MaxTime => Kind::MaxTime(()),
2099                AggregateFunc::MinInt16 => Kind::MinInt16(()),
2100                AggregateFunc::MinInt32 => Kind::MinInt32(()),
2101                AggregateFunc::MinInt64 => Kind::MinInt64(()),
2102                AggregateFunc::MinUInt16 => Kind::MinUint16(()),
2103                AggregateFunc::MinUInt32 => Kind::MinUint32(()),
2104                AggregateFunc::MinUInt64 => Kind::MinUint64(()),
2105                AggregateFunc::MinMzTimestamp => Kind::MinMzTimestamp(()),
2106                AggregateFunc::MinFloat32 => Kind::MinFloat32(()),
2107                AggregateFunc::MinFloat64 => Kind::MinFloat64(()),
2108                AggregateFunc::MinBool => Kind::MinBool(()),
2109                AggregateFunc::MinString => Kind::MinString(()),
2110                AggregateFunc::MinDate => Kind::MinDate(()),
2111                AggregateFunc::MinTimestamp => Kind::MinTimestamp(()),
2112                AggregateFunc::MinTimestampTz => Kind::MinTimestampTz(()),
2113                AggregateFunc::MinInterval => Kind::MinInterval(()),
2114                AggregateFunc::MinTime => Kind::MinTime(()),
2115                AggregateFunc::SumInt16 => Kind::SumInt16(()),
2116                AggregateFunc::SumInt32 => Kind::SumInt32(()),
2117                AggregateFunc::SumInt64 => Kind::SumInt64(()),
2118                AggregateFunc::SumUInt16 => Kind::SumUint16(()),
2119                AggregateFunc::SumUInt32 => Kind::SumUint32(()),
2120                AggregateFunc::SumUInt64 => Kind::SumUint64(()),
2121                AggregateFunc::SumFloat32 => Kind::SumFloat32(()),
2122                AggregateFunc::SumFloat64 => Kind::SumFloat64(()),
2123                AggregateFunc::SumNumeric => Kind::SumNumeric(()),
2124                AggregateFunc::Count => Kind::Count(()),
2125                AggregateFunc::Any => Kind::Any(()),
2126                AggregateFunc::All => Kind::All(()),
2127                AggregateFunc::JsonbAgg { order_by } => Kind::JsonbAgg(order_by.into_proto()),
2128                AggregateFunc::JsonbObjectAgg { order_by } => {
2129                    Kind::JsonbObjectAgg(order_by.into_proto())
2130                }
2131                AggregateFunc::MapAgg {
2132                    order_by,
2133                    value_type,
2134                } => Kind::MapAgg(proto_aggregate_func::ProtoMapAgg {
2135                    order_by: Some(order_by.into_proto()),
2136                    value_type: Some(value_type.into_proto()),
2137                }),
2138                AggregateFunc::ArrayConcat { order_by } => Kind::ArrayConcat(order_by.into_proto()),
2139                AggregateFunc::ListConcat { order_by } => Kind::ListConcat(order_by.into_proto()),
2140                AggregateFunc::StringAgg { order_by } => Kind::StringAgg(order_by.into_proto()),
2141                AggregateFunc::RowNumber { order_by } => Kind::RowNumber(order_by.into_proto()),
2142                AggregateFunc::Rank { order_by } => Kind::Rank(order_by.into_proto()),
2143                AggregateFunc::DenseRank { order_by } => Kind::DenseRank(order_by.into_proto()),
2144                AggregateFunc::LagLead {
2145                    order_by,
2146                    lag_lead,
2147                    ignore_nulls,
2148                } => Kind::LagLead(proto_aggregate_func::ProtoLagLead {
2149                    order_by: Some(order_by.into_proto()),
2150                    lag_lead: Some(match lag_lead {
2151                        LagLeadType::Lag => proto_aggregate_func::proto_lag_lead::LagLead::Lag(()),
2152                        LagLeadType::Lead => {
2153                            proto_aggregate_func::proto_lag_lead::LagLead::Lead(())
2154                        }
2155                    }),
2156                    ignore_nulls: *ignore_nulls,
2157                }),
2158                AggregateFunc::FirstValue {
2159                    order_by,
2160                    window_frame,
2161                } => Kind::FirstValue(proto_aggregate_func::ProtoFramedWindowFunc {
2162                    order_by: Some(order_by.into_proto()),
2163                    window_frame: Some(window_frame.into_proto()),
2164                }),
2165                AggregateFunc::LastValue {
2166                    order_by,
2167                    window_frame,
2168                } => Kind::LastValue(proto_aggregate_func::ProtoFramedWindowFunc {
2169                    order_by: Some(order_by.into_proto()),
2170                    window_frame: Some(window_frame.into_proto()),
2171                }),
2172                AggregateFunc::WindowAggregate {
2173                    wrapped_aggregate,
2174                    order_by,
2175                    window_frame,
2176                } => Kind::WindowAggregate(Box::new(proto_aggregate_func::ProtoWindowAggregate {
2177                    wrapped_aggregate: Some(wrapped_aggregate.into_proto()),
2178                    order_by: Some(order_by.into_proto()),
2179                    window_frame: Some(window_frame.into_proto()),
2180                })),
2181                AggregateFunc::FusedValueWindowFunc { funcs, order_by } => {
2182                    Kind::FusedValueWindowFunc(ProtoFusedValueWindowFunc {
2183                        funcs: funcs.into_proto(),
2184                        order_by: Some(order_by.into_proto()),
2185                    })
2186                }
2187                AggregateFunc::FusedWindowAggregate {
2188                    wrapped_aggregates,
2189                    order_by,
2190                    window_frame,
2191                } => Kind::FusedWindowAggregate(ProtoFusedWindowAggregate {
2192                    wrapped_aggregates: wrapped_aggregates.into_proto(),
2193                    order_by: Some(order_by.into_proto()),
2194                    window_frame: Some(window_frame.into_proto()),
2195                }),
2196                AggregateFunc::Dummy => Kind::Dummy(()),
2197            }),
2198        }
2199    }
2200
2201    fn from_proto(proto: ProtoAggregateFunc) -> Result<Self, TryFromProtoError> {
2202        use proto_aggregate_func::Kind;
2203        let kind = proto
2204            .kind
2205            .ok_or_else(|| TryFromProtoError::missing_field("ProtoAggregateFunc::kind"))?;
2206        Ok(match kind {
2207            Kind::MaxNumeric(()) => AggregateFunc::MaxNumeric,
2208            Kind::MaxInt16(()) => AggregateFunc::MaxInt16,
2209            Kind::MaxInt32(()) => AggregateFunc::MaxInt32,
2210            Kind::MaxInt64(()) => AggregateFunc::MaxInt64,
2211            Kind::MaxUint16(()) => AggregateFunc::MaxUInt16,
2212            Kind::MaxUint32(()) => AggregateFunc::MaxUInt32,
2213            Kind::MaxUint64(()) => AggregateFunc::MaxUInt64,
2214            Kind::MaxMzTimestamp(()) => AggregateFunc::MaxMzTimestamp,
2215            Kind::MaxFloat32(()) => AggregateFunc::MaxFloat32,
2216            Kind::MaxFloat64(()) => AggregateFunc::MaxFloat64,
2217            Kind::MaxBool(()) => AggregateFunc::MaxBool,
2218            Kind::MaxString(()) => AggregateFunc::MaxString,
2219            Kind::MaxDate(()) => AggregateFunc::MaxDate,
2220            Kind::MaxTimestamp(()) => AggregateFunc::MaxTimestamp,
2221            Kind::MaxTimestampTz(()) => AggregateFunc::MaxTimestampTz,
2222            Kind::MaxInterval(()) => AggregateFunc::MaxInterval,
2223            Kind::MaxTime(()) => AggregateFunc::MaxTime,
2224            Kind::MinNumeric(()) => AggregateFunc::MinNumeric,
2225            Kind::MinInt16(()) => AggregateFunc::MinInt16,
2226            Kind::MinInt32(()) => AggregateFunc::MinInt32,
2227            Kind::MinInt64(()) => AggregateFunc::MinInt64,
2228            Kind::MinUint16(()) => AggregateFunc::MinUInt16,
2229            Kind::MinUint32(()) => AggregateFunc::MinUInt32,
2230            Kind::MinUint64(()) => AggregateFunc::MinUInt64,
2231            Kind::MinMzTimestamp(()) => AggregateFunc::MinMzTimestamp,
2232            Kind::MinFloat32(()) => AggregateFunc::MinFloat32,
2233            Kind::MinFloat64(()) => AggregateFunc::MinFloat64,
2234            Kind::MinBool(()) => AggregateFunc::MinBool,
2235            Kind::MinString(()) => AggregateFunc::MinString,
2236            Kind::MinDate(()) => AggregateFunc::MinDate,
2237            Kind::MinTimestamp(()) => AggregateFunc::MinTimestamp,
2238            Kind::MinTimestampTz(()) => AggregateFunc::MinTimestampTz,
2239            Kind::MinInterval(()) => AggregateFunc::MinInterval,
2240            Kind::MinTime(()) => AggregateFunc::MinTime,
2241            Kind::SumInt16(()) => AggregateFunc::SumInt16,
2242            Kind::SumInt32(()) => AggregateFunc::SumInt32,
2243            Kind::SumInt64(()) => AggregateFunc::SumInt64,
2244            Kind::SumUint16(()) => AggregateFunc::SumUInt16,
2245            Kind::SumUint32(()) => AggregateFunc::SumUInt32,
2246            Kind::SumUint64(()) => AggregateFunc::SumUInt64,
2247            Kind::SumFloat32(()) => AggregateFunc::SumFloat32,
2248            Kind::SumFloat64(()) => AggregateFunc::SumFloat64,
2249            Kind::SumNumeric(()) => AggregateFunc::SumNumeric,
2250            Kind::Count(()) => AggregateFunc::Count,
2251            Kind::Any(()) => AggregateFunc::Any,
2252            Kind::All(()) => AggregateFunc::All,
2253            Kind::JsonbAgg(order_by) => AggregateFunc::JsonbAgg {
2254                order_by: order_by.into_rust()?,
2255            },
2256            Kind::JsonbObjectAgg(order_by) => AggregateFunc::JsonbObjectAgg {
2257                order_by: order_by.into_rust()?,
2258            },
2259            Kind::MapAgg(pma) => AggregateFunc::MapAgg {
2260                order_by: pma.order_by.into_rust_if_some("ProtoMapAgg::order_by")?,
2261                value_type: pma
2262                    .value_type
2263                    .into_rust_if_some("ProtoMapAgg::value_type")?,
2264            },
2265            Kind::ArrayConcat(order_by) => AggregateFunc::ArrayConcat {
2266                order_by: order_by.into_rust()?,
2267            },
2268            Kind::ListConcat(order_by) => AggregateFunc::ListConcat {
2269                order_by: order_by.into_rust()?,
2270            },
2271            Kind::StringAgg(order_by) => AggregateFunc::StringAgg {
2272                order_by: order_by.into_rust()?,
2273            },
2274            Kind::RowNumber(order_by) => AggregateFunc::RowNumber {
2275                order_by: order_by.into_rust()?,
2276            },
2277            Kind::Rank(order_by) => AggregateFunc::Rank {
2278                order_by: order_by.into_rust()?,
2279            },
2280            Kind::DenseRank(order_by) => AggregateFunc::DenseRank {
2281                order_by: order_by.into_rust()?,
2282            },
2283            Kind::LagLead(pll) => AggregateFunc::LagLead {
2284                order_by: pll.order_by.into_rust_if_some("ProtoLagLead::order_by")?,
2285                lag_lead: match pll.lag_lead {
2286                    Some(proto_aggregate_func::proto_lag_lead::LagLead::Lag(())) => {
2287                        LagLeadType::Lag
2288                    }
2289                    Some(proto_aggregate_func::proto_lag_lead::LagLead::Lead(())) => {
2290                        LagLeadType::Lead
2291                    }
2292                    None => {
2293                        return Err(TryFromProtoError::MissingField(
2294                            "ProtoLagLead::lag_lead".into(),
2295                        ));
2296                    }
2297                },
2298                ignore_nulls: pll.ignore_nulls,
2299            },
2300            Kind::FirstValue(pfv) => AggregateFunc::FirstValue {
2301                order_by: pfv
2302                    .order_by
2303                    .into_rust_if_some("ProtoFramedWindowFunc::order_by")?,
2304                window_frame: pfv
2305                    .window_frame
2306                    .into_rust_if_some("ProtoFramedWindowFunc::window_frame")?,
2307            },
2308            Kind::LastValue(pfv) => AggregateFunc::LastValue {
2309                order_by: pfv
2310                    .order_by
2311                    .into_rust_if_some("ProtoFramedWindowFunc::order_by")?,
2312                window_frame: pfv
2313                    .window_frame
2314                    .into_rust_if_some("ProtoFramedWindowFunc::window_frame")?,
2315            },
2316            Kind::WindowAggregate(paf) => AggregateFunc::WindowAggregate {
2317                wrapped_aggregate: paf
2318                    .wrapped_aggregate
2319                    .into_rust_if_some("ProtoWindowAggregate::wrapped_aggregate")?,
2320                order_by: paf
2321                    .order_by
2322                    .into_rust_if_some("ProtoWindowAggregate::order_by")?,
2323                window_frame: paf
2324                    .window_frame
2325                    .into_rust_if_some("ProtoWindowAggregate::window_frame")?,
2326            },
2327            Kind::FusedValueWindowFunc(fvwf) => AggregateFunc::FusedValueWindowFunc {
2328                funcs: fvwf.funcs.into_rust()?,
2329                order_by: fvwf
2330                    .order_by
2331                    .into_rust_if_some("ProtoFusedValueWindowFunc::order_by")?,
2332            },
2333            Kind::FusedWindowAggregate(fwa) => AggregateFunc::FusedWindowAggregate {
2334                wrapped_aggregates: fwa.wrapped_aggregates.into_rust()?,
2335                order_by: fwa
2336                    .order_by
2337                    .into_rust_if_some("ProtoFusedWindowAggregate::order_by")?,
2338                window_frame: fwa
2339                    .window_frame
2340                    .into_rust_if_some("ProtoFusedWindowAggregate::window_frame")?,
2341            },
2342            Kind::Dummy(()) => AggregateFunc::Dummy,
2343        })
2344    }
2345}
2346
2347impl AggregateFunc {
2348    pub fn eval<'a, I>(&self, datums: I, temp_storage: &'a RowArena) -> Datum<'a>
2349    where
2350        I: IntoIterator<Item = Datum<'a>>,
2351    {
2352        match self {
2353            AggregateFunc::MaxNumeric => {
2354                max_datum::<'a, I, OrderedDecimal<numeric::Numeric>>(datums)
2355            }
2356            AggregateFunc::MaxInt16 => max_datum::<'a, I, i16>(datums),
2357            AggregateFunc::MaxInt32 => max_datum::<'a, I, i32>(datums),
2358            AggregateFunc::MaxInt64 => max_datum::<'a, I, i64>(datums),
2359            AggregateFunc::MaxUInt16 => max_datum::<'a, I, u16>(datums),
2360            AggregateFunc::MaxUInt32 => max_datum::<'a, I, u32>(datums),
2361            AggregateFunc::MaxUInt64 => max_datum::<'a, I, u64>(datums),
2362            AggregateFunc::MaxMzTimestamp => max_datum::<'a, I, mz_repr::Timestamp>(datums),
2363            AggregateFunc::MaxFloat32 => max_datum::<'a, I, OrderedFloat<f32>>(datums),
2364            AggregateFunc::MaxFloat64 => max_datum::<'a, I, OrderedFloat<f64>>(datums),
2365            AggregateFunc::MaxBool => max_datum::<'a, I, bool>(datums),
2366            AggregateFunc::MaxString => max_string(datums),
2367            AggregateFunc::MaxDate => max_datum::<'a, I, Date>(datums),
2368            AggregateFunc::MaxTimestamp => {
2369                max_datum::<'a, I, CheckedTimestamp<NaiveDateTime>>(datums)
2370            }
2371            AggregateFunc::MaxTimestampTz => {
2372                max_datum::<'a, I, CheckedTimestamp<DateTime<Utc>>>(datums)
2373            }
2374            AggregateFunc::MaxInterval => max_datum::<'a, I, Interval>(datums),
2375            AggregateFunc::MaxTime => max_datum::<'a, I, NaiveTime>(datums),
2376            AggregateFunc::MinNumeric => {
2377                min_datum::<'a, I, OrderedDecimal<numeric::Numeric>>(datums)
2378            }
2379            AggregateFunc::MinInt16 => min_datum::<'a, I, i16>(datums),
2380            AggregateFunc::MinInt32 => min_datum::<'a, I, i32>(datums),
2381            AggregateFunc::MinInt64 => min_datum::<'a, I, i64>(datums),
2382            AggregateFunc::MinUInt16 => min_datum::<'a, I, u16>(datums),
2383            AggregateFunc::MinUInt32 => min_datum::<'a, I, u32>(datums),
2384            AggregateFunc::MinUInt64 => min_datum::<'a, I, u64>(datums),
2385            AggregateFunc::MinMzTimestamp => min_datum::<'a, I, mz_repr::Timestamp>(datums),
2386            AggregateFunc::MinFloat32 => min_datum::<'a, I, OrderedFloat<f32>>(datums),
2387            AggregateFunc::MinFloat64 => min_datum::<'a, I, OrderedFloat<f64>>(datums),
2388            AggregateFunc::MinBool => min_datum::<'a, I, bool>(datums),
2389            AggregateFunc::MinString => min_string(datums),
2390            AggregateFunc::MinDate => min_datum::<'a, I, Date>(datums),
2391            AggregateFunc::MinTimestamp => {
2392                min_datum::<'a, I, CheckedTimestamp<NaiveDateTime>>(datums)
2393            }
2394            AggregateFunc::MinTimestampTz => {
2395                min_datum::<'a, I, CheckedTimestamp<DateTime<Utc>>>(datums)
2396            }
2397            AggregateFunc::MinInterval => min_datum::<'a, I, Interval>(datums),
2398            AggregateFunc::MinTime => min_datum::<'a, I, NaiveTime>(datums),
2399            AggregateFunc::SumInt16 => sum_datum::<'a, I, i16, i64>(datums),
2400            AggregateFunc::SumInt32 => sum_datum::<'a, I, i32, i64>(datums),
2401            AggregateFunc::SumInt64 => sum_datum::<'a, I, i64, i128>(datums),
2402            AggregateFunc::SumUInt16 => sum_datum::<'a, I, u16, u64>(datums),
2403            AggregateFunc::SumUInt32 => sum_datum::<'a, I, u32, u64>(datums),
2404            AggregateFunc::SumUInt64 => sum_datum::<'a, I, u64, u128>(datums),
2405            AggregateFunc::SumFloat32 => sum_datum::<'a, I, f32, f32>(datums),
2406            AggregateFunc::SumFloat64 => sum_datum::<'a, I, f64, f64>(datums),
2407            AggregateFunc::SumNumeric => sum_numeric(datums),
2408            AggregateFunc::Count => count(datums),
2409            AggregateFunc::Any => any(datums),
2410            AggregateFunc::All => all(datums),
2411            AggregateFunc::JsonbAgg { order_by } => jsonb_agg(datums, temp_storage, order_by),
2412            AggregateFunc::MapAgg { order_by, .. } | AggregateFunc::JsonbObjectAgg { order_by } => {
2413                dict_agg(datums, temp_storage, order_by)
2414            }
2415            AggregateFunc::ArrayConcat { order_by } => array_concat(datums, temp_storage, order_by),
2416            AggregateFunc::ListConcat { order_by } => list_concat(datums, temp_storage, order_by),
2417            AggregateFunc::StringAgg { order_by } => string_agg(datums, temp_storage, order_by),
2418            AggregateFunc::RowNumber { order_by } => row_number(datums, temp_storage, order_by),
2419            AggregateFunc::Rank { order_by } => rank(datums, temp_storage, order_by),
2420            AggregateFunc::DenseRank { order_by } => dense_rank(datums, temp_storage, order_by),
2421            AggregateFunc::LagLead {
2422                order_by,
2423                lag_lead: lag_lead_type,
2424                ignore_nulls,
2425            } => lag_lead(datums, temp_storage, order_by, lag_lead_type, ignore_nulls),
2426            AggregateFunc::FirstValue {
2427                order_by,
2428                window_frame,
2429            } => first_value(datums, temp_storage, order_by, window_frame),
2430            AggregateFunc::LastValue {
2431                order_by,
2432                window_frame,
2433            } => last_value(datums, temp_storage, order_by, window_frame),
2434            AggregateFunc::WindowAggregate {
2435                wrapped_aggregate,
2436                order_by,
2437                window_frame,
2438            } => window_aggr::<_, NaiveOneByOneAggr>(
2439                datums,
2440                temp_storage,
2441                wrapped_aggregate,
2442                order_by,
2443                window_frame,
2444            ),
2445            AggregateFunc::FusedValueWindowFunc { funcs, order_by } => {
2446                fused_value_window_func(datums, temp_storage, funcs, order_by)
2447            }
2448            AggregateFunc::FusedWindowAggregate {
2449                wrapped_aggregates,
2450                order_by,
2451                window_frame,
2452            } => fused_window_aggr::<_, NaiveOneByOneAggr>(
2453                datums,
2454                temp_storage,
2455                wrapped_aggregates,
2456                order_by,
2457                window_frame,
2458            ),
2459            AggregateFunc::Dummy => Datum::Dummy,
2460        }
2461    }
2462
2463    /// Like `eval`, but it's given a [OneByOneAggr]. If `self` is a `WindowAggregate`, then
2464    /// the given [OneByOneAggr] will be used to evaluate the wrapped aggregate inside the
2465    /// `WindowAggregate`. If `self` is not a `WindowAggregate`, then it simply calls `eval`.
2466    pub fn eval_with_fast_window_agg<'a, I, W>(
2467        &self,
2468        datums: I,
2469        temp_storage: &'a RowArena,
2470    ) -> Datum<'a>
2471    where
2472        I: IntoIterator<Item = Datum<'a>>,
2473        W: OneByOneAggr,
2474    {
2475        match self {
2476            AggregateFunc::WindowAggregate {
2477                wrapped_aggregate,
2478                order_by,
2479                window_frame,
2480            } => window_aggr::<_, W>(
2481                datums,
2482                temp_storage,
2483                wrapped_aggregate,
2484                order_by,
2485                window_frame,
2486            ),
2487            AggregateFunc::FusedWindowAggregate {
2488                wrapped_aggregates,
2489                order_by,
2490                window_frame,
2491            } => fused_window_aggr::<_, W>(
2492                datums,
2493                temp_storage,
2494                wrapped_aggregates,
2495                order_by,
2496                window_frame,
2497            ),
2498            _ => self.eval(datums, temp_storage),
2499        }
2500    }
2501
2502    pub fn eval_with_unnest_list<'a, I, W>(
2503        &self,
2504        datums: I,
2505        temp_storage: &'a RowArena,
2506    ) -> impl Iterator<Item = Datum<'a>>
2507    where
2508        I: IntoIterator<Item = Datum<'a>>,
2509        W: OneByOneAggr,
2510    {
2511        // TODO: Use `enum_dispatch` to construct a unified iterator instead of `collect_vec`.
2512        assert!(self.can_fuse_with_unnest_list());
2513        match self {
2514            AggregateFunc::RowNumber { order_by } => {
2515                row_number_no_list(datums, temp_storage, order_by).collect_vec()
2516            }
2517            AggregateFunc::Rank { order_by } => {
2518                rank_no_list(datums, temp_storage, order_by).collect_vec()
2519            }
2520            AggregateFunc::DenseRank { order_by } => {
2521                dense_rank_no_list(datums, temp_storage, order_by).collect_vec()
2522            }
2523            AggregateFunc::LagLead {
2524                order_by,
2525                lag_lead: lag_lead_type,
2526                ignore_nulls,
2527            } => lag_lead_no_list(datums, temp_storage, order_by, lag_lead_type, ignore_nulls)
2528                .collect_vec(),
2529            AggregateFunc::FirstValue {
2530                order_by,
2531                window_frame,
2532            } => first_value_no_list(datums, temp_storage, order_by, window_frame).collect_vec(),
2533            AggregateFunc::LastValue {
2534                order_by,
2535                window_frame,
2536            } => last_value_no_list(datums, temp_storage, order_by, window_frame).collect_vec(),
2537            AggregateFunc::FusedValueWindowFunc { funcs, order_by } => {
2538                fused_value_window_func_no_list(datums, temp_storage, funcs, order_by).collect_vec()
2539            }
2540            AggregateFunc::WindowAggregate {
2541                wrapped_aggregate,
2542                order_by,
2543                window_frame,
2544            } => window_aggr_no_list::<_, W>(
2545                datums,
2546                temp_storage,
2547                wrapped_aggregate,
2548                order_by,
2549                window_frame,
2550            )
2551            .collect_vec(),
2552            AggregateFunc::FusedWindowAggregate {
2553                wrapped_aggregates,
2554                order_by,
2555                window_frame,
2556            } => fused_window_aggr_no_list::<_, W>(
2557                datums,
2558                temp_storage,
2559                wrapped_aggregates,
2560                order_by,
2561                window_frame,
2562            )
2563            .collect_vec(),
2564            _ => unreachable!("asserted above that `can_fuse_with_unnest_list`"),
2565        }
2566        .into_iter()
2567    }
2568
2569    /// Returns the output of the aggregation function when applied on an empty
2570    /// input relation.
2571    pub fn default(&self) -> Datum<'static> {
2572        match self {
2573            AggregateFunc::Count => Datum::Int64(0),
2574            AggregateFunc::Any => Datum::False,
2575            AggregateFunc::All => Datum::True,
2576            AggregateFunc::Dummy => Datum::Dummy,
2577            _ => Datum::Null,
2578        }
2579    }
2580
2581    /// Returns a datum whose inclusion in the aggregation will not change its
2582    /// result.
2583    pub fn identity_datum(&self) -> Datum<'static> {
2584        match self {
2585            AggregateFunc::Any => Datum::False,
2586            AggregateFunc::All => Datum::True,
2587            AggregateFunc::Dummy => Datum::Dummy,
2588            AggregateFunc::ArrayConcat { .. } => Datum::empty_array(),
2589            AggregateFunc::ListConcat { .. } => Datum::empty_list(),
2590            AggregateFunc::RowNumber { .. }
2591            | AggregateFunc::Rank { .. }
2592            | AggregateFunc::DenseRank { .. }
2593            | AggregateFunc::LagLead { .. }
2594            | AggregateFunc::FirstValue { .. }
2595            | AggregateFunc::LastValue { .. }
2596            | AggregateFunc::WindowAggregate { .. }
2597            | AggregateFunc::FusedValueWindowFunc { .. }
2598            | AggregateFunc::FusedWindowAggregate { .. } => Datum::empty_list(),
2599            AggregateFunc::MaxNumeric
2600            | AggregateFunc::MaxInt16
2601            | AggregateFunc::MaxInt32
2602            | AggregateFunc::MaxInt64
2603            | AggregateFunc::MaxUInt16
2604            | AggregateFunc::MaxUInt32
2605            | AggregateFunc::MaxUInt64
2606            | AggregateFunc::MaxMzTimestamp
2607            | AggregateFunc::MaxFloat32
2608            | AggregateFunc::MaxFloat64
2609            | AggregateFunc::MaxBool
2610            | AggregateFunc::MaxString
2611            | AggregateFunc::MaxDate
2612            | AggregateFunc::MaxTimestamp
2613            | AggregateFunc::MaxTimestampTz
2614            | AggregateFunc::MaxInterval
2615            | AggregateFunc::MaxTime
2616            | AggregateFunc::MinNumeric
2617            | AggregateFunc::MinInt16
2618            | AggregateFunc::MinInt32
2619            | AggregateFunc::MinInt64
2620            | AggregateFunc::MinUInt16
2621            | AggregateFunc::MinUInt32
2622            | AggregateFunc::MinUInt64
2623            | AggregateFunc::MinMzTimestamp
2624            | AggregateFunc::MinFloat32
2625            | AggregateFunc::MinFloat64
2626            | AggregateFunc::MinBool
2627            | AggregateFunc::MinString
2628            | AggregateFunc::MinDate
2629            | AggregateFunc::MinTimestamp
2630            | AggregateFunc::MinTimestampTz
2631            | AggregateFunc::MinInterval
2632            | AggregateFunc::MinTime
2633            | AggregateFunc::SumInt16
2634            | AggregateFunc::SumInt32
2635            | AggregateFunc::SumInt64
2636            | AggregateFunc::SumUInt16
2637            | AggregateFunc::SumUInt32
2638            | AggregateFunc::SumUInt64
2639            | AggregateFunc::SumFloat32
2640            | AggregateFunc::SumFloat64
2641            | AggregateFunc::SumNumeric
2642            | AggregateFunc::Count
2643            | AggregateFunc::JsonbAgg { .. }
2644            | AggregateFunc::JsonbObjectAgg { .. }
2645            | AggregateFunc::MapAgg { .. }
2646            | AggregateFunc::StringAgg { .. } => Datum::Null,
2647        }
2648    }
2649
2650    pub fn can_fuse_with_unnest_list(&self) -> bool {
2651        match self {
2652            AggregateFunc::RowNumber { .. }
2653            | AggregateFunc::Rank { .. }
2654            | AggregateFunc::DenseRank { .. }
2655            | AggregateFunc::LagLead { .. }
2656            | AggregateFunc::FirstValue { .. }
2657            | AggregateFunc::LastValue { .. }
2658            | AggregateFunc::WindowAggregate { .. }
2659            | AggregateFunc::FusedValueWindowFunc { .. }
2660            | AggregateFunc::FusedWindowAggregate { .. } => true,
2661            AggregateFunc::ArrayConcat { .. }
2662            | AggregateFunc::ListConcat { .. }
2663            | AggregateFunc::Any
2664            | AggregateFunc::All
2665            | AggregateFunc::Dummy
2666            | AggregateFunc::MaxNumeric
2667            | AggregateFunc::MaxInt16
2668            | AggregateFunc::MaxInt32
2669            | AggregateFunc::MaxInt64
2670            | AggregateFunc::MaxUInt16
2671            | AggregateFunc::MaxUInt32
2672            | AggregateFunc::MaxUInt64
2673            | AggregateFunc::MaxMzTimestamp
2674            | AggregateFunc::MaxFloat32
2675            | AggregateFunc::MaxFloat64
2676            | AggregateFunc::MaxBool
2677            | AggregateFunc::MaxString
2678            | AggregateFunc::MaxDate
2679            | AggregateFunc::MaxTimestamp
2680            | AggregateFunc::MaxTimestampTz
2681            | AggregateFunc::MaxInterval
2682            | AggregateFunc::MaxTime
2683            | AggregateFunc::MinNumeric
2684            | AggregateFunc::MinInt16
2685            | AggregateFunc::MinInt32
2686            | AggregateFunc::MinInt64
2687            | AggregateFunc::MinUInt16
2688            | AggregateFunc::MinUInt32
2689            | AggregateFunc::MinUInt64
2690            | AggregateFunc::MinMzTimestamp
2691            | AggregateFunc::MinFloat32
2692            | AggregateFunc::MinFloat64
2693            | AggregateFunc::MinBool
2694            | AggregateFunc::MinString
2695            | AggregateFunc::MinDate
2696            | AggregateFunc::MinTimestamp
2697            | AggregateFunc::MinTimestampTz
2698            | AggregateFunc::MinInterval
2699            | AggregateFunc::MinTime
2700            | AggregateFunc::SumInt16
2701            | AggregateFunc::SumInt32
2702            | AggregateFunc::SumInt64
2703            | AggregateFunc::SumUInt16
2704            | AggregateFunc::SumUInt32
2705            | AggregateFunc::SumUInt64
2706            | AggregateFunc::SumFloat32
2707            | AggregateFunc::SumFloat64
2708            | AggregateFunc::SumNumeric
2709            | AggregateFunc::Count
2710            | AggregateFunc::JsonbAgg { .. }
2711            | AggregateFunc::JsonbObjectAgg { .. }
2712            | AggregateFunc::MapAgg { .. }
2713            | AggregateFunc::StringAgg { .. } => false,
2714        }
2715    }
2716
2717    /// The output column type for the result of an aggregation.
2718    ///
2719    /// The output column type also contains nullability information, which
2720    /// is (without further information) true for aggregations that are not
2721    /// counts.
2722    pub fn output_type(&self, input_type: ColumnType) -> ColumnType {
2723        let scalar_type = match self {
2724            AggregateFunc::Count => ScalarType::Int64,
2725            AggregateFunc::Any => ScalarType::Bool,
2726            AggregateFunc::All => ScalarType::Bool,
2727            AggregateFunc::JsonbAgg { .. } => ScalarType::Jsonb,
2728            AggregateFunc::JsonbObjectAgg { .. } => ScalarType::Jsonb,
2729            AggregateFunc::SumInt16 => ScalarType::Int64,
2730            AggregateFunc::SumInt32 => ScalarType::Int64,
2731            AggregateFunc::SumInt64 => ScalarType::Numeric {
2732                max_scale: Some(NumericMaxScale::ZERO),
2733            },
2734            AggregateFunc::SumUInt16 => ScalarType::UInt64,
2735            AggregateFunc::SumUInt32 => ScalarType::UInt64,
2736            AggregateFunc::SumUInt64 => ScalarType::Numeric {
2737                max_scale: Some(NumericMaxScale::ZERO),
2738            },
2739            AggregateFunc::MapAgg { value_type, .. } => ScalarType::Map {
2740                value_type: Box::new(value_type.clone()),
2741                custom_id: None,
2742            },
2743            AggregateFunc::ArrayConcat { .. } | AggregateFunc::ListConcat { .. } => {
2744                match input_type.scalar_type {
2745                    // The input is wrapped in a Record if there's an ORDER BY, so extract it out.
2746                    ScalarType::Record { ref fields, .. } => fields[0].1.scalar_type.clone(),
2747                    _ => unreachable!(),
2748                }
2749            }
2750            AggregateFunc::StringAgg { .. } => ScalarType::String,
2751            AggregateFunc::RowNumber { .. } => {
2752                AggregateFunc::output_type_ranking_window_funcs(&input_type, "?row_number?")
2753            }
2754            AggregateFunc::Rank { .. } => {
2755                AggregateFunc::output_type_ranking_window_funcs(&input_type, "?rank?")
2756            }
2757            AggregateFunc::DenseRank { .. } => {
2758                AggregateFunc::output_type_ranking_window_funcs(&input_type, "?dense_rank?")
2759            }
2760            AggregateFunc::LagLead { lag_lead: lag_lead_type, .. } => {
2761                // The input type for Lag is ((OriginalRow, EncodedArgs), OrderByExprs...)
2762                let fields = input_type.scalar_type.unwrap_record_element_type();
2763                let original_row_type = fields[0].unwrap_record_element_type()[0]
2764                    .clone()
2765                    .nullable(false);
2766                let output_type_inner = Self::lag_lead_output_type_inner_from_encoded_args(fields[0].unwrap_record_element_type()[1]);
2767                let column_name = Self::lag_lead_result_column_name(lag_lead_type);
2768
2769                ScalarType::List {
2770                    element_type: Box::new(ScalarType::Record {
2771                        fields: [
2772                            (column_name, output_type_inner),
2773                            (ColumnName::from("?orig_row?"), original_row_type),
2774                        ].into(),
2775                        custom_id: None,
2776                    }),
2777                    custom_id: None,
2778                }
2779            }
2780            AggregateFunc::FirstValue { .. } => {
2781                // The input type for FirstValue is ((OriginalRow, Arg), OrderByExprs...)
2782                let fields = input_type.scalar_type.unwrap_record_element_type();
2783                let original_row_type = fields[0].unwrap_record_element_type()[0]
2784                    .clone()
2785                    .nullable(false);
2786                let value_type = fields[0].unwrap_record_element_type()[1]
2787                    .clone()
2788                    .nullable(true); // null when the partition is empty
2789
2790                ScalarType::List {
2791                    element_type: Box::new(ScalarType::Record {
2792                        fields: [
2793                            (ColumnName::from("?first_value?"), value_type),
2794                            (ColumnName::from("?orig_row?"), original_row_type),
2795                        ].into(),
2796                        custom_id: None,
2797                    }),
2798                    custom_id: None,
2799                }
2800            }
2801            AggregateFunc::LastValue { .. } => {
2802                // The input type for LastValue is ((OriginalRow, Arg), OrderByExprs...)
2803                let fields = input_type.scalar_type.unwrap_record_element_type();
2804                let original_row_type = fields[0].unwrap_record_element_type()[0]
2805                    .clone()
2806                    .nullable(false);
2807                let value_type = fields[0].unwrap_record_element_type()[1]
2808                    .clone()
2809                    .nullable(true); // null when the partition is empty
2810
2811                ScalarType::List {
2812                    element_type: Box::new(ScalarType::Record {
2813                        fields: [
2814                            (ColumnName::from("?last_value?"), value_type),
2815                            (ColumnName::from("?orig_row?"), original_row_type),
2816                        ].into(),
2817                        custom_id: None,
2818                    }),
2819                    custom_id: None,
2820                }
2821            }
2822            AggregateFunc::WindowAggregate {
2823                wrapped_aggregate, ..
2824            } => {
2825                // The input type for a window aggregate is ((OriginalRow, Arg), OrderByExprs...)
2826                let fields = input_type.scalar_type.unwrap_record_element_type();
2827                let original_row_type = fields[0].unwrap_record_element_type()[0]
2828                    .clone()
2829                    .nullable(false);
2830                let arg_type = fields[0].unwrap_record_element_type()[1]
2831                    .clone()
2832                    .nullable(true);
2833                let wrapped_aggr_out_type = wrapped_aggregate.output_type(arg_type);
2834
2835                ScalarType::List {
2836                    element_type: Box::new(ScalarType::Record {
2837                        fields: [
2838                            (ColumnName::from("?window_agg?"), wrapped_aggr_out_type),
2839                            (ColumnName::from("?orig_row?"), original_row_type),
2840                        ].into(),
2841                        custom_id: None,
2842                    }),
2843                    custom_id: None,
2844                }
2845            }
2846            AggregateFunc::FusedWindowAggregate {
2847                wrapped_aggregates, ..
2848            } => {
2849                // The input type for a fused window aggregate is ((OriginalRow, Args), OrderByExprs...)
2850                // where `Args` is a record.
2851                let fields = input_type.scalar_type.unwrap_record_element_type();
2852                let original_row_type = fields[0].unwrap_record_element_type()[0]
2853                    .clone()
2854                    .nullable(false);
2855                let args_type = fields[0].unwrap_record_element_type()[1];
2856                let arg_types = args_type.unwrap_record_element_type();
2857                let out_fields = arg_types.iter().zip_eq(wrapped_aggregates).map(|(arg_type, wrapped_agg)| {
2858                    (
2859                        ColumnName::from(wrapped_agg.name()),
2860                        wrapped_agg.output_type((**arg_type).clone().nullable(true)),
2861                    )
2862                }).collect_vec();
2863
2864                ScalarType::List {
2865                    element_type: Box::new(ScalarType::Record {
2866                        fields: [
2867                            (ColumnName::from("?fused_window_agg?"), ScalarType::Record {
2868                                fields: out_fields.into(),
2869                                custom_id: None,
2870                            }.nullable(false)),
2871                            (ColumnName::from("?orig_row?"), original_row_type),
2872                        ].into(),
2873                        custom_id: None,
2874                    }),
2875                    custom_id: None,
2876                }
2877            }
2878            AggregateFunc::FusedValueWindowFunc { funcs, order_by: _ } => {
2879                // The input type is ((OriginalRow, EncodedArgs), OrderByExprs...)
2880                // where EncodedArgs is a record, where each element is the argument to one of the
2881                // function calls that got fused. This is a record for lag/lead, and a simple type
2882                // for first_value/last_value.
2883                let fields = input_type.scalar_type.unwrap_record_element_type();
2884                let original_row_type = fields[0].unwrap_record_element_type()[0]
2885                    .clone()
2886                    .nullable(false);
2887                let encoded_args_type = fields[0].unwrap_record_element_type()[1].unwrap_record_element_type();
2888
2889                ScalarType::List {
2890                    element_type: Box::new(ScalarType::Record {
2891                        fields: [
2892                            (ColumnName::from("?fused_value_window_func?"), ScalarType::Record {
2893                                fields: encoded_args_type.into_iter().zip_eq(funcs).map(|(arg_type, func)| {
2894                                    match func {
2895                                        AggregateFunc::LagLead { lag_lead: lag_lead_type, .. } => {
2896                                            (
2897                                                Self::lag_lead_result_column_name(lag_lead_type),
2898                                                Self::lag_lead_output_type_inner_from_encoded_args(arg_type)
2899                                            )
2900                                        },
2901                                        AggregateFunc::FirstValue { .. } => {
2902                                            (
2903                                                ColumnName::from("?first_value?"),
2904                                                arg_type.clone().nullable(true),
2905                                            )
2906                                        }
2907                                        AggregateFunc::LastValue { .. } => {
2908                                            (
2909                                                ColumnName::from("?last_value?"),
2910                                                arg_type.clone().nullable(true),
2911                                            )
2912                                        }
2913                                        _ => panic!("FusedValueWindowFunc has an unknown function"),
2914                                    }
2915                                }).collect(),
2916                                custom_id: None,
2917                            }.nullable(false)),
2918                            (ColumnName::from("?orig_row?"), original_row_type),
2919                        ].into(),
2920                        custom_id: None,
2921                    }),
2922                    custom_id: None,
2923                }
2924            }
2925            AggregateFunc::Dummy
2926            | AggregateFunc::MaxNumeric
2927            | AggregateFunc::MaxInt16
2928            | AggregateFunc::MaxInt32
2929            | AggregateFunc::MaxInt64
2930            | AggregateFunc::MaxUInt16
2931            | AggregateFunc::MaxUInt32
2932            | AggregateFunc::MaxUInt64
2933            | AggregateFunc::MaxMzTimestamp
2934            | AggregateFunc::MaxFloat32
2935            | AggregateFunc::MaxFloat64
2936            | AggregateFunc::MaxBool
2937            // Note AggregateFunc::MaxString, MinString rely on returning input
2938            // type as output type to support the proper return type for
2939            // character input.
2940            | AggregateFunc::MaxString
2941            | AggregateFunc::MaxDate
2942            | AggregateFunc::MaxTimestamp
2943            | AggregateFunc::MaxTimestampTz
2944            | AggregateFunc::MaxInterval
2945            | AggregateFunc::MaxTime
2946            | AggregateFunc::MinNumeric
2947            | AggregateFunc::MinInt16
2948            | AggregateFunc::MinInt32
2949            | AggregateFunc::MinInt64
2950            | AggregateFunc::MinUInt16
2951            | AggregateFunc::MinUInt32
2952            | AggregateFunc::MinUInt64
2953            | AggregateFunc::MinMzTimestamp
2954            | AggregateFunc::MinFloat32
2955            | AggregateFunc::MinFloat64
2956            | AggregateFunc::MinBool
2957            | AggregateFunc::MinString
2958            | AggregateFunc::MinDate
2959            | AggregateFunc::MinTimestamp
2960            | AggregateFunc::MinTimestampTz
2961            | AggregateFunc::MinInterval
2962            | AggregateFunc::MinTime
2963            | AggregateFunc::SumFloat32
2964            | AggregateFunc::SumFloat64
2965            | AggregateFunc::SumNumeric => input_type.scalar_type.clone(),
2966        };
2967        // Count never produces null, and other aggregations only produce
2968        // null in the presence of null inputs.
2969        let nullable = match self {
2970            AggregateFunc::Count => false,
2971            // Use the nullability of the underlying column being aggregated, not the Records wrapping it
2972            AggregateFunc::StringAgg { .. } => match input_type.scalar_type {
2973                // The outer Record wraps the input in the first position, and any ORDER BY expressions afterwards
2974                ScalarType::Record { fields, .. } => match &fields[0].1.scalar_type {
2975                    // The inner Record is a (value, separator) tuple
2976                    ScalarType::Record { fields, .. } => fields[0].1.nullable,
2977                    _ => unreachable!(),
2978                },
2979                _ => unreachable!(),
2980            },
2981            _ => input_type.nullable,
2982        };
2983        scalar_type.nullable(nullable)
2984    }
2985
2986    /// Compute output type for ROW_NUMBER, RANK, DENSE_RANK
2987    fn output_type_ranking_window_funcs(input_type: &ColumnType, col_name: &str) -> ScalarType {
2988        match input_type.scalar_type {
2989            ScalarType::Record { ref fields, .. } => ScalarType::List {
2990                element_type: Box::new(ScalarType::Record {
2991                    fields: [
2992                        (
2993                            ColumnName::from(col_name),
2994                            ScalarType::Int64.nullable(false),
2995                        ),
2996                        (ColumnName::from("?orig_row?"), {
2997                            let inner = match &fields[0].1.scalar_type {
2998                                ScalarType::List { element_type, .. } => element_type.clone(),
2999                                _ => unreachable!(),
3000                            };
3001                            inner.nullable(false)
3002                        }),
3003                    ]
3004                    .into(),
3005                    custom_id: None,
3006                }),
3007                custom_id: None,
3008            },
3009            _ => unreachable!(),
3010        }
3011    }
3012
3013    /// Given the `EncodedArgs` part of `((OriginalRow, EncodedArgs), OrderByExprs...)`,
3014    /// this computes the type of the first field of the output type. (The first field is the
3015    /// real result, the rest is the original row.)
3016    fn lag_lead_output_type_inner_from_encoded_args(encoded_args_type: &ScalarType) -> ColumnType {
3017        // lag/lead have 3 arguments, and the output type is
3018        // the same as the first of these, but always nullable. (It's null when the
3019        // lag/lead computation reaches over the bounds of the window partition.)
3020        encoded_args_type.unwrap_record_element_type()[0]
3021            .clone()
3022            .nullable(true)
3023    }
3024
3025    fn lag_lead_result_column_name(lag_lead_type: &LagLeadType) -> ColumnName {
3026        ColumnName::from(match lag_lead_type {
3027            LagLeadType::Lag => "?lag?",
3028            LagLeadType::Lead => "?lead?",
3029        })
3030    }
3031
3032    /// Returns true if the non-null constraint on the aggregation can be
3033    /// converted into a non-null constraint on its parameter expression, ie.
3034    /// whether the result of the aggregation is null if all the input values
3035    /// are null.
3036    pub fn propagates_nonnull_constraint(&self) -> bool {
3037        match self {
3038            AggregateFunc::MaxNumeric
3039            | AggregateFunc::MaxInt16
3040            | AggregateFunc::MaxInt32
3041            | AggregateFunc::MaxInt64
3042            | AggregateFunc::MaxUInt16
3043            | AggregateFunc::MaxUInt32
3044            | AggregateFunc::MaxUInt64
3045            | AggregateFunc::MaxMzTimestamp
3046            | AggregateFunc::MaxFloat32
3047            | AggregateFunc::MaxFloat64
3048            | AggregateFunc::MaxBool
3049            | AggregateFunc::MaxString
3050            | AggregateFunc::MaxDate
3051            | AggregateFunc::MaxTimestamp
3052            | AggregateFunc::MaxTimestampTz
3053            | AggregateFunc::MinNumeric
3054            | AggregateFunc::MinInt16
3055            | AggregateFunc::MinInt32
3056            | AggregateFunc::MinInt64
3057            | AggregateFunc::MinUInt16
3058            | AggregateFunc::MinUInt32
3059            | AggregateFunc::MinUInt64
3060            | AggregateFunc::MinMzTimestamp
3061            | AggregateFunc::MinFloat32
3062            | AggregateFunc::MinFloat64
3063            | AggregateFunc::MinBool
3064            | AggregateFunc::MinString
3065            | AggregateFunc::MinDate
3066            | AggregateFunc::MinTimestamp
3067            | AggregateFunc::MinTimestampTz
3068            | AggregateFunc::SumInt16
3069            | AggregateFunc::SumInt32
3070            | AggregateFunc::SumInt64
3071            | AggregateFunc::SumUInt16
3072            | AggregateFunc::SumUInt32
3073            | AggregateFunc::SumUInt64
3074            | AggregateFunc::SumFloat32
3075            | AggregateFunc::SumFloat64
3076            | AggregateFunc::SumNumeric
3077            | AggregateFunc::StringAgg { .. } => true,
3078            // Count is never null
3079            AggregateFunc::Count => false,
3080            _ => false,
3081        }
3082    }
3083}
3084
3085fn jsonb_each<'a>(
3086    a: Datum<'a>,
3087    temp_storage: &'a RowArena,
3088    stringify: bool,
3089) -> impl Iterator<Item = (Row, Diff)> + 'a {
3090    // First produce a map, so that a common iterator can be returned.
3091    let map = match a {
3092        Datum::Map(dict) => dict,
3093        _ => mz_repr::DatumMap::empty(),
3094    };
3095
3096    map.iter().map(move |(k, mut v)| {
3097        if stringify {
3098            v = jsonb_stringify(v, temp_storage);
3099        }
3100        (Row::pack_slice(&[Datum::String(k), v]), Diff::ONE)
3101    })
3102}
3103
3104fn jsonb_object_keys<'a>(a: Datum<'a>) -> impl Iterator<Item = (Row, Diff)> + 'a {
3105    let map = match a {
3106        Datum::Map(dict) => dict,
3107        _ => mz_repr::DatumMap::empty(),
3108    };
3109
3110    map.iter()
3111        .map(move |(k, _)| (Row::pack_slice(&[Datum::String(k)]), Diff::ONE))
3112}
3113
3114fn jsonb_array_elements<'a>(
3115    a: Datum<'a>,
3116    temp_storage: &'a RowArena,
3117    stringify: bool,
3118) -> impl Iterator<Item = (Row, Diff)> + 'a {
3119    let list = match a {
3120        Datum::List(list) => list,
3121        _ => mz_repr::DatumList::empty(),
3122    };
3123    list.iter().map(move |mut e| {
3124        if stringify {
3125            e = jsonb_stringify(e, temp_storage);
3126        }
3127        (Row::pack_slice(&[e]), Diff::ONE)
3128    })
3129}
3130
3131fn regexp_extract(a: Datum, r: &AnalyzedRegex) -> Option<(Row, Diff)> {
3132    let r = r.inner();
3133    let a = a.unwrap_str();
3134    let captures = r.captures(a)?;
3135    let datums = captures
3136        .iter()
3137        .skip(1)
3138        .map(|m| Datum::from(m.map(|m| m.as_str())));
3139    Some((Row::pack(datums), Diff::ONE))
3140}
3141
3142fn regexp_matches<'a, 'r: 'a>(
3143    exprs: &[Datum<'a>],
3144) -> Result<impl Iterator<Item = (Row, Diff)> + 'a, EvalError> {
3145    // There are only two acceptable ways to call this function:
3146    // 1. regexp_matches(string, regex)
3147    // 2. regexp_matches(string, regex, flag)
3148    assert!(exprs.len() == 2 || exprs.len() == 3);
3149    let a = exprs[0].unwrap_str();
3150    let r = exprs[1].unwrap_str();
3151
3152    let (regex, opts) = if exprs.len() == 3 {
3153        let flag = exprs[2].unwrap_str();
3154        let opts = AnalyzedRegexOpts::from_str(flag)?;
3155        (AnalyzedRegex::new(r, opts)?, opts)
3156    } else {
3157        let opts = AnalyzedRegexOpts::default();
3158        (AnalyzedRegex::new(r, opts)?, opts)
3159    };
3160
3161    let regex = regex.inner().clone();
3162
3163    let iter = regex.captures_iter(a).map(move |captures| {
3164        let matches = captures
3165            .iter()
3166            // The first match is the *entire* match, we want the capture groups by themselves.
3167            .skip(1)
3168            .map(|m| Datum::from(m.map(|m| m.as_str())))
3169            .collect::<Vec<_>>();
3170
3171        let mut binding = SharedRow::get();
3172        let mut packer = binding.packer();
3173
3174        let dimension = ArrayDimension {
3175            lower_bound: 1,
3176            length: matches.len(),
3177        };
3178        packer
3179            .try_push_array(&[dimension], matches)
3180            .expect("generated dimensions above");
3181
3182        (binding.clone(), Diff::ONE)
3183    });
3184
3185    // This is slightly unfortunate, but we need to collect the captures into a
3186    // Vec before we can yield them, because we can't return a iter with a
3187    // reference to the local `regex` variable.
3188    // We attempt to minimize the cost of this by using a SmallVec.
3189    let out = iter.collect::<SmallVec<[_; 3]>>();
3190
3191    if opts.global {
3192        Ok(Either::Left(out.into_iter()))
3193    } else {
3194        Ok(Either::Right(out.into_iter().take(1)))
3195    }
3196}
3197
3198fn generate_series<N>(
3199    start: N,
3200    stop: N,
3201    step: N,
3202) -> Result<impl Iterator<Item = (Row, Diff)>, EvalError>
3203where
3204    N: Integer + Signed + CheckedAdd + Clone,
3205    Datum<'static>: From<N>,
3206{
3207    if step == N::zero() {
3208        return Err(EvalError::InvalidParameterValue(
3209            "step size cannot equal zero".into(),
3210        ));
3211    }
3212    Ok(num::range_step_inclusive(start, stop, step)
3213        .map(move |i| (Row::pack_slice(&[Datum::from(i)]), Diff::ONE)))
3214}
3215
3216/// Like
3217/// [`num::range_step_inclusive`](https://github.com/rust-num/num-iter/blob/ddb14c1e796d401014c6c7a727de61d8109ad986/src/lib.rs#L279),
3218/// but for our timestamp types using [`Interval`] for `step`.xwxw
3219#[derive(Clone)]
3220pub struct TimestampRangeStepInclusive<T> {
3221    state: CheckedTimestamp<T>,
3222    stop: CheckedTimestamp<T>,
3223    step: Interval,
3224    rev: bool,
3225    done: bool,
3226}
3227
3228impl<T: TimestampLike> Iterator for TimestampRangeStepInclusive<T> {
3229    type Item = CheckedTimestamp<T>;
3230
3231    #[inline]
3232    fn next(&mut self) -> Option<CheckedTimestamp<T>> {
3233        if !self.done
3234            && ((self.rev && self.state >= self.stop) || (!self.rev && self.state <= self.stop))
3235        {
3236            let result = self.state.clone();
3237            match add_timestamp_months(self.state.deref(), self.step.months) {
3238                Ok(state) => match state.checked_add_signed(self.step.duration_as_chrono()) {
3239                    Some(v) => match CheckedTimestamp::from_timestamplike(v) {
3240                        Ok(v) => self.state = v,
3241                        Err(_) => self.done = true,
3242                    },
3243                    None => self.done = true,
3244                },
3245                Err(..) => {
3246                    self.done = true;
3247                }
3248            }
3249
3250            Some(result)
3251        } else {
3252            None
3253        }
3254    }
3255}
3256
3257fn generate_series_ts<T: TimestampLike>(
3258    start: CheckedTimestamp<T>,
3259    stop: CheckedTimestamp<T>,
3260    step: Interval,
3261    conv: fn(CheckedTimestamp<T>) -> Datum<'static>,
3262) -> Result<impl Iterator<Item = (Row, Diff)>, EvalError> {
3263    let normalized_step = step.as_microseconds();
3264    if normalized_step == 0 {
3265        return Err(EvalError::InvalidParameterValue(
3266            "step size cannot equal zero".into(),
3267        ));
3268    }
3269    let rev = normalized_step < 0;
3270
3271    let trsi = TimestampRangeStepInclusive {
3272        state: start,
3273        stop,
3274        step,
3275        rev,
3276        done: false,
3277    };
3278
3279    Ok(trsi.map(move |i| (Row::pack_slice(&[conv(i)]), Diff::ONE)))
3280}
3281
3282fn generate_subscripts_array(
3283    a: Datum,
3284    dim: i32,
3285) -> Result<Box<dyn Iterator<Item = (Row, Diff)>>, EvalError> {
3286    if dim <= 0 {
3287        return Ok(Box::new(iter::empty()));
3288    }
3289
3290    match a.unwrap_array().dims().into_iter().nth(
3291        (dim - 1)
3292            .try_into()
3293            .map_err(|_| EvalError::Int32OutOfRange((dim - 1).to_string().into()))?,
3294    ) {
3295        Some(requested_dim) => Ok(Box::new(generate_series::<i32>(
3296            requested_dim.lower_bound.try_into().map_err(|_| {
3297                EvalError::Int32OutOfRange(requested_dim.lower_bound.to_string().into())
3298            })?,
3299            requested_dim
3300                .length
3301                .try_into()
3302                .map_err(|_| EvalError::Int32OutOfRange(requested_dim.length.to_string().into()))?,
3303            1,
3304        )?)),
3305        None => Ok(Box::new(iter::empty())),
3306    }
3307}
3308
3309fn unnest_array<'a>(a: Datum<'a>) -> impl Iterator<Item = (Row, Diff)> + 'a {
3310    a.unwrap_array()
3311        .elements()
3312        .iter()
3313        .map(move |e| (Row::pack_slice(&[e]), Diff::ONE))
3314}
3315
3316fn unnest_list<'a>(a: Datum<'a>) -> impl Iterator<Item = (Row, Diff)> + 'a {
3317    a.unwrap_list()
3318        .iter()
3319        .map(move |e| (Row::pack_slice(&[e]), Diff::ONE))
3320}
3321
3322fn unnest_map<'a>(a: Datum<'a>) -> impl Iterator<Item = (Row, Diff)> + 'a {
3323    a.unwrap_map()
3324        .iter()
3325        .map(move |(k, v)| (Row::pack_slice(&[Datum::from(k), v]), Diff::ONE))
3326}
3327
3328impl AggregateFunc {
3329    /// The base function name without the `~[...]` suffix used when rendering
3330    /// variants that represent a parameterized function family.
3331    pub fn name(&self) -> &'static str {
3332        match self {
3333            Self::MaxNumeric => "max",
3334            Self::MaxInt16 => "max",
3335            Self::MaxInt32 => "max",
3336            Self::MaxInt64 => "max",
3337            Self::MaxUInt16 => "max",
3338            Self::MaxUInt32 => "max",
3339            Self::MaxUInt64 => "max",
3340            Self::MaxMzTimestamp => "max",
3341            Self::MaxFloat32 => "max",
3342            Self::MaxFloat64 => "max",
3343            Self::MaxBool => "max",
3344            Self::MaxString => "max",
3345            Self::MaxDate => "max",
3346            Self::MaxTimestamp => "max",
3347            Self::MaxTimestampTz => "max",
3348            Self::MaxInterval => "max",
3349            Self::MaxTime => "max",
3350            Self::MinNumeric => "min",
3351            Self::MinInt16 => "min",
3352            Self::MinInt32 => "min",
3353            Self::MinInt64 => "min",
3354            Self::MinUInt16 => "min",
3355            Self::MinUInt32 => "min",
3356            Self::MinUInt64 => "min",
3357            Self::MinMzTimestamp => "min",
3358            Self::MinFloat32 => "min",
3359            Self::MinFloat64 => "min",
3360            Self::MinBool => "min",
3361            Self::MinString => "min",
3362            Self::MinDate => "min",
3363            Self::MinTimestamp => "min",
3364            Self::MinTimestampTz => "min",
3365            Self::MinInterval => "min",
3366            Self::MinTime => "min",
3367            Self::SumInt16 => "sum",
3368            Self::SumInt32 => "sum",
3369            Self::SumInt64 => "sum",
3370            Self::SumUInt16 => "sum",
3371            Self::SumUInt32 => "sum",
3372            Self::SumUInt64 => "sum",
3373            Self::SumFloat32 => "sum",
3374            Self::SumFloat64 => "sum",
3375            Self::SumNumeric => "sum",
3376            Self::Count => "count",
3377            Self::Any => "any",
3378            Self::All => "all",
3379            Self::JsonbAgg { .. } => "jsonb_agg",
3380            Self::JsonbObjectAgg { .. } => "jsonb_object_agg",
3381            Self::MapAgg { .. } => "map_agg",
3382            Self::ArrayConcat { .. } => "array_agg",
3383            Self::ListConcat { .. } => "list_agg",
3384            Self::StringAgg { .. } => "string_agg",
3385            Self::RowNumber { .. } => "row_number",
3386            Self::Rank { .. } => "rank",
3387            Self::DenseRank { .. } => "dense_rank",
3388            Self::LagLead {
3389                lag_lead: LagLeadType::Lag,
3390                ..
3391            } => "lag",
3392            Self::LagLead {
3393                lag_lead: LagLeadType::Lead,
3394                ..
3395            } => "lead",
3396            Self::FirstValue { .. } => "first_value",
3397            Self::LastValue { .. } => "last_value",
3398            Self::WindowAggregate { .. } => "window_agg",
3399            Self::FusedValueWindowFunc { .. } => "fused_value_window_func",
3400            Self::FusedWindowAggregate { .. } => "fused_window_agg",
3401            Self::Dummy => "dummy",
3402        }
3403    }
3404}
3405
3406impl<'a, M> fmt::Display for HumanizedExpr<'a, AggregateFunc, M>
3407where
3408    M: HumanizerMode,
3409{
3410    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
3411        use AggregateFunc::*;
3412        let name = self.expr.name();
3413        match self.expr {
3414            JsonbAgg { order_by }
3415            | JsonbObjectAgg { order_by }
3416            | MapAgg { order_by, .. }
3417            | ArrayConcat { order_by }
3418            | ListConcat { order_by }
3419            | StringAgg { order_by }
3420            | RowNumber { order_by }
3421            | Rank { order_by }
3422            | DenseRank { order_by } => {
3423                let order_by = order_by.iter().map(|col| self.child(col));
3424                write!(f, "{}[order_by=[{}]]", name, separated(", ", order_by))
3425            }
3426            LagLead {
3427                lag_lead: _,
3428                ignore_nulls,
3429                order_by,
3430            } => {
3431                let order_by = order_by.iter().map(|col| self.child(col));
3432                f.write_str(name)?;
3433                f.write_str("[")?;
3434                if *ignore_nulls {
3435                    f.write_str("ignore_nulls=true, ")?;
3436                }
3437                write!(f, "order_by=[{}]", separated(", ", order_by))?;
3438                f.write_str("]")
3439            }
3440            FirstValue {
3441                order_by,
3442                window_frame,
3443            } => {
3444                let order_by = order_by.iter().map(|col| self.child(col));
3445                f.write_str(name)?;
3446                f.write_str("[")?;
3447                write!(f, "order_by=[{}]", separated(", ", order_by))?;
3448                if *window_frame != WindowFrame::default() {
3449                    write!(f, " {}", window_frame)?;
3450                }
3451                f.write_str("]")
3452            }
3453            LastValue {
3454                order_by,
3455                window_frame,
3456            } => {
3457                let order_by = order_by.iter().map(|col| self.child(col));
3458                f.write_str(name)?;
3459                f.write_str("[")?;
3460                write!(f, "order_by=[{}]", separated(", ", order_by))?;
3461                if *window_frame != WindowFrame::default() {
3462                    write!(f, " {}", window_frame)?;
3463                }
3464                f.write_str("]")
3465            }
3466            WindowAggregate {
3467                wrapped_aggregate,
3468                order_by,
3469                window_frame,
3470            } => {
3471                let order_by = order_by.iter().map(|col| self.child(col));
3472                let wrapped_aggregate = self.child(wrapped_aggregate.deref());
3473                f.write_str(name)?;
3474                f.write_str("[")?;
3475                write!(f, "{} ", wrapped_aggregate)?;
3476                write!(f, "order_by=[{}]", separated(", ", order_by))?;
3477                if *window_frame != WindowFrame::default() {
3478                    write!(f, " {}", window_frame)?;
3479                }
3480                f.write_str("]")
3481            }
3482            FusedValueWindowFunc { funcs, order_by } => {
3483                let order_by = order_by.iter().map(|col| self.child(col));
3484                let funcs = separated(", ", funcs.iter().map(|func| self.child(func)));
3485                f.write_str(name)?;
3486                f.write_str("[")?;
3487                write!(f, "{} ", funcs)?;
3488                write!(f, "order_by=[{}]", separated(", ", order_by))?;
3489                f.write_str("]")
3490            }
3491            _ => f.write_str(name),
3492        }
3493    }
3494}
3495
3496#[derive(
3497    Arbitrary, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize, Hash, MzReflect,
3498)]
3499pub struct CaptureGroupDesc {
3500    pub index: u32,
3501    pub name: Option<String>,
3502    pub nullable: bool,
3503}
3504
3505impl RustType<ProtoCaptureGroupDesc> for CaptureGroupDesc {
3506    fn into_proto(&self) -> ProtoCaptureGroupDesc {
3507        ProtoCaptureGroupDesc {
3508            index: self.index,
3509            name: self.name.clone(),
3510            nullable: self.nullable,
3511        }
3512    }
3513
3514    fn from_proto(proto: ProtoCaptureGroupDesc) -> Result<Self, TryFromProtoError> {
3515        Ok(Self {
3516            index: proto.index,
3517            name: proto.name,
3518            nullable: proto.nullable,
3519        })
3520    }
3521}
3522
3523#[derive(
3524    Arbitrary,
3525    Clone,
3526    Copy,
3527    Debug,
3528    Eq,
3529    PartialEq,
3530    Ord,
3531    PartialOrd,
3532    Serialize,
3533    Deserialize,
3534    Hash,
3535    MzReflect,
3536    Default,
3537)]
3538pub struct AnalyzedRegexOpts {
3539    pub case_insensitive: bool,
3540    pub global: bool,
3541}
3542
3543impl FromStr for AnalyzedRegexOpts {
3544    type Err = EvalError;
3545
3546    fn from_str(s: &str) -> Result<Self, Self::Err> {
3547        let mut opts = AnalyzedRegexOpts::default();
3548        for c in s.chars() {
3549            match c {
3550                'i' => opts.case_insensitive = true,
3551                'g' => opts.global = true,
3552                _ => return Err(EvalError::InvalidRegexFlag(c)),
3553            }
3554        }
3555        Ok(opts)
3556    }
3557}
3558
3559impl RustType<ProtoAnalyzedRegexOpts> for AnalyzedRegexOpts {
3560    fn into_proto(&self) -> ProtoAnalyzedRegexOpts {
3561        ProtoAnalyzedRegexOpts {
3562            case_insensitive: self.case_insensitive,
3563            global: self.global,
3564        }
3565    }
3566
3567    fn from_proto(proto: ProtoAnalyzedRegexOpts) -> Result<Self, TryFromProtoError> {
3568        Ok(Self {
3569            case_insensitive: proto.case_insensitive,
3570            global: proto.global,
3571        })
3572    }
3573}
3574
3575#[derive(
3576    Arbitrary, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize, Hash, MzReflect,
3577)]
3578pub struct AnalyzedRegex(
3579    #[proptest(strategy = "mz_repr::adt::regex::any_regex()")] ReprRegex,
3580    Vec<CaptureGroupDesc>,
3581    AnalyzedRegexOpts,
3582);
3583
3584impl RustType<ProtoAnalyzedRegex> for AnalyzedRegex {
3585    fn into_proto(&self) -> ProtoAnalyzedRegex {
3586        ProtoAnalyzedRegex {
3587            regex: Some(self.0.into_proto()),
3588            groups: self.1.into_proto(),
3589            opts: Some(self.2.into_proto()),
3590        }
3591    }
3592
3593    fn from_proto(proto: ProtoAnalyzedRegex) -> Result<Self, TryFromProtoError> {
3594        Ok(AnalyzedRegex(
3595            proto.regex.into_rust_if_some("ProtoAnalyzedRegex::regex")?,
3596            proto.groups.into_rust()?,
3597            proto.opts.into_rust_if_some("ProtoAnalyzedRegex::opts")?,
3598        ))
3599    }
3600}
3601
3602impl AnalyzedRegex {
3603    pub fn new(s: &str, opts: AnalyzedRegexOpts) -> Result<Self, regex::Error> {
3604        let r = ReprRegex::new(s, opts.case_insensitive)?;
3605        // TODO(benesch): remove potentially dangerous usage of `as`.
3606        #[allow(clippy::as_conversions)]
3607        let descs: Vec<_> = r
3608            .capture_names()
3609            .enumerate()
3610            // The first capture is the entire matched string.
3611            // This will often not be useful, so skip it.
3612            // If people want it they can just surround their
3613            // entire regex in an explicit capture group.
3614            .skip(1)
3615            .map(|(i, name)| CaptureGroupDesc {
3616                index: i as u32,
3617                name: name.map(String::from),
3618                // TODO -- we can do better.
3619                // https://github.com/MaterializeInc/database-issues/issues/612
3620                nullable: true,
3621            })
3622            .collect();
3623        Ok(Self(r, descs, opts))
3624    }
3625    pub fn capture_groups_len(&self) -> usize {
3626        self.1.len()
3627    }
3628    pub fn capture_groups_iter(&self) -> impl Iterator<Item = &CaptureGroupDesc> {
3629        self.1.iter()
3630    }
3631    pub fn inner(&self) -> &Regex {
3632        &(self.0).regex
3633    }
3634    pub fn opts(&self) -> &AnalyzedRegexOpts {
3635        &self.2
3636    }
3637}
3638
3639pub fn csv_extract(a: Datum<'_>, n_cols: usize) -> impl Iterator<Item = (Row, Diff)> + '_ {
3640    let bytes = a.unwrap_str().as_bytes();
3641    let mut row = Row::default();
3642    let csv_reader = csv::ReaderBuilder::new()
3643        .has_headers(false)
3644        .from_reader(bytes);
3645    csv_reader.into_records().filter_map(move |res| match res {
3646        Ok(sr) if sr.len() == n_cols => {
3647            row.packer().extend(sr.iter().map(Datum::String));
3648            Some((row.clone(), Diff::ONE))
3649        }
3650        _ => None,
3651    })
3652}
3653
3654pub fn repeat(a: Datum) -> Option<(Row, Diff)> {
3655    let n = a.unwrap_int64();
3656    if n != 0 {
3657        Some((Row::default(), n.into()))
3658    } else {
3659        None
3660    }
3661}
3662
3663fn wrap<'a>(datums: &'a [Datum<'a>], width: usize) -> impl Iterator<Item = (Row, Diff)> + 'a {
3664    datums
3665        .chunks(width)
3666        .map(|chunk| (Row::pack(chunk), Diff::ONE))
3667}
3668
3669fn acl_explode<'a>(
3670    acl_items: Datum<'a>,
3671    temp_storage: &'a RowArena,
3672) -> Result<impl Iterator<Item = (Row, Diff)> + 'a, EvalError> {
3673    let acl_items = acl_items.unwrap_array();
3674    let mut res = Vec::new();
3675    for acl_item in acl_items.elements().iter() {
3676        if acl_item.is_null() {
3677            return Err(EvalError::AclArrayNullElement);
3678        }
3679        let acl_item = acl_item.unwrap_acl_item();
3680        for privilege in acl_item.acl_mode.explode() {
3681            let row = [
3682                Datum::UInt32(acl_item.grantor.0),
3683                Datum::UInt32(acl_item.grantee.0),
3684                Datum::String(temp_storage.push_string(privilege.to_string())),
3685                // GRANT OPTION is not implemented, so we hardcode false.
3686                Datum::False,
3687            ];
3688            res.push((Row::pack_slice(&row), Diff::ONE));
3689        }
3690    }
3691    Ok(res.into_iter())
3692}
3693
3694fn mz_acl_explode<'a>(
3695    mz_acl_items: Datum<'a>,
3696    temp_storage: &'a RowArena,
3697) -> Result<impl Iterator<Item = (Row, Diff)> + 'a, EvalError> {
3698    let mz_acl_items = mz_acl_items.unwrap_array();
3699    let mut res = Vec::new();
3700    for mz_acl_item in mz_acl_items.elements().iter() {
3701        if mz_acl_item.is_null() {
3702            return Err(EvalError::MzAclArrayNullElement);
3703        }
3704        let mz_acl_item = mz_acl_item.unwrap_mz_acl_item();
3705        for privilege in mz_acl_item.acl_mode.explode() {
3706            let row = [
3707                Datum::String(temp_storage.push_string(mz_acl_item.grantor.to_string())),
3708                Datum::String(temp_storage.push_string(mz_acl_item.grantee.to_string())),
3709                Datum::String(temp_storage.push_string(privilege.to_string())),
3710                // GRANT OPTION is not implemented, so we hardcode false.
3711                Datum::False,
3712            ];
3713            res.push((Row::pack_slice(&row), Diff::ONE));
3714        }
3715    }
3716    Ok(res.into_iter())
3717}
3718
3719/// When adding a new `TableFunc` variant, please consider adding it to
3720/// `TableFunc::with_ordinality`!
3721#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize, Hash, MzReflect)]
3722pub enum TableFunc {
3723    AclExplode,
3724    MzAclExplode,
3725    JsonbEach {
3726        stringify: bool,
3727    },
3728    JsonbObjectKeys,
3729    JsonbArrayElements {
3730        stringify: bool,
3731    },
3732    RegexpExtract(AnalyzedRegex),
3733    CsvExtract(usize),
3734    GenerateSeriesInt32,
3735    GenerateSeriesInt64,
3736    GenerateSeriesTimestamp,
3737    GenerateSeriesTimestampTz,
3738    /// Supplied with an input count,
3739    ///   1. Adds a column as if a typed subquery result,
3740    ///   2. Filters the row away if the count is only one,
3741    ///   3. Errors if the count is not exactly one.
3742    /// The intent is that this presents as if a subquery result with too many
3743    /// records contributing. The error column has the same type as the result
3744    /// should have, but we only produce it if the count exceeds one.
3745    ///
3746    /// This logic could nearly be achieved with map, filter, project logic,
3747    /// but has been challenging to do in a way that respects the vagaries of
3748    /// SQL and our semantics. If we reveal a constant value in the column we
3749    /// risk the optimizer pruning the branch; if we reveal that this will not
3750    /// produce rows we risk the optimizer pruning the branch; if we reveal that
3751    /// the only possible value is an error we risk the optimizer propagating that
3752    /// error without guards.
3753    ///
3754    /// Before replacing this by an `MirScalarExpr`, quadruple check that it
3755    /// would not result in misoptimizations due to expression evaluation order
3756    /// being utterly undefined, and predicate pushdown trimming any fragments
3757    /// that might produce columns that will not be needed.
3758    GuardSubquerySize {
3759        column_type: ScalarType,
3760    },
3761    Repeat,
3762    UnnestArray {
3763        el_typ: ScalarType,
3764    },
3765    UnnestList {
3766        el_typ: ScalarType,
3767    },
3768    UnnestMap {
3769        value_type: ScalarType,
3770    },
3771    /// Given `n` input expressions, wraps them into `n / width` rows, each of
3772    /// `width` columns.
3773    ///
3774    /// This function is not intended to be called directly by end users, but
3775    /// is useful in the planning of e.g. VALUES clauses.
3776    Wrap {
3777        types: Vec<ColumnType>,
3778        width: usize,
3779    },
3780    GenerateSubscriptsArray,
3781    /// Execute some arbitrary scalar function as a table function.
3782    TabletizedScalar {
3783        name: String,
3784        relation: RelationType,
3785    },
3786    RegexpMatches,
3787    /// Implements the WITH ORDINALITY clause.
3788    ///
3789    /// Don't construct `TableFunc::WithOrdinality` manually! Use the `with_ordinality` constructor
3790    /// function instead, which checks whether the given table function supports `WithOrdinality`.
3791    #[allow(private_interfaces)]
3792    WithOrdinality(WithOrdinality),
3793}
3794
3795/// Evaluates the inner table function, expands its results into unary (repeating each row as
3796/// many times as the diff indicates), and appends an integer corresponding to the ordinal
3797/// position (starting from 1). For example, it numbers the elements of a list when calling
3798/// `unnest_list`.
3799///
3800/// Private enum variant of `TableFunc`. Don't construct this directly, but use
3801/// `TableFunc::with_ordinality` instead.
3802#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize, Hash, MzReflect)]
3803struct WithOrdinality {
3804    inner: Box<TableFunc>,
3805}
3806
3807impl TableFunc {
3808    /// Adds `WITH ORDINALITY` to a table function if it's allowed on the given table function.
3809    pub fn with_ordinality(inner: TableFunc) -> Option<TableFunc> {
3810        match inner {
3811            TableFunc::AclExplode
3812            | TableFunc::MzAclExplode
3813            | TableFunc::JsonbEach { .. }
3814            | TableFunc::JsonbObjectKeys
3815            | TableFunc::JsonbArrayElements { .. }
3816            | TableFunc::RegexpExtract(_)
3817            | TableFunc::CsvExtract(_)
3818            | TableFunc::GenerateSeriesInt32
3819            | TableFunc::GenerateSeriesInt64
3820            | TableFunc::GenerateSeriesTimestamp
3821            | TableFunc::GenerateSeriesTimestampTz
3822            | TableFunc::GuardSubquerySize { .. }
3823            | TableFunc::Repeat
3824            | TableFunc::UnnestArray { .. }
3825            | TableFunc::UnnestList { .. }
3826            | TableFunc::UnnestMap { .. }
3827            | TableFunc::Wrap { .. }
3828            | TableFunc::GenerateSubscriptsArray
3829            | TableFunc::TabletizedScalar { .. }
3830            | TableFunc::RegexpMatches => Some(TableFunc::WithOrdinality(WithOrdinality {
3831                inner: Box::new(inner),
3832            })),
3833            // IMPORTANT: Before adding a new table function here, consider negative diffs:
3834            // `WithOrdinality::eval` will panic if the inner table function emits a negative diff.
3835            _ => None,
3836        }
3837    }
3838}
3839
3840/// Manual `Arbitrary`, because proptest-derive is choking on the recursive `WithOrdinality`
3841/// variant.
3842impl Arbitrary for TableFunc {
3843    type Parameters = ();
3844    type Strategy = BoxedStrategy<Self>;
3845
3846    fn arbitrary_with(_: Self::Parameters) -> Self::Strategy {
3847        let leaf = Union::new(vec![
3848            Just(TableFunc::AclExplode),
3849            Just(TableFunc::MzAclExplode),
3850            Just(TableFunc::JsonbObjectKeys),
3851            Just(TableFunc::GenerateSeriesInt32),
3852            Just(TableFunc::GenerateSeriesInt64),
3853            Just(TableFunc::GenerateSeriesTimestamp),
3854            Just(TableFunc::GenerateSeriesTimestampTz),
3855            Just(TableFunc::Repeat),
3856            Just(TableFunc::GenerateSubscriptsArray),
3857            Just(TableFunc::RegexpMatches),
3858        ])
3859        .boxed();
3860
3861        // recursive WithOrdinality variant
3862        leaf.prop_recursive(2, 256, 2, |inner| {
3863            inner.clone().prop_map(|tf| {
3864                TableFunc::WithOrdinality(WithOrdinality {
3865                    inner: Box::new(tf),
3866                })
3867            })
3868        })
3869        .boxed()
3870    }
3871}
3872
3873impl RustType<ProtoTableFunc> for TableFunc {
3874    fn into_proto(&self) -> ProtoTableFunc {
3875        use proto_table_func::{Kind, ProtoWrap};
3876
3877        ProtoTableFunc {
3878            kind: Some(match self {
3879                TableFunc::AclExplode => Kind::AclExplode(()),
3880                TableFunc::MzAclExplode => Kind::MzAclExplode(()),
3881                TableFunc::JsonbEach { stringify } => Kind::JsonbEach(*stringify),
3882                TableFunc::JsonbObjectKeys => Kind::JsonbObjectKeys(()),
3883                TableFunc::JsonbArrayElements { stringify } => Kind::JsonbArrayElements(*stringify),
3884                TableFunc::RegexpExtract(x) => Kind::RegexpExtract(x.into_proto()),
3885                TableFunc::CsvExtract(x) => Kind::CsvExtract(x.into_proto()),
3886                TableFunc::GenerateSeriesInt32 => Kind::GenerateSeriesInt32(()),
3887                TableFunc::GenerateSeriesInt64 => Kind::GenerateSeriesInt64(()),
3888                TableFunc::GenerateSeriesTimestamp => Kind::GenerateSeriesTimestamp(()),
3889                TableFunc::GenerateSeriesTimestampTz => Kind::GenerateSeriesTimestampTz(()),
3890                TableFunc::GuardSubquerySize { column_type } => {
3891                    Kind::GuardSubquerySize(column_type.into_proto())
3892                }
3893                TableFunc::Repeat => Kind::Repeat(()),
3894                TableFunc::UnnestArray { el_typ } => Kind::UnnestArray(el_typ.into_proto()),
3895                TableFunc::UnnestList { el_typ } => Kind::UnnestList(el_typ.into_proto()),
3896                TableFunc::UnnestMap { value_type } => Kind::UnnestMap(value_type.into_proto()),
3897                TableFunc::Wrap { types, width } => Kind::Wrap(ProtoWrap {
3898                    types: types.into_proto(),
3899                    width: width.into_proto(),
3900                }),
3901                TableFunc::GenerateSubscriptsArray => Kind::GenerateSubscriptsArray(()),
3902                TableFunc::TabletizedScalar { name, relation } => {
3903                    Kind::TabletizedScalar(ProtoTabletizedScalar {
3904                        name: name.into_proto(),
3905                        relation: Some(relation.into_proto()),
3906                    })
3907                }
3908                TableFunc::RegexpMatches => Kind::RegexpMatches(()),
3909                TableFunc::WithOrdinality(WithOrdinality { inner }) => {
3910                    Kind::WithOrdinality(Box::new(ProtoWithOrdinality {
3911                        inner: Some(inner.into_proto()),
3912                    }))
3913                }
3914            }),
3915        }
3916    }
3917
3918    fn from_proto(proto: ProtoTableFunc) -> Result<Self, TryFromProtoError> {
3919        use proto_table_func::Kind;
3920
3921        let kind = proto
3922            .kind
3923            .ok_or_else(|| TryFromProtoError::missing_field("ProtoTableFunc::Kind"))?;
3924
3925        Ok(match kind {
3926            Kind::AclExplode(()) => TableFunc::AclExplode,
3927            Kind::MzAclExplode(()) => TableFunc::MzAclExplode,
3928            Kind::JsonbEach(stringify) => TableFunc::JsonbEach { stringify },
3929            Kind::JsonbObjectKeys(()) => TableFunc::JsonbObjectKeys,
3930            Kind::JsonbArrayElements(stringify) => TableFunc::JsonbArrayElements { stringify },
3931            Kind::RegexpExtract(x) => TableFunc::RegexpExtract(x.into_rust()?),
3932            Kind::CsvExtract(x) => TableFunc::CsvExtract(x.into_rust()?),
3933            Kind::GenerateSeriesInt32(()) => TableFunc::GenerateSeriesInt32,
3934            Kind::GenerateSeriesInt64(()) => TableFunc::GenerateSeriesInt64,
3935            Kind::GenerateSeriesTimestamp(()) => TableFunc::GenerateSeriesTimestamp,
3936            Kind::GenerateSeriesTimestampTz(()) => TableFunc::GenerateSeriesTimestampTz,
3937            Kind::GuardSubquerySize(x) => TableFunc::GuardSubquerySize {
3938                column_type: x.into_rust()?,
3939            },
3940            Kind::Repeat(()) => TableFunc::Repeat,
3941            Kind::UnnestArray(x) => TableFunc::UnnestArray {
3942                el_typ: x.into_rust()?,
3943            },
3944            Kind::UnnestList(x) => TableFunc::UnnestList {
3945                el_typ: x.into_rust()?,
3946            },
3947            Kind::UnnestMap(value_type) => TableFunc::UnnestMap {
3948                value_type: value_type.into_rust()?,
3949            },
3950            Kind::Wrap(x) => TableFunc::Wrap {
3951                width: x.width.into_rust()?,
3952                types: x.types.into_rust()?,
3953            },
3954            Kind::GenerateSubscriptsArray(()) => TableFunc::GenerateSubscriptsArray,
3955            Kind::TabletizedScalar(v) => TableFunc::TabletizedScalar {
3956                name: v.name,
3957                relation: v
3958                    .relation
3959                    .into_rust_if_some("ProtoTabletizedScalar::relation")?,
3960            },
3961            Kind::RegexpMatches(_) => TableFunc::RegexpMatches,
3962            Kind::WithOrdinality(inner) => TableFunc::WithOrdinality(WithOrdinality {
3963                inner: Box::new(
3964                    inner
3965                        .inner
3966                        .map(|inner| *inner)
3967                        .into_rust_if_some("ProtoWithOrdinality::inner")?,
3968                ),
3969            }),
3970        })
3971    }
3972}
3973
3974impl TableFunc {
3975    /// Executes `self` on the given input row (`datums`).
3976    pub fn eval<'a>(
3977        &'a self,
3978        datums: &'a [Datum<'a>],
3979        temp_storage: &'a RowArena,
3980    ) -> Result<Box<dyn Iterator<Item = (Row, Diff)> + 'a>, EvalError> {
3981        if self.empty_on_null_input() && datums.iter().any(|d| d.is_null()) {
3982            return Ok(Box::new(vec![].into_iter()));
3983        }
3984        match self {
3985            TableFunc::AclExplode => Ok(Box::new(acl_explode(datums[0], temp_storage)?)),
3986            TableFunc::MzAclExplode => Ok(Box::new(mz_acl_explode(datums[0], temp_storage)?)),
3987            TableFunc::JsonbEach { stringify } => {
3988                Ok(Box::new(jsonb_each(datums[0], temp_storage, *stringify)))
3989            }
3990            TableFunc::JsonbObjectKeys => Ok(Box::new(jsonb_object_keys(datums[0]))),
3991            TableFunc::JsonbArrayElements { stringify } => Ok(Box::new(jsonb_array_elements(
3992                datums[0],
3993                temp_storage,
3994                *stringify,
3995            ))),
3996            TableFunc::RegexpExtract(a) => Ok(Box::new(regexp_extract(datums[0], a).into_iter())),
3997            TableFunc::CsvExtract(n_cols) => Ok(Box::new(csv_extract(datums[0], *n_cols))),
3998            TableFunc::GenerateSeriesInt32 => {
3999                let res = generate_series(
4000                    datums[0].unwrap_int32(),
4001                    datums[1].unwrap_int32(),
4002                    datums[2].unwrap_int32(),
4003                )?;
4004                Ok(Box::new(res))
4005            }
4006            TableFunc::GenerateSeriesInt64 => {
4007                let res = generate_series(
4008                    datums[0].unwrap_int64(),
4009                    datums[1].unwrap_int64(),
4010                    datums[2].unwrap_int64(),
4011                )?;
4012                Ok(Box::new(res))
4013            }
4014            TableFunc::GenerateSeriesTimestamp => {
4015                fn pass_through<'a>(d: CheckedTimestamp<NaiveDateTime>) -> Datum<'a> {
4016                    Datum::from(d)
4017                }
4018                let res = generate_series_ts(
4019                    datums[0].unwrap_timestamp(),
4020                    datums[1].unwrap_timestamp(),
4021                    datums[2].unwrap_interval(),
4022                    pass_through,
4023                )?;
4024                Ok(Box::new(res))
4025            }
4026            TableFunc::GenerateSeriesTimestampTz => {
4027                fn gen_ts_tz<'a>(d: CheckedTimestamp<DateTime<Utc>>) -> Datum<'a> {
4028                    Datum::from(d)
4029                }
4030                let res = generate_series_ts(
4031                    datums[0].unwrap_timestamptz(),
4032                    datums[1].unwrap_timestamptz(),
4033                    datums[2].unwrap_interval(),
4034                    gen_ts_tz,
4035                )?;
4036                Ok(Box::new(res))
4037            }
4038            TableFunc::GenerateSubscriptsArray => {
4039                generate_subscripts_array(datums[0], datums[1].unwrap_int32())
4040            }
4041            TableFunc::GuardSubquerySize { column_type: _ } => {
4042                // We error if the count is not one,
4043                // and produce no rows if equal to one.
4044                let count = datums[0].unwrap_int64();
4045                if count != 1 {
4046                    Err(EvalError::MultipleRowsFromSubquery)
4047                } else {
4048                    Ok(Box::new([].into_iter()))
4049                }
4050            }
4051            TableFunc::Repeat => Ok(Box::new(repeat(datums[0]).into_iter())),
4052            TableFunc::UnnestArray { .. } => Ok(Box::new(unnest_array(datums[0]))),
4053            TableFunc::UnnestList { .. } => Ok(Box::new(unnest_list(datums[0]))),
4054            TableFunc::UnnestMap { .. } => Ok(Box::new(unnest_map(datums[0]))),
4055            TableFunc::Wrap { width, .. } => Ok(Box::new(wrap(datums, *width))),
4056            TableFunc::TabletizedScalar { .. } => {
4057                let r = Row::pack_slice(datums);
4058                Ok(Box::new(std::iter::once((r, Diff::ONE))))
4059            }
4060            TableFunc::RegexpMatches => Ok(Box::new(regexp_matches(datums)?)),
4061            TableFunc::WithOrdinality(func_with_ordinality) => {
4062                func_with_ordinality.eval(datums, temp_storage)
4063            }
4064        }
4065    }
4066
4067    pub fn output_type(&self) -> RelationType {
4068        let (column_types, keys) = match self {
4069            TableFunc::AclExplode => {
4070                let column_types = vec![
4071                    ScalarType::Oid.nullable(false),
4072                    ScalarType::Oid.nullable(false),
4073                    ScalarType::String.nullable(false),
4074                    ScalarType::Bool.nullable(false),
4075                ];
4076                let keys = vec![];
4077                (column_types, keys)
4078            }
4079            TableFunc::MzAclExplode => {
4080                let column_types = vec![
4081                    ScalarType::String.nullable(false),
4082                    ScalarType::String.nullable(false),
4083                    ScalarType::String.nullable(false),
4084                    ScalarType::Bool.nullable(false),
4085                ];
4086                let keys = vec![];
4087                (column_types, keys)
4088            }
4089            TableFunc::JsonbEach { stringify: true } => {
4090                let column_types = vec![
4091                    ScalarType::String.nullable(false),
4092                    ScalarType::String.nullable(true),
4093                ];
4094                let keys = vec![];
4095                (column_types, keys)
4096            }
4097            TableFunc::JsonbEach { stringify: false } => {
4098                let column_types = vec![
4099                    ScalarType::String.nullable(false),
4100                    ScalarType::Jsonb.nullable(false),
4101                ];
4102                let keys = vec![];
4103                (column_types, keys)
4104            }
4105            TableFunc::JsonbObjectKeys => {
4106                let column_types = vec![ScalarType::String.nullable(false)];
4107                let keys = vec![];
4108                (column_types, keys)
4109            }
4110            TableFunc::JsonbArrayElements { stringify: true } => {
4111                let column_types = vec![ScalarType::String.nullable(true)];
4112                let keys = vec![];
4113                (column_types, keys)
4114            }
4115            TableFunc::JsonbArrayElements { stringify: false } => {
4116                let column_types = vec![ScalarType::Jsonb.nullable(false)];
4117                let keys = vec![];
4118                (column_types, keys)
4119            }
4120            TableFunc::RegexpExtract(a) => {
4121                let column_types = a
4122                    .capture_groups_iter()
4123                    .map(|cg| ScalarType::String.nullable(cg.nullable))
4124                    .collect();
4125                let keys = vec![];
4126                (column_types, keys)
4127            }
4128            TableFunc::CsvExtract(n_cols) => {
4129                let column_types = iter::repeat(ScalarType::String.nullable(false))
4130                    .take(*n_cols)
4131                    .collect();
4132                let keys = vec![];
4133                (column_types, keys)
4134            }
4135            TableFunc::GenerateSeriesInt32 => {
4136                let column_types = vec![ScalarType::Int32.nullable(false)];
4137                let keys = vec![vec![0]];
4138                (column_types, keys)
4139            }
4140            TableFunc::GenerateSeriesInt64 => {
4141                let column_types = vec![ScalarType::Int64.nullable(false)];
4142                let keys = vec![vec![0]];
4143                (column_types, keys)
4144            }
4145            TableFunc::GenerateSeriesTimestamp => {
4146                let column_types = vec![ScalarType::Timestamp { precision: None }.nullable(false)];
4147                let keys = vec![vec![0]];
4148                (column_types, keys)
4149            }
4150            TableFunc::GenerateSeriesTimestampTz => {
4151                let column_types =
4152                    vec![ScalarType::TimestampTz { precision: None }.nullable(false)];
4153                let keys = vec![vec![0]];
4154                (column_types, keys)
4155            }
4156            TableFunc::GenerateSubscriptsArray => {
4157                let column_types = vec![ScalarType::Int32.nullable(false)];
4158                let keys = vec![vec![0]];
4159                (column_types, keys)
4160            }
4161            TableFunc::GuardSubquerySize { column_type } => {
4162                let column_types = vec![column_type.clone().nullable(false)];
4163                let keys = vec![];
4164                (column_types, keys)
4165            }
4166            TableFunc::Repeat => {
4167                let column_types = vec![];
4168                let keys = vec![];
4169                (column_types, keys)
4170            }
4171            TableFunc::UnnestArray { el_typ } => {
4172                let column_types = vec![el_typ.clone().nullable(true)];
4173                let keys = vec![];
4174                (column_types, keys)
4175            }
4176            TableFunc::UnnestList { el_typ } => {
4177                let column_types = vec![el_typ.clone().nullable(true)];
4178                let keys = vec![];
4179                (column_types, keys)
4180            }
4181            TableFunc::UnnestMap { value_type } => {
4182                let column_types = vec![
4183                    ScalarType::String.nullable(false),
4184                    value_type.clone().nullable(true),
4185                ];
4186                let keys = vec![vec![0]];
4187                (column_types, keys)
4188            }
4189            TableFunc::Wrap { types, .. } => {
4190                let column_types = types.clone();
4191                let keys = vec![];
4192                (column_types, keys)
4193            }
4194            TableFunc::TabletizedScalar { relation, .. } => {
4195                return relation.clone();
4196            }
4197            TableFunc::RegexpMatches => {
4198                let column_types =
4199                    vec![ScalarType::Array(Box::new(ScalarType::String)).nullable(false)];
4200                let keys = vec![];
4201
4202                (column_types, keys)
4203            }
4204            TableFunc::WithOrdinality(WithOrdinality { inner }) => {
4205                let mut typ = inner.output_type();
4206                // Add the ordinality column.
4207                typ.column_types.push(ScalarType::Int64.nullable(false));
4208                // The ordinality column is always a key.
4209                typ.keys.push(vec![typ.column_types.len() - 1]);
4210                (typ.column_types, typ.keys)
4211            }
4212        };
4213
4214        soft_assert_eq_no_log!(column_types.len(), self.output_arity());
4215
4216        if !keys.is_empty() {
4217            RelationType::new(column_types).with_keys(keys)
4218        } else {
4219            RelationType::new(column_types)
4220        }
4221    }
4222
4223    pub fn output_arity(&self) -> usize {
4224        match self {
4225            TableFunc::AclExplode => 4,
4226            TableFunc::MzAclExplode => 4,
4227            TableFunc::JsonbEach { .. } => 2,
4228            TableFunc::JsonbObjectKeys => 1,
4229            TableFunc::JsonbArrayElements { .. } => 1,
4230            TableFunc::RegexpExtract(a) => a.capture_groups_len(),
4231            TableFunc::CsvExtract(n_cols) => *n_cols,
4232            TableFunc::GenerateSeriesInt32 => 1,
4233            TableFunc::GenerateSeriesInt64 => 1,
4234            TableFunc::GenerateSeriesTimestamp => 1,
4235            TableFunc::GenerateSeriesTimestampTz => 1,
4236            TableFunc::GenerateSubscriptsArray => 1,
4237            TableFunc::GuardSubquerySize { .. } => 1,
4238            TableFunc::Repeat => 0,
4239            TableFunc::UnnestArray { .. } => 1,
4240            TableFunc::UnnestList { .. } => 1,
4241            TableFunc::UnnestMap { .. } => 2,
4242            TableFunc::Wrap { width, .. } => *width,
4243            TableFunc::TabletizedScalar { relation, .. } => relation.column_types.len(),
4244            TableFunc::RegexpMatches => 1,
4245            TableFunc::WithOrdinality(WithOrdinality { inner }) => inner.output_arity() + 1,
4246        }
4247    }
4248
4249    pub fn empty_on_null_input(&self) -> bool {
4250        match self {
4251            TableFunc::AclExplode
4252            | TableFunc::MzAclExplode
4253            | TableFunc::JsonbEach { .. }
4254            | TableFunc::JsonbObjectKeys
4255            | TableFunc::JsonbArrayElements { .. }
4256            | TableFunc::GenerateSeriesInt32
4257            | TableFunc::GenerateSeriesInt64
4258            | TableFunc::GenerateSeriesTimestamp
4259            | TableFunc::GenerateSeriesTimestampTz
4260            | TableFunc::GenerateSubscriptsArray
4261            | TableFunc::RegexpExtract(_)
4262            | TableFunc::CsvExtract(_)
4263            | TableFunc::Repeat
4264            | TableFunc::UnnestArray { .. }
4265            | TableFunc::UnnestList { .. }
4266            | TableFunc::UnnestMap { .. }
4267            | TableFunc::RegexpMatches => true,
4268            TableFunc::GuardSubquerySize { .. } => false,
4269            TableFunc::Wrap { .. } => false,
4270            TableFunc::TabletizedScalar { .. } => false,
4271            TableFunc::WithOrdinality(WithOrdinality { inner }) => inner.empty_on_null_input(),
4272        }
4273    }
4274
4275    /// True iff the table function preserves the append-only property of its input.
4276    pub fn preserves_monotonicity(&self) -> bool {
4277        // Most variants preserve monotonicity, but all variants are enumerated to
4278        // ensure that added variants at least check that this is the case.
4279        match self {
4280            TableFunc::AclExplode => false,
4281            TableFunc::MzAclExplode => false,
4282            TableFunc::JsonbEach { .. } => true,
4283            TableFunc::JsonbObjectKeys => true,
4284            TableFunc::JsonbArrayElements { .. } => true,
4285            TableFunc::RegexpExtract(_) => true,
4286            TableFunc::CsvExtract(_) => true,
4287            TableFunc::GenerateSeriesInt32 => true,
4288            TableFunc::GenerateSeriesInt64 => true,
4289            TableFunc::GenerateSeriesTimestamp => true,
4290            TableFunc::GenerateSeriesTimestampTz => true,
4291            TableFunc::GenerateSubscriptsArray => true,
4292            TableFunc::Repeat => false,
4293            TableFunc::UnnestArray { .. } => true,
4294            TableFunc::UnnestList { .. } => true,
4295            TableFunc::UnnestMap { .. } => true,
4296            TableFunc::Wrap { .. } => true,
4297            TableFunc::TabletizedScalar { .. } => true,
4298            TableFunc::RegexpMatches => true,
4299            TableFunc::GuardSubquerySize { .. } => false,
4300            TableFunc::WithOrdinality(WithOrdinality { inner }) => inner.preserves_monotonicity(),
4301        }
4302    }
4303}
4304
4305impl fmt::Display for TableFunc {
4306    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
4307        match self {
4308            TableFunc::AclExplode => f.write_str("aclexplode"),
4309            TableFunc::MzAclExplode => f.write_str("mz_aclexplode"),
4310            TableFunc::JsonbEach { .. } => f.write_str("jsonb_each"),
4311            TableFunc::JsonbObjectKeys => f.write_str("jsonb_object_keys"),
4312            TableFunc::JsonbArrayElements { .. } => f.write_str("jsonb_array_elements"),
4313            TableFunc::RegexpExtract(a) => write!(f, "regexp_extract({:?}, _)", a.0),
4314            TableFunc::CsvExtract(n_cols) => write!(f, "csv_extract({}, _)", n_cols),
4315            TableFunc::GenerateSeriesInt32 => f.write_str("generate_series"),
4316            TableFunc::GenerateSeriesInt64 => f.write_str("generate_series"),
4317            TableFunc::GenerateSeriesTimestamp => f.write_str("generate_series"),
4318            TableFunc::GenerateSeriesTimestampTz => f.write_str("generate_series"),
4319            TableFunc::GenerateSubscriptsArray => f.write_str("generate_subscripts"),
4320            TableFunc::GuardSubquerySize { .. } => f.write_str("guard_subquery_size"),
4321            TableFunc::Repeat => f.write_str("repeat_row"),
4322            TableFunc::UnnestArray { .. } => f.write_str("unnest_array"),
4323            TableFunc::UnnestList { .. } => f.write_str("unnest_list"),
4324            TableFunc::UnnestMap { .. } => f.write_str("unnest_map"),
4325            TableFunc::Wrap { width, .. } => write!(f, "wrap{}", width),
4326            TableFunc::TabletizedScalar { name, .. } => f.write_str(name),
4327            TableFunc::RegexpMatches => write!(f, "regexp_matches(_, _, _)"),
4328            TableFunc::WithOrdinality(WithOrdinality { inner }) => {
4329                write!(f, "{}[with_ordinality]", inner)
4330            }
4331        }
4332    }
4333}
4334
4335impl WithOrdinality {
4336    /// Executes the `self.inner` table function on the given input row (`datums`), and zips
4337    /// 1, 2, 3, ... to the result as a new column. We need to expand rows with non-1 diffs into the
4338    /// corresponding number of rows with unit diffs, because the ordinality column will have
4339    /// different values for each copy.
4340    ///
4341    /// # Panics
4342    ///
4343    /// Panics if the `inner` table function emits a negative diff.
4344    fn eval<'a>(
4345        &'a self,
4346        datums: &'a [Datum<'a>],
4347        temp_storage: &'a RowArena,
4348    ) -> Result<Box<dyn Iterator<Item = (Row, Diff)> + 'a>, EvalError> {
4349        let mut next_ordinal: i64 = 1;
4350        let it = self
4351            .inner
4352            .eval(datums, temp_storage)?
4353            .flat_map(move |(mut row, diff)| {
4354                let diff = diff.into_inner();
4355                // WITH ORDINALITY is not well-defined for negative diffs. This is ok, since the
4356                // only table function that can emit negative diffs is `repeat_row`, which is in
4357                // `mz_unsafe`, so users can never call it.
4358                //
4359                // (We also don't need to worry about negative diffs in FlatMap's input, because
4360                // the diff of the input of the FlatMap is factored in after we return from here.)
4361                assert!(diff >= 0);
4362                // The ordinals that will be associated with this row.
4363                let mut ordinals = next_ordinal..(next_ordinal + diff);
4364                next_ordinal += diff;
4365                // The maximum byte capacity we need for the original row and its ordinal.
4366                let cap = row.data_len() + datum_size(&Datum::Int64(next_ordinal));
4367                iter::from_fn(move || {
4368                    let ordinal = ordinals.next()?;
4369                    let mut row = if ordinals.is_empty() {
4370                        // This is the last row, so no need to clone. (Most table functions emit
4371                        // only 1 diffs, so this completely avoids cloning in most cases.)
4372                        std::mem::take(&mut row)
4373                    } else {
4374                        let mut new_row = Row::with_capacity(cap);
4375                        new_row.clone_from(&row);
4376                        new_row
4377                    };
4378                    RowPacker::for_existing_row(&mut row).push(Datum::Int64(ordinal));
4379                    Some((row, Diff::ONE))
4380                })
4381            });
4382        Ok(Box::new(it))
4383    }
4384}
4385
4386#[cfg(test)]
4387mod tests {
4388    use super::{AggregateFunc, ProtoAggregateFunc, ProtoTableFunc, TableFunc};
4389    use mz_ore::assert_ok;
4390    use mz_proto::protobuf_roundtrip;
4391    use proptest::prelude::*;
4392
4393    proptest! {
4394       #[mz_ore::test]
4395        #[cfg_attr(miri, ignore)] // too slow
4396        fn aggregate_func_protobuf_roundtrip(expect in any::<AggregateFunc>() ) {
4397            let actual = protobuf_roundtrip::<_, ProtoAggregateFunc>(&expect);
4398            assert_ok!(actual);
4399            assert_eq!(actual.unwrap(), expect);
4400        }
4401    }
4402
4403    proptest! {
4404       #[mz_ore::test]
4405        #[cfg_attr(miri, ignore)] // too slow
4406        fn table_func_protobuf_roundtrip(expect in any::<TableFunc>() ) {
4407            let actual = protobuf_roundtrip::<_, ProtoTableFunc>(&expect);
4408            assert_ok!(actual);
4409            assert_eq!(actual.unwrap(), expect);
4410        }
4411    }
4412}