mz_expr/relation/
func.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10#![allow(missing_docs)]
11
12use std::cmp::{max, min};
13use std::iter::Sum;
14use std::ops::Deref;
15use std::str::FromStr;
16use std::{fmt, iter};
17
18use chrono::{DateTime, NaiveDateTime, NaiveTime, Utc};
19use dec::OrderedDecimal;
20use itertools::{Either, Itertools};
21use mz_lowertest::MzReflect;
22use mz_ore::cast::CastFrom;
23
24use mz_ore::soft_assert_or_log;
25use mz_ore::str::separated;
26use mz_proto::{IntoRustIfSome, ProtoType, RustType, TryFromProtoError};
27use mz_repr::adt::array::ArrayDimension;
28use mz_repr::adt::date::Date;
29use mz_repr::adt::interval::Interval;
30use mz_repr::adt::numeric::{self, Numeric, NumericMaxScale};
31use mz_repr::adt::regex::Regex as ReprRegex;
32use mz_repr::adt::timestamp::{CheckedTimestamp, TimestampLike};
33use mz_repr::{
34    ColumnName, ColumnType, Datum, Diff, RelationType, Row, RowArena, ScalarType, SharedRow,
35};
36use num::{CheckedAdd, Integer, Signed, ToPrimitive};
37use ordered_float::OrderedFloat;
38use proptest::prelude::{Arbitrary, Just};
39use proptest::strategy::{BoxedStrategy, Strategy, Union};
40use proptest_derive::Arbitrary;
41use regex::Regex;
42use serde::{Deserialize, Serialize};
43use smallvec::SmallVec;
44
45use crate::EvalError;
46use crate::WindowFrameBound::{
47    CurrentRow, OffsetFollowing, OffsetPreceding, UnboundedFollowing, UnboundedPreceding,
48};
49use crate::WindowFrameUnits::{Groups, Range, Rows};
50use crate::explain::{HumanizedExpr, HumanizerMode};
51use crate::relation::proto_aggregate_func::{
52    self, ProtoColumnOrders, ProtoFusedValueWindowFunc, ProtoFusedWindowAggregate,
53};
54use crate::relation::proto_table_func::ProtoTabletizedScalar;
55use crate::relation::{
56    ColumnOrder, ProtoAggregateFunc, ProtoTableFunc, WindowFrame, WindowFrameBound,
57    WindowFrameUnits, compare_columns, proto_table_func,
58};
59use crate::scalar::func::{add_timestamp_months, jsonb_stringify};
60
61include!(concat!(env!("OUT_DIR"), "/mz_expr.relation.func.rs"));
62
63// TODO(jamii) be careful about overflow in sum/avg
64// see https://timely.zulipchat.com/#narrow/stream/186635-engineering/topic/additional.20work/near/163507435
65
66fn max_string<'a, I>(datums: I) -> Datum<'a>
67where
68    I: IntoIterator<Item = Datum<'a>>,
69{
70    match datums
71        .into_iter()
72        .filter(|d| !d.is_null())
73        .max_by(|a, b| a.unwrap_str().cmp(b.unwrap_str()))
74    {
75        Some(datum) => datum,
76        None => Datum::Null,
77    }
78}
79
80fn max_datum<'a, I, DatumType>(datums: I) -> Datum<'a>
81where
82    I: IntoIterator<Item = Datum<'a>>,
83    DatumType: TryFrom<Datum<'a>> + Ord,
84    <DatumType as TryFrom<Datum<'a>>>::Error: std::fmt::Debug,
85    Datum<'a>: From<Option<DatumType>>,
86{
87    let x: Option<DatumType> = datums
88        .into_iter()
89        .filter(|d| !d.is_null())
90        .map(|d| DatumType::try_from(d).expect("unexpected type"))
91        .max();
92
93    x.into()
94}
95
96fn min_datum<'a, I, DatumType>(datums: I) -> Datum<'a>
97where
98    I: IntoIterator<Item = Datum<'a>>,
99    DatumType: TryFrom<Datum<'a>> + Ord,
100    <DatumType as TryFrom<Datum<'a>>>::Error: std::fmt::Debug,
101    Datum<'a>: From<Option<DatumType>>,
102{
103    let x: Option<DatumType> = datums
104        .into_iter()
105        .filter(|d| !d.is_null())
106        .map(|d| DatumType::try_from(d).expect("unexpected type"))
107        .min();
108
109    x.into()
110}
111
112fn min_string<'a, I>(datums: I) -> Datum<'a>
113where
114    I: IntoIterator<Item = Datum<'a>>,
115{
116    match datums
117        .into_iter()
118        .filter(|d| !d.is_null())
119        .min_by(|a, b| a.unwrap_str().cmp(b.unwrap_str()))
120    {
121        Some(datum) => datum,
122        None => Datum::Null,
123    }
124}
125
126fn sum_datum<'a, I, DatumType, ResultType>(datums: I) -> Datum<'a>
127where
128    I: IntoIterator<Item = Datum<'a>>,
129    DatumType: TryFrom<Datum<'a>>,
130    <DatumType as TryFrom<Datum<'a>>>::Error: std::fmt::Debug,
131    ResultType: From<DatumType> + Sum + Into<Datum<'a>>,
132{
133    let mut datums = datums.into_iter().filter(|d| !d.is_null()).peekable();
134    if datums.peek().is_none() {
135        Datum::Null
136    } else {
137        let x = datums
138            .map(|d| ResultType::from(DatumType::try_from(d).expect("unexpected type")))
139            .sum::<ResultType>();
140        x.into()
141    }
142}
143
144fn sum_numeric<'a, I>(datums: I) -> Datum<'a>
145where
146    I: IntoIterator<Item = Datum<'a>>,
147{
148    let mut cx = numeric::cx_datum();
149    let mut sum = Numeric::zero();
150    let mut empty = true;
151    for d in datums {
152        if !d.is_null() {
153            empty = false;
154            cx.add(&mut sum, &d.unwrap_numeric().0);
155        }
156    }
157    match empty {
158        true => Datum::Null,
159        false => Datum::from(sum),
160    }
161}
162
163// TODO(benesch): remove potentially dangerous usage of `as`.
164#[allow(clippy::as_conversions)]
165fn count<'a, I>(datums: I) -> Datum<'a>
166where
167    I: IntoIterator<Item = Datum<'a>>,
168{
169    // TODO(jkosh44) This should error when the count can't fit inside of an `i64` instead of returning a negative result.
170    let x: i64 = datums.into_iter().filter(|d| !d.is_null()).count() as i64;
171    Datum::from(x)
172}
173
174fn any<'a, I>(datums: I) -> Datum<'a>
175where
176    I: IntoIterator<Item = Datum<'a>>,
177{
178    datums
179        .into_iter()
180        .fold(Datum::False, |state, next| match (state, next) {
181            (Datum::True, _) | (_, Datum::True) => Datum::True,
182            (Datum::Null, _) | (_, Datum::Null) => Datum::Null,
183            _ => Datum::False,
184        })
185}
186
187fn all<'a, I>(datums: I) -> Datum<'a>
188where
189    I: IntoIterator<Item = Datum<'a>>,
190{
191    datums
192        .into_iter()
193        .fold(Datum::True, |state, next| match (state, next) {
194            (Datum::False, _) | (_, Datum::False) => Datum::False,
195            (Datum::Null, _) | (_, Datum::Null) => Datum::Null,
196            _ => Datum::True,
197        })
198}
199
200fn string_agg<'a, I>(datums: I, temp_storage: &'a RowArena, order_by: &[ColumnOrder]) -> Datum<'a>
201where
202    I: IntoIterator<Item = Datum<'a>>,
203{
204    const EMPTY_SEP: &str = "";
205
206    let datums = order_aggregate_datums(datums, order_by);
207    let mut sep_value_pairs = datums.into_iter().filter_map(|d| {
208        if d.is_null() {
209            return None;
210        }
211        let mut value_sep = d.unwrap_list().iter();
212        match (value_sep.next().unwrap(), value_sep.next().unwrap()) {
213            (Datum::Null, _) => None,
214            (Datum::String(val), Datum::Null) => Some((EMPTY_SEP, val)),
215            (Datum::String(val), Datum::String(sep)) => Some((sep, val)),
216            _ => unreachable!(),
217        }
218    });
219
220    let mut s = String::default();
221    match sep_value_pairs.next() {
222        // First value not prefixed by its separator
223        Some((_, value)) => s.push_str(value),
224        // If no non-null values sent, return NULL.
225        None => return Datum::Null,
226    }
227
228    for (sep, value) in sep_value_pairs {
229        s.push_str(sep);
230        s.push_str(value);
231    }
232
233    Datum::String(temp_storage.push_string(s))
234}
235
236fn jsonb_agg<'a, I>(datums: I, temp_storage: &'a RowArena, order_by: &[ColumnOrder]) -> Datum<'a>
237where
238    I: IntoIterator<Item = Datum<'a>>,
239{
240    let datums = order_aggregate_datums(datums, order_by);
241    temp_storage.make_datum(|packer| {
242        packer.push_list(datums.into_iter().filter(|d| !d.is_null()));
243    })
244}
245
246fn dict_agg<'a, I>(datums: I, temp_storage: &'a RowArena, order_by: &[ColumnOrder]) -> Datum<'a>
247where
248    I: IntoIterator<Item = Datum<'a>>,
249{
250    let datums = order_aggregate_datums(datums, order_by);
251    temp_storage.make_datum(|packer| {
252        let mut datums: Vec<_> = datums
253            .into_iter()
254            .filter_map(|d| {
255                if d.is_null() {
256                    return None;
257                }
258                let mut list = d.unwrap_list().iter();
259                let key = list.next().unwrap();
260                let val = list.next().unwrap();
261                if key.is_null() {
262                    // TODO(benesch): this should produce an error, but
263                    // aggregate functions cannot presently produce errors.
264                    None
265                } else {
266                    Some((key.unwrap_str(), val))
267                }
268            })
269            .collect();
270        // datums are ordered by any ORDER BY clause now, and we want to preserve
271        // the last entry for each key, but we also need to present unique and sorted
272        // keys to push_dict. Use sort_by here, which is stable, and so will preserve
273        // the ORDER BY order. Then reverse and dedup to retain the last of each
274        // key. Reverse again so we're back in push_dict order.
275        datums.sort_by_key(|(k, _v)| *k);
276        datums.reverse();
277        datums.dedup_by_key(|(k, _v)| *k);
278        datums.reverse();
279        packer.push_dict(datums);
280    })
281}
282
283/// Assuming datums is a List, sort them by the 2nd through Nth elements
284/// corresponding to order_by, then return the 1st element.
285///
286/// Near the usages of this function, we sometimes want to produce Datums with a shorter lifetime
287/// than 'a. We have to actually perform the shortening of the lifetime here, inside this function,
288/// because if we were to simply return `impl Iterator<Item = Datum<'a>>`, that wouldn't be
289/// covariant in the item type, because opaque types are always invariant. (Contrast this with how
290/// we perform the shortening _inside_ this function: the input of the `map` is known to
291/// specifically be `std::vec::IntoIter`, which is known to be covariant.)
292pub fn order_aggregate_datums<'a: 'b, 'b, I>(
293    datums: I,
294    order_by: &[ColumnOrder],
295) -> impl Iterator<Item = Datum<'b>>
296where
297    I: IntoIterator<Item = Datum<'a>>,
298{
299    order_aggregate_datums_with_rank_inner(datums, order_by)
300        .into_iter()
301        // (`payload` is coerced here to `Datum<'b>` in the argument of the closure)
302        .map(|(payload, _order_datums)| payload)
303}
304
305/// Assuming datums is a List, sort them by the 2nd through Nth elements
306/// corresponding to order_by, then return the 1st element and computed order by expression.
307fn order_aggregate_datums_with_rank<'a, I>(
308    datums: I,
309    order_by: &[ColumnOrder],
310) -> impl Iterator<Item = (Datum<'a>, Row)>
311where
312    I: IntoIterator<Item = Datum<'a>>,
313{
314    order_aggregate_datums_with_rank_inner(datums, order_by)
315        .into_iter()
316        .map(|(payload, order_by_datums)| (payload, Row::pack(order_by_datums)))
317}
318
319fn order_aggregate_datums_with_rank_inner<'a, I>(
320    datums: I,
321    order_by: &[ColumnOrder],
322) -> Vec<(Datum<'a>, Vec<Datum<'a>>)>
323where
324    I: IntoIterator<Item = Datum<'a>>,
325{
326    let mut decoded: Vec<(Datum, Vec<Datum>)> = datums
327        .into_iter()
328        .map(|d| {
329            let list = d.unwrap_list();
330            let mut list_it = list.iter();
331            let payload = list_it.next().unwrap();
332
333            // We decode the order_by Datums here instead of the comparison function, because the
334            // comparison function is expected to be called `O(log n)` times on each input row.
335            // The only downside is that the decoded data might be bigger, but I think that's fine,
336            // because:
337            // - if we have a window partition so big that this would create a memory problem, then
338            //   the non-incrementalness of window functions will create a serious CPU problem
339            //   anyway,
340            // - and anyhow various other parts of the window function code already do decoding
341            //   upfront.
342            let mut order_by_datums = Vec::with_capacity(order_by.len());
343            for _ in 0..order_by.len() {
344                order_by_datums.push(
345                    list_it
346                        .next()
347                        .expect("must have exactly the same number of Datums as `order_by`"),
348                );
349            }
350
351            (payload, order_by_datums)
352        })
353        .collect();
354
355    let mut sort_by =
356        |(payload_left, left_order_by_datums): &(Datum, Vec<Datum>),
357         (payload_right, right_order_by_datums): &(Datum, Vec<Datum>)| {
358            compare_columns(
359                order_by,
360                left_order_by_datums,
361                right_order_by_datums,
362                || payload_left.cmp(payload_right),
363            )
364        };
365    // `sort_unstable_by` can be faster and uses less memory than `sort_by`. An unstable sort is
366    // enough here, because if two elements are equal in our `compare` function, then the elements
367    // are actually binary-equal (because of the `tiebreaker` given to `compare_columns`), so it
368    // doesn't matter what order they end up in.
369    decoded.sort_unstable_by(&mut sort_by);
370    decoded
371}
372
373fn array_concat<'a, I>(datums: I, temp_storage: &'a RowArena, order_by: &[ColumnOrder]) -> Datum<'a>
374where
375    I: IntoIterator<Item = Datum<'a>>,
376{
377    let datums = order_aggregate_datums(datums, order_by);
378    let datums: Vec<_> = datums
379        .into_iter()
380        .map(|d| d.unwrap_array().elements().iter())
381        .flatten()
382        .collect();
383    let dims = ArrayDimension {
384        lower_bound: 1,
385        length: datums.len(),
386    };
387    temp_storage.make_datum(|packer| {
388        packer.try_push_array(&[dims], datums).unwrap();
389    })
390}
391
392fn list_concat<'a, I>(datums: I, temp_storage: &'a RowArena, order_by: &[ColumnOrder]) -> Datum<'a>
393where
394    I: IntoIterator<Item = Datum<'a>>,
395{
396    let datums = order_aggregate_datums(datums, order_by);
397    temp_storage.make_datum(|packer| {
398        packer.push_list(datums.into_iter().map(|d| d.unwrap_list().iter()).flatten());
399    })
400}
401
402/// The expected input is in the format of `[((OriginalRow, [EncodedArgs]), OrderByExprs...)]`
403/// The output is in the format of `[result_value, original_row]`.
404/// See an example at `lag_lead`, where the input-output formats are similar.
405fn row_number<'a, I>(
406    datums: I,
407    callers_temp_storage: &'a RowArena,
408    order_by: &[ColumnOrder],
409) -> Datum<'a>
410where
411    I: IntoIterator<Item = Datum<'a>>,
412{
413    // We want to use our own temp_storage here, to avoid flooding `callers_temp_storage` with a
414    // large number of new datums. This is because we don't want to make an assumption about
415    // whether the caller creates a new temp_storage between window partitions.
416    let temp_storage = RowArena::new();
417    let datums = row_number_no_list(datums, &temp_storage, order_by);
418
419    callers_temp_storage.make_datum(|packer| {
420        packer.push_list(datums);
421    })
422}
423
424/// Like `row_number`, but doesn't perform the final wrapping in a list, returning an Iterator
425/// instead.
426fn row_number_no_list<'a: 'b, 'b, I>(
427    datums: I,
428    callers_temp_storage: &'b RowArena,
429    order_by: &[ColumnOrder],
430) -> impl Iterator<Item = Datum<'b>>
431where
432    I: IntoIterator<Item = Datum<'a>>,
433{
434    let datums = order_aggregate_datums(datums, order_by);
435
436    callers_temp_storage.reserve(datums.size_hint().0);
437    datums
438        .into_iter()
439        .map(|d| d.unwrap_list().iter())
440        .flatten()
441        .zip(1i64..)
442        .map(|(d, i)| {
443            callers_temp_storage.make_datum(|packer| {
444                packer.push_list_with(|packer| {
445                    packer.push(Datum::Int64(i));
446                    packer.push(d);
447                });
448            })
449        })
450}
451
452/// The expected input is in the format of `[((OriginalRow, [EncodedArgs]), OrderByExprs...)]`
453/// The output is in the format of `[result_value, original_row]`.
454/// See an example at `lag_lead`, where the input-output formats are similar.
455fn rank<'a, I>(datums: I, callers_temp_storage: &'a RowArena, order_by: &[ColumnOrder]) -> Datum<'a>
456where
457    I: IntoIterator<Item = Datum<'a>>,
458{
459    let temp_storage = RowArena::new();
460    let datums = rank_no_list(datums, &temp_storage, order_by);
461
462    callers_temp_storage.make_datum(|packer| {
463        packer.push_list(datums);
464    })
465}
466
467/// Like `rank`, but doesn't perform the final wrapping in a list, returning an Iterator
468/// instead.
469fn rank_no_list<'a: 'b, 'b, I>(
470    datums: I,
471    callers_temp_storage: &'b RowArena,
472    order_by: &[ColumnOrder],
473) -> impl Iterator<Item = Datum<'b>>
474where
475    I: IntoIterator<Item = Datum<'a>>,
476{
477    // Keep the row used for ordering around, as it is used to determine the rank
478    let datums = order_aggregate_datums_with_rank(datums, order_by);
479
480    let mut datums = datums
481        .into_iter()
482        .map(|(d0, order_row)| {
483            d0.unwrap_list()
484                .iter()
485                .map(move |d1| (d1, order_row.clone()))
486        })
487        .flatten();
488
489    callers_temp_storage.reserve(datums.size_hint().0);
490    datums
491        .next()
492        .map_or(vec![], |(first_datum, first_order_row)| {
493            // Folding with (last order_by row, last assigned rank, row number, output vec)
494            datums.fold((first_order_row, 1, 1, vec![(first_datum, 1)]), |mut acc, (next_datum, next_order_row)| {
495                let (ref mut acc_row, ref mut acc_rank, ref mut acc_row_num, ref mut output) = acc;
496                *acc_row_num += 1;
497                // Identity is based on the order_by expression
498                if *acc_row != next_order_row {
499                    *acc_rank = *acc_row_num;
500                    *acc_row = next_order_row;
501                }
502
503                (*output).push((next_datum, *acc_rank));
504                acc
505            })
506        }.3).into_iter().map(|(d, i)| {
507        callers_temp_storage.make_datum(|packer| {
508            packer.push_list_with(|packer| {
509                packer.push(Datum::Int64(i));
510                packer.push(d);
511            });
512        })
513    })
514}
515
516/// The expected input is in the format of `[((OriginalRow, [EncodedArgs]), OrderByExprs...)]`
517/// The output is in the format of `[result_value, original_row]`.
518/// See an example at `lag_lead`, where the input-output formats are similar.
519fn dense_rank<'a, I>(
520    datums: I,
521    callers_temp_storage: &'a RowArena,
522    order_by: &[ColumnOrder],
523) -> Datum<'a>
524where
525    I: IntoIterator<Item = Datum<'a>>,
526{
527    let temp_storage = RowArena::new();
528    let datums = dense_rank_no_list(datums, &temp_storage, order_by);
529
530    callers_temp_storage.make_datum(|packer| {
531        packer.push_list(datums);
532    })
533}
534
535/// Like `dense_rank`, but doesn't perform the final wrapping in a list, returning an Iterator
536/// instead.
537fn dense_rank_no_list<'a: 'b, 'b, I>(
538    datums: I,
539    callers_temp_storage: &'b RowArena,
540    order_by: &[ColumnOrder],
541) -> impl Iterator<Item = Datum<'b>>
542where
543    I: IntoIterator<Item = Datum<'a>>,
544{
545    // Keep the row used for ordering around, as it is used to determine the rank
546    let datums = order_aggregate_datums_with_rank(datums, order_by);
547
548    let mut datums = datums
549        .into_iter()
550        .map(|(d0, order_row)| {
551            d0.unwrap_list()
552                .iter()
553                .map(move |d1| (d1, order_row.clone()))
554        })
555        .flatten();
556
557    callers_temp_storage.reserve(datums.size_hint().0);
558    datums
559        .next()
560        .map_or(vec![], |(first_datum, first_order_row)| {
561            // Folding with (last order_by row, last assigned rank, output vec)
562            datums.fold((first_order_row, 1, vec![(first_datum, 1)]), |mut acc, (next_datum, next_order_row)| {
563                let (ref mut acc_row, ref mut acc_rank, ref mut output) = acc;
564                // Identity is based on the order_by expression
565                if *acc_row != next_order_row {
566                    *acc_rank += 1;
567                    *acc_row = next_order_row;
568                }
569
570                (*output).push((next_datum, *acc_rank));
571                acc
572            })
573        }.2).into_iter().map(|(d, i)| {
574        callers_temp_storage.make_datum(|packer| {
575            packer.push_list_with(|packer| {
576                packer.push(Datum::Int64(i));
577                packer.push(d);
578            });
579        })
580    })
581}
582
583/// The expected input is in the format of `[((OriginalRow, EncodedArgs), OrderByExprs...)]`
584/// For example,
585///
586/// lag(x*y, 1, null) over (partition by x+y order by x-y, x/y)
587///
588/// list of:
589/// row(
590///   row(
591///     row(#0, #1),
592///     row((#0 * #1), 1, null)
593///   ),
594///   (#0 - #1),
595///   (#0 / #1)
596/// )
597///
598/// The output is in the format of `[result_value, original_row]`, e.g.
599/// list of:
600/// row(
601///   42,
602///   row(7, 8)
603/// )
604fn lag_lead<'a, I>(
605    datums: I,
606    callers_temp_storage: &'a RowArena,
607    order_by: &[ColumnOrder],
608    lag_lead_type: &LagLeadType,
609    ignore_nulls: &bool,
610) -> Datum<'a>
611where
612    I: IntoIterator<Item = Datum<'a>>,
613{
614    let temp_storage = RowArena::new();
615    let iter = lag_lead_no_list(datums, &temp_storage, order_by, lag_lead_type, ignore_nulls);
616    callers_temp_storage.make_datum(|packer| {
617        packer.push_list(iter);
618    })
619}
620
621/// Like `lag_lead`, but doesn't perform the final wrapping in a list, returning an Iterator
622/// instead.
623fn lag_lead_no_list<'a: 'b, 'b, I>(
624    datums: I,
625    callers_temp_storage: &'b RowArena,
626    order_by: &[ColumnOrder],
627    lag_lead_type: &LagLeadType,
628    ignore_nulls: &bool,
629) -> impl Iterator<Item = Datum<'b>>
630where
631    I: IntoIterator<Item = Datum<'a>>,
632{
633    // Sort the datums according to the ORDER BY expressions and return the (OriginalRow, EncodedArgs) record
634    let datums = order_aggregate_datums(datums, order_by);
635
636    // Take the (OriginalRow, EncodedArgs) records and unwrap them into separate datums.
637    // EncodedArgs = (InputValue, Offset, DefaultValue) for Lag/Lead
638    // (`OriginalRow` is kept in a record form, as we don't need to look inside that.)
639    let (orig_rows, unwrapped_args): (Vec<_>, Vec<_>) = datums
640        .into_iter()
641        .map(|d| {
642            let mut iter = d.unwrap_list().iter();
643            let original_row = iter.next().unwrap();
644            let (input_value, offset, default_value) =
645                unwrap_lag_lead_encoded_args(iter.next().unwrap());
646            (original_row, (input_value, offset, default_value))
647        })
648        .unzip();
649
650    let result = lag_lead_inner(unwrapped_args, lag_lead_type, ignore_nulls);
651
652    callers_temp_storage.reserve(result.len());
653    result
654        .into_iter()
655        .zip_eq(orig_rows)
656        .map(|(result_value, original_row)| {
657            callers_temp_storage.make_datum(|packer| {
658                packer.push_list_with(|packer| {
659                    packer.push(result_value);
660                    packer.push(original_row);
661                });
662            })
663        })
664}
665
666/// lag/lead's arguments are in a record. This function unwraps this record.
667fn unwrap_lag_lead_encoded_args(encoded_args: Datum) -> (Datum, Datum, Datum) {
668    let mut encoded_args_iter = encoded_args.unwrap_list().iter();
669    let (input_value, offset, default_value) = (
670        encoded_args_iter.next().unwrap(),
671        encoded_args_iter.next().unwrap(),
672        encoded_args_iter.next().unwrap(),
673    );
674    (input_value, offset, default_value)
675}
676
677/// Each element of `args` has the 3 arguments evaluated for a single input row.
678/// Returns the results for each input row.
679fn lag_lead_inner<'a>(
680    args: Vec<(Datum<'a>, Datum<'a>, Datum<'a>)>,
681    lag_lead_type: &LagLeadType,
682    ignore_nulls: &bool,
683) -> Vec<Datum<'a>> {
684    if *ignore_nulls {
685        lag_lead_inner_ignore_nulls(args, lag_lead_type)
686    } else {
687        lag_lead_inner_respect_nulls(args, lag_lead_type)
688    }
689}
690
691fn lag_lead_inner_respect_nulls<'a>(
692    args: Vec<(Datum<'a>, Datum<'a>, Datum<'a>)>,
693    lag_lead_type: &LagLeadType,
694) -> Vec<Datum<'a>> {
695    let mut result: Vec<Datum> = Vec::with_capacity(args.len());
696    for (idx, (_, offset, default_value)) in args.iter().enumerate() {
697        // Null offsets are acceptable, and always return null
698        if offset.is_null() {
699            result.push(Datum::Null);
700            continue;
701        }
702
703        let idx = i64::try_from(idx).expect("Array index does not fit in i64");
704        let offset = i64::from(offset.unwrap_int32());
705        let offset = match lag_lead_type {
706            LagLeadType::Lag => -offset,
707            LagLeadType::Lead => offset,
708        };
709
710        // Get a Datum from `datums`. Return None if index is out of range.
711        let datums_get = |i: i64| -> Option<Datum> {
712            match u64::try_from(i) {
713                Ok(i) => args
714                    .get(usize::cast_from(i))
715                    .map(|d| Some(d.0)) // succeeded in getting a Datum from the vec
716                    .unwrap_or(None), // overindexing
717                Err(_) => None, // underindexing (negative index)
718            }
719        };
720
721        let lagged_value = datums_get(idx + offset).unwrap_or(*default_value);
722
723        result.push(lagged_value);
724    }
725
726    result
727}
728
729// `i64` indexes get involved in this function because it's convenient to allow negative indexes and
730// have `datums_get` fail on them, and thus handle the beginning and end of the input vector
731// uniformly, rather than checking underflow separately during index manipulations.
732#[allow(clippy::as_conversions)]
733fn lag_lead_inner_ignore_nulls<'a>(
734    args: Vec<(Datum<'a>, Datum<'a>, Datum<'a>)>,
735    lag_lead_type: &LagLeadType,
736) -> Vec<Datum<'a>> {
737    // We check here once that even the largest index fits in `i64`, and then do silent `as`
738    // conversions from `usize` indexes to `i64` indexes throughout this function.
739    if i64::try_from(args.len()).is_err() {
740        panic!("window partition way too big")
741    }
742    // Preparation: Make sure we can jump over a run of nulls in constant time, i.e., regardless of
743    // how many nulls the run has. The following skip tables will point to the next non-null index.
744    let mut skip_nulls_backward = vec![None; args.len()];
745    let mut last_non_null: i64 = -1;
746    let pairs = args
747        .iter()
748        .enumerate()
749        .zip_eq(skip_nulls_backward.iter_mut());
750    for ((i, (d, _, _)), slot) in pairs {
751        if d.is_null() {
752            *slot = Some(last_non_null);
753        } else {
754            last_non_null = i as i64;
755        }
756    }
757    let mut skip_nulls_forward = vec![None; args.len()];
758    let mut last_non_null: i64 = args.len() as i64;
759    let pairs = args
760        .iter()
761        .enumerate()
762        .rev()
763        .zip_eq(skip_nulls_forward.iter_mut().rev());
764    for ((i, (d, _, _)), slot) in pairs {
765        if d.is_null() {
766            *slot = Some(last_non_null);
767        } else {
768            last_non_null = i as i64;
769        }
770    }
771
772    // The actual computation.
773    let mut result: Vec<Datum> = Vec::with_capacity(args.len());
774    for (idx, (_, offset, default_value)) in args.iter().enumerate() {
775        // Null offsets are acceptable, and always return null
776        if offset.is_null() {
777            result.push(Datum::Null);
778            continue;
779        }
780
781        let idx = idx as i64; // checked at the beginning of the function that len() fits
782        let offset = i64::cast_from(offset.unwrap_int32());
783        let offset = match lag_lead_type {
784            LagLeadType::Lag => -offset,
785            LagLeadType::Lead => offset,
786        };
787        let increment = offset.signum();
788
789        // Get a Datum from `datums`. Return None if index is out of range.
790        let datums_get = |i: i64| -> Option<Datum> {
791            match u64::try_from(i) {
792                Ok(i) => args
793                    .get(usize::cast_from(i))
794                    .map(|d| Some(d.0)) // succeeded in getting a Datum from the vec
795                    .unwrap_or(None), // overindexing
796                Err(_) => None, // underindexing (negative index)
797            }
798        };
799
800        let lagged_value = if increment != 0 {
801            // We start j from idx, and step j until we have seen an abs(offset) number of non-null
802            // values or reach the beginning or end of the partition.
803            //
804            // If offset is big, then this is slow: Considering the entire function, it's
805            // `O(partition_size * offset)`.
806            // However, a common use case is an offset of 1, for which this doesn't matter.
807            // TODO: For larger offsets, we could have a completely different implementation
808            // that starts the inner loop from the index where we found the previous result:
809            // https://github.com/MaterializeInc/materialize/pull/29287#discussion_r1738695174
810            let mut j = idx;
811            for _ in 0..num::abs(offset) {
812                j += increment;
813                // Jump over a run of nulls
814                if datums_get(j).is_some_and(|d| d.is_null()) {
815                    let ju = j as usize; // `j >= 0` because of the above `is_some_and`
816                    if increment > 0 {
817                        j = skip_nulls_forward[ju].expect("checked above that it's null");
818                    } else {
819                        j = skip_nulls_backward[ju].expect("checked above that it's null");
820                    }
821                }
822                if datums_get(j).is_none() {
823                    break;
824                }
825            }
826            match datums_get(j) {
827                Some(datum) => datum,
828                None => *default_value,
829            }
830        } else {
831            assert_eq!(offset, 0);
832            let datum = datums_get(idx).expect("known to exist");
833            if !datum.is_null() {
834                datum
835            } else {
836                // I can imagine returning here either `default_value` or `null`.
837                // (I'm leaning towards `default_value`.)
838                // We used to run into an infinite loop in this case, so panicking is
839                // better. Started a SQL Council thread:
840                // https://materializeinc.slack.com/archives/C063H5S7NKE/p1724962369706729
841                panic!("0 offset in lag/lead IGNORE NULLS");
842            }
843        };
844
845        result.push(lagged_value);
846    }
847
848    result
849}
850
851/// The expected input is in the format of [((OriginalRow, InputValue), OrderByExprs...)]
852fn first_value<'a, I>(
853    datums: I,
854    callers_temp_storage: &'a RowArena,
855    order_by: &[ColumnOrder],
856    window_frame: &WindowFrame,
857) -> Datum<'a>
858where
859    I: IntoIterator<Item = Datum<'a>>,
860{
861    let temp_storage = RowArena::new();
862    let iter = first_value_no_list(datums, &temp_storage, order_by, window_frame);
863    callers_temp_storage.make_datum(|packer| {
864        packer.push_list(iter);
865    })
866}
867
868/// Like `first_value`, but doesn't perform the final wrapping in a list, returning an Iterator
869/// instead.
870fn first_value_no_list<'a: 'b, 'b, I>(
871    datums: I,
872    callers_temp_storage: &'b RowArena,
873    order_by: &[ColumnOrder],
874    window_frame: &WindowFrame,
875) -> impl Iterator<Item = Datum<'b>>
876where
877    I: IntoIterator<Item = Datum<'a>>,
878{
879    // Sort the datums according to the ORDER BY expressions and return the (OriginalRow, InputValue) record
880    let datums = order_aggregate_datums(datums, order_by);
881
882    // Decode the input (OriginalRow, InputValue) into separate datums
883    let (orig_rows, args): (Vec<_>, Vec<_>) = datums
884        .into_iter()
885        .map(|d| {
886            let mut iter = d.unwrap_list().iter();
887            let original_row = iter.next().unwrap();
888            let arg = iter.next().unwrap();
889
890            (original_row, arg)
891        })
892        .unzip();
893
894    let results = first_value_inner(args, window_frame);
895
896    callers_temp_storage.reserve(results.len());
897    results
898        .into_iter()
899        .zip_eq(orig_rows)
900        .map(|(result_value, original_row)| {
901            callers_temp_storage.make_datum(|packer| {
902                packer.push_list_with(|packer| {
903                    packer.push(result_value);
904                    packer.push(original_row);
905                });
906            })
907        })
908}
909
910fn first_value_inner<'a>(datums: Vec<Datum<'a>>, window_frame: &WindowFrame) -> Vec<Datum<'a>> {
911    let length = datums.len();
912    let mut result: Vec<Datum> = Vec::with_capacity(length);
913    for (idx, current_datum) in datums.iter().enumerate() {
914        let first_value = match &window_frame.start_bound {
915            // Always return the current value
916            WindowFrameBound::CurrentRow => *current_datum,
917            WindowFrameBound::UnboundedPreceding => {
918                if let WindowFrameBound::OffsetPreceding(end_offset) = &window_frame.end_bound {
919                    let end_offset = usize::cast_from(*end_offset);
920
921                    // If the frame ends before the first row, return null
922                    if idx < end_offset {
923                        Datum::Null
924                    } else {
925                        datums[0]
926                    }
927                } else {
928                    datums[0]
929                }
930            }
931            WindowFrameBound::OffsetPreceding(offset) => {
932                let start_offset = usize::cast_from(*offset);
933                let start_idx = idx.saturating_sub(start_offset);
934                if let WindowFrameBound::OffsetPreceding(end_offset) = &window_frame.end_bound {
935                    let end_offset = usize::cast_from(*end_offset);
936
937                    // If the frame is empty or ends before the first row, return null
938                    if start_offset < end_offset || idx < end_offset {
939                        Datum::Null
940                    } else {
941                        datums[start_idx]
942                    }
943                } else {
944                    datums[start_idx]
945                }
946            }
947            WindowFrameBound::OffsetFollowing(offset) => {
948                let start_offset = usize::cast_from(*offset);
949                let start_idx = idx.saturating_add(start_offset);
950                if let WindowFrameBound::OffsetFollowing(end_offset) = &window_frame.end_bound {
951                    // If the frame is empty or starts after the last row, return null
952                    if offset > end_offset || start_idx >= length {
953                        Datum::Null
954                    } else {
955                        datums[start_idx]
956                    }
957                } else {
958                    datums
959                        .get(start_idx)
960                        .map(|d| d.clone())
961                        .unwrap_or(Datum::Null)
962                }
963            }
964            // Forbidden during planning
965            WindowFrameBound::UnboundedFollowing => unreachable!(),
966        };
967        result.push(first_value);
968    }
969    result
970}
971
972/// The expected input is in the format of [((OriginalRow, InputValue), OrderByExprs...)]
973fn last_value<'a, I>(
974    datums: I,
975    callers_temp_storage: &'a RowArena,
976    order_by: &[ColumnOrder],
977    window_frame: &WindowFrame,
978) -> Datum<'a>
979where
980    I: IntoIterator<Item = Datum<'a>>,
981{
982    let temp_storage = RowArena::new();
983    let iter = last_value_no_list(datums, &temp_storage, order_by, window_frame);
984    callers_temp_storage.make_datum(|packer| {
985        packer.push_list(iter);
986    })
987}
988
989/// Like `last_value`, but doesn't perform the final wrapping in a list, returning an Iterator
990/// instead.
991fn last_value_no_list<'a: 'b, 'b, I>(
992    datums: I,
993    callers_temp_storage: &'b RowArena,
994    order_by: &[ColumnOrder],
995    window_frame: &WindowFrame,
996) -> impl Iterator<Item = Datum<'b>>
997where
998    I: IntoIterator<Item = Datum<'a>>,
999{
1000    // Sort the datums according to the ORDER BY expressions and return the ((OriginalRow, InputValue), OrderByRow) record
1001    // The OrderByRow is kept around because it is required to compute the peer groups in RANGE mode
1002    let datums = order_aggregate_datums_with_rank(datums, order_by);
1003
1004    // Decode the input (OriginalRow, InputValue) into separate datums, while keeping the OrderByRow
1005    let size_hint = datums.size_hint().0;
1006    let mut args = Vec::with_capacity(size_hint);
1007    let mut original_rows = Vec::with_capacity(size_hint);
1008    let mut order_by_rows = Vec::with_capacity(size_hint);
1009    for (d, order_by_row) in datums.into_iter() {
1010        let mut iter = d.unwrap_list().iter();
1011        let original_row = iter.next().unwrap();
1012        let arg = iter.next().unwrap();
1013        order_by_rows.push(order_by_row);
1014        original_rows.push(original_row);
1015        args.push(arg);
1016    }
1017
1018    let results = last_value_inner(args, &order_by_rows, window_frame);
1019
1020    callers_temp_storage.reserve(results.len());
1021    results
1022        .into_iter()
1023        .zip_eq(original_rows)
1024        .map(|(result_value, original_row)| {
1025            callers_temp_storage.make_datum(|packer| {
1026                packer.push_list_with(|packer| {
1027                    packer.push(result_value);
1028                    packer.push(original_row);
1029                });
1030            })
1031        })
1032}
1033
1034fn last_value_inner<'a>(
1035    args: Vec<Datum<'a>>,
1036    order_by_rows: &Vec<Row>,
1037    window_frame: &WindowFrame,
1038) -> Vec<Datum<'a>> {
1039    let length = args.len();
1040    let mut results: Vec<Datum> = Vec::with_capacity(length);
1041    for (idx, (current_datum, order_by_row)) in args.iter().zip_eq(order_by_rows).enumerate() {
1042        let last_value = match &window_frame.end_bound {
1043            WindowFrameBound::CurrentRow => match &window_frame.units {
1044                // Always return the current value when in ROWS mode
1045                WindowFrameUnits::Rows => *current_datum,
1046                WindowFrameUnits::Range => {
1047                    // When in RANGE mode, return the last value of the peer group
1048                    // The peer group is the group of rows with the same ORDER BY value
1049                    // Note: Range is only supported for the default window frame (RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW),
1050                    // which is why it does not appear in the other branches
1051                    let target_idx = order_by_rows[idx..]
1052                        .iter()
1053                        .enumerate()
1054                        .take_while(|(_, row)| *row == order_by_row)
1055                        .last()
1056                        .unwrap()
1057                        .0
1058                        + idx;
1059                    args[target_idx]
1060                }
1061                // GROUPS is not supported, and forbidden during planning
1062                WindowFrameUnits::Groups => unreachable!(),
1063            },
1064            WindowFrameBound::UnboundedFollowing => {
1065                if let WindowFrameBound::OffsetFollowing(start_offset) = &window_frame.start_bound {
1066                    let start_offset = usize::cast_from(*start_offset);
1067
1068                    // If the frame starts after the last row of the window, return null
1069                    if idx + start_offset > length - 1 {
1070                        Datum::Null
1071                    } else {
1072                        args[length - 1]
1073                    }
1074                } else {
1075                    args[length - 1]
1076                }
1077            }
1078            WindowFrameBound::OffsetFollowing(offset) => {
1079                let end_offset = usize::cast_from(*offset);
1080                let end_idx = idx.saturating_add(end_offset);
1081                if let WindowFrameBound::OffsetFollowing(start_offset) = &window_frame.start_bound {
1082                    let start_offset = usize::cast_from(*start_offset);
1083                    let start_idx = idx.saturating_add(start_offset);
1084
1085                    // If the frame is empty or starts after the last row of the window, return null
1086                    if end_offset < start_offset || start_idx >= length {
1087                        Datum::Null
1088                    } else {
1089                        // Return the last valid element in the window
1090                        args.get(end_idx).unwrap_or(&args[length - 1]).clone()
1091                    }
1092                } else {
1093                    args.get(end_idx).unwrap_or(&args[length - 1]).clone()
1094                }
1095            }
1096            WindowFrameBound::OffsetPreceding(offset) => {
1097                let end_offset = usize::cast_from(*offset);
1098                let end_idx = idx.saturating_sub(end_offset);
1099                if idx < end_offset {
1100                    // If the frame ends before the first row, return null
1101                    Datum::Null
1102                } else if let WindowFrameBound::OffsetPreceding(start_offset) =
1103                    &window_frame.start_bound
1104                {
1105                    // If the frame is empty, return null
1106                    if offset > start_offset {
1107                        Datum::Null
1108                    } else {
1109                        args[end_idx]
1110                    }
1111                } else {
1112                    args[end_idx]
1113                }
1114            }
1115            // Forbidden during planning
1116            WindowFrameBound::UnboundedPreceding => unreachable!(),
1117        };
1118        results.push(last_value);
1119    }
1120    results
1121}
1122
1123/// Executes `FusedValueWindowFunc` on a reduction group.
1124/// The expected input is in the format of `[((OriginalRow, (Args1, Args2, ...)), OrderByExprs...)]`
1125/// where `Args1`, `Args2`, are the arguments of each of the fused functions. For functions that
1126/// have only a single argument (first_value/last_value), these are simple values. For functions
1127/// that have multiple arguments (lag/lead), these are also records.
1128fn fused_value_window_func<'a, I>(
1129    input_datums: I,
1130    callers_temp_storage: &'a RowArena,
1131    funcs: &Vec<AggregateFunc>,
1132    order_by: &Vec<ColumnOrder>,
1133) -> Datum<'a>
1134where
1135    I: IntoIterator<Item = Datum<'a>>,
1136{
1137    let temp_storage = RowArena::new();
1138    let iter = fused_value_window_func_no_list(input_datums, &temp_storage, funcs, order_by);
1139    callers_temp_storage.make_datum(|packer| {
1140        packer.push_list(iter);
1141    })
1142}
1143
1144/// Like `fused_value_window_func`, but doesn't perform the final wrapping in a list, returning an
1145/// Iterator instead.
1146fn fused_value_window_func_no_list<'a: 'b, 'b, I>(
1147    input_datums: I,
1148    callers_temp_storage: &'b RowArena,
1149    funcs: &Vec<AggregateFunc>,
1150    order_by: &Vec<ColumnOrder>,
1151) -> impl Iterator<Item = Datum<'b>>
1152where
1153    I: IntoIterator<Item = Datum<'a>>,
1154{
1155    let has_last_value = funcs
1156        .iter()
1157        .any(|f| matches!(f, AggregateFunc::LastValue { .. }));
1158
1159    let input_datums_with_ranks = order_aggregate_datums_with_rank(input_datums, order_by);
1160
1161    let size_hint = input_datums_with_ranks.size_hint().0;
1162    let mut encoded_argsss = vec![Vec::with_capacity(size_hint); funcs.len()];
1163    let mut original_rows = Vec::with_capacity(size_hint);
1164    let mut order_by_rows = Vec::with_capacity(size_hint);
1165    for (d, order_by_row) in input_datums_with_ranks {
1166        let mut iter = d.unwrap_list().iter();
1167        let original_row = iter.next().unwrap();
1168        original_rows.push(original_row);
1169        let mut argss_iter = iter.next().unwrap().unwrap_list().iter();
1170        for i in 0..funcs.len() {
1171            let encoded_args = argss_iter.next().unwrap();
1172            encoded_argsss[i].push(encoded_args);
1173        }
1174        if has_last_value {
1175            order_by_rows.push(order_by_row);
1176        }
1177    }
1178
1179    let mut results_per_row = vec![Vec::with_capacity(funcs.len()); original_rows.len()];
1180    for (func, encoded_argss) in funcs.iter().zip_eq(encoded_argsss) {
1181        let results = match func {
1182            AggregateFunc::LagLead {
1183                order_by: inner_order_by,
1184                lag_lead,
1185                ignore_nulls,
1186            } => {
1187                assert_eq!(order_by, inner_order_by);
1188                let unwrapped_argss = encoded_argss
1189                    .into_iter()
1190                    .map(|encoded_args| unwrap_lag_lead_encoded_args(encoded_args))
1191                    .collect();
1192                lag_lead_inner(unwrapped_argss, lag_lead, ignore_nulls)
1193            }
1194            AggregateFunc::FirstValue {
1195                order_by: inner_order_by,
1196                window_frame,
1197            } => {
1198                assert_eq!(order_by, inner_order_by);
1199                // (No unwrapping to do on the args here, because there is only 1 arg, so it's not
1200                // wrapped into a record.)
1201                first_value_inner(encoded_argss, window_frame)
1202            }
1203            AggregateFunc::LastValue {
1204                order_by: inner_order_by,
1205                window_frame,
1206            } => {
1207                assert_eq!(order_by, inner_order_by);
1208                // (No unwrapping to do on the args here, because there is only 1 arg, so it's not
1209                // wrapped into a record.)
1210                last_value_inner(encoded_argss, &order_by_rows, window_frame)
1211            }
1212            _ => panic!("unknown window function in FusedValueWindowFunc"),
1213        };
1214        for (results, result) in results_per_row.iter_mut().zip_eq(results) {
1215            results.push(result);
1216        }
1217    }
1218
1219    callers_temp_storage.reserve(2 * original_rows.len());
1220    results_per_row
1221        .into_iter()
1222        .enumerate()
1223        .map(move |(i, results)| {
1224            callers_temp_storage.make_datum(|packer| {
1225                packer.push_list_with(|packer| {
1226                    packer
1227                        .push(callers_temp_storage.make_datum(|packer| packer.push_list(results)));
1228                    packer.push(original_rows[i]);
1229                });
1230            })
1231        })
1232}
1233
1234/// `input_datums` is an entire window partition.
1235/// The expected input is in the format of `[((OriginalRow, InputValue), OrderByExprs...)]`
1236/// See also in the comment in `window_func_applied_to`.
1237///
1238/// `wrapped_aggregate`: e.g., for `sum(...) OVER (...)`, this is the `sum(...)`.
1239///
1240/// Note that this `order_by` doesn't have expressions, only `ColumnOrder`s. For an explanation,
1241/// see the comment on `WindowExprType`.
1242fn window_aggr<'a, I, A>(
1243    input_datums: I,
1244    callers_temp_storage: &'a RowArena,
1245    wrapped_aggregate: &AggregateFunc,
1246    order_by: &[ColumnOrder],
1247    window_frame: &WindowFrame,
1248) -> Datum<'a>
1249where
1250    I: IntoIterator<Item = Datum<'a>>,
1251    A: OneByOneAggr,
1252{
1253    let temp_storage = RowArena::new();
1254    let iter = window_aggr_no_list::<I, A>(
1255        input_datums,
1256        &temp_storage,
1257        wrapped_aggregate,
1258        order_by,
1259        window_frame,
1260    );
1261    callers_temp_storage.make_datum(|packer| {
1262        packer.push_list(iter);
1263    })
1264}
1265
1266/// Like `window_aggr`, but doesn't perform the final wrapping in a list, returning an Iterator
1267/// instead.
1268fn window_aggr_no_list<'a: 'b, 'b, I, A>(
1269    input_datums: I,
1270    callers_temp_storage: &'b RowArena,
1271    wrapped_aggregate: &AggregateFunc,
1272    order_by: &[ColumnOrder],
1273    window_frame: &WindowFrame,
1274) -> impl Iterator<Item = Datum<'b>>
1275where
1276    I: IntoIterator<Item = Datum<'a>>,
1277    A: OneByOneAggr,
1278{
1279    // Sort the datums according to the ORDER BY expressions and return the ((OriginalRow, InputValue), OrderByRow) record
1280    // The OrderByRow is kept around because it is required to compute the peer groups in RANGE mode
1281    let datums = order_aggregate_datums_with_rank(input_datums, order_by);
1282
1283    // Decode the input (OriginalRow, InputValue) into separate datums, while keeping the OrderByRow
1284    let size_hint = datums.size_hint().0;
1285    let mut args: Vec<Datum> = Vec::with_capacity(size_hint);
1286    let mut original_rows: Vec<Datum> = Vec::with_capacity(size_hint);
1287    let mut order_by_rows = Vec::with_capacity(size_hint);
1288    for (d, order_by_row) in datums.into_iter() {
1289        let mut iter = d.unwrap_list().iter();
1290        let original_row = iter.next().unwrap();
1291        let arg = iter.next().unwrap();
1292        order_by_rows.push(order_by_row);
1293        original_rows.push(original_row);
1294        args.push(arg);
1295    }
1296
1297    let results = window_aggr_inner::<A>(
1298        args,
1299        &order_by_rows,
1300        wrapped_aggregate,
1301        order_by,
1302        window_frame,
1303        callers_temp_storage,
1304    );
1305
1306    callers_temp_storage.reserve(results.len());
1307    results
1308        .into_iter()
1309        .zip_eq(original_rows)
1310        .map(|(result_value, original_row)| {
1311            callers_temp_storage.make_datum(|packer| {
1312                packer.push_list_with(|packer| {
1313                    packer.push(result_value);
1314                    packer.push(original_row);
1315                });
1316            })
1317        })
1318}
1319
1320fn window_aggr_inner<'a, A>(
1321    mut args: Vec<Datum<'a>>,
1322    order_by_rows: &Vec<Row>,
1323    wrapped_aggregate: &AggregateFunc,
1324    order_by: &[ColumnOrder],
1325    window_frame: &WindowFrame,
1326    temp_storage: &'a RowArena,
1327) -> Vec<Datum<'a>>
1328where
1329    A: OneByOneAggr,
1330{
1331    let length = args.len();
1332    let mut result: Vec<Datum> = Vec::with_capacity(length);
1333
1334    // In this degenerate case, all results would be `wrapped_aggregate.default()` (usually null).
1335    // However, this currently can't happen, because
1336    // - Groups frame mode is currently not supported;
1337    // - Range frame mode is currently supported only for the default frame, which includes the
1338    //   current row.
1339    soft_assert_or_log!(
1340        !((matches!(window_frame.units, WindowFrameUnits::Groups)
1341            || matches!(window_frame.units, WindowFrameUnits::Range))
1342            && !window_frame.includes_current_row()),
1343        "window frame without current row"
1344    );
1345
1346    if (matches!(
1347        window_frame.start_bound,
1348        WindowFrameBound::UnboundedPreceding
1349    ) && matches!(window_frame.end_bound, WindowFrameBound::UnboundedFollowing))
1350        || (order_by.is_empty()
1351            && (matches!(window_frame.units, WindowFrameUnits::Groups)
1352                || matches!(window_frame.units, WindowFrameUnits::Range))
1353            && window_frame.includes_current_row())
1354    {
1355        // Either
1356        //  - UNBOUNDED frame in both directions, or
1357        //  - There is no ORDER BY and the frame is such that the current peer group is included.
1358        //    (The current peer group will be the whole partition if there is no ORDER BY.)
1359        // We simply need to compute the aggregate once, on the entire partition, and each input
1360        // row will get this one aggregate value as result.
1361        let result_value = wrapped_aggregate.eval(args, temp_storage);
1362        // Every row will get the above aggregate as result.
1363        for _ in 0..length {
1364            result.push(result_value);
1365        }
1366    } else {
1367        fn rows_between_unbounded_preceding_and_current_row<'a, A>(
1368            args: Vec<Datum<'a>>,
1369            result: &mut Vec<Datum<'a>>,
1370            mut one_by_one_aggr: A,
1371            temp_storage: &'a RowArena,
1372        ) where
1373            A: OneByOneAggr,
1374        {
1375            for current_arg in args.into_iter() {
1376                one_by_one_aggr.give(&current_arg);
1377                let result_value = one_by_one_aggr.get_current_aggregate(temp_storage);
1378                result.push(result_value);
1379            }
1380        }
1381
1382        fn groups_between_unbounded_preceding_and_current_row<'a, A>(
1383            args: Vec<Datum<'a>>,
1384            order_by_rows: &Vec<Row>,
1385            result: &mut Vec<Datum<'a>>,
1386            mut one_by_one_aggr: A,
1387            temp_storage: &'a RowArena,
1388        ) where
1389            A: OneByOneAggr,
1390        {
1391            let mut peer_group_start = 0;
1392            while peer_group_start < args.len() {
1393                // Find the boundaries of the current peer group.
1394                // peer_group_start will point to the first element of the peer group,
1395                // peer_group_end will point to _just after_ the last element of the peer group.
1396                let mut peer_group_end = peer_group_start + 1;
1397                while peer_group_end < args.len()
1398                    && order_by_rows[peer_group_start] == order_by_rows[peer_group_end]
1399                {
1400                    // The peer group goes on while the OrderByRows not differ.
1401                    peer_group_end += 1;
1402                }
1403                // Let's compute the aggregate (which will be the same for all records in this
1404                // peer group).
1405                for current_arg in args[peer_group_start..peer_group_end].iter() {
1406                    one_by_one_aggr.give(current_arg);
1407                }
1408                let agg_for_peer_group = one_by_one_aggr.get_current_aggregate(temp_storage);
1409                // Put the above aggregate into each record in the peer group.
1410                for _ in args[peer_group_start..peer_group_end].iter() {
1411                    result.push(agg_for_peer_group);
1412                }
1413                // Point to the start of the next peer group.
1414                peer_group_start = peer_group_end;
1415            }
1416        }
1417
1418        fn rows_between_offset_and_offset<'a>(
1419            args: Vec<Datum<'a>>,
1420            result: &mut Vec<Datum<'a>>,
1421            wrapped_aggregate: &AggregateFunc,
1422            temp_storage: &'a RowArena,
1423            offset_start: i64,
1424            offset_end: i64,
1425        ) {
1426            let len = args
1427                .len()
1428                .to_i64()
1429                .expect("window partition's len should fit into i64");
1430            for i in 0..len {
1431                let i = i.to_i64().expect("window partition shouldn't be super big");
1432                // Trim the start of the frame to make it not reach over the start of the window
1433                // partition.
1434                let frame_start = max(i + offset_start, 0)
1435                    .to_usize()
1436                    .expect("The max made sure it's not negative");
1437                // Trim the end of the frame to make it not reach over the end of the window
1438                // partition.
1439                let frame_end = min(i + offset_end, len - 1).to_usize();
1440                match frame_end {
1441                    Some(frame_end) => {
1442                        if frame_start <= frame_end {
1443                            // Compute the aggregate on the frame.
1444                            // TODO:
1445                            // This implementation is quite slow if the frame is large: we do an
1446                            // inner loop over the entire frame, and compute the aggregate from
1447                            // scratch. We could do better:
1448                            //  - For invertible aggregations we could do a rolling aggregation.
1449                            //  - There are various tricks for min/max as well, making use of either
1450                            //    the fixed size of the window, or that we are not retracting
1451                            //    arbitrary elements but doing queue operations. E.g., see
1452                            //    http://codercareer.blogspot.com/2012/02/no-33-maximums-in-sliding-windows.html
1453                            let frame_values = args[frame_start..=frame_end].iter().cloned();
1454                            let result_value = wrapped_aggregate.eval(frame_values, temp_storage);
1455                            result.push(result_value);
1456                        } else {
1457                            // frame_start > frame_end, so this is an empty frame.
1458                            let result_value = wrapped_aggregate.default();
1459                            result.push(result_value);
1460                        }
1461                    }
1462                    None => {
1463                        // frame_end would be negative, so this is an empty frame.
1464                        let result_value = wrapped_aggregate.default();
1465                        result.push(result_value);
1466                    }
1467                }
1468            }
1469        }
1470
1471        match (
1472            &window_frame.units,
1473            &window_frame.start_bound,
1474            &window_frame.end_bound,
1475        ) {
1476            // Cases where one edge of the frame is CurrentRow.
1477            // Note that these cases could be merged into the more general cases below where one
1478            // edge is some offset (with offset = 0), but the CurrentRow cases probably cover 95%
1479            // of user queries, so let's make this simple and fast.
1480            (Rows, UnboundedPreceding, CurrentRow) => {
1481                rows_between_unbounded_preceding_and_current_row::<A>(
1482                    args,
1483                    &mut result,
1484                    A::new(wrapped_aggregate, false),
1485                    temp_storage,
1486                );
1487            }
1488            (Rows, CurrentRow, UnboundedFollowing) => {
1489                // Same as above, but reverse.
1490                args.reverse();
1491                rows_between_unbounded_preceding_and_current_row::<A>(
1492                    args,
1493                    &mut result,
1494                    A::new(wrapped_aggregate, true),
1495                    temp_storage,
1496                );
1497                result.reverse();
1498            }
1499            (Range, UnboundedPreceding, CurrentRow) => {
1500                // Note that for the default frame, the RANGE frame mode is identical to the GROUPS
1501                // frame mode.
1502                groups_between_unbounded_preceding_and_current_row::<A>(
1503                    args,
1504                    order_by_rows,
1505                    &mut result,
1506                    A::new(wrapped_aggregate, false),
1507                    temp_storage,
1508                );
1509            }
1510            // The next several cases all call `rows_between_offset_and_offset`. Note that the
1511            // offset passed to `rows_between_offset_and_offset` should be negated when it's
1512            // PRECEDING.
1513            (Rows, OffsetPreceding(start_prec), OffsetPreceding(end_prec)) => {
1514                let start_prec = start_prec.to_i64().expect(
1515                    "window frame start OFFSET shouldn't be super big (the planning ensured this)",
1516                );
1517                let end_prec = end_prec.to_i64().expect(
1518                    "window frame end OFFSET shouldn't be super big (the planning ensured this)",
1519                );
1520                rows_between_offset_and_offset(
1521                    args,
1522                    &mut result,
1523                    wrapped_aggregate,
1524                    temp_storage,
1525                    -start_prec,
1526                    -end_prec,
1527                );
1528            }
1529            (Rows, OffsetPreceding(start_prec), OffsetFollowing(end_fol)) => {
1530                let start_prec = start_prec.to_i64().expect(
1531                    "window frame start OFFSET shouldn't be super big (the planning ensured this)",
1532                );
1533                let end_fol = end_fol.to_i64().expect(
1534                    "window frame end OFFSET shouldn't be super big (the planning ensured this)",
1535                );
1536                rows_between_offset_and_offset(
1537                    args,
1538                    &mut result,
1539                    wrapped_aggregate,
1540                    temp_storage,
1541                    -start_prec,
1542                    end_fol,
1543                );
1544            }
1545            (Rows, OffsetFollowing(start_fol), OffsetFollowing(end_fol)) => {
1546                let start_fol = start_fol.to_i64().expect(
1547                    "window frame start OFFSET shouldn't be super big (the planning ensured this)",
1548                );
1549                let end_fol = end_fol.to_i64().expect(
1550                    "window frame end OFFSET shouldn't be super big (the planning ensured this)",
1551                );
1552                rows_between_offset_and_offset(
1553                    args,
1554                    &mut result,
1555                    wrapped_aggregate,
1556                    temp_storage,
1557                    start_fol,
1558                    end_fol,
1559                );
1560            }
1561            (Rows, OffsetFollowing(_), OffsetPreceding(_)) => {
1562                unreachable!() // The planning ensured that this nonsensical case can't happen
1563            }
1564            (Rows, OffsetPreceding(start_prec), CurrentRow) => {
1565                let start_prec = start_prec.to_i64().expect(
1566                    "window frame start OFFSET shouldn't be super big (the planning ensured this)",
1567                );
1568                let end_fol = 0;
1569                rows_between_offset_and_offset(
1570                    args,
1571                    &mut result,
1572                    wrapped_aggregate,
1573                    temp_storage,
1574                    -start_prec,
1575                    end_fol,
1576                );
1577            }
1578            (Rows, CurrentRow, OffsetFollowing(end_fol)) => {
1579                let start_fol = 0;
1580                let end_fol = end_fol.to_i64().expect(
1581                    "window frame end OFFSET shouldn't be super big (the planning ensured this)",
1582                );
1583                rows_between_offset_and_offset(
1584                    args,
1585                    &mut result,
1586                    wrapped_aggregate,
1587                    temp_storage,
1588                    start_fol,
1589                    end_fol,
1590                );
1591            }
1592            (Rows, CurrentRow, CurrentRow) => {
1593                // We could have a more efficient implementation for this, but this is probably
1594                // super rare. (Might be more common with RANGE or GROUPS frame mode, though!)
1595                let start_fol = 0;
1596                let end_fol = 0;
1597                rows_between_offset_and_offset(
1598                    args,
1599                    &mut result,
1600                    wrapped_aggregate,
1601                    temp_storage,
1602                    start_fol,
1603                    end_fol,
1604                );
1605            }
1606            (Rows, CurrentRow, OffsetPreceding(_))
1607            | (Rows, UnboundedFollowing, _)
1608            | (Rows, _, UnboundedPreceding)
1609            | (Rows, OffsetFollowing(..), CurrentRow) => {
1610                unreachable!() // The planning ensured that these nonsensical cases can't happen
1611            }
1612            (Rows, UnboundedPreceding, UnboundedFollowing) => {
1613                // This is handled by the complicated if condition near the beginning of this
1614                // function.
1615                unreachable!()
1616            }
1617            (Rows, UnboundedPreceding, OffsetPreceding(_))
1618            | (Rows, UnboundedPreceding, OffsetFollowing(_))
1619            | (Rows, OffsetPreceding(..), UnboundedFollowing)
1620            | (Rows, OffsetFollowing(..), UnboundedFollowing) => {
1621                // Unsupported. Bail in the planner.
1622                // https://github.com/MaterializeInc/database-issues/issues/6720
1623                unreachable!()
1624            }
1625            (Range, _, _) => {
1626                // Unsupported.
1627                // The planner doesn't allow Range frame mode for now (except for the default
1628                // frame), see https://github.com/MaterializeInc/database-issues/issues/6585
1629                // Note that it would be easy to handle (Range, CurrentRow, UnboundedFollowing):
1630                // it would be similar to (Rows, CurrentRow, UnboundedFollowing), but would call
1631                // groups_between_unbounded_preceding_current_row.
1632                unreachable!()
1633            }
1634            (Groups, _, _) => {
1635                // Unsupported.
1636                // The planner doesn't allow Groups frame mode for now, see
1637                // https://github.com/MaterializeInc/database-issues/issues/6588
1638                unreachable!()
1639            }
1640        }
1641    }
1642
1643    result
1644}
1645
1646/// Computes a bundle of fused window aggregations.
1647/// The input is similar to `window_aggr`, but `InputValue` is not just a single value, but a record
1648/// where each component is the input to one of the aggregations.
1649fn fused_window_aggr<'a, I, A>(
1650    input_datums: I,
1651    callers_temp_storage: &'a RowArena,
1652    wrapped_aggregates: &Vec<AggregateFunc>,
1653    order_by: &Vec<ColumnOrder>,
1654    window_frame: &WindowFrame,
1655) -> Datum<'a>
1656where
1657    I: IntoIterator<Item = Datum<'a>>,
1658    A: OneByOneAggr,
1659{
1660    let temp_storage = RowArena::new();
1661    let iter = fused_window_aggr_no_list::<_, A>(
1662        input_datums,
1663        &temp_storage,
1664        wrapped_aggregates,
1665        order_by,
1666        window_frame,
1667    );
1668    callers_temp_storage.make_datum(|packer| {
1669        packer.push_list(iter);
1670    })
1671}
1672
1673/// Like `fused_window_aggr`, but doesn't perform the final wrapping in a list, returning an
1674/// Iterator instead.
1675fn fused_window_aggr_no_list<'a: 'b, 'b, I, A>(
1676    input_datums: I,
1677    callers_temp_storage: &'b RowArena,
1678    wrapped_aggregates: &Vec<AggregateFunc>,
1679    order_by: &Vec<ColumnOrder>,
1680    window_frame: &WindowFrame,
1681) -> impl Iterator<Item = Datum<'b>>
1682where
1683    I: IntoIterator<Item = Datum<'a>>,
1684    A: OneByOneAggr,
1685{
1686    // Sort the datums according to the ORDER BY expressions and return the ((OriginalRow, InputValue), OrderByRow) record
1687    // The OrderByRow is kept around because it is required to compute the peer groups in RANGE mode
1688    let datums = order_aggregate_datums_with_rank(input_datums, order_by);
1689
1690    let size_hint = datums.size_hint().0;
1691    let mut argss = vec![Vec::with_capacity(size_hint); wrapped_aggregates.len()];
1692    let mut original_rows = Vec::with_capacity(size_hint);
1693    let mut order_by_rows = Vec::with_capacity(size_hint);
1694    for (d, order_by_row) in datums {
1695        let mut iter = d.unwrap_list().iter();
1696        let original_row = iter.next().unwrap();
1697        original_rows.push(original_row);
1698        let args_iter = iter.next().unwrap().unwrap_list().iter();
1699        // Push each argument into the respective list
1700        for (args, arg) in argss.iter_mut().zip_eq(args_iter) {
1701            args.push(arg);
1702        }
1703        order_by_rows.push(order_by_row);
1704    }
1705
1706    let mut results_per_row =
1707        vec![Vec::with_capacity(wrapped_aggregates.len()); original_rows.len()];
1708    for (wrapped_aggr, args) in wrapped_aggregates.iter().zip_eq(argss) {
1709        let results = window_aggr_inner::<A>(
1710            args,
1711            &order_by_rows,
1712            wrapped_aggr,
1713            order_by,
1714            window_frame,
1715            callers_temp_storage,
1716        );
1717        for (results, result) in results_per_row.iter_mut().zip_eq(results) {
1718            results.push(result);
1719        }
1720    }
1721
1722    callers_temp_storage.reserve(2 * original_rows.len());
1723    results_per_row
1724        .into_iter()
1725        .enumerate()
1726        .map(move |(i, results)| {
1727            callers_temp_storage.make_datum(|packer| {
1728                packer.push_list_with(|packer| {
1729                    packer
1730                        .push(callers_temp_storage.make_datum(|packer| packer.push_list(results)));
1731                    packer.push(original_rows[i]);
1732                });
1733            })
1734        })
1735}
1736
1737/// An implementation of an aggregation where we can send in the input elements one-by-one, and
1738/// can also ask the current aggregate at any moment. (This just delegates to other aggregation
1739/// evaluation approaches.)
1740pub trait OneByOneAggr {
1741    /// The `reverse` parameter makes the aggregations process input elements in reverse order.
1742    /// This has an effect only for non-commutative aggregations, e.g. `list_agg`. These are
1743    /// currently only some of the Basic aggregations. (Basic aggregations are handled by
1744    /// `NaiveOneByOneAggr`).
1745    fn new(agg: &AggregateFunc, reverse: bool) -> Self;
1746    /// Pushes one input element into the aggregation.
1747    fn give(&mut self, d: &Datum);
1748    /// Returns the value of the aggregate computed on the given values so far.
1749    fn get_current_aggregate<'a>(&self, temp_storage: &'a RowArena) -> Datum<'a>;
1750}
1751
1752/// Naive implementation of [OneByOneAggr], suitable for stuff like const folding, but too slow for
1753/// rendering. This relies only on infrastructure available in `mz-expr`. It simply saves all the
1754/// given input, and calls the given [AggregateFunc]'s `eval` method when asked about the current
1755/// aggregate. (For Accumulable and Hierarchical aggregations, the rendering has more efficient
1756/// implementations, but for Basic aggregations even the rendering uses this naive implementation.)
1757#[derive(Debug)]
1758pub struct NaiveOneByOneAggr {
1759    agg: AggregateFunc,
1760    input: Vec<Row>,
1761    reverse: bool,
1762}
1763
1764impl OneByOneAggr for NaiveOneByOneAggr {
1765    fn new(agg: &AggregateFunc, reverse: bool) -> Self {
1766        NaiveOneByOneAggr {
1767            agg: agg.clone(),
1768            input: Vec::new(),
1769            reverse,
1770        }
1771    }
1772
1773    fn give(&mut self, d: &Datum) {
1774        let mut row = Row::default();
1775        row.packer().push(d);
1776        self.input.push(row);
1777    }
1778
1779    fn get_current_aggregate<'a>(&self, temp_storage: &'a RowArena) -> Datum<'a> {
1780        temp_storage.make_datum(|packer| {
1781            packer.push(if !self.reverse {
1782                self.agg
1783                    .eval(self.input.iter().map(|r| r.unpack_first()), temp_storage)
1784            } else {
1785                self.agg.eval(
1786                    self.input.iter().rev().map(|r| r.unpack_first()),
1787                    temp_storage,
1788                )
1789            });
1790        })
1791    }
1792}
1793
1794/// Identify whether the given aggregate function is Lag or Lead, since they share
1795/// implementations.
1796#[derive(
1797    Arbitrary, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize, Hash, MzReflect,
1798)]
1799pub enum LagLeadType {
1800    Lag,
1801    Lead,
1802}
1803
1804#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize, Hash, MzReflect)]
1805pub enum AggregateFunc {
1806    MaxNumeric,
1807    MaxInt16,
1808    MaxInt32,
1809    MaxInt64,
1810    MaxUInt16,
1811    MaxUInt32,
1812    MaxUInt64,
1813    MaxMzTimestamp,
1814    MaxFloat32,
1815    MaxFloat64,
1816    MaxBool,
1817    MaxString,
1818    MaxDate,
1819    MaxTimestamp,
1820    MaxTimestampTz,
1821    MaxInterval,
1822    MaxTime,
1823    MinNumeric,
1824    MinInt16,
1825    MinInt32,
1826    MinInt64,
1827    MinUInt16,
1828    MinUInt32,
1829    MinUInt64,
1830    MinMzTimestamp,
1831    MinFloat32,
1832    MinFloat64,
1833    MinBool,
1834    MinString,
1835    MinDate,
1836    MinTimestamp,
1837    MinTimestampTz,
1838    MinInterval,
1839    MinTime,
1840    SumInt16,
1841    SumInt32,
1842    SumInt64,
1843    SumUInt16,
1844    SumUInt32,
1845    SumUInt64,
1846    SumFloat32,
1847    SumFloat64,
1848    SumNumeric,
1849    Count,
1850    Any,
1851    All,
1852    /// Accumulates `Datum::List`s whose first element is a JSON-typed `Datum`s
1853    /// into a JSON list. The other elements are columns used by `order_by`.
1854    ///
1855    /// WARNING: Unlike the `jsonb_agg` function that is exposed by the SQL
1856    /// layer, this function filters out `Datum::Null`, for consistency with
1857    /// the other aggregate functions.
1858    JsonbAgg {
1859        order_by: Vec<ColumnOrder>,
1860    },
1861    /// Zips `Datum::List`s whose first element is a JSON-typed `Datum`s into a
1862    /// JSON map. The other elements are columns used by `order_by`.
1863    ///
1864    /// WARNING: Unlike the `jsonb_object_agg` function that is exposed by the SQL
1865    /// layer, this function filters out `Datum::Null`, for consistency with
1866    /// the other aggregate functions.
1867    JsonbObjectAgg {
1868        order_by: Vec<ColumnOrder>,
1869    },
1870    /// Zips a `Datum::List` whose first element is a `Datum::List` guaranteed
1871    /// to be non-empty and whose len % 2 == 0 into a `Datum::Map`. The other
1872    /// elements are columns used by `order_by`.
1873    MapAgg {
1874        order_by: Vec<ColumnOrder>,
1875        value_type: ScalarType,
1876    },
1877    /// Accumulates `Datum::Array`s of `ScalarType::Record` whose first element is a `Datum::Array`
1878    /// into a single `Datum::Array` (the remaining fields are used by `order_by`).
1879    ArrayConcat {
1880        order_by: Vec<ColumnOrder>,
1881    },
1882    /// Accumulates `Datum::List`s of `ScalarType::Record` whose first field is a `Datum::List`
1883    /// into a single `Datum::List` (the remaining fields are used by `order_by`).
1884    ListConcat {
1885        order_by: Vec<ColumnOrder>,
1886    },
1887    StringAgg {
1888        order_by: Vec<ColumnOrder>,
1889    },
1890    RowNumber {
1891        order_by: Vec<ColumnOrder>,
1892    },
1893    Rank {
1894        order_by: Vec<ColumnOrder>,
1895    },
1896    DenseRank {
1897        order_by: Vec<ColumnOrder>,
1898    },
1899    LagLead {
1900        order_by: Vec<ColumnOrder>,
1901        lag_lead: LagLeadType,
1902        ignore_nulls: bool,
1903    },
1904    FirstValue {
1905        order_by: Vec<ColumnOrder>,
1906        window_frame: WindowFrame,
1907    },
1908    LastValue {
1909        order_by: Vec<ColumnOrder>,
1910        window_frame: WindowFrame,
1911    },
1912    /// Several value window functions fused into one function, to amortize overheads.
1913    FusedValueWindowFunc {
1914        funcs: Vec<AggregateFunc>,
1915        /// Currently, all the fused functions must have the same `order_by`. (We can later
1916        /// eliminate this limitation.)
1917        order_by: Vec<ColumnOrder>,
1918    },
1919    WindowAggregate {
1920        wrapped_aggregate: Box<AggregateFunc>,
1921        order_by: Vec<ColumnOrder>,
1922        window_frame: WindowFrame,
1923    },
1924    FusedWindowAggregate {
1925        wrapped_aggregates: Vec<AggregateFunc>,
1926        order_by: Vec<ColumnOrder>,
1927        window_frame: WindowFrame,
1928    },
1929    /// Accumulates any number of `Datum::Dummy`s into `Datum::Dummy`.
1930    ///
1931    /// Useful for removing an expensive aggregation while maintaining the shape
1932    /// of a reduce operator.
1933    Dummy,
1934}
1935
1936/// An explicit [`Arbitrary`] implementation needed here because of a known
1937/// `proptest` issue.
1938///
1939/// Revert to the derive-macro implementation once the issue[^1] is fixed.
1940///
1941/// [^1]: <https://github.com/AltSysrq/proptest/issues/152>
1942impl Arbitrary for AggregateFunc {
1943    type Parameters = ();
1944
1945    type Strategy = Union<BoxedStrategy<Self>>;
1946
1947    fn arbitrary_with(_: Self::Parameters) -> Self::Strategy {
1948        use proptest::collection::vec;
1949        use proptest::prelude::any as proptest_any;
1950        Union::new(vec![
1951            Just(AggregateFunc::MaxNumeric).boxed(),
1952            Just(AggregateFunc::MaxInt16).boxed(),
1953            Just(AggregateFunc::MaxInt32).boxed(),
1954            Just(AggregateFunc::MaxInt64).boxed(),
1955            Just(AggregateFunc::MaxUInt16).boxed(),
1956            Just(AggregateFunc::MaxUInt32).boxed(),
1957            Just(AggregateFunc::MaxUInt64).boxed(),
1958            Just(AggregateFunc::MaxMzTimestamp).boxed(),
1959            Just(AggregateFunc::MaxFloat32).boxed(),
1960            Just(AggregateFunc::MaxFloat64).boxed(),
1961            Just(AggregateFunc::MaxBool).boxed(),
1962            Just(AggregateFunc::MaxString).boxed(),
1963            Just(AggregateFunc::MaxTimestamp).boxed(),
1964            Just(AggregateFunc::MaxDate).boxed(),
1965            Just(AggregateFunc::MaxTimestampTz).boxed(),
1966            Just(AggregateFunc::MaxInterval).boxed(),
1967            Just(AggregateFunc::MaxTime).boxed(),
1968            Just(AggregateFunc::MinNumeric).boxed(),
1969            Just(AggregateFunc::MinInt16).boxed(),
1970            Just(AggregateFunc::MinInt32).boxed(),
1971            Just(AggregateFunc::MinInt64).boxed(),
1972            Just(AggregateFunc::MinUInt16).boxed(),
1973            Just(AggregateFunc::MinUInt32).boxed(),
1974            Just(AggregateFunc::MinUInt64).boxed(),
1975            Just(AggregateFunc::MinMzTimestamp).boxed(),
1976            Just(AggregateFunc::MinFloat32).boxed(),
1977            Just(AggregateFunc::MinFloat64).boxed(),
1978            Just(AggregateFunc::MinBool).boxed(),
1979            Just(AggregateFunc::MinString).boxed(),
1980            Just(AggregateFunc::MinDate).boxed(),
1981            Just(AggregateFunc::MinTimestamp).boxed(),
1982            Just(AggregateFunc::MinTimestampTz).boxed(),
1983            Just(AggregateFunc::MinInterval).boxed(),
1984            Just(AggregateFunc::MinTime).boxed(),
1985            Just(AggregateFunc::SumInt16).boxed(),
1986            Just(AggregateFunc::SumInt32).boxed(),
1987            Just(AggregateFunc::SumInt64).boxed(),
1988            Just(AggregateFunc::SumUInt16).boxed(),
1989            Just(AggregateFunc::SumUInt32).boxed(),
1990            Just(AggregateFunc::SumUInt64).boxed(),
1991            Just(AggregateFunc::SumFloat32).boxed(),
1992            Just(AggregateFunc::SumFloat64).boxed(),
1993            Just(AggregateFunc::SumNumeric).boxed(),
1994            Just(AggregateFunc::Count).boxed(),
1995            Just(AggregateFunc::Any).boxed(),
1996            Just(AggregateFunc::All).boxed(),
1997            vec(proptest_any::<ColumnOrder>(), 1..4)
1998                .prop_map(|order_by| AggregateFunc::JsonbAgg { order_by })
1999                .boxed(),
2000            vec(proptest_any::<ColumnOrder>(), 1..4)
2001                .prop_map(|order_by| AggregateFunc::JsonbObjectAgg { order_by })
2002                .boxed(),
2003            (
2004                vec(proptest_any::<ColumnOrder>(), 1..4),
2005                proptest_any::<ScalarType>(),
2006            )
2007                .prop_map(|(order_by, value_type)| AggregateFunc::MapAgg {
2008                    order_by,
2009                    value_type,
2010                })
2011                .boxed(),
2012            vec(proptest_any::<ColumnOrder>(), 1..4)
2013                .prop_map(|order_by| AggregateFunc::ArrayConcat { order_by })
2014                .boxed(),
2015            vec(proptest_any::<ColumnOrder>(), 1..4)
2016                .prop_map(|order_by| AggregateFunc::ListConcat { order_by })
2017                .boxed(),
2018            vec(proptest_any::<ColumnOrder>(), 1..4)
2019                .prop_map(|order_by| AggregateFunc::StringAgg { order_by })
2020                .boxed(),
2021            vec(proptest_any::<ColumnOrder>(), 1..4)
2022                .prop_map(|order_by| AggregateFunc::RowNumber { order_by })
2023                .boxed(),
2024            vec(proptest_any::<ColumnOrder>(), 1..4)
2025                .prop_map(|order_by| AggregateFunc::DenseRank { order_by })
2026                .boxed(),
2027            (
2028                vec(proptest_any::<ColumnOrder>(), 1..4),
2029                proptest_any::<LagLeadType>(),
2030                proptest_any::<bool>(),
2031            )
2032                .prop_map(
2033                    |(order_by, lag_lead, ignore_nulls)| AggregateFunc::LagLead {
2034                        order_by,
2035                        lag_lead,
2036                        ignore_nulls,
2037                    },
2038                )
2039                .boxed(),
2040            (
2041                vec(proptest_any::<ColumnOrder>(), 1..4),
2042                proptest_any::<WindowFrame>(),
2043            )
2044                .prop_map(|(order_by, window_frame)| AggregateFunc::FirstValue {
2045                    order_by,
2046                    window_frame,
2047                })
2048                .boxed(),
2049            (
2050                vec(proptest_any::<ColumnOrder>(), 1..4),
2051                proptest_any::<WindowFrame>(),
2052            )
2053                .prop_map(|(order_by, window_frame)| AggregateFunc::LastValue {
2054                    order_by,
2055                    window_frame,
2056                })
2057                .boxed(),
2058            Just(AggregateFunc::Dummy).boxed(),
2059        ])
2060    }
2061}
2062
2063impl RustType<ProtoColumnOrders> for Vec<ColumnOrder> {
2064    fn into_proto(&self) -> ProtoColumnOrders {
2065        ProtoColumnOrders {
2066            orders: self.into_proto(),
2067        }
2068    }
2069
2070    fn from_proto(proto: ProtoColumnOrders) -> Result<Self, TryFromProtoError> {
2071        proto.orders.into_rust()
2072    }
2073}
2074
2075impl RustType<ProtoAggregateFunc> for AggregateFunc {
2076    fn into_proto(&self) -> ProtoAggregateFunc {
2077        use proto_aggregate_func::Kind;
2078        ProtoAggregateFunc {
2079            kind: Some(match self {
2080                AggregateFunc::MaxNumeric => Kind::MaxNumeric(()),
2081                AggregateFunc::MaxInt16 => Kind::MaxInt16(()),
2082                AggregateFunc::MaxInt32 => Kind::MaxInt32(()),
2083                AggregateFunc::MaxInt64 => Kind::MaxInt64(()),
2084                AggregateFunc::MaxUInt16 => Kind::MaxUint16(()),
2085                AggregateFunc::MaxUInt32 => Kind::MaxUint32(()),
2086                AggregateFunc::MaxUInt64 => Kind::MaxUint64(()),
2087                AggregateFunc::MaxMzTimestamp => Kind::MaxMzTimestamp(()),
2088                AggregateFunc::MaxFloat32 => Kind::MaxFloat32(()),
2089                AggregateFunc::MaxFloat64 => Kind::MaxFloat64(()),
2090                AggregateFunc::MaxBool => Kind::MaxBool(()),
2091                AggregateFunc::MaxString => Kind::MaxString(()),
2092                AggregateFunc::MaxDate => Kind::MaxDate(()),
2093                AggregateFunc::MaxTimestamp => Kind::MaxTimestamp(()),
2094                AggregateFunc::MaxTimestampTz => Kind::MaxTimestampTz(()),
2095                AggregateFunc::MinNumeric => Kind::MinNumeric(()),
2096                AggregateFunc::MaxInterval => Kind::MaxInterval(()),
2097                AggregateFunc::MaxTime => Kind::MaxTime(()),
2098                AggregateFunc::MinInt16 => Kind::MinInt16(()),
2099                AggregateFunc::MinInt32 => Kind::MinInt32(()),
2100                AggregateFunc::MinInt64 => Kind::MinInt64(()),
2101                AggregateFunc::MinUInt16 => Kind::MinUint16(()),
2102                AggregateFunc::MinUInt32 => Kind::MinUint32(()),
2103                AggregateFunc::MinUInt64 => Kind::MinUint64(()),
2104                AggregateFunc::MinMzTimestamp => Kind::MinMzTimestamp(()),
2105                AggregateFunc::MinFloat32 => Kind::MinFloat32(()),
2106                AggregateFunc::MinFloat64 => Kind::MinFloat64(()),
2107                AggregateFunc::MinBool => Kind::MinBool(()),
2108                AggregateFunc::MinString => Kind::MinString(()),
2109                AggregateFunc::MinDate => Kind::MinDate(()),
2110                AggregateFunc::MinTimestamp => Kind::MinTimestamp(()),
2111                AggregateFunc::MinTimestampTz => Kind::MinTimestampTz(()),
2112                AggregateFunc::MinInterval => Kind::MinInterval(()),
2113                AggregateFunc::MinTime => Kind::MinTime(()),
2114                AggregateFunc::SumInt16 => Kind::SumInt16(()),
2115                AggregateFunc::SumInt32 => Kind::SumInt32(()),
2116                AggregateFunc::SumInt64 => Kind::SumInt64(()),
2117                AggregateFunc::SumUInt16 => Kind::SumUint16(()),
2118                AggregateFunc::SumUInt32 => Kind::SumUint32(()),
2119                AggregateFunc::SumUInt64 => Kind::SumUint64(()),
2120                AggregateFunc::SumFloat32 => Kind::SumFloat32(()),
2121                AggregateFunc::SumFloat64 => Kind::SumFloat64(()),
2122                AggregateFunc::SumNumeric => Kind::SumNumeric(()),
2123                AggregateFunc::Count => Kind::Count(()),
2124                AggregateFunc::Any => Kind::Any(()),
2125                AggregateFunc::All => Kind::All(()),
2126                AggregateFunc::JsonbAgg { order_by } => Kind::JsonbAgg(order_by.into_proto()),
2127                AggregateFunc::JsonbObjectAgg { order_by } => {
2128                    Kind::JsonbObjectAgg(order_by.into_proto())
2129                }
2130                AggregateFunc::MapAgg {
2131                    order_by,
2132                    value_type,
2133                } => Kind::MapAgg(proto_aggregate_func::ProtoMapAgg {
2134                    order_by: Some(order_by.into_proto()),
2135                    value_type: Some(value_type.into_proto()),
2136                }),
2137                AggregateFunc::ArrayConcat { order_by } => Kind::ArrayConcat(order_by.into_proto()),
2138                AggregateFunc::ListConcat { order_by } => Kind::ListConcat(order_by.into_proto()),
2139                AggregateFunc::StringAgg { order_by } => Kind::StringAgg(order_by.into_proto()),
2140                AggregateFunc::RowNumber { order_by } => Kind::RowNumber(order_by.into_proto()),
2141                AggregateFunc::Rank { order_by } => Kind::Rank(order_by.into_proto()),
2142                AggregateFunc::DenseRank { order_by } => Kind::DenseRank(order_by.into_proto()),
2143                AggregateFunc::LagLead {
2144                    order_by,
2145                    lag_lead,
2146                    ignore_nulls,
2147                } => Kind::LagLead(proto_aggregate_func::ProtoLagLead {
2148                    order_by: Some(order_by.into_proto()),
2149                    lag_lead: Some(match lag_lead {
2150                        LagLeadType::Lag => proto_aggregate_func::proto_lag_lead::LagLead::Lag(()),
2151                        LagLeadType::Lead => {
2152                            proto_aggregate_func::proto_lag_lead::LagLead::Lead(())
2153                        }
2154                    }),
2155                    ignore_nulls: *ignore_nulls,
2156                }),
2157                AggregateFunc::FirstValue {
2158                    order_by,
2159                    window_frame,
2160                } => Kind::FirstValue(proto_aggregate_func::ProtoFramedWindowFunc {
2161                    order_by: Some(order_by.into_proto()),
2162                    window_frame: Some(window_frame.into_proto()),
2163                }),
2164                AggregateFunc::LastValue {
2165                    order_by,
2166                    window_frame,
2167                } => Kind::LastValue(proto_aggregate_func::ProtoFramedWindowFunc {
2168                    order_by: Some(order_by.into_proto()),
2169                    window_frame: Some(window_frame.into_proto()),
2170                }),
2171                AggregateFunc::WindowAggregate {
2172                    wrapped_aggregate,
2173                    order_by,
2174                    window_frame,
2175                } => Kind::WindowAggregate(Box::new(proto_aggregate_func::ProtoWindowAggregate {
2176                    wrapped_aggregate: Some(wrapped_aggregate.into_proto()),
2177                    order_by: Some(order_by.into_proto()),
2178                    window_frame: Some(window_frame.into_proto()),
2179                })),
2180                AggregateFunc::FusedValueWindowFunc { funcs, order_by } => {
2181                    Kind::FusedValueWindowFunc(ProtoFusedValueWindowFunc {
2182                        funcs: funcs.into_proto(),
2183                        order_by: Some(order_by.into_proto()),
2184                    })
2185                }
2186                AggregateFunc::FusedWindowAggregate {
2187                    wrapped_aggregates,
2188                    order_by,
2189                    window_frame,
2190                } => Kind::FusedWindowAggregate(ProtoFusedWindowAggregate {
2191                    wrapped_aggregates: wrapped_aggregates.into_proto(),
2192                    order_by: Some(order_by.into_proto()),
2193                    window_frame: Some(window_frame.into_proto()),
2194                }),
2195                AggregateFunc::Dummy => Kind::Dummy(()),
2196            }),
2197        }
2198    }
2199
2200    fn from_proto(proto: ProtoAggregateFunc) -> Result<Self, TryFromProtoError> {
2201        use proto_aggregate_func::Kind;
2202        let kind = proto
2203            .kind
2204            .ok_or_else(|| TryFromProtoError::missing_field("ProtoAggregateFunc::kind"))?;
2205        Ok(match kind {
2206            Kind::MaxNumeric(()) => AggregateFunc::MaxNumeric,
2207            Kind::MaxInt16(()) => AggregateFunc::MaxInt16,
2208            Kind::MaxInt32(()) => AggregateFunc::MaxInt32,
2209            Kind::MaxInt64(()) => AggregateFunc::MaxInt64,
2210            Kind::MaxUint16(()) => AggregateFunc::MaxUInt16,
2211            Kind::MaxUint32(()) => AggregateFunc::MaxUInt32,
2212            Kind::MaxUint64(()) => AggregateFunc::MaxUInt64,
2213            Kind::MaxMzTimestamp(()) => AggregateFunc::MaxMzTimestamp,
2214            Kind::MaxFloat32(()) => AggregateFunc::MaxFloat32,
2215            Kind::MaxFloat64(()) => AggregateFunc::MaxFloat64,
2216            Kind::MaxBool(()) => AggregateFunc::MaxBool,
2217            Kind::MaxString(()) => AggregateFunc::MaxString,
2218            Kind::MaxDate(()) => AggregateFunc::MaxDate,
2219            Kind::MaxTimestamp(()) => AggregateFunc::MaxTimestamp,
2220            Kind::MaxTimestampTz(()) => AggregateFunc::MaxTimestampTz,
2221            Kind::MaxInterval(()) => AggregateFunc::MaxInterval,
2222            Kind::MaxTime(()) => AggregateFunc::MaxTime,
2223            Kind::MinNumeric(()) => AggregateFunc::MinNumeric,
2224            Kind::MinInt16(()) => AggregateFunc::MinInt16,
2225            Kind::MinInt32(()) => AggregateFunc::MinInt32,
2226            Kind::MinInt64(()) => AggregateFunc::MinInt64,
2227            Kind::MinUint16(()) => AggregateFunc::MinUInt16,
2228            Kind::MinUint32(()) => AggregateFunc::MinUInt32,
2229            Kind::MinUint64(()) => AggregateFunc::MinUInt64,
2230            Kind::MinMzTimestamp(()) => AggregateFunc::MinMzTimestamp,
2231            Kind::MinFloat32(()) => AggregateFunc::MinFloat32,
2232            Kind::MinFloat64(()) => AggregateFunc::MinFloat64,
2233            Kind::MinBool(()) => AggregateFunc::MinBool,
2234            Kind::MinString(()) => AggregateFunc::MinString,
2235            Kind::MinDate(()) => AggregateFunc::MinDate,
2236            Kind::MinTimestamp(()) => AggregateFunc::MinTimestamp,
2237            Kind::MinTimestampTz(()) => AggregateFunc::MinTimestampTz,
2238            Kind::MinInterval(()) => AggregateFunc::MinInterval,
2239            Kind::MinTime(()) => AggregateFunc::MinTime,
2240            Kind::SumInt16(()) => AggregateFunc::SumInt16,
2241            Kind::SumInt32(()) => AggregateFunc::SumInt32,
2242            Kind::SumInt64(()) => AggregateFunc::SumInt64,
2243            Kind::SumUint16(()) => AggregateFunc::SumUInt16,
2244            Kind::SumUint32(()) => AggregateFunc::SumUInt32,
2245            Kind::SumUint64(()) => AggregateFunc::SumUInt64,
2246            Kind::SumFloat32(()) => AggregateFunc::SumFloat32,
2247            Kind::SumFloat64(()) => AggregateFunc::SumFloat64,
2248            Kind::SumNumeric(()) => AggregateFunc::SumNumeric,
2249            Kind::Count(()) => AggregateFunc::Count,
2250            Kind::Any(()) => AggregateFunc::Any,
2251            Kind::All(()) => AggregateFunc::All,
2252            Kind::JsonbAgg(order_by) => AggregateFunc::JsonbAgg {
2253                order_by: order_by.into_rust()?,
2254            },
2255            Kind::JsonbObjectAgg(order_by) => AggregateFunc::JsonbObjectAgg {
2256                order_by: order_by.into_rust()?,
2257            },
2258            Kind::MapAgg(pma) => AggregateFunc::MapAgg {
2259                order_by: pma.order_by.into_rust_if_some("ProtoMapAgg::order_by")?,
2260                value_type: pma
2261                    .value_type
2262                    .into_rust_if_some("ProtoMapAgg::value_type")?,
2263            },
2264            Kind::ArrayConcat(order_by) => AggregateFunc::ArrayConcat {
2265                order_by: order_by.into_rust()?,
2266            },
2267            Kind::ListConcat(order_by) => AggregateFunc::ListConcat {
2268                order_by: order_by.into_rust()?,
2269            },
2270            Kind::StringAgg(order_by) => AggregateFunc::StringAgg {
2271                order_by: order_by.into_rust()?,
2272            },
2273            Kind::RowNumber(order_by) => AggregateFunc::RowNumber {
2274                order_by: order_by.into_rust()?,
2275            },
2276            Kind::Rank(order_by) => AggregateFunc::Rank {
2277                order_by: order_by.into_rust()?,
2278            },
2279            Kind::DenseRank(order_by) => AggregateFunc::DenseRank {
2280                order_by: order_by.into_rust()?,
2281            },
2282            Kind::LagLead(pll) => AggregateFunc::LagLead {
2283                order_by: pll.order_by.into_rust_if_some("ProtoLagLead::order_by")?,
2284                lag_lead: match pll.lag_lead {
2285                    Some(proto_aggregate_func::proto_lag_lead::LagLead::Lag(())) => {
2286                        LagLeadType::Lag
2287                    }
2288                    Some(proto_aggregate_func::proto_lag_lead::LagLead::Lead(())) => {
2289                        LagLeadType::Lead
2290                    }
2291                    None => {
2292                        return Err(TryFromProtoError::MissingField(
2293                            "ProtoLagLead::lag_lead".into(),
2294                        ));
2295                    }
2296                },
2297                ignore_nulls: pll.ignore_nulls,
2298            },
2299            Kind::FirstValue(pfv) => AggregateFunc::FirstValue {
2300                order_by: pfv
2301                    .order_by
2302                    .into_rust_if_some("ProtoFramedWindowFunc::order_by")?,
2303                window_frame: pfv
2304                    .window_frame
2305                    .into_rust_if_some("ProtoFramedWindowFunc::window_frame")?,
2306            },
2307            Kind::LastValue(pfv) => AggregateFunc::LastValue {
2308                order_by: pfv
2309                    .order_by
2310                    .into_rust_if_some("ProtoFramedWindowFunc::order_by")?,
2311                window_frame: pfv
2312                    .window_frame
2313                    .into_rust_if_some("ProtoFramedWindowFunc::window_frame")?,
2314            },
2315            Kind::WindowAggregate(paf) => AggregateFunc::WindowAggregate {
2316                wrapped_aggregate: paf
2317                    .wrapped_aggregate
2318                    .into_rust_if_some("ProtoWindowAggregate::wrapped_aggregate")?,
2319                order_by: paf
2320                    .order_by
2321                    .into_rust_if_some("ProtoWindowAggregate::order_by")?,
2322                window_frame: paf
2323                    .window_frame
2324                    .into_rust_if_some("ProtoWindowAggregate::window_frame")?,
2325            },
2326            Kind::FusedValueWindowFunc(fvwf) => AggregateFunc::FusedValueWindowFunc {
2327                funcs: fvwf.funcs.into_rust()?,
2328                order_by: fvwf
2329                    .order_by
2330                    .into_rust_if_some("ProtoFusedValueWindowFunc::order_by")?,
2331            },
2332            Kind::FusedWindowAggregate(fwa) => AggregateFunc::FusedWindowAggregate {
2333                wrapped_aggregates: fwa.wrapped_aggregates.into_rust()?,
2334                order_by: fwa
2335                    .order_by
2336                    .into_rust_if_some("ProtoFusedWindowAggregate::order_by")?,
2337                window_frame: fwa
2338                    .window_frame
2339                    .into_rust_if_some("ProtoFusedWindowAggregate::window_frame")?,
2340            },
2341            Kind::Dummy(()) => AggregateFunc::Dummy,
2342        })
2343    }
2344}
2345
2346impl AggregateFunc {
2347    pub fn eval<'a, I>(&self, datums: I, temp_storage: &'a RowArena) -> Datum<'a>
2348    where
2349        I: IntoIterator<Item = Datum<'a>>,
2350    {
2351        match self {
2352            AggregateFunc::MaxNumeric => {
2353                max_datum::<'a, I, OrderedDecimal<numeric::Numeric>>(datums)
2354            }
2355            AggregateFunc::MaxInt16 => max_datum::<'a, I, i16>(datums),
2356            AggregateFunc::MaxInt32 => max_datum::<'a, I, i32>(datums),
2357            AggregateFunc::MaxInt64 => max_datum::<'a, I, i64>(datums),
2358            AggregateFunc::MaxUInt16 => max_datum::<'a, I, u16>(datums),
2359            AggregateFunc::MaxUInt32 => max_datum::<'a, I, u32>(datums),
2360            AggregateFunc::MaxUInt64 => max_datum::<'a, I, u64>(datums),
2361            AggregateFunc::MaxMzTimestamp => max_datum::<'a, I, mz_repr::Timestamp>(datums),
2362            AggregateFunc::MaxFloat32 => max_datum::<'a, I, OrderedFloat<f32>>(datums),
2363            AggregateFunc::MaxFloat64 => max_datum::<'a, I, OrderedFloat<f64>>(datums),
2364            AggregateFunc::MaxBool => max_datum::<'a, I, bool>(datums),
2365            AggregateFunc::MaxString => max_string(datums),
2366            AggregateFunc::MaxDate => max_datum::<'a, I, Date>(datums),
2367            AggregateFunc::MaxTimestamp => {
2368                max_datum::<'a, I, CheckedTimestamp<NaiveDateTime>>(datums)
2369            }
2370            AggregateFunc::MaxTimestampTz => {
2371                max_datum::<'a, I, CheckedTimestamp<DateTime<Utc>>>(datums)
2372            }
2373            AggregateFunc::MaxInterval => max_datum::<'a, I, Interval>(datums),
2374            AggregateFunc::MaxTime => max_datum::<'a, I, NaiveTime>(datums),
2375            AggregateFunc::MinNumeric => {
2376                min_datum::<'a, I, OrderedDecimal<numeric::Numeric>>(datums)
2377            }
2378            AggregateFunc::MinInt16 => min_datum::<'a, I, i16>(datums),
2379            AggregateFunc::MinInt32 => min_datum::<'a, I, i32>(datums),
2380            AggregateFunc::MinInt64 => min_datum::<'a, I, i64>(datums),
2381            AggregateFunc::MinUInt16 => min_datum::<'a, I, u16>(datums),
2382            AggregateFunc::MinUInt32 => min_datum::<'a, I, u32>(datums),
2383            AggregateFunc::MinUInt64 => min_datum::<'a, I, u64>(datums),
2384            AggregateFunc::MinMzTimestamp => min_datum::<'a, I, mz_repr::Timestamp>(datums),
2385            AggregateFunc::MinFloat32 => min_datum::<'a, I, OrderedFloat<f32>>(datums),
2386            AggregateFunc::MinFloat64 => min_datum::<'a, I, OrderedFloat<f64>>(datums),
2387            AggregateFunc::MinBool => min_datum::<'a, I, bool>(datums),
2388            AggregateFunc::MinString => min_string(datums),
2389            AggregateFunc::MinDate => min_datum::<'a, I, Date>(datums),
2390            AggregateFunc::MinTimestamp => {
2391                min_datum::<'a, I, CheckedTimestamp<NaiveDateTime>>(datums)
2392            }
2393            AggregateFunc::MinTimestampTz => {
2394                min_datum::<'a, I, CheckedTimestamp<DateTime<Utc>>>(datums)
2395            }
2396            AggregateFunc::MinInterval => min_datum::<'a, I, Interval>(datums),
2397            AggregateFunc::MinTime => min_datum::<'a, I, NaiveTime>(datums),
2398            AggregateFunc::SumInt16 => sum_datum::<'a, I, i16, i64>(datums),
2399            AggregateFunc::SumInt32 => sum_datum::<'a, I, i32, i64>(datums),
2400            AggregateFunc::SumInt64 => sum_datum::<'a, I, i64, i128>(datums),
2401            AggregateFunc::SumUInt16 => sum_datum::<'a, I, u16, u64>(datums),
2402            AggregateFunc::SumUInt32 => sum_datum::<'a, I, u32, u64>(datums),
2403            AggregateFunc::SumUInt64 => sum_datum::<'a, I, u64, u128>(datums),
2404            AggregateFunc::SumFloat32 => sum_datum::<'a, I, f32, f32>(datums),
2405            AggregateFunc::SumFloat64 => sum_datum::<'a, I, f64, f64>(datums),
2406            AggregateFunc::SumNumeric => sum_numeric(datums),
2407            AggregateFunc::Count => count(datums),
2408            AggregateFunc::Any => any(datums),
2409            AggregateFunc::All => all(datums),
2410            AggregateFunc::JsonbAgg { order_by } => jsonb_agg(datums, temp_storage, order_by),
2411            AggregateFunc::MapAgg { order_by, .. } | AggregateFunc::JsonbObjectAgg { order_by } => {
2412                dict_agg(datums, temp_storage, order_by)
2413            }
2414            AggregateFunc::ArrayConcat { order_by } => array_concat(datums, temp_storage, order_by),
2415            AggregateFunc::ListConcat { order_by } => list_concat(datums, temp_storage, order_by),
2416            AggregateFunc::StringAgg { order_by } => string_agg(datums, temp_storage, order_by),
2417            AggregateFunc::RowNumber { order_by } => row_number(datums, temp_storage, order_by),
2418            AggregateFunc::Rank { order_by } => rank(datums, temp_storage, order_by),
2419            AggregateFunc::DenseRank { order_by } => dense_rank(datums, temp_storage, order_by),
2420            AggregateFunc::LagLead {
2421                order_by,
2422                lag_lead: lag_lead_type,
2423                ignore_nulls,
2424            } => lag_lead(datums, temp_storage, order_by, lag_lead_type, ignore_nulls),
2425            AggregateFunc::FirstValue {
2426                order_by,
2427                window_frame,
2428            } => first_value(datums, temp_storage, order_by, window_frame),
2429            AggregateFunc::LastValue {
2430                order_by,
2431                window_frame,
2432            } => last_value(datums, temp_storage, order_by, window_frame),
2433            AggregateFunc::WindowAggregate {
2434                wrapped_aggregate,
2435                order_by,
2436                window_frame,
2437            } => window_aggr::<_, NaiveOneByOneAggr>(
2438                datums,
2439                temp_storage,
2440                wrapped_aggregate,
2441                order_by,
2442                window_frame,
2443            ),
2444            AggregateFunc::FusedValueWindowFunc { funcs, order_by } => {
2445                fused_value_window_func(datums, temp_storage, funcs, order_by)
2446            }
2447            AggregateFunc::FusedWindowAggregate {
2448                wrapped_aggregates,
2449                order_by,
2450                window_frame,
2451            } => fused_window_aggr::<_, NaiveOneByOneAggr>(
2452                datums,
2453                temp_storage,
2454                wrapped_aggregates,
2455                order_by,
2456                window_frame,
2457            ),
2458            AggregateFunc::Dummy => Datum::Dummy,
2459        }
2460    }
2461
2462    /// Like `eval`, but it's given a [OneByOneAggr]. If `self` is a `WindowAggregate`, then
2463    /// the given [OneByOneAggr] will be used to evaluate the wrapped aggregate inside the
2464    /// `WindowAggregate`. If `self` is not a `WindowAggregate`, then it simply calls `eval`.
2465    pub fn eval_with_fast_window_agg<'a, I, W>(
2466        &self,
2467        datums: I,
2468        temp_storage: &'a RowArena,
2469    ) -> Datum<'a>
2470    where
2471        I: IntoIterator<Item = Datum<'a>>,
2472        W: OneByOneAggr,
2473    {
2474        match self {
2475            AggregateFunc::WindowAggregate {
2476                wrapped_aggregate,
2477                order_by,
2478                window_frame,
2479            } => window_aggr::<_, W>(
2480                datums,
2481                temp_storage,
2482                wrapped_aggregate,
2483                order_by,
2484                window_frame,
2485            ),
2486            AggregateFunc::FusedWindowAggregate {
2487                wrapped_aggregates,
2488                order_by,
2489                window_frame,
2490            } => fused_window_aggr::<_, W>(
2491                datums,
2492                temp_storage,
2493                wrapped_aggregates,
2494                order_by,
2495                window_frame,
2496            ),
2497            _ => self.eval(datums, temp_storage),
2498        }
2499    }
2500
2501    pub fn eval_with_unnest_list<'a, I, W>(
2502        &self,
2503        datums: I,
2504        temp_storage: &'a RowArena,
2505    ) -> impl Iterator<Item = Datum<'a>>
2506    where
2507        I: IntoIterator<Item = Datum<'a>>,
2508        W: OneByOneAggr,
2509    {
2510        // TODO: Use `enum_dispatch` to construct a unified iterator instead of `collect_vec`.
2511        assert!(self.can_fuse_with_unnest_list());
2512        match self {
2513            AggregateFunc::RowNumber { order_by } => {
2514                row_number_no_list(datums, temp_storage, order_by).collect_vec()
2515            }
2516            AggregateFunc::Rank { order_by } => {
2517                rank_no_list(datums, temp_storage, order_by).collect_vec()
2518            }
2519            AggregateFunc::DenseRank { order_by } => {
2520                dense_rank_no_list(datums, temp_storage, order_by).collect_vec()
2521            }
2522            AggregateFunc::LagLead {
2523                order_by,
2524                lag_lead: lag_lead_type,
2525                ignore_nulls,
2526            } => lag_lead_no_list(datums, temp_storage, order_by, lag_lead_type, ignore_nulls)
2527                .collect_vec(),
2528            AggregateFunc::FirstValue {
2529                order_by,
2530                window_frame,
2531            } => first_value_no_list(datums, temp_storage, order_by, window_frame).collect_vec(),
2532            AggregateFunc::LastValue {
2533                order_by,
2534                window_frame,
2535            } => last_value_no_list(datums, temp_storage, order_by, window_frame).collect_vec(),
2536            AggregateFunc::FusedValueWindowFunc { funcs, order_by } => {
2537                fused_value_window_func_no_list(datums, temp_storage, funcs, order_by).collect_vec()
2538            }
2539            AggregateFunc::WindowAggregate {
2540                wrapped_aggregate,
2541                order_by,
2542                window_frame,
2543            } => window_aggr_no_list::<_, W>(
2544                datums,
2545                temp_storage,
2546                wrapped_aggregate,
2547                order_by,
2548                window_frame,
2549            )
2550            .collect_vec(),
2551            AggregateFunc::FusedWindowAggregate {
2552                wrapped_aggregates,
2553                order_by,
2554                window_frame,
2555            } => fused_window_aggr_no_list::<_, W>(
2556                datums,
2557                temp_storage,
2558                wrapped_aggregates,
2559                order_by,
2560                window_frame,
2561            )
2562            .collect_vec(),
2563            _ => unreachable!("asserted above that `can_fuse_with_unnest_list`"),
2564        }
2565        .into_iter()
2566    }
2567
2568    /// Returns the output of the aggregation function when applied on an empty
2569    /// input relation.
2570    pub fn default(&self) -> Datum<'static> {
2571        match self {
2572            AggregateFunc::Count => Datum::Int64(0),
2573            AggregateFunc::Any => Datum::False,
2574            AggregateFunc::All => Datum::True,
2575            AggregateFunc::Dummy => Datum::Dummy,
2576            _ => Datum::Null,
2577        }
2578    }
2579
2580    /// Returns a datum whose inclusion in the aggregation will not change its
2581    /// result.
2582    pub fn identity_datum(&self) -> Datum<'static> {
2583        match self {
2584            AggregateFunc::Any => Datum::False,
2585            AggregateFunc::All => Datum::True,
2586            AggregateFunc::Dummy => Datum::Dummy,
2587            AggregateFunc::ArrayConcat { .. } => Datum::empty_array(),
2588            AggregateFunc::ListConcat { .. } => Datum::empty_list(),
2589            AggregateFunc::RowNumber { .. }
2590            | AggregateFunc::Rank { .. }
2591            | AggregateFunc::DenseRank { .. }
2592            | AggregateFunc::LagLead { .. }
2593            | AggregateFunc::FirstValue { .. }
2594            | AggregateFunc::LastValue { .. }
2595            | AggregateFunc::WindowAggregate { .. }
2596            | AggregateFunc::FusedValueWindowFunc { .. }
2597            | AggregateFunc::FusedWindowAggregate { .. } => Datum::empty_list(),
2598            AggregateFunc::MaxNumeric
2599            | AggregateFunc::MaxInt16
2600            | AggregateFunc::MaxInt32
2601            | AggregateFunc::MaxInt64
2602            | AggregateFunc::MaxUInt16
2603            | AggregateFunc::MaxUInt32
2604            | AggregateFunc::MaxUInt64
2605            | AggregateFunc::MaxMzTimestamp
2606            | AggregateFunc::MaxFloat32
2607            | AggregateFunc::MaxFloat64
2608            | AggregateFunc::MaxBool
2609            | AggregateFunc::MaxString
2610            | AggregateFunc::MaxDate
2611            | AggregateFunc::MaxTimestamp
2612            | AggregateFunc::MaxTimestampTz
2613            | AggregateFunc::MaxInterval
2614            | AggregateFunc::MaxTime
2615            | AggregateFunc::MinNumeric
2616            | AggregateFunc::MinInt16
2617            | AggregateFunc::MinInt32
2618            | AggregateFunc::MinInt64
2619            | AggregateFunc::MinUInt16
2620            | AggregateFunc::MinUInt32
2621            | AggregateFunc::MinUInt64
2622            | AggregateFunc::MinMzTimestamp
2623            | AggregateFunc::MinFloat32
2624            | AggregateFunc::MinFloat64
2625            | AggregateFunc::MinBool
2626            | AggregateFunc::MinString
2627            | AggregateFunc::MinDate
2628            | AggregateFunc::MinTimestamp
2629            | AggregateFunc::MinTimestampTz
2630            | AggregateFunc::MinInterval
2631            | AggregateFunc::MinTime
2632            | AggregateFunc::SumInt16
2633            | AggregateFunc::SumInt32
2634            | AggregateFunc::SumInt64
2635            | AggregateFunc::SumUInt16
2636            | AggregateFunc::SumUInt32
2637            | AggregateFunc::SumUInt64
2638            | AggregateFunc::SumFloat32
2639            | AggregateFunc::SumFloat64
2640            | AggregateFunc::SumNumeric
2641            | AggregateFunc::Count
2642            | AggregateFunc::JsonbAgg { .. }
2643            | AggregateFunc::JsonbObjectAgg { .. }
2644            | AggregateFunc::MapAgg { .. }
2645            | AggregateFunc::StringAgg { .. } => Datum::Null,
2646        }
2647    }
2648
2649    pub fn can_fuse_with_unnest_list(&self) -> bool {
2650        match self {
2651            AggregateFunc::RowNumber { .. }
2652            | AggregateFunc::Rank { .. }
2653            | AggregateFunc::DenseRank { .. }
2654            | AggregateFunc::LagLead { .. }
2655            | AggregateFunc::FirstValue { .. }
2656            | AggregateFunc::LastValue { .. }
2657            | AggregateFunc::WindowAggregate { .. }
2658            | AggregateFunc::FusedValueWindowFunc { .. }
2659            | AggregateFunc::FusedWindowAggregate { .. } => true,
2660            AggregateFunc::ArrayConcat { .. }
2661            | AggregateFunc::ListConcat { .. }
2662            | AggregateFunc::Any
2663            | AggregateFunc::All
2664            | AggregateFunc::Dummy
2665            | AggregateFunc::MaxNumeric
2666            | AggregateFunc::MaxInt16
2667            | AggregateFunc::MaxInt32
2668            | AggregateFunc::MaxInt64
2669            | AggregateFunc::MaxUInt16
2670            | AggregateFunc::MaxUInt32
2671            | AggregateFunc::MaxUInt64
2672            | AggregateFunc::MaxMzTimestamp
2673            | AggregateFunc::MaxFloat32
2674            | AggregateFunc::MaxFloat64
2675            | AggregateFunc::MaxBool
2676            | AggregateFunc::MaxString
2677            | AggregateFunc::MaxDate
2678            | AggregateFunc::MaxTimestamp
2679            | AggregateFunc::MaxTimestampTz
2680            | AggregateFunc::MaxInterval
2681            | AggregateFunc::MaxTime
2682            | AggregateFunc::MinNumeric
2683            | AggregateFunc::MinInt16
2684            | AggregateFunc::MinInt32
2685            | AggregateFunc::MinInt64
2686            | AggregateFunc::MinUInt16
2687            | AggregateFunc::MinUInt32
2688            | AggregateFunc::MinUInt64
2689            | AggregateFunc::MinMzTimestamp
2690            | AggregateFunc::MinFloat32
2691            | AggregateFunc::MinFloat64
2692            | AggregateFunc::MinBool
2693            | AggregateFunc::MinString
2694            | AggregateFunc::MinDate
2695            | AggregateFunc::MinTimestamp
2696            | AggregateFunc::MinTimestampTz
2697            | AggregateFunc::MinInterval
2698            | AggregateFunc::MinTime
2699            | AggregateFunc::SumInt16
2700            | AggregateFunc::SumInt32
2701            | AggregateFunc::SumInt64
2702            | AggregateFunc::SumUInt16
2703            | AggregateFunc::SumUInt32
2704            | AggregateFunc::SumUInt64
2705            | AggregateFunc::SumFloat32
2706            | AggregateFunc::SumFloat64
2707            | AggregateFunc::SumNumeric
2708            | AggregateFunc::Count
2709            | AggregateFunc::JsonbAgg { .. }
2710            | AggregateFunc::JsonbObjectAgg { .. }
2711            | AggregateFunc::MapAgg { .. }
2712            | AggregateFunc::StringAgg { .. } => false,
2713        }
2714    }
2715
2716    /// The output column type for the result of an aggregation.
2717    ///
2718    /// The output column type also contains nullability information, which
2719    /// is (without further information) true for aggregations that are not
2720    /// counts.
2721    pub fn output_type(&self, input_type: ColumnType) -> ColumnType {
2722        let scalar_type = match self {
2723            AggregateFunc::Count => ScalarType::Int64,
2724            AggregateFunc::Any => ScalarType::Bool,
2725            AggregateFunc::All => ScalarType::Bool,
2726            AggregateFunc::JsonbAgg { .. } => ScalarType::Jsonb,
2727            AggregateFunc::JsonbObjectAgg { .. } => ScalarType::Jsonb,
2728            AggregateFunc::SumInt16 => ScalarType::Int64,
2729            AggregateFunc::SumInt32 => ScalarType::Int64,
2730            AggregateFunc::SumInt64 => ScalarType::Numeric {
2731                max_scale: Some(NumericMaxScale::ZERO),
2732            },
2733            AggregateFunc::SumUInt16 => ScalarType::UInt64,
2734            AggregateFunc::SumUInt32 => ScalarType::UInt64,
2735            AggregateFunc::SumUInt64 => ScalarType::Numeric {
2736                max_scale: Some(NumericMaxScale::ZERO),
2737            },
2738            AggregateFunc::MapAgg { value_type, .. } => ScalarType::Map {
2739                value_type: Box::new(value_type.clone()),
2740                custom_id: None,
2741            },
2742            AggregateFunc::ArrayConcat { .. } | AggregateFunc::ListConcat { .. } => {
2743                match input_type.scalar_type {
2744                    // The input is wrapped in a Record if there's an ORDER BY, so extract it out.
2745                    ScalarType::Record { ref fields, .. } => fields[0].1.scalar_type.clone(),
2746                    _ => unreachable!(),
2747                }
2748            }
2749            AggregateFunc::StringAgg { .. } => ScalarType::String,
2750            AggregateFunc::RowNumber { .. } => {
2751                AggregateFunc::output_type_ranking_window_funcs(&input_type, "?row_number?")
2752            }
2753            AggregateFunc::Rank { .. } => {
2754                AggregateFunc::output_type_ranking_window_funcs(&input_type, "?rank?")
2755            }
2756            AggregateFunc::DenseRank { .. } => {
2757                AggregateFunc::output_type_ranking_window_funcs(&input_type, "?dense_rank?")
2758            }
2759            AggregateFunc::LagLead { lag_lead: lag_lead_type, .. } => {
2760                // The input type for Lag is ((OriginalRow, EncodedArgs), OrderByExprs...)
2761                let fields = input_type.scalar_type.unwrap_record_element_type();
2762                let original_row_type = fields[0].unwrap_record_element_type()[0]
2763                    .clone()
2764                    .nullable(false);
2765                let output_type_inner = Self::lag_lead_output_type_inner_from_encoded_args(fields[0].unwrap_record_element_type()[1]);
2766                let column_name = Self::lag_lead_result_column_name(lag_lead_type);
2767
2768                ScalarType::List {
2769                    element_type: Box::new(ScalarType::Record {
2770                        fields: [
2771                            (column_name, output_type_inner),
2772                            (ColumnName::from("?orig_row?"), original_row_type),
2773                        ].into(),
2774                        custom_id: None,
2775                    }),
2776                    custom_id: None,
2777                }
2778            }
2779            AggregateFunc::FirstValue { .. } => {
2780                // The input type for FirstValue is ((OriginalRow, Arg), OrderByExprs...)
2781                let fields = input_type.scalar_type.unwrap_record_element_type();
2782                let original_row_type = fields[0].unwrap_record_element_type()[0]
2783                    .clone()
2784                    .nullable(false);
2785                let value_type = fields[0].unwrap_record_element_type()[1]
2786                    .clone()
2787                    .nullable(true); // null when the partition is empty
2788
2789                ScalarType::List {
2790                    element_type: Box::new(ScalarType::Record {
2791                        fields: [
2792                            (ColumnName::from("?first_value?"), value_type),
2793                            (ColumnName::from("?orig_row?"), original_row_type),
2794                        ].into(),
2795                        custom_id: None,
2796                    }),
2797                    custom_id: None,
2798                }
2799            }
2800            AggregateFunc::LastValue { .. } => {
2801                // The input type for LastValue is ((OriginalRow, Arg), OrderByExprs...)
2802                let fields = input_type.scalar_type.unwrap_record_element_type();
2803                let original_row_type = fields[0].unwrap_record_element_type()[0]
2804                    .clone()
2805                    .nullable(false);
2806                let value_type = fields[0].unwrap_record_element_type()[1]
2807                    .clone()
2808                    .nullable(true); // null when the partition is empty
2809
2810                ScalarType::List {
2811                    element_type: Box::new(ScalarType::Record {
2812                        fields: [
2813                            (ColumnName::from("?last_value?"), value_type),
2814                            (ColumnName::from("?orig_row?"), original_row_type),
2815                        ].into(),
2816                        custom_id: None,
2817                    }),
2818                    custom_id: None,
2819                }
2820            }
2821            AggregateFunc::WindowAggregate {
2822                wrapped_aggregate, ..
2823            } => {
2824                // The input type for a window aggregate is ((OriginalRow, Arg), OrderByExprs...)
2825                let fields = input_type.scalar_type.unwrap_record_element_type();
2826                let original_row_type = fields[0].unwrap_record_element_type()[0]
2827                    .clone()
2828                    .nullable(false);
2829                let arg_type = fields[0].unwrap_record_element_type()[1]
2830                    .clone()
2831                    .nullable(true);
2832                let wrapped_aggr_out_type = wrapped_aggregate.output_type(arg_type);
2833
2834                ScalarType::List {
2835                    element_type: Box::new(ScalarType::Record {
2836                        fields: [
2837                            (ColumnName::from("?window_agg?"), wrapped_aggr_out_type),
2838                            (ColumnName::from("?orig_row?"), original_row_type),
2839                        ].into(),
2840                        custom_id: None,
2841                    }),
2842                    custom_id: None,
2843                }
2844            }
2845            AggregateFunc::FusedWindowAggregate {
2846                wrapped_aggregates, ..
2847            } => {
2848                // The input type for a fused window aggregate is ((OriginalRow, Args), OrderByExprs...)
2849                // where `Args` is a record.
2850                let fields = input_type.scalar_type.unwrap_record_element_type();
2851                let original_row_type = fields[0].unwrap_record_element_type()[0]
2852                    .clone()
2853                    .nullable(false);
2854                let args_type = fields[0].unwrap_record_element_type()[1];
2855                let arg_types = args_type.unwrap_record_element_type();
2856                let out_fields = arg_types.iter().zip_eq(wrapped_aggregates).map(|(arg_type, wrapped_agg)| {
2857                    (
2858                        ColumnName::from(wrapped_agg.name()),
2859                        wrapped_agg.output_type((**arg_type).clone().nullable(true)),
2860                    )
2861                }).collect_vec();
2862
2863                ScalarType::List {
2864                    element_type: Box::new(ScalarType::Record {
2865                        fields: [
2866                            (ColumnName::from("?fused_window_agg?"), ScalarType::Record {
2867                                fields: out_fields.into(),
2868                                custom_id: None,
2869                            }.nullable(false)),
2870                            (ColumnName::from("?orig_row?"), original_row_type),
2871                        ].into(),
2872                        custom_id: None,
2873                    }),
2874                    custom_id: None,
2875                }
2876            }
2877            AggregateFunc::FusedValueWindowFunc { funcs, order_by: _ } => {
2878                // The input type is ((OriginalRow, EncodedArgs), OrderByExprs...)
2879                // where EncodedArgs is a record, where each element is the argument to one of the
2880                // function calls that got fused. This is a record for lag/lead, and a simple type
2881                // for first_value/last_value.
2882                let fields = input_type.scalar_type.unwrap_record_element_type();
2883                let original_row_type = fields[0].unwrap_record_element_type()[0]
2884                    .clone()
2885                    .nullable(false);
2886                let encoded_args_type = fields[0].unwrap_record_element_type()[1].unwrap_record_element_type();
2887
2888                ScalarType::List {
2889                    element_type: Box::new(ScalarType::Record {
2890                        fields: [
2891                            (ColumnName::from("?fused_value_window_func?"), ScalarType::Record {
2892                                fields: encoded_args_type.into_iter().zip_eq(funcs).map(|(arg_type, func)| {
2893                                    match func {
2894                                        AggregateFunc::LagLead { lag_lead: lag_lead_type, .. } => {
2895                                            (
2896                                                Self::lag_lead_result_column_name(lag_lead_type),
2897                                                Self::lag_lead_output_type_inner_from_encoded_args(arg_type)
2898                                            )
2899                                        },
2900                                        AggregateFunc::FirstValue { .. } => {
2901                                            (
2902                                                ColumnName::from("?first_value?"),
2903                                                arg_type.clone().nullable(true),
2904                                            )
2905                                        }
2906                                        AggregateFunc::LastValue { .. } => {
2907                                            (
2908                                                ColumnName::from("?last_value?"),
2909                                                arg_type.clone().nullable(true),
2910                                            )
2911                                        }
2912                                        _ => panic!("FusedValueWindowFunc has an unknown function"),
2913                                    }
2914                                }).collect(),
2915                                custom_id: None,
2916                            }.nullable(false)),
2917                            (ColumnName::from("?orig_row?"), original_row_type),
2918                        ].into(),
2919                        custom_id: None,
2920                    }),
2921                    custom_id: None,
2922                }
2923            }
2924            AggregateFunc::Dummy
2925            | AggregateFunc::MaxNumeric
2926            | AggregateFunc::MaxInt16
2927            | AggregateFunc::MaxInt32
2928            | AggregateFunc::MaxInt64
2929            | AggregateFunc::MaxUInt16
2930            | AggregateFunc::MaxUInt32
2931            | AggregateFunc::MaxUInt64
2932            | AggregateFunc::MaxMzTimestamp
2933            | AggregateFunc::MaxFloat32
2934            | AggregateFunc::MaxFloat64
2935            | AggregateFunc::MaxBool
2936            // Note AggregateFunc::MaxString, MinString rely on returning input
2937            // type as output type to support the proper return type for
2938            // character input.
2939            | AggregateFunc::MaxString
2940            | AggregateFunc::MaxDate
2941            | AggregateFunc::MaxTimestamp
2942            | AggregateFunc::MaxTimestampTz
2943            | AggregateFunc::MaxInterval
2944            | AggregateFunc::MaxTime
2945            | AggregateFunc::MinNumeric
2946            | AggregateFunc::MinInt16
2947            | AggregateFunc::MinInt32
2948            | AggregateFunc::MinInt64
2949            | AggregateFunc::MinUInt16
2950            | AggregateFunc::MinUInt32
2951            | AggregateFunc::MinUInt64
2952            | AggregateFunc::MinMzTimestamp
2953            | AggregateFunc::MinFloat32
2954            | AggregateFunc::MinFloat64
2955            | AggregateFunc::MinBool
2956            | AggregateFunc::MinString
2957            | AggregateFunc::MinDate
2958            | AggregateFunc::MinTimestamp
2959            | AggregateFunc::MinTimestampTz
2960            | AggregateFunc::MinInterval
2961            | AggregateFunc::MinTime
2962            | AggregateFunc::SumFloat32
2963            | AggregateFunc::SumFloat64
2964            | AggregateFunc::SumNumeric => input_type.scalar_type.clone(),
2965        };
2966        // Count never produces null, and other aggregations only produce
2967        // null in the presence of null inputs.
2968        let nullable = match self {
2969            AggregateFunc::Count => false,
2970            // Use the nullability of the underlying column being aggregated, not the Records wrapping it
2971            AggregateFunc::StringAgg { .. } => match input_type.scalar_type {
2972                // The outer Record wraps the input in the first position, and any ORDER BY expressions afterwards
2973                ScalarType::Record { fields, .. } => match &fields[0].1.scalar_type {
2974                    // The inner Record is a (value, separator) tuple
2975                    ScalarType::Record { fields, .. } => fields[0].1.nullable,
2976                    _ => unreachable!(),
2977                },
2978                _ => unreachable!(),
2979            },
2980            _ => input_type.nullable,
2981        };
2982        scalar_type.nullable(nullable)
2983    }
2984
2985    /// Compute output type for ROW_NUMBER, RANK, DENSE_RANK
2986    fn output_type_ranking_window_funcs(input_type: &ColumnType, col_name: &str) -> ScalarType {
2987        match input_type.scalar_type {
2988            ScalarType::Record { ref fields, .. } => ScalarType::List {
2989                element_type: Box::new(ScalarType::Record {
2990                    fields: [
2991                        (
2992                            ColumnName::from(col_name),
2993                            ScalarType::Int64.nullable(false),
2994                        ),
2995                        (ColumnName::from("?orig_row?"), {
2996                            let inner = match &fields[0].1.scalar_type {
2997                                ScalarType::List { element_type, .. } => element_type.clone(),
2998                                _ => unreachable!(),
2999                            };
3000                            inner.nullable(false)
3001                        }),
3002                    ]
3003                    .into(),
3004                    custom_id: None,
3005                }),
3006                custom_id: None,
3007            },
3008            _ => unreachable!(),
3009        }
3010    }
3011
3012    /// Given the `EncodedArgs` part of `((OriginalRow, EncodedArgs), OrderByExprs...)`,
3013    /// this computes the type of the first field of the output type. (The first field is the
3014    /// real result, the rest is the original row.)
3015    fn lag_lead_output_type_inner_from_encoded_args(encoded_args_type: &ScalarType) -> ColumnType {
3016        // lag/lead have 3 arguments, and the output type is
3017        // the same as the first of these, but always nullable. (It's null when the
3018        // lag/lead computation reaches over the bounds of the window partition.)
3019        encoded_args_type.unwrap_record_element_type()[0]
3020            .clone()
3021            .nullable(true)
3022    }
3023
3024    fn lag_lead_result_column_name(lag_lead_type: &LagLeadType) -> ColumnName {
3025        ColumnName::from(match lag_lead_type {
3026            LagLeadType::Lag => "?lag?",
3027            LagLeadType::Lead => "?lead?",
3028        })
3029    }
3030
3031    /// Returns true if the non-null constraint on the aggregation can be
3032    /// converted into a non-null constraint on its parameter expression, ie.
3033    /// whether the result of the aggregation is null if all the input values
3034    /// are null.
3035    pub fn propagates_nonnull_constraint(&self) -> bool {
3036        match self {
3037            AggregateFunc::MaxNumeric
3038            | AggregateFunc::MaxInt16
3039            | AggregateFunc::MaxInt32
3040            | AggregateFunc::MaxInt64
3041            | AggregateFunc::MaxUInt16
3042            | AggregateFunc::MaxUInt32
3043            | AggregateFunc::MaxUInt64
3044            | AggregateFunc::MaxMzTimestamp
3045            | AggregateFunc::MaxFloat32
3046            | AggregateFunc::MaxFloat64
3047            | AggregateFunc::MaxBool
3048            | AggregateFunc::MaxString
3049            | AggregateFunc::MaxDate
3050            | AggregateFunc::MaxTimestamp
3051            | AggregateFunc::MaxTimestampTz
3052            | AggregateFunc::MinNumeric
3053            | AggregateFunc::MinInt16
3054            | AggregateFunc::MinInt32
3055            | AggregateFunc::MinInt64
3056            | AggregateFunc::MinUInt16
3057            | AggregateFunc::MinUInt32
3058            | AggregateFunc::MinUInt64
3059            | AggregateFunc::MinMzTimestamp
3060            | AggregateFunc::MinFloat32
3061            | AggregateFunc::MinFloat64
3062            | AggregateFunc::MinBool
3063            | AggregateFunc::MinString
3064            | AggregateFunc::MinDate
3065            | AggregateFunc::MinTimestamp
3066            | AggregateFunc::MinTimestampTz
3067            | AggregateFunc::SumInt16
3068            | AggregateFunc::SumInt32
3069            | AggregateFunc::SumInt64
3070            | AggregateFunc::SumUInt16
3071            | AggregateFunc::SumUInt32
3072            | AggregateFunc::SumUInt64
3073            | AggregateFunc::SumFloat32
3074            | AggregateFunc::SumFloat64
3075            | AggregateFunc::SumNumeric
3076            | AggregateFunc::StringAgg { .. } => true,
3077            // Count is never null
3078            AggregateFunc::Count => false,
3079            _ => false,
3080        }
3081    }
3082}
3083
3084fn jsonb_each<'a>(
3085    a: Datum<'a>,
3086    temp_storage: &'a RowArena,
3087    stringify: bool,
3088) -> impl Iterator<Item = (Row, Diff)> + 'a {
3089    // First produce a map, so that a common iterator can be returned.
3090    let map = match a {
3091        Datum::Map(dict) => dict,
3092        _ => mz_repr::DatumMap::empty(),
3093    };
3094
3095    map.iter().map(move |(k, mut v)| {
3096        if stringify {
3097            v = jsonb_stringify(v, temp_storage);
3098        }
3099        (Row::pack_slice(&[Datum::String(k), v]), Diff::ONE)
3100    })
3101}
3102
3103fn jsonb_object_keys<'a>(a: Datum<'a>) -> impl Iterator<Item = (Row, Diff)> + 'a {
3104    let map = match a {
3105        Datum::Map(dict) => dict,
3106        _ => mz_repr::DatumMap::empty(),
3107    };
3108
3109    map.iter()
3110        .map(move |(k, _)| (Row::pack_slice(&[Datum::String(k)]), Diff::ONE))
3111}
3112
3113fn jsonb_array_elements<'a>(
3114    a: Datum<'a>,
3115    temp_storage: &'a RowArena,
3116    stringify: bool,
3117) -> impl Iterator<Item = (Row, Diff)> + 'a {
3118    let list = match a {
3119        Datum::List(list) => list,
3120        _ => mz_repr::DatumList::empty(),
3121    };
3122    list.iter().map(move |mut e| {
3123        if stringify {
3124            e = jsonb_stringify(e, temp_storage);
3125        }
3126        (Row::pack_slice(&[e]), Diff::ONE)
3127    })
3128}
3129
3130fn regexp_extract(a: Datum, r: &AnalyzedRegex) -> Option<(Row, Diff)> {
3131    let r = r.inner();
3132    let a = a.unwrap_str();
3133    let captures = r.captures(a)?;
3134    let datums = captures
3135        .iter()
3136        .skip(1)
3137        .map(|m| Datum::from(m.map(|m| m.as_str())));
3138    Some((Row::pack(datums), Diff::ONE))
3139}
3140
3141fn regexp_matches<'a, 'r: 'a>(
3142    exprs: &[Datum<'a>],
3143) -> Result<impl Iterator<Item = (Row, Diff)> + 'a, EvalError> {
3144    // There are only two acceptable ways to call this function:
3145    // 1. regexp_matches(string, regex)
3146    // 2. regexp_matches(string, regex, flag)
3147    assert!(exprs.len() == 2 || exprs.len() == 3);
3148    let a = exprs[0].unwrap_str();
3149    let r = exprs[1].unwrap_str();
3150
3151    let (regex, opts) = if exprs.len() == 3 {
3152        let flag = exprs[2].unwrap_str();
3153        let opts = AnalyzedRegexOpts::from_str(flag)?;
3154        (AnalyzedRegex::new(r, opts)?, opts)
3155    } else {
3156        let opts = AnalyzedRegexOpts::default();
3157        (AnalyzedRegex::new(r, opts)?, opts)
3158    };
3159
3160    let regex = regex.inner().clone();
3161
3162    let iter = regex.captures_iter(a).map(move |captures| {
3163        let matches = captures
3164            .iter()
3165            // The first match is the *entire* match, we want the capture groups by themselves.
3166            .skip(1)
3167            .map(|m| Datum::from(m.map(|m| m.as_str())))
3168            .collect::<Vec<_>>();
3169
3170        let row = SharedRow::get();
3171        let mut binding = row.borrow_mut();
3172        let mut packer = binding.packer();
3173
3174        let dimension = ArrayDimension {
3175            lower_bound: 1,
3176            length: matches.len(),
3177        };
3178        packer
3179            .try_push_array(&[dimension], matches)
3180            .expect("generated dimensions above");
3181
3182        (binding.clone(), Diff::ONE)
3183    });
3184
3185    // This is slightly unfortunate, but we need to collect the captures into a
3186    // Vec before we can yield them, because we can't return a iter with a
3187    // reference to the local `regex` variable.
3188    // We attempt to minimize the cost of this by using a SmallVec.
3189    let out = iter.collect::<SmallVec<[_; 3]>>();
3190
3191    if opts.global {
3192        Ok(Either::Left(out.into_iter()))
3193    } else {
3194        Ok(Either::Right(out.into_iter().take(1)))
3195    }
3196}
3197
3198fn generate_series<N>(
3199    start: N,
3200    stop: N,
3201    step: N,
3202) -> Result<impl Iterator<Item = (Row, Diff)>, EvalError>
3203where
3204    N: Integer + Signed + CheckedAdd + Clone,
3205    Datum<'static>: From<N>,
3206{
3207    if step == N::zero() {
3208        return Err(EvalError::InvalidParameterValue(
3209            "step size cannot equal zero".into(),
3210        ));
3211    }
3212    Ok(num::range_step_inclusive(start, stop, step)
3213        .map(move |i| (Row::pack_slice(&[Datum::from(i)]), Diff::ONE)))
3214}
3215
3216/// Like
3217/// [`num::range_step_inclusive`](https://github.com/rust-num/num-iter/blob/ddb14c1e796d401014c6c7a727de61d8109ad986/src/lib.rs#L279),
3218/// but for our timestamp types using [`Interval`] for `step`.xwxw
3219#[derive(Clone)]
3220pub struct TimestampRangeStepInclusive<T> {
3221    state: CheckedTimestamp<T>,
3222    stop: CheckedTimestamp<T>,
3223    step: Interval,
3224    rev: bool,
3225    done: bool,
3226}
3227
3228impl<T: TimestampLike> Iterator for TimestampRangeStepInclusive<T> {
3229    type Item = CheckedTimestamp<T>;
3230
3231    #[inline]
3232    fn next(&mut self) -> Option<CheckedTimestamp<T>> {
3233        if !self.done
3234            && ((self.rev && self.state >= self.stop) || (!self.rev && self.state <= self.stop))
3235        {
3236            let result = self.state.clone();
3237            match add_timestamp_months(self.state.deref(), self.step.months) {
3238                Ok(state) => match state.checked_add_signed(self.step.duration_as_chrono()) {
3239                    Some(v) => match CheckedTimestamp::from_timestamplike(v) {
3240                        Ok(v) => self.state = v,
3241                        Err(_) => self.done = true,
3242                    },
3243                    None => self.done = true,
3244                },
3245                Err(..) => {
3246                    self.done = true;
3247                }
3248            }
3249
3250            Some(result)
3251        } else {
3252            None
3253        }
3254    }
3255}
3256
3257fn generate_series_ts<T: TimestampLike>(
3258    start: CheckedTimestamp<T>,
3259    stop: CheckedTimestamp<T>,
3260    step: Interval,
3261    conv: fn(CheckedTimestamp<T>) -> Datum<'static>,
3262) -> Result<impl Iterator<Item = (Row, Diff)>, EvalError> {
3263    let normalized_step = step.as_microseconds();
3264    if normalized_step == 0 {
3265        return Err(EvalError::InvalidParameterValue(
3266            "step size cannot equal zero".into(),
3267        ));
3268    }
3269    let rev = normalized_step < 0;
3270
3271    let trsi = TimestampRangeStepInclusive {
3272        state: start,
3273        stop,
3274        step,
3275        rev,
3276        done: false,
3277    };
3278
3279    Ok(trsi.map(move |i| (Row::pack_slice(&[conv(i)]), Diff::ONE)))
3280}
3281
3282fn generate_subscripts_array(
3283    a: Datum,
3284    dim: i32,
3285) -> Result<Box<dyn Iterator<Item = (Row, Diff)>>, EvalError> {
3286    if dim <= 0 {
3287        return Ok(Box::new(iter::empty()));
3288    }
3289
3290    match a.unwrap_array().dims().into_iter().nth(
3291        (dim - 1)
3292            .try_into()
3293            .map_err(|_| EvalError::Int32OutOfRange((dim - 1).to_string().into()))?,
3294    ) {
3295        Some(requested_dim) => Ok(Box::new(generate_series::<i32>(
3296            requested_dim.lower_bound.try_into().map_err(|_| {
3297                EvalError::Int32OutOfRange(requested_dim.lower_bound.to_string().into())
3298            })?,
3299            requested_dim
3300                .length
3301                .try_into()
3302                .map_err(|_| EvalError::Int32OutOfRange(requested_dim.length.to_string().into()))?,
3303            1,
3304        )?)),
3305        None => Ok(Box::new(iter::empty())),
3306    }
3307}
3308
3309fn unnest_array<'a>(a: Datum<'a>) -> impl Iterator<Item = (Row, Diff)> + 'a {
3310    a.unwrap_array()
3311        .elements()
3312        .iter()
3313        .map(move |e| (Row::pack_slice(&[e]), Diff::ONE))
3314}
3315
3316fn unnest_list<'a>(a: Datum<'a>) -> impl Iterator<Item = (Row, Diff)> + 'a {
3317    a.unwrap_list()
3318        .iter()
3319        .map(move |e| (Row::pack_slice(&[e]), Diff::ONE))
3320}
3321
3322fn unnest_map<'a>(a: Datum<'a>) -> impl Iterator<Item = (Row, Diff)> + 'a {
3323    a.unwrap_map()
3324        .iter()
3325        .map(move |(k, v)| (Row::pack_slice(&[Datum::from(k), v]), Diff::ONE))
3326}
3327
3328impl AggregateFunc {
3329    /// The base function name without the `~[...]` suffix used when rendering
3330    /// variants that represent a parameterized function family.
3331    pub fn name(&self) -> &'static str {
3332        match self {
3333            Self::MaxNumeric => "max",
3334            Self::MaxInt16 => "max",
3335            Self::MaxInt32 => "max",
3336            Self::MaxInt64 => "max",
3337            Self::MaxUInt16 => "max",
3338            Self::MaxUInt32 => "max",
3339            Self::MaxUInt64 => "max",
3340            Self::MaxMzTimestamp => "max",
3341            Self::MaxFloat32 => "max",
3342            Self::MaxFloat64 => "max",
3343            Self::MaxBool => "max",
3344            Self::MaxString => "max",
3345            Self::MaxDate => "max",
3346            Self::MaxTimestamp => "max",
3347            Self::MaxTimestampTz => "max",
3348            Self::MaxInterval => "max",
3349            Self::MaxTime => "max",
3350            Self::MinNumeric => "min",
3351            Self::MinInt16 => "min",
3352            Self::MinInt32 => "min",
3353            Self::MinInt64 => "min",
3354            Self::MinUInt16 => "min",
3355            Self::MinUInt32 => "min",
3356            Self::MinUInt64 => "min",
3357            Self::MinMzTimestamp => "min",
3358            Self::MinFloat32 => "min",
3359            Self::MinFloat64 => "min",
3360            Self::MinBool => "min",
3361            Self::MinString => "min",
3362            Self::MinDate => "min",
3363            Self::MinTimestamp => "min",
3364            Self::MinTimestampTz => "min",
3365            Self::MinInterval => "min",
3366            Self::MinTime => "min",
3367            Self::SumInt16 => "sum",
3368            Self::SumInt32 => "sum",
3369            Self::SumInt64 => "sum",
3370            Self::SumUInt16 => "sum",
3371            Self::SumUInt32 => "sum",
3372            Self::SumUInt64 => "sum",
3373            Self::SumFloat32 => "sum",
3374            Self::SumFloat64 => "sum",
3375            Self::SumNumeric => "sum",
3376            Self::Count => "count",
3377            Self::Any => "any",
3378            Self::All => "all",
3379            Self::JsonbAgg { .. } => "jsonb_agg",
3380            Self::JsonbObjectAgg { .. } => "jsonb_object_agg",
3381            Self::MapAgg { .. } => "map_agg",
3382            Self::ArrayConcat { .. } => "array_agg",
3383            Self::ListConcat { .. } => "list_agg",
3384            Self::StringAgg { .. } => "string_agg",
3385            Self::RowNumber { .. } => "row_number",
3386            Self::Rank { .. } => "rank",
3387            Self::DenseRank { .. } => "dense_rank",
3388            Self::LagLead {
3389                lag_lead: LagLeadType::Lag,
3390                ..
3391            } => "lag",
3392            Self::LagLead {
3393                lag_lead: LagLeadType::Lead,
3394                ..
3395            } => "lead",
3396            Self::FirstValue { .. } => "first_value",
3397            Self::LastValue { .. } => "last_value",
3398            Self::WindowAggregate { .. } => "window_agg",
3399            Self::FusedValueWindowFunc { .. } => "fused_value_window_func",
3400            Self::FusedWindowAggregate { .. } => "fused_window_agg",
3401            Self::Dummy => "dummy",
3402        }
3403    }
3404}
3405
3406impl<'a, M> fmt::Display for HumanizedExpr<'a, AggregateFunc, M>
3407where
3408    M: HumanizerMode,
3409{
3410    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
3411        use AggregateFunc::*;
3412        let name = self.expr.name();
3413        match self.expr {
3414            JsonbAgg { order_by }
3415            | JsonbObjectAgg { order_by }
3416            | MapAgg { order_by, .. }
3417            | ArrayConcat { order_by }
3418            | ListConcat { order_by }
3419            | StringAgg { order_by }
3420            | RowNumber { order_by }
3421            | Rank { order_by }
3422            | DenseRank { order_by } => {
3423                let order_by = order_by.iter().map(|col| self.child(col));
3424                write!(f, "{}[order_by=[{}]]", name, separated(", ", order_by))
3425            }
3426            LagLead {
3427                lag_lead: _,
3428                ignore_nulls,
3429                order_by,
3430            } => {
3431                let order_by = order_by.iter().map(|col| self.child(col));
3432                f.write_str(name)?;
3433                f.write_str("[")?;
3434                if *ignore_nulls {
3435                    f.write_str("ignore_nulls=true, ")?;
3436                }
3437                write!(f, "order_by=[{}]", separated(", ", order_by))?;
3438                f.write_str("]")
3439            }
3440            FirstValue {
3441                order_by,
3442                window_frame,
3443            } => {
3444                let order_by = order_by.iter().map(|col| self.child(col));
3445                f.write_str(name)?;
3446                f.write_str("[")?;
3447                write!(f, "order_by=[{}]", separated(", ", order_by))?;
3448                if *window_frame != WindowFrame::default() {
3449                    write!(f, " {}", window_frame)?;
3450                }
3451                f.write_str("]")
3452            }
3453            LastValue {
3454                order_by,
3455                window_frame,
3456            } => {
3457                let order_by = order_by.iter().map(|col| self.child(col));
3458                f.write_str(name)?;
3459                f.write_str("[")?;
3460                write!(f, "order_by=[{}]", separated(", ", order_by))?;
3461                if *window_frame != WindowFrame::default() {
3462                    write!(f, " {}", window_frame)?;
3463                }
3464                f.write_str("]")
3465            }
3466            WindowAggregate {
3467                wrapped_aggregate,
3468                order_by,
3469                window_frame,
3470            } => {
3471                let order_by = order_by.iter().map(|col| self.child(col));
3472                let wrapped_aggregate = self.child(wrapped_aggregate.deref());
3473                f.write_str(name)?;
3474                f.write_str("[")?;
3475                write!(f, "{} ", wrapped_aggregate)?;
3476                write!(f, "order_by=[{}]", separated(", ", order_by))?;
3477                if *window_frame != WindowFrame::default() {
3478                    write!(f, " {}", window_frame)?;
3479                }
3480                f.write_str("]")
3481            }
3482            FusedValueWindowFunc { funcs, order_by } => {
3483                let order_by = order_by.iter().map(|col| self.child(col));
3484                let funcs = separated(", ", funcs.iter().map(|func| self.child(func)));
3485                f.write_str(name)?;
3486                f.write_str("[")?;
3487                write!(f, "{} ", funcs)?;
3488                write!(f, "order_by=[{}]", separated(", ", order_by))?;
3489                f.write_str("]")
3490            }
3491            _ => f.write_str(name),
3492        }
3493    }
3494}
3495
3496#[derive(
3497    Arbitrary, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize, Hash, MzReflect,
3498)]
3499pub struct CaptureGroupDesc {
3500    pub index: u32,
3501    pub name: Option<String>,
3502    pub nullable: bool,
3503}
3504
3505impl RustType<ProtoCaptureGroupDesc> for CaptureGroupDesc {
3506    fn into_proto(&self) -> ProtoCaptureGroupDesc {
3507        ProtoCaptureGroupDesc {
3508            index: self.index,
3509            name: self.name.clone(),
3510            nullable: self.nullable,
3511        }
3512    }
3513
3514    fn from_proto(proto: ProtoCaptureGroupDesc) -> Result<Self, TryFromProtoError> {
3515        Ok(Self {
3516            index: proto.index,
3517            name: proto.name,
3518            nullable: proto.nullable,
3519        })
3520    }
3521}
3522
3523#[derive(
3524    Arbitrary,
3525    Clone,
3526    Copy,
3527    Debug,
3528    Eq,
3529    PartialEq,
3530    Ord,
3531    PartialOrd,
3532    Serialize,
3533    Deserialize,
3534    Hash,
3535    MzReflect,
3536    Default,
3537)]
3538pub struct AnalyzedRegexOpts {
3539    pub case_insensitive: bool,
3540    pub global: bool,
3541}
3542
3543impl FromStr for AnalyzedRegexOpts {
3544    type Err = EvalError;
3545
3546    fn from_str(s: &str) -> Result<Self, Self::Err> {
3547        let mut opts = AnalyzedRegexOpts::default();
3548        for c in s.chars() {
3549            match c {
3550                'i' => opts.case_insensitive = true,
3551                'g' => opts.global = true,
3552                _ => return Err(EvalError::InvalidRegexFlag(c)),
3553            }
3554        }
3555        Ok(opts)
3556    }
3557}
3558
3559impl RustType<ProtoAnalyzedRegexOpts> for AnalyzedRegexOpts {
3560    fn into_proto(&self) -> ProtoAnalyzedRegexOpts {
3561        ProtoAnalyzedRegexOpts {
3562            case_insensitive: self.case_insensitive,
3563            global: self.global,
3564        }
3565    }
3566
3567    fn from_proto(proto: ProtoAnalyzedRegexOpts) -> Result<Self, TryFromProtoError> {
3568        Ok(Self {
3569            case_insensitive: proto.case_insensitive,
3570            global: proto.global,
3571        })
3572    }
3573}
3574
3575#[derive(
3576    Arbitrary, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize, Hash, MzReflect,
3577)]
3578pub struct AnalyzedRegex(
3579    #[proptest(strategy = "mz_repr::adt::regex::any_regex()")] ReprRegex,
3580    Vec<CaptureGroupDesc>,
3581    AnalyzedRegexOpts,
3582);
3583
3584impl RustType<ProtoAnalyzedRegex> for AnalyzedRegex {
3585    fn into_proto(&self) -> ProtoAnalyzedRegex {
3586        ProtoAnalyzedRegex {
3587            regex: Some(self.0.into_proto()),
3588            groups: self.1.into_proto(),
3589            opts: Some(self.2.into_proto()),
3590        }
3591    }
3592
3593    fn from_proto(proto: ProtoAnalyzedRegex) -> Result<Self, TryFromProtoError> {
3594        Ok(AnalyzedRegex(
3595            proto.regex.into_rust_if_some("ProtoAnalyzedRegex::regex")?,
3596            proto.groups.into_rust()?,
3597            proto.opts.into_rust_if_some("ProtoAnalyzedRegex::opts")?,
3598        ))
3599    }
3600}
3601
3602impl AnalyzedRegex {
3603    pub fn new(s: &str, opts: AnalyzedRegexOpts) -> Result<Self, regex::Error> {
3604        let r = ReprRegex::new(s, opts.case_insensitive)?;
3605        // TODO(benesch): remove potentially dangerous usage of `as`.
3606        #[allow(clippy::as_conversions)]
3607        let descs: Vec<_> = r
3608            .capture_names()
3609            .enumerate()
3610            // The first capture is the entire matched string.
3611            // This will often not be useful, so skip it.
3612            // If people want it they can just surround their
3613            // entire regex in an explicit capture group.
3614            .skip(1)
3615            .map(|(i, name)| CaptureGroupDesc {
3616                index: i as u32,
3617                name: name.map(String::from),
3618                // TODO -- we can do better.
3619                // https://github.com/MaterializeInc/database-issues/issues/612
3620                nullable: true,
3621            })
3622            .collect();
3623        Ok(Self(r, descs, opts))
3624    }
3625    pub fn capture_groups_len(&self) -> usize {
3626        self.1.len()
3627    }
3628    pub fn capture_groups_iter(&self) -> impl Iterator<Item = &CaptureGroupDesc> {
3629        self.1.iter()
3630    }
3631    pub fn inner(&self) -> &Regex {
3632        &(self.0).regex
3633    }
3634    pub fn opts(&self) -> &AnalyzedRegexOpts {
3635        &self.2
3636    }
3637}
3638
3639pub fn csv_extract(a: Datum, n_cols: usize) -> impl Iterator<Item = (Row, Diff)> + '_ {
3640    let bytes = a.unwrap_str().as_bytes();
3641    let mut row = Row::default();
3642    let csv_reader = csv::ReaderBuilder::new()
3643        .has_headers(false)
3644        .from_reader(bytes);
3645    csv_reader.into_records().filter_map(move |res| match res {
3646        Ok(sr) if sr.len() == n_cols => {
3647            row.packer().extend(sr.iter().map(Datum::String));
3648            Some((row.clone(), Diff::ONE))
3649        }
3650        _ => None,
3651    })
3652}
3653
3654pub fn repeat(a: Datum) -> Option<(Row, Diff)> {
3655    let n = a.unwrap_int64();
3656    if n != 0 {
3657        Some((Row::default(), n.into()))
3658    } else {
3659        None
3660    }
3661}
3662
3663fn wrap<'a>(datums: &'a [Datum<'a>], width: usize) -> impl Iterator<Item = (Row, Diff)> + 'a {
3664    datums
3665        .chunks(width)
3666        .map(|chunk| (Row::pack(chunk), Diff::ONE))
3667}
3668
3669fn acl_explode<'a>(
3670    acl_items: Datum<'a>,
3671    temp_storage: &'a RowArena,
3672) -> Result<impl Iterator<Item = (Row, Diff)> + 'a, EvalError> {
3673    let acl_items = acl_items.unwrap_array();
3674    let mut res = Vec::new();
3675    for acl_item in acl_items.elements().iter() {
3676        if acl_item.is_null() {
3677            return Err(EvalError::AclArrayNullElement);
3678        }
3679        let acl_item = acl_item.unwrap_acl_item();
3680        for privilege in acl_item.acl_mode.explode() {
3681            let row = [
3682                Datum::UInt32(acl_item.grantor.0),
3683                Datum::UInt32(acl_item.grantee.0),
3684                Datum::String(temp_storage.push_string(privilege.to_string())),
3685                // GRANT OPTION is not implemented, so we hardcode false.
3686                Datum::False,
3687            ];
3688            res.push((Row::pack_slice(&row), Diff::ONE));
3689        }
3690    }
3691    Ok(res.into_iter())
3692}
3693
3694fn mz_acl_explode<'a>(
3695    mz_acl_items: Datum<'a>,
3696    temp_storage: &'a RowArena,
3697) -> Result<impl Iterator<Item = (Row, Diff)> + 'a, EvalError> {
3698    let mz_acl_items = mz_acl_items.unwrap_array();
3699    let mut res = Vec::new();
3700    for mz_acl_item in mz_acl_items.elements().iter() {
3701        if mz_acl_item.is_null() {
3702            return Err(EvalError::MzAclArrayNullElement);
3703        }
3704        let mz_acl_item = mz_acl_item.unwrap_mz_acl_item();
3705        for privilege in mz_acl_item.acl_mode.explode() {
3706            let row = [
3707                Datum::String(temp_storage.push_string(mz_acl_item.grantor.to_string())),
3708                Datum::String(temp_storage.push_string(mz_acl_item.grantee.to_string())),
3709                Datum::String(temp_storage.push_string(privilege.to_string())),
3710                // GRANT OPTION is not implemented, so we hardcode false.
3711                Datum::False,
3712            ];
3713            res.push((Row::pack_slice(&row), Diff::ONE));
3714        }
3715    }
3716    Ok(res.into_iter())
3717}
3718
3719#[derive(
3720    Arbitrary, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize, Hash, MzReflect,
3721)]
3722pub enum TableFunc {
3723    AclExplode,
3724    MzAclExplode,
3725    JsonbEach {
3726        stringify: bool,
3727    },
3728    JsonbObjectKeys,
3729    JsonbArrayElements {
3730        stringify: bool,
3731    },
3732    RegexpExtract(AnalyzedRegex),
3733    CsvExtract(usize),
3734    GenerateSeriesInt32,
3735    GenerateSeriesInt64,
3736    GenerateSeriesTimestamp,
3737    GenerateSeriesTimestampTz,
3738    Repeat,
3739    UnnestArray {
3740        el_typ: ScalarType,
3741    },
3742    UnnestList {
3743        el_typ: ScalarType,
3744    },
3745    UnnestMap {
3746        value_type: ScalarType,
3747    },
3748    /// Given `n` input expressions, wraps them into `n / width` rows, each of
3749    /// `width` columns.
3750    ///
3751    /// This function is not intended to be called directly by end users, but
3752    /// is useful in the planning of e.g. VALUES clauses.
3753    Wrap {
3754        types: Vec<ColumnType>,
3755        width: usize,
3756    },
3757    GenerateSubscriptsArray,
3758    /// Execute some arbitrary scalar function as a table function.
3759    TabletizedScalar {
3760        name: String,
3761        relation: RelationType,
3762    },
3763    RegexpMatches,
3764}
3765
3766impl RustType<ProtoTableFunc> for TableFunc {
3767    fn into_proto(&self) -> ProtoTableFunc {
3768        use proto_table_func::{Kind, ProtoWrap};
3769
3770        ProtoTableFunc {
3771            kind: Some(match self {
3772                TableFunc::AclExplode => Kind::AclExplode(()),
3773                TableFunc::MzAclExplode => Kind::MzAclExplode(()),
3774                TableFunc::JsonbEach { stringify } => Kind::JsonbEach(*stringify),
3775                TableFunc::JsonbObjectKeys => Kind::JsonbObjectKeys(()),
3776                TableFunc::JsonbArrayElements { stringify } => Kind::JsonbArrayElements(*stringify),
3777                TableFunc::RegexpExtract(x) => Kind::RegexpExtract(x.into_proto()),
3778                TableFunc::CsvExtract(x) => Kind::CsvExtract(x.into_proto()),
3779                TableFunc::GenerateSeriesInt32 => Kind::GenerateSeriesInt32(()),
3780                TableFunc::GenerateSeriesInt64 => Kind::GenerateSeriesInt64(()),
3781                TableFunc::GenerateSeriesTimestamp => Kind::GenerateSeriesTimestamp(()),
3782                TableFunc::GenerateSeriesTimestampTz => Kind::GenerateSeriesTimestampTz(()),
3783                TableFunc::Repeat => Kind::Repeat(()),
3784                TableFunc::UnnestArray { el_typ } => Kind::UnnestArray(el_typ.into_proto()),
3785                TableFunc::UnnestList { el_typ } => Kind::UnnestList(el_typ.into_proto()),
3786                TableFunc::UnnestMap { value_type } => Kind::UnnestMap(value_type.into_proto()),
3787                TableFunc::Wrap { types, width } => Kind::Wrap(ProtoWrap {
3788                    types: types.into_proto(),
3789                    width: width.into_proto(),
3790                }),
3791                TableFunc::GenerateSubscriptsArray => Kind::GenerateSubscriptsArray(()),
3792                TableFunc::TabletizedScalar { name, relation } => {
3793                    Kind::TabletizedScalar(ProtoTabletizedScalar {
3794                        name: name.into_proto(),
3795                        relation: Some(relation.into_proto()),
3796                    })
3797                }
3798                TableFunc::RegexpMatches => Kind::RegexpMatches(()),
3799            }),
3800        }
3801    }
3802
3803    fn from_proto(proto: ProtoTableFunc) -> Result<Self, TryFromProtoError> {
3804        use proto_table_func::Kind;
3805
3806        let kind = proto
3807            .kind
3808            .ok_or_else(|| TryFromProtoError::missing_field("ProtoTableFunc::Kind"))?;
3809
3810        Ok(match kind {
3811            Kind::AclExplode(()) => TableFunc::AclExplode,
3812            Kind::MzAclExplode(()) => TableFunc::MzAclExplode,
3813            Kind::JsonbEach(stringify) => TableFunc::JsonbEach { stringify },
3814            Kind::JsonbObjectKeys(()) => TableFunc::JsonbObjectKeys,
3815            Kind::JsonbArrayElements(stringify) => TableFunc::JsonbArrayElements { stringify },
3816            Kind::RegexpExtract(x) => TableFunc::RegexpExtract(x.into_rust()?),
3817            Kind::CsvExtract(x) => TableFunc::CsvExtract(x.into_rust()?),
3818            Kind::GenerateSeriesInt32(()) => TableFunc::GenerateSeriesInt32,
3819            Kind::GenerateSeriesInt64(()) => TableFunc::GenerateSeriesInt64,
3820            Kind::GenerateSeriesTimestamp(()) => TableFunc::GenerateSeriesTimestamp,
3821            Kind::GenerateSeriesTimestampTz(()) => TableFunc::GenerateSeriesTimestampTz,
3822            Kind::Repeat(()) => TableFunc::Repeat,
3823            Kind::UnnestArray(x) => TableFunc::UnnestArray {
3824                el_typ: x.into_rust()?,
3825            },
3826            Kind::UnnestList(x) => TableFunc::UnnestList {
3827                el_typ: x.into_rust()?,
3828            },
3829            Kind::UnnestMap(value_type) => TableFunc::UnnestMap {
3830                value_type: value_type.into_rust()?,
3831            },
3832            Kind::Wrap(x) => TableFunc::Wrap {
3833                width: x.width.into_rust()?,
3834                types: x.types.into_rust()?,
3835            },
3836            Kind::GenerateSubscriptsArray(()) => TableFunc::GenerateSubscriptsArray,
3837            Kind::TabletizedScalar(v) => TableFunc::TabletizedScalar {
3838                name: v.name,
3839                relation: v
3840                    .relation
3841                    .into_rust_if_some("ProtoTabletizedScalar::relation")?,
3842            },
3843            Kind::RegexpMatches(_) => TableFunc::RegexpMatches,
3844        })
3845    }
3846}
3847
3848impl TableFunc {
3849    pub fn eval<'a>(
3850        &'a self,
3851        datums: &'a [Datum<'a>],
3852        temp_storage: &'a RowArena,
3853    ) -> Result<Box<dyn Iterator<Item = (Row, Diff)> + 'a>, EvalError> {
3854        if self.empty_on_null_input() && datums.iter().any(|d| d.is_null()) {
3855            return Ok(Box::new(vec![].into_iter()));
3856        }
3857        match self {
3858            TableFunc::AclExplode => Ok(Box::new(acl_explode(datums[0], temp_storage)?)),
3859            TableFunc::MzAclExplode => Ok(Box::new(mz_acl_explode(datums[0], temp_storage)?)),
3860            TableFunc::JsonbEach { stringify } => {
3861                Ok(Box::new(jsonb_each(datums[0], temp_storage, *stringify)))
3862            }
3863            TableFunc::JsonbObjectKeys => Ok(Box::new(jsonb_object_keys(datums[0]))),
3864            TableFunc::JsonbArrayElements { stringify } => Ok(Box::new(jsonb_array_elements(
3865                datums[0],
3866                temp_storage,
3867                *stringify,
3868            ))),
3869            TableFunc::RegexpExtract(a) => Ok(Box::new(regexp_extract(datums[0], a).into_iter())),
3870            TableFunc::CsvExtract(n_cols) => Ok(Box::new(csv_extract(datums[0], *n_cols))),
3871            TableFunc::GenerateSeriesInt32 => {
3872                let res = generate_series(
3873                    datums[0].unwrap_int32(),
3874                    datums[1].unwrap_int32(),
3875                    datums[2].unwrap_int32(),
3876                )?;
3877                Ok(Box::new(res))
3878            }
3879            TableFunc::GenerateSeriesInt64 => {
3880                let res = generate_series(
3881                    datums[0].unwrap_int64(),
3882                    datums[1].unwrap_int64(),
3883                    datums[2].unwrap_int64(),
3884                )?;
3885                Ok(Box::new(res))
3886            }
3887            TableFunc::GenerateSeriesTimestamp => {
3888                fn pass_through<'a>(d: CheckedTimestamp<NaiveDateTime>) -> Datum<'a> {
3889                    Datum::from(d)
3890                }
3891                let res = generate_series_ts(
3892                    datums[0].unwrap_timestamp(),
3893                    datums[1].unwrap_timestamp(),
3894                    datums[2].unwrap_interval(),
3895                    pass_through,
3896                )?;
3897                Ok(Box::new(res))
3898            }
3899            TableFunc::GenerateSeriesTimestampTz => {
3900                fn gen_ts_tz<'a>(d: CheckedTimestamp<DateTime<Utc>>) -> Datum<'a> {
3901                    Datum::from(d)
3902                }
3903                let res = generate_series_ts(
3904                    datums[0].unwrap_timestamptz(),
3905                    datums[1].unwrap_timestamptz(),
3906                    datums[2].unwrap_interval(),
3907                    gen_ts_tz,
3908                )?;
3909                Ok(Box::new(res))
3910            }
3911            TableFunc::GenerateSubscriptsArray => {
3912                generate_subscripts_array(datums[0], datums[1].unwrap_int32())
3913            }
3914            TableFunc::Repeat => Ok(Box::new(repeat(datums[0]).into_iter())),
3915            TableFunc::UnnestArray { .. } => Ok(Box::new(unnest_array(datums[0]))),
3916            TableFunc::UnnestList { .. } => Ok(Box::new(unnest_list(datums[0]))),
3917            TableFunc::UnnestMap { .. } => Ok(Box::new(unnest_map(datums[0]))),
3918            TableFunc::Wrap { width, .. } => Ok(Box::new(wrap(datums, *width))),
3919            TableFunc::TabletizedScalar { .. } => {
3920                let r = Row::pack_slice(datums);
3921                Ok(Box::new(std::iter::once((r, Diff::ONE))))
3922            }
3923            TableFunc::RegexpMatches => Ok(Box::new(regexp_matches(datums)?)),
3924        }
3925    }
3926
3927    pub fn output_type(&self) -> RelationType {
3928        let (column_types, keys) = match self {
3929            TableFunc::AclExplode => {
3930                let column_types = vec![
3931                    ScalarType::Oid.nullable(false),
3932                    ScalarType::Oid.nullable(false),
3933                    ScalarType::String.nullable(false),
3934                    ScalarType::Bool.nullable(false),
3935                ];
3936                let keys = vec![];
3937                (column_types, keys)
3938            }
3939            TableFunc::MzAclExplode => {
3940                let column_types = vec![
3941                    ScalarType::String.nullable(false),
3942                    ScalarType::String.nullable(false),
3943                    ScalarType::String.nullable(false),
3944                    ScalarType::Bool.nullable(false),
3945                ];
3946                let keys = vec![];
3947                (column_types, keys)
3948            }
3949            TableFunc::JsonbEach { stringify: true } => {
3950                let column_types = vec![
3951                    ScalarType::String.nullable(false),
3952                    ScalarType::String.nullable(true),
3953                ];
3954                let keys = vec![];
3955                (column_types, keys)
3956            }
3957            TableFunc::JsonbEach { stringify: false } => {
3958                let column_types = vec![
3959                    ScalarType::String.nullable(false),
3960                    ScalarType::Jsonb.nullable(false),
3961                ];
3962                let keys = vec![];
3963                (column_types, keys)
3964            }
3965            TableFunc::JsonbObjectKeys => {
3966                let column_types = vec![ScalarType::String.nullable(false)];
3967                let keys = vec![];
3968                (column_types, keys)
3969            }
3970            TableFunc::JsonbArrayElements { stringify: true } => {
3971                let column_types = vec![ScalarType::String.nullable(true)];
3972                let keys = vec![];
3973                (column_types, keys)
3974            }
3975            TableFunc::JsonbArrayElements { stringify: false } => {
3976                let column_types = vec![ScalarType::Jsonb.nullable(false)];
3977                let keys = vec![];
3978                (column_types, keys)
3979            }
3980            TableFunc::RegexpExtract(a) => {
3981                let column_types = a
3982                    .capture_groups_iter()
3983                    .map(|cg| ScalarType::String.nullable(cg.nullable))
3984                    .collect();
3985                let keys = vec![];
3986                (column_types, keys)
3987            }
3988            TableFunc::CsvExtract(n_cols) => {
3989                let column_types = iter::repeat(ScalarType::String.nullable(false))
3990                    .take(*n_cols)
3991                    .collect();
3992                let keys = vec![];
3993                (column_types, keys)
3994            }
3995            TableFunc::GenerateSeriesInt32 => {
3996                let column_types = vec![ScalarType::Int32.nullable(false)];
3997                let keys = vec![vec![0]];
3998                (column_types, keys)
3999            }
4000            TableFunc::GenerateSeriesInt64 => {
4001                let column_types = vec![ScalarType::Int64.nullable(false)];
4002                let keys = vec![vec![0]];
4003                (column_types, keys)
4004            }
4005            TableFunc::GenerateSeriesTimestamp => {
4006                let column_types = vec![ScalarType::Timestamp { precision: None }.nullable(false)];
4007                let keys = vec![vec![0]];
4008                (column_types, keys)
4009            }
4010            TableFunc::GenerateSeriesTimestampTz => {
4011                let column_types =
4012                    vec![ScalarType::TimestampTz { precision: None }.nullable(false)];
4013                let keys = vec![vec![0]];
4014                (column_types, keys)
4015            }
4016            TableFunc::GenerateSubscriptsArray => {
4017                let column_types = vec![ScalarType::Int32.nullable(false)];
4018                let keys = vec![vec![0]];
4019                (column_types, keys)
4020            }
4021            TableFunc::Repeat => {
4022                let column_types = vec![];
4023                let keys = vec![];
4024                (column_types, keys)
4025            }
4026            TableFunc::UnnestArray { el_typ } => {
4027                let column_types = vec![el_typ.clone().nullable(true)];
4028                let keys = vec![];
4029                (column_types, keys)
4030            }
4031            TableFunc::UnnestList { el_typ } => {
4032                let column_types = vec![el_typ.clone().nullable(true)];
4033                let keys = vec![];
4034                (column_types, keys)
4035            }
4036            TableFunc::UnnestMap { value_type } => {
4037                let column_types = vec![
4038                    ScalarType::String.nullable(false),
4039                    value_type.clone().nullable(true),
4040                ];
4041                let keys = vec![vec![0]];
4042                (column_types, keys)
4043            }
4044            TableFunc::Wrap { types, .. } => {
4045                let column_types = types.clone();
4046                let keys = vec![];
4047                (column_types, keys)
4048            }
4049            TableFunc::TabletizedScalar { relation, .. } => {
4050                return relation.clone();
4051            }
4052            TableFunc::RegexpMatches => {
4053                let column_types =
4054                    vec![ScalarType::Array(Box::new(ScalarType::String)).nullable(false)];
4055                let keys = vec![];
4056
4057                (column_types, keys)
4058            }
4059        };
4060
4061        if !keys.is_empty() {
4062            RelationType::new(column_types).with_keys(keys)
4063        } else {
4064            RelationType::new(column_types)
4065        }
4066    }
4067
4068    pub fn output_arity(&self) -> usize {
4069        match self {
4070            TableFunc::AclExplode => 4,
4071            TableFunc::MzAclExplode => 4,
4072            TableFunc::JsonbEach { .. } => 2,
4073            TableFunc::JsonbObjectKeys => 1,
4074            TableFunc::JsonbArrayElements { .. } => 1,
4075            TableFunc::RegexpExtract(a) => a.capture_groups_len(),
4076            TableFunc::CsvExtract(n_cols) => *n_cols,
4077            TableFunc::GenerateSeriesInt32 => 1,
4078            TableFunc::GenerateSeriesInt64 => 1,
4079            TableFunc::GenerateSeriesTimestamp => 1,
4080            TableFunc::GenerateSeriesTimestampTz => 1,
4081            TableFunc::GenerateSubscriptsArray => 1,
4082            TableFunc::Repeat => 0,
4083            TableFunc::UnnestArray { .. } => 1,
4084            TableFunc::UnnestList { .. } => 1,
4085            TableFunc::UnnestMap { .. } => 2,
4086            TableFunc::Wrap { width, .. } => *width,
4087            TableFunc::TabletizedScalar { relation, .. } => relation.column_types.len(),
4088            TableFunc::RegexpMatches => 1,
4089        }
4090    }
4091
4092    pub fn empty_on_null_input(&self) -> bool {
4093        match self {
4094            TableFunc::AclExplode
4095            | TableFunc::MzAclExplode
4096            | TableFunc::JsonbEach { .. }
4097            | TableFunc::JsonbObjectKeys
4098            | TableFunc::JsonbArrayElements { .. }
4099            | TableFunc::GenerateSeriesInt32
4100            | TableFunc::GenerateSeriesInt64
4101            | TableFunc::GenerateSeriesTimestamp
4102            | TableFunc::GenerateSeriesTimestampTz
4103            | TableFunc::GenerateSubscriptsArray
4104            | TableFunc::RegexpExtract(_)
4105            | TableFunc::CsvExtract(_)
4106            | TableFunc::Repeat
4107            | TableFunc::UnnestArray { .. }
4108            | TableFunc::UnnestList { .. }
4109            | TableFunc::UnnestMap { .. }
4110            | TableFunc::RegexpMatches => true,
4111            TableFunc::Wrap { .. } => false,
4112            TableFunc::TabletizedScalar { .. } => false,
4113        }
4114    }
4115
4116    /// True iff the table function preserves the append-only property of its input.
4117    pub fn preserves_monotonicity(&self) -> bool {
4118        // Most variants preserve monotonicity, but all variants are enumerated to
4119        // ensure that added variants at least check that this is the case.
4120        match self {
4121            TableFunc::AclExplode => false,
4122            TableFunc::MzAclExplode => false,
4123            TableFunc::JsonbEach { .. } => true,
4124            TableFunc::JsonbObjectKeys => true,
4125            TableFunc::JsonbArrayElements { .. } => true,
4126            TableFunc::RegexpExtract(_) => true,
4127            TableFunc::CsvExtract(_) => true,
4128            TableFunc::GenerateSeriesInt32 => true,
4129            TableFunc::GenerateSeriesInt64 => true,
4130            TableFunc::GenerateSeriesTimestamp => true,
4131            TableFunc::GenerateSeriesTimestampTz => true,
4132            TableFunc::GenerateSubscriptsArray => true,
4133            TableFunc::Repeat => false,
4134            TableFunc::UnnestArray { .. } => true,
4135            TableFunc::UnnestList { .. } => true,
4136            TableFunc::UnnestMap { .. } => true,
4137            TableFunc::Wrap { .. } => true,
4138            TableFunc::TabletizedScalar { .. } => true,
4139            TableFunc::RegexpMatches => true,
4140        }
4141    }
4142}
4143
4144impl fmt::Display for TableFunc {
4145    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
4146        match self {
4147            TableFunc::AclExplode => f.write_str("aclexplode"),
4148            TableFunc::MzAclExplode => f.write_str("mz_aclexplode"),
4149            TableFunc::JsonbEach { .. } => f.write_str("jsonb_each"),
4150            TableFunc::JsonbObjectKeys => f.write_str("jsonb_object_keys"),
4151            TableFunc::JsonbArrayElements { .. } => f.write_str("jsonb_array_elements"),
4152            TableFunc::RegexpExtract(a) => write!(f, "regexp_extract({:?}, _)", a.0),
4153            TableFunc::CsvExtract(n_cols) => write!(f, "csv_extract({}, _)", n_cols),
4154            TableFunc::GenerateSeriesInt32 => f.write_str("generate_series"),
4155            TableFunc::GenerateSeriesInt64 => f.write_str("generate_series"),
4156            TableFunc::GenerateSeriesTimestamp => f.write_str("generate_series"),
4157            TableFunc::GenerateSeriesTimestampTz => f.write_str("generate_series"),
4158            TableFunc::GenerateSubscriptsArray => f.write_str("generate_subscripts"),
4159            TableFunc::Repeat => f.write_str("repeat_row"),
4160            TableFunc::UnnestArray { .. } => f.write_str("unnest_array"),
4161            TableFunc::UnnestList { .. } => f.write_str("unnest_list"),
4162            TableFunc::UnnestMap { .. } => f.write_str("unnest_map"),
4163            TableFunc::Wrap { width, .. } => write!(f, "wrap{}", width),
4164            TableFunc::TabletizedScalar { name, .. } => f.write_str(name),
4165            TableFunc::RegexpMatches => write!(f, "regexp_matches(_, _, _)"),
4166        }
4167    }
4168}
4169
4170#[cfg(test)]
4171mod tests {
4172    use super::{AggregateFunc, ProtoAggregateFunc, ProtoTableFunc, TableFunc};
4173    use mz_ore::assert_ok;
4174    use mz_proto::protobuf_roundtrip;
4175    use proptest::prelude::*;
4176
4177    proptest! {
4178       #[mz_ore::test]
4179        #[cfg_attr(miri, ignore)] // too slow
4180        fn aggregate_func_protobuf_roundtrip(expect in any::<AggregateFunc>() ) {
4181            let actual = protobuf_roundtrip::<_, ProtoAggregateFunc>(&expect);
4182            assert_ok!(actual);
4183            assert_eq!(actual.unwrap(), expect);
4184        }
4185    }
4186
4187    proptest! {
4188       #[mz_ore::test]
4189        #[cfg_attr(miri, ignore)] // too slow
4190        fn table_func_protobuf_roundtrip(expect in any::<TableFunc>() ) {
4191            let actual = protobuf_roundtrip::<_, ProtoTableFunc>(&expect);
4192            assert_ok!(actual);
4193            assert_eq!(actual.unwrap(), expect);
4194        }
4195    }
4196}