mz_expr/relation/
func.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10#![allow(missing_docs)]
11
12use std::cmp::{max, min};
13use std::iter::Sum;
14use std::ops::Deref;
15use std::str::FromStr;
16use std::{fmt, iter};
17
18use chrono::{DateTime, NaiveDateTime, NaiveTime, Utc};
19use dec::OrderedDecimal;
20use itertools::{Either, Itertools};
21use mz_lowertest::MzReflect;
22use mz_ore::cast::CastFrom;
23
24use mz_ore::str::separated;
25use mz_ore::{soft_assert_eq_no_log, soft_assert_or_log};
26use mz_repr::adt::array::ArrayDimension;
27use mz_repr::adt::date::Date;
28use mz_repr::adt::interval::Interval;
29use mz_repr::adt::numeric::{self, Numeric, NumericMaxScale};
30use mz_repr::adt::regex::Regex as ReprRegex;
31use mz_repr::adt::timestamp::{CheckedTimestamp, TimestampLike};
32use mz_repr::{
33    ColumnName, Datum, Diff, Row, RowArena, RowPacker, SharedRow, SqlColumnType, SqlRelationType,
34    SqlScalarType, datum_size,
35};
36use num::{CheckedAdd, Integer, Signed, ToPrimitive};
37use ordered_float::OrderedFloat;
38use regex::Regex;
39use serde::{Deserialize, Serialize};
40use smallvec::SmallVec;
41
42use crate::EvalError;
43use crate::WindowFrameBound::{
44    CurrentRow, OffsetFollowing, OffsetPreceding, UnboundedFollowing, UnboundedPreceding,
45};
46use crate::WindowFrameUnits::{Groups, Range, Rows};
47use crate::explain::{HumanizedExpr, HumanizerMode};
48use crate::relation::{
49    ColumnOrder, WindowFrame, WindowFrameBound, WindowFrameUnits, compare_columns,
50};
51use crate::scalar::func::{add_timestamp_months, jsonb_stringify};
52
53// TODO(jamii) be careful about overflow in sum/avg
54// see https://timely.zulipchat.com/#narrow/stream/186635-engineering/topic/additional.20work/near/163507435
55
56fn max_string<'a, I>(datums: I) -> Datum<'a>
57where
58    I: IntoIterator<Item = Datum<'a>>,
59{
60    match datums
61        .into_iter()
62        .filter(|d| !d.is_null())
63        .max_by(|a, b| a.unwrap_str().cmp(b.unwrap_str()))
64    {
65        Some(datum) => datum,
66        None => Datum::Null,
67    }
68}
69
70fn max_datum<'a, I, DatumType>(datums: I) -> Datum<'a>
71where
72    I: IntoIterator<Item = Datum<'a>>,
73    DatumType: TryFrom<Datum<'a>> + Ord,
74    <DatumType as TryFrom<Datum<'a>>>::Error: std::fmt::Debug,
75    Datum<'a>: From<Option<DatumType>>,
76{
77    let x: Option<DatumType> = datums
78        .into_iter()
79        .filter(|d| !d.is_null())
80        .map(|d| DatumType::try_from(d).expect("unexpected type"))
81        .max();
82
83    x.into()
84}
85
86fn min_datum<'a, I, DatumType>(datums: I) -> Datum<'a>
87where
88    I: IntoIterator<Item = Datum<'a>>,
89    DatumType: TryFrom<Datum<'a>> + Ord,
90    <DatumType as TryFrom<Datum<'a>>>::Error: std::fmt::Debug,
91    Datum<'a>: From<Option<DatumType>>,
92{
93    let x: Option<DatumType> = datums
94        .into_iter()
95        .filter(|d| !d.is_null())
96        .map(|d| DatumType::try_from(d).expect("unexpected type"))
97        .min();
98
99    x.into()
100}
101
102fn min_string<'a, I>(datums: I) -> Datum<'a>
103where
104    I: IntoIterator<Item = Datum<'a>>,
105{
106    match datums
107        .into_iter()
108        .filter(|d| !d.is_null())
109        .min_by(|a, b| a.unwrap_str().cmp(b.unwrap_str()))
110    {
111        Some(datum) => datum,
112        None => Datum::Null,
113    }
114}
115
116fn sum_datum<'a, I, DatumType, ResultType>(datums: I) -> Datum<'a>
117where
118    I: IntoIterator<Item = Datum<'a>>,
119    DatumType: TryFrom<Datum<'a>>,
120    <DatumType as TryFrom<Datum<'a>>>::Error: std::fmt::Debug,
121    ResultType: From<DatumType> + Sum + Into<Datum<'a>>,
122{
123    let mut datums = datums.into_iter().filter(|d| !d.is_null()).peekable();
124    if datums.peek().is_none() {
125        Datum::Null
126    } else {
127        let x = datums
128            .map(|d| ResultType::from(DatumType::try_from(d).expect("unexpected type")))
129            .sum::<ResultType>();
130        x.into()
131    }
132}
133
134fn sum_numeric<'a, I>(datums: I) -> Datum<'a>
135where
136    I: IntoIterator<Item = Datum<'a>>,
137{
138    let mut cx = numeric::cx_datum();
139    let mut sum = Numeric::zero();
140    let mut empty = true;
141    for d in datums {
142        if !d.is_null() {
143            empty = false;
144            cx.add(&mut sum, &d.unwrap_numeric().0);
145        }
146    }
147    match empty {
148        true => Datum::Null,
149        false => Datum::from(sum),
150    }
151}
152
153// TODO(benesch): remove potentially dangerous usage of `as`.
154#[allow(clippy::as_conversions)]
155fn count<'a, I>(datums: I) -> Datum<'a>
156where
157    I: IntoIterator<Item = Datum<'a>>,
158{
159    // TODO(jkosh44) This should error when the count can't fit inside of an `i64` instead of returning a negative result.
160    let x: i64 = datums.into_iter().filter(|d| !d.is_null()).count() as i64;
161    Datum::from(x)
162}
163
164fn any<'a, I>(datums: I) -> Datum<'a>
165where
166    I: IntoIterator<Item = Datum<'a>>,
167{
168    datums
169        .into_iter()
170        .fold(Datum::False, |state, next| match (state, next) {
171            (Datum::True, _) | (_, Datum::True) => Datum::True,
172            (Datum::Null, _) | (_, Datum::Null) => Datum::Null,
173            _ => Datum::False,
174        })
175}
176
177fn all<'a, I>(datums: I) -> Datum<'a>
178where
179    I: IntoIterator<Item = Datum<'a>>,
180{
181    datums
182        .into_iter()
183        .fold(Datum::True, |state, next| match (state, next) {
184            (Datum::False, _) | (_, Datum::False) => Datum::False,
185            (Datum::Null, _) | (_, Datum::Null) => Datum::Null,
186            _ => Datum::True,
187        })
188}
189
190fn string_agg<'a, I>(datums: I, temp_storage: &'a RowArena, order_by: &[ColumnOrder]) -> Datum<'a>
191where
192    I: IntoIterator<Item = Datum<'a>>,
193{
194    const EMPTY_SEP: &str = "";
195
196    let datums = order_aggregate_datums(datums, order_by);
197    let mut sep_value_pairs = datums.into_iter().filter_map(|d| {
198        if d.is_null() {
199            return None;
200        }
201        let mut value_sep = d.unwrap_list().iter();
202        match (value_sep.next().unwrap(), value_sep.next().unwrap()) {
203            (Datum::Null, _) => None,
204            (Datum::String(val), Datum::Null) => Some((EMPTY_SEP, val)),
205            (Datum::String(val), Datum::String(sep)) => Some((sep, val)),
206            _ => unreachable!(),
207        }
208    });
209
210    let mut s = String::default();
211    match sep_value_pairs.next() {
212        // First value not prefixed by its separator
213        Some((_, value)) => s.push_str(value),
214        // If no non-null values sent, return NULL.
215        None => return Datum::Null,
216    }
217
218    for (sep, value) in sep_value_pairs {
219        s.push_str(sep);
220        s.push_str(value);
221    }
222
223    Datum::String(temp_storage.push_string(s))
224}
225
226fn jsonb_agg<'a, I>(datums: I, temp_storage: &'a RowArena, order_by: &[ColumnOrder]) -> Datum<'a>
227where
228    I: IntoIterator<Item = Datum<'a>>,
229{
230    let datums = order_aggregate_datums(datums, order_by);
231    temp_storage.make_datum(|packer| {
232        packer.push_list(datums.into_iter().filter(|d| !d.is_null()));
233    })
234}
235
236fn dict_agg<'a, I>(datums: I, temp_storage: &'a RowArena, order_by: &[ColumnOrder]) -> Datum<'a>
237where
238    I: IntoIterator<Item = Datum<'a>>,
239{
240    let datums = order_aggregate_datums(datums, order_by);
241    temp_storage.make_datum(|packer| {
242        let mut datums: Vec<_> = datums
243            .into_iter()
244            .filter_map(|d| {
245                if d.is_null() {
246                    return None;
247                }
248                let mut list = d.unwrap_list().iter();
249                let key = list.next().unwrap();
250                let val = list.next().unwrap();
251                if key.is_null() {
252                    // TODO(benesch): this should produce an error, but
253                    // aggregate functions cannot presently produce errors.
254                    None
255                } else {
256                    Some((key.unwrap_str(), val))
257                }
258            })
259            .collect();
260        // datums are ordered by any ORDER BY clause now, and we want to preserve
261        // the last entry for each key, but we also need to present unique and sorted
262        // keys to push_dict. Use sort_by here, which is stable, and so will preserve
263        // the ORDER BY order. Then reverse and dedup to retain the last of each
264        // key. Reverse again so we're back in push_dict order.
265        datums.sort_by_key(|(k, _v)| *k);
266        datums.reverse();
267        datums.dedup_by_key(|(k, _v)| *k);
268        datums.reverse();
269        packer.push_dict(datums);
270    })
271}
272
273/// Assuming datums is a List, sort them by the 2nd through Nth elements
274/// corresponding to order_by, then return the 1st element.
275///
276/// Near the usages of this function, we sometimes want to produce Datums with a shorter lifetime
277/// than 'a. We have to actually perform the shortening of the lifetime here, inside this function,
278/// because if we were to simply return `impl Iterator<Item = Datum<'a>>`, that wouldn't be
279/// covariant in the item type, because opaque types are always invariant. (Contrast this with how
280/// we perform the shortening _inside_ this function: the input of the `map` is known to
281/// specifically be `std::vec::IntoIter`, which is known to be covariant.)
282pub fn order_aggregate_datums<'a: 'b, 'b, I>(
283    datums: I,
284    order_by: &[ColumnOrder],
285) -> impl Iterator<Item = Datum<'b>>
286where
287    I: IntoIterator<Item = Datum<'a>>,
288{
289    order_aggregate_datums_with_rank_inner(datums, order_by)
290        .into_iter()
291        // (`payload` is coerced here to `Datum<'b>` in the argument of the closure)
292        .map(|(payload, _order_datums)| payload)
293}
294
295/// Assuming datums is a List, sort them by the 2nd through Nth elements
296/// corresponding to order_by, then return the 1st element and computed order by expression.
297fn order_aggregate_datums_with_rank<'a, I>(
298    datums: I,
299    order_by: &[ColumnOrder],
300) -> impl Iterator<Item = (Datum<'a>, Row)>
301where
302    I: IntoIterator<Item = Datum<'a>>,
303{
304    order_aggregate_datums_with_rank_inner(datums, order_by)
305        .into_iter()
306        .map(|(payload, order_by_datums)| (payload, Row::pack(order_by_datums)))
307}
308
309fn order_aggregate_datums_with_rank_inner<'a, I>(
310    datums: I,
311    order_by: &[ColumnOrder],
312) -> Vec<(Datum<'a>, Vec<Datum<'a>>)>
313where
314    I: IntoIterator<Item = Datum<'a>>,
315{
316    let mut decoded: Vec<(Datum, Vec<Datum>)> = datums
317        .into_iter()
318        .map(|d| {
319            let list = d.unwrap_list();
320            let mut list_it = list.iter();
321            let payload = list_it.next().unwrap();
322
323            // We decode the order_by Datums here instead of the comparison function, because the
324            // comparison function is expected to be called `O(log n)` times on each input row.
325            // The only downside is that the decoded data might be bigger, but I think that's fine,
326            // because:
327            // - if we have a window partition so big that this would create a memory problem, then
328            //   the non-incrementalness of window functions will create a serious CPU problem
329            //   anyway,
330            // - and anyhow various other parts of the window function code already do decoding
331            //   upfront.
332            let mut order_by_datums = Vec::with_capacity(order_by.len());
333            for _ in 0..order_by.len() {
334                order_by_datums.push(
335                    list_it
336                        .next()
337                        .expect("must have exactly the same number of Datums as `order_by`"),
338                );
339            }
340
341            (payload, order_by_datums)
342        })
343        .collect();
344
345    let mut sort_by =
346        |(payload_left, left_order_by_datums): &(Datum, Vec<Datum>),
347         (payload_right, right_order_by_datums): &(Datum, Vec<Datum>)| {
348            compare_columns(
349                order_by,
350                left_order_by_datums,
351                right_order_by_datums,
352                || payload_left.cmp(payload_right),
353            )
354        };
355    // `sort_unstable_by` can be faster and uses less memory than `sort_by`. An unstable sort is
356    // enough here, because if two elements are equal in our `compare` function, then the elements
357    // are actually binary-equal (because of the `tiebreaker` given to `compare_columns`), so it
358    // doesn't matter what order they end up in.
359    decoded.sort_unstable_by(&mut sort_by);
360    decoded
361}
362
363fn array_concat<'a, I>(datums: I, temp_storage: &'a RowArena, order_by: &[ColumnOrder]) -> Datum<'a>
364where
365    I: IntoIterator<Item = Datum<'a>>,
366{
367    let datums = order_aggregate_datums(datums, order_by);
368    let datums: Vec<_> = datums
369        .into_iter()
370        .map(|d| d.unwrap_array().elements().iter())
371        .flatten()
372        .collect();
373    let dims = ArrayDimension {
374        lower_bound: 1,
375        length: datums.len(),
376    };
377    temp_storage.make_datum(|packer| {
378        packer.try_push_array(&[dims], datums).unwrap();
379    })
380}
381
382fn list_concat<'a, I>(datums: I, temp_storage: &'a RowArena, order_by: &[ColumnOrder]) -> Datum<'a>
383where
384    I: IntoIterator<Item = Datum<'a>>,
385{
386    let datums = order_aggregate_datums(datums, order_by);
387    temp_storage.make_datum(|packer| {
388        packer.push_list(datums.into_iter().map(|d| d.unwrap_list().iter()).flatten());
389    })
390}
391
392/// The expected input is in the format of `[((OriginalRow, [EncodedArgs]), OrderByExprs...)]`
393/// The output is in the format of `[result_value, original_row]`.
394/// See an example at `lag_lead`, where the input-output formats are similar.
395fn row_number<'a, I>(
396    datums: I,
397    callers_temp_storage: &'a RowArena,
398    order_by: &[ColumnOrder],
399) -> Datum<'a>
400where
401    I: IntoIterator<Item = Datum<'a>>,
402{
403    // We want to use our own temp_storage here, to avoid flooding `callers_temp_storage` with a
404    // large number of new datums. This is because we don't want to make an assumption about
405    // whether the caller creates a new temp_storage between window partitions.
406    let temp_storage = RowArena::new();
407    let datums = row_number_no_list(datums, &temp_storage, order_by);
408
409    callers_temp_storage.make_datum(|packer| {
410        packer.push_list(datums);
411    })
412}
413
414/// Like `row_number`, but doesn't perform the final wrapping in a list, returning an Iterator
415/// instead.
416fn row_number_no_list<'a: 'b, 'b, I>(
417    datums: I,
418    callers_temp_storage: &'b RowArena,
419    order_by: &[ColumnOrder],
420) -> impl Iterator<Item = Datum<'b>>
421where
422    I: IntoIterator<Item = Datum<'a>>,
423{
424    let datums = order_aggregate_datums(datums, order_by);
425
426    callers_temp_storage.reserve(datums.size_hint().0);
427    #[allow(clippy::disallowed_methods)]
428    datums
429        .into_iter()
430        .map(|d| d.unwrap_list().iter())
431        .flatten()
432        .zip(1i64..)
433        .map(|(d, i)| {
434            callers_temp_storage.make_datum(|packer| {
435                packer.push_list_with(|packer| {
436                    packer.push(Datum::Int64(i));
437                    packer.push(d);
438                });
439            })
440        })
441}
442
443/// The expected input is in the format of `[((OriginalRow, [EncodedArgs]), OrderByExprs...)]`
444/// The output is in the format of `[result_value, original_row]`.
445/// See an example at `lag_lead`, where the input-output formats are similar.
446fn rank<'a, I>(datums: I, callers_temp_storage: &'a RowArena, order_by: &[ColumnOrder]) -> Datum<'a>
447where
448    I: IntoIterator<Item = Datum<'a>>,
449{
450    let temp_storage = RowArena::new();
451    let datums = rank_no_list(datums, &temp_storage, order_by);
452
453    callers_temp_storage.make_datum(|packer| {
454        packer.push_list(datums);
455    })
456}
457
458/// Like `rank`, but doesn't perform the final wrapping in a list, returning an Iterator
459/// instead.
460fn rank_no_list<'a: 'b, 'b, I>(
461    datums: I,
462    callers_temp_storage: &'b RowArena,
463    order_by: &[ColumnOrder],
464) -> impl Iterator<Item = Datum<'b>>
465where
466    I: IntoIterator<Item = Datum<'a>>,
467{
468    // Keep the row used for ordering around, as it is used to determine the rank
469    let datums = order_aggregate_datums_with_rank(datums, order_by);
470
471    let mut datums = datums
472        .into_iter()
473        .map(|(d0, order_row)| {
474            d0.unwrap_list()
475                .iter()
476                .map(move |d1| (d1, order_row.clone()))
477        })
478        .flatten();
479
480    callers_temp_storage.reserve(datums.size_hint().0);
481    datums
482        .next()
483        .map_or(vec![], |(first_datum, first_order_row)| {
484            // Folding with (last order_by row, last assigned rank, row number, output vec)
485            datums.fold((first_order_row, 1, 1, vec![(first_datum, 1)]), |mut acc, (next_datum, next_order_row)| {
486                let (ref mut acc_row, ref mut acc_rank, ref mut acc_row_num, ref mut output) = acc;
487                *acc_row_num += 1;
488                // Identity is based on the order_by expression
489                if *acc_row != next_order_row {
490                    *acc_rank = *acc_row_num;
491                    *acc_row = next_order_row;
492                }
493
494                (*output).push((next_datum, *acc_rank));
495                acc
496            })
497        }.3).into_iter().map(|(d, i)| {
498        callers_temp_storage.make_datum(|packer| {
499            packer.push_list_with(|packer| {
500                packer.push(Datum::Int64(i));
501                packer.push(d);
502            });
503        })
504    })
505}
506
507/// The expected input is in the format of `[((OriginalRow, [EncodedArgs]), OrderByExprs...)]`
508/// The output is in the format of `[result_value, original_row]`.
509/// See an example at `lag_lead`, where the input-output formats are similar.
510fn dense_rank<'a, I>(
511    datums: I,
512    callers_temp_storage: &'a RowArena,
513    order_by: &[ColumnOrder],
514) -> Datum<'a>
515where
516    I: IntoIterator<Item = Datum<'a>>,
517{
518    let temp_storage = RowArena::new();
519    let datums = dense_rank_no_list(datums, &temp_storage, order_by);
520
521    callers_temp_storage.make_datum(|packer| {
522        packer.push_list(datums);
523    })
524}
525
526/// Like `dense_rank`, but doesn't perform the final wrapping in a list, returning an Iterator
527/// instead.
528fn dense_rank_no_list<'a: 'b, 'b, I>(
529    datums: I,
530    callers_temp_storage: &'b RowArena,
531    order_by: &[ColumnOrder],
532) -> impl Iterator<Item = Datum<'b>>
533where
534    I: IntoIterator<Item = Datum<'a>>,
535{
536    // Keep the row used for ordering around, as it is used to determine the rank
537    let datums = order_aggregate_datums_with_rank(datums, order_by);
538
539    let mut datums = datums
540        .into_iter()
541        .map(|(d0, order_row)| {
542            d0.unwrap_list()
543                .iter()
544                .map(move |d1| (d1, order_row.clone()))
545        })
546        .flatten();
547
548    callers_temp_storage.reserve(datums.size_hint().0);
549    datums
550        .next()
551        .map_or(vec![], |(first_datum, first_order_row)| {
552            // Folding with (last order_by row, last assigned rank, output vec)
553            datums.fold((first_order_row, 1, vec![(first_datum, 1)]), |mut acc, (next_datum, next_order_row)| {
554                let (ref mut acc_row, ref mut acc_rank, ref mut output) = acc;
555                // Identity is based on the order_by expression
556                if *acc_row != next_order_row {
557                    *acc_rank += 1;
558                    *acc_row = next_order_row;
559                }
560
561                (*output).push((next_datum, *acc_rank));
562                acc
563            })
564        }.2).into_iter().map(|(d, i)| {
565        callers_temp_storage.make_datum(|packer| {
566            packer.push_list_with(|packer| {
567                packer.push(Datum::Int64(i));
568                packer.push(d);
569            });
570        })
571    })
572}
573
574/// The expected input is in the format of `[((OriginalRow, EncodedArgs), OrderByExprs...)]`
575/// For example,
576///
577/// lag(x*y, 1, null) over (partition by x+y order by x-y, x/y)
578///
579/// list of:
580/// row(
581///   row(
582///     row(#0, #1),
583///     row((#0 * #1), 1, null)
584///   ),
585///   (#0 - #1),
586///   (#0 / #1)
587/// )
588///
589/// The output is in the format of `[result_value, original_row]`, e.g.
590/// list of:
591/// row(
592///   42,
593///   row(7, 8)
594/// )
595fn lag_lead<'a, I>(
596    datums: I,
597    callers_temp_storage: &'a RowArena,
598    order_by: &[ColumnOrder],
599    lag_lead_type: &LagLeadType,
600    ignore_nulls: &bool,
601) -> Datum<'a>
602where
603    I: IntoIterator<Item = Datum<'a>>,
604{
605    let temp_storage = RowArena::new();
606    let iter = lag_lead_no_list(datums, &temp_storage, order_by, lag_lead_type, ignore_nulls);
607    callers_temp_storage.make_datum(|packer| {
608        packer.push_list(iter);
609    })
610}
611
612/// Like `lag_lead`, but doesn't perform the final wrapping in a list, returning an Iterator
613/// instead.
614fn lag_lead_no_list<'a: 'b, 'b, I>(
615    datums: I,
616    callers_temp_storage: &'b RowArena,
617    order_by: &[ColumnOrder],
618    lag_lead_type: &LagLeadType,
619    ignore_nulls: &bool,
620) -> impl Iterator<Item = Datum<'b>>
621where
622    I: IntoIterator<Item = Datum<'a>>,
623{
624    // Sort the datums according to the ORDER BY expressions and return the (OriginalRow, EncodedArgs) record
625    let datums = order_aggregate_datums(datums, order_by);
626
627    // Take the (OriginalRow, EncodedArgs) records and unwrap them into separate datums.
628    // EncodedArgs = (InputValue, Offset, DefaultValue) for Lag/Lead
629    // (`OriginalRow` is kept in a record form, as we don't need to look inside that.)
630    let (orig_rows, unwrapped_args): (Vec<_>, Vec<_>) = datums
631        .into_iter()
632        .map(|d| {
633            let mut iter = d.unwrap_list().iter();
634            let original_row = iter.next().unwrap();
635            let (input_value, offset, default_value) =
636                unwrap_lag_lead_encoded_args(iter.next().unwrap());
637            (original_row, (input_value, offset, default_value))
638        })
639        .unzip();
640
641    let result = lag_lead_inner(unwrapped_args, lag_lead_type, ignore_nulls);
642
643    callers_temp_storage.reserve(result.len());
644    result
645        .into_iter()
646        .zip_eq(orig_rows)
647        .map(|(result_value, original_row)| {
648            callers_temp_storage.make_datum(|packer| {
649                packer.push_list_with(|packer| {
650                    packer.push(result_value);
651                    packer.push(original_row);
652                });
653            })
654        })
655}
656
657/// lag/lead's arguments are in a record. This function unwraps this record.
658fn unwrap_lag_lead_encoded_args(encoded_args: Datum) -> (Datum, Datum, Datum) {
659    let mut encoded_args_iter = encoded_args.unwrap_list().iter();
660    let (input_value, offset, default_value) = (
661        encoded_args_iter.next().unwrap(),
662        encoded_args_iter.next().unwrap(),
663        encoded_args_iter.next().unwrap(),
664    );
665    (input_value, offset, default_value)
666}
667
668/// Each element of `args` has the 3 arguments evaluated for a single input row.
669/// Returns the results for each input row.
670fn lag_lead_inner<'a>(
671    args: Vec<(Datum<'a>, Datum<'a>, Datum<'a>)>,
672    lag_lead_type: &LagLeadType,
673    ignore_nulls: &bool,
674) -> Vec<Datum<'a>> {
675    if *ignore_nulls {
676        lag_lead_inner_ignore_nulls(args, lag_lead_type)
677    } else {
678        lag_lead_inner_respect_nulls(args, lag_lead_type)
679    }
680}
681
682fn lag_lead_inner_respect_nulls<'a>(
683    args: Vec<(Datum<'a>, Datum<'a>, Datum<'a>)>,
684    lag_lead_type: &LagLeadType,
685) -> Vec<Datum<'a>> {
686    let mut result: Vec<Datum> = Vec::with_capacity(args.len());
687    for (idx, (_, offset, default_value)) in args.iter().enumerate() {
688        // Null offsets are acceptable, and always return null
689        if offset.is_null() {
690            result.push(Datum::Null);
691            continue;
692        }
693
694        let idx = i64::try_from(idx).expect("Array index does not fit in i64");
695        let offset = i64::from(offset.unwrap_int32());
696        let offset = match lag_lead_type {
697            LagLeadType::Lag => -offset,
698            LagLeadType::Lead => offset,
699        };
700
701        // Get a Datum from `datums`. Return None if index is out of range.
702        let datums_get = |i: i64| -> Option<Datum> {
703            match u64::try_from(i) {
704                Ok(i) => args
705                    .get(usize::cast_from(i))
706                    .map(|d| Some(d.0)) // succeeded in getting a Datum from the vec
707                    .unwrap_or(None), // overindexing
708                Err(_) => None, // underindexing (negative index)
709            }
710        };
711
712        let lagged_value = datums_get(idx + offset).unwrap_or(*default_value);
713
714        result.push(lagged_value);
715    }
716
717    result
718}
719
720// `i64` indexes get involved in this function because it's convenient to allow negative indexes and
721// have `datums_get` fail on them, and thus handle the beginning and end of the input vector
722// uniformly, rather than checking underflow separately during index manipulations.
723#[allow(clippy::as_conversions)]
724fn lag_lead_inner_ignore_nulls<'a>(
725    args: Vec<(Datum<'a>, Datum<'a>, Datum<'a>)>,
726    lag_lead_type: &LagLeadType,
727) -> Vec<Datum<'a>> {
728    // We check here once that even the largest index fits in `i64`, and then do silent `as`
729    // conversions from `usize` indexes to `i64` indexes throughout this function.
730    if i64::try_from(args.len()).is_err() {
731        panic!("window partition way too big")
732    }
733    // Preparation: Make sure we can jump over a run of nulls in constant time, i.e., regardless of
734    // how many nulls the run has. The following skip tables will point to the next non-null index.
735    let mut skip_nulls_backward = vec![None; args.len()];
736    let mut last_non_null: i64 = -1;
737    let pairs = args
738        .iter()
739        .enumerate()
740        .zip_eq(skip_nulls_backward.iter_mut());
741    for ((i, (d, _, _)), slot) in pairs {
742        if d.is_null() {
743            *slot = Some(last_non_null);
744        } else {
745            last_non_null = i as i64;
746        }
747    }
748    let mut skip_nulls_forward = vec![None; args.len()];
749    let mut last_non_null: i64 = args.len() as i64;
750    let pairs = args
751        .iter()
752        .enumerate()
753        .rev()
754        .zip_eq(skip_nulls_forward.iter_mut().rev());
755    for ((i, (d, _, _)), slot) in pairs {
756        if d.is_null() {
757            *slot = Some(last_non_null);
758        } else {
759            last_non_null = i as i64;
760        }
761    }
762
763    // The actual computation.
764    let mut result: Vec<Datum> = Vec::with_capacity(args.len());
765    for (idx, (_, offset, default_value)) in args.iter().enumerate() {
766        // Null offsets are acceptable, and always return null
767        if offset.is_null() {
768            result.push(Datum::Null);
769            continue;
770        }
771
772        let idx = idx as i64; // checked at the beginning of the function that len() fits
773        let offset = i64::cast_from(offset.unwrap_int32());
774        let offset = match lag_lead_type {
775            LagLeadType::Lag => -offset,
776            LagLeadType::Lead => offset,
777        };
778        let increment = offset.signum();
779
780        // Get a Datum from `datums`. Return None if index is out of range.
781        let datums_get = |i: i64| -> Option<Datum> {
782            match u64::try_from(i) {
783                Ok(i) => args
784                    .get(usize::cast_from(i))
785                    .map(|d| Some(d.0)) // succeeded in getting a Datum from the vec
786                    .unwrap_or(None), // overindexing
787                Err(_) => None, // underindexing (negative index)
788            }
789        };
790
791        let lagged_value = if increment != 0 {
792            // We start j from idx, and step j until we have seen an abs(offset) number of non-null
793            // values or reach the beginning or end of the partition.
794            //
795            // If offset is big, then this is slow: Considering the entire function, it's
796            // `O(partition_size * offset)`.
797            // However, a common use case is an offset of 1, for which this doesn't matter.
798            // TODO: For larger offsets, we could have a completely different implementation
799            // that starts the inner loop from the index where we found the previous result:
800            // https://github.com/MaterializeInc/materialize/pull/29287#discussion_r1738695174
801            let mut j = idx;
802            for _ in 0..num::abs(offset) {
803                j += increment;
804                // Jump over a run of nulls
805                if datums_get(j).is_some_and(|d| d.is_null()) {
806                    let ju = j as usize; // `j >= 0` because of the above `is_some_and`
807                    if increment > 0 {
808                        j = skip_nulls_forward[ju].expect("checked above that it's null");
809                    } else {
810                        j = skip_nulls_backward[ju].expect("checked above that it's null");
811                    }
812                }
813                if datums_get(j).is_none() {
814                    break;
815                }
816            }
817            match datums_get(j) {
818                Some(datum) => datum,
819                None => *default_value,
820            }
821        } else {
822            assert_eq!(offset, 0);
823            let datum = datums_get(idx).expect("known to exist");
824            if !datum.is_null() {
825                datum
826            } else {
827                // I can imagine returning here either `default_value` or `null`.
828                // (I'm leaning towards `default_value`.)
829                // We used to run into an infinite loop in this case, so panicking is
830                // better. Started a SQL Council thread:
831                // https://materializeinc.slack.com/archives/C063H5S7NKE/p1724962369706729
832                panic!("0 offset in lag/lead IGNORE NULLS");
833            }
834        };
835
836        result.push(lagged_value);
837    }
838
839    result
840}
841
842/// The expected input is in the format of [((OriginalRow, InputValue), OrderByExprs...)]
843fn first_value<'a, I>(
844    datums: I,
845    callers_temp_storage: &'a RowArena,
846    order_by: &[ColumnOrder],
847    window_frame: &WindowFrame,
848) -> Datum<'a>
849where
850    I: IntoIterator<Item = Datum<'a>>,
851{
852    let temp_storage = RowArena::new();
853    let iter = first_value_no_list(datums, &temp_storage, order_by, window_frame);
854    callers_temp_storage.make_datum(|packer| {
855        packer.push_list(iter);
856    })
857}
858
859/// Like `first_value`, but doesn't perform the final wrapping in a list, returning an Iterator
860/// instead.
861fn first_value_no_list<'a: 'b, 'b, I>(
862    datums: I,
863    callers_temp_storage: &'b RowArena,
864    order_by: &[ColumnOrder],
865    window_frame: &WindowFrame,
866) -> impl Iterator<Item = Datum<'b>>
867where
868    I: IntoIterator<Item = Datum<'a>>,
869{
870    // Sort the datums according to the ORDER BY expressions and return the (OriginalRow, InputValue) record
871    let datums = order_aggregate_datums(datums, order_by);
872
873    // Decode the input (OriginalRow, InputValue) into separate datums
874    let (orig_rows, args): (Vec<_>, Vec<_>) = datums
875        .into_iter()
876        .map(|d| {
877            let mut iter = d.unwrap_list().iter();
878            let original_row = iter.next().unwrap();
879            let arg = iter.next().unwrap();
880
881            (original_row, arg)
882        })
883        .unzip();
884
885    let results = first_value_inner(args, window_frame);
886
887    callers_temp_storage.reserve(results.len());
888    results
889        .into_iter()
890        .zip_eq(orig_rows)
891        .map(|(result_value, original_row)| {
892            callers_temp_storage.make_datum(|packer| {
893                packer.push_list_with(|packer| {
894                    packer.push(result_value);
895                    packer.push(original_row);
896                });
897            })
898        })
899}
900
901fn first_value_inner<'a>(datums: Vec<Datum<'a>>, window_frame: &WindowFrame) -> Vec<Datum<'a>> {
902    let length = datums.len();
903    let mut result: Vec<Datum> = Vec::with_capacity(length);
904    for (idx, current_datum) in datums.iter().enumerate() {
905        let first_value = match &window_frame.start_bound {
906            // Always return the current value
907            WindowFrameBound::CurrentRow => *current_datum,
908            WindowFrameBound::UnboundedPreceding => {
909                if let WindowFrameBound::OffsetPreceding(end_offset) = &window_frame.end_bound {
910                    let end_offset = usize::cast_from(*end_offset);
911
912                    // If the frame ends before the first row, return null
913                    if idx < end_offset {
914                        Datum::Null
915                    } else {
916                        datums[0]
917                    }
918                } else {
919                    datums[0]
920                }
921            }
922            WindowFrameBound::OffsetPreceding(offset) => {
923                let start_offset = usize::cast_from(*offset);
924                let start_idx = idx.saturating_sub(start_offset);
925                if let WindowFrameBound::OffsetPreceding(end_offset) = &window_frame.end_bound {
926                    let end_offset = usize::cast_from(*end_offset);
927
928                    // If the frame is empty or ends before the first row, return null
929                    if start_offset < end_offset || idx < end_offset {
930                        Datum::Null
931                    } else {
932                        datums[start_idx]
933                    }
934                } else {
935                    datums[start_idx]
936                }
937            }
938            WindowFrameBound::OffsetFollowing(offset) => {
939                let start_offset = usize::cast_from(*offset);
940                let start_idx = idx.saturating_add(start_offset);
941                if let WindowFrameBound::OffsetFollowing(end_offset) = &window_frame.end_bound {
942                    // If the frame is empty or starts after the last row, return null
943                    if offset > end_offset || start_idx >= length {
944                        Datum::Null
945                    } else {
946                        datums[start_idx]
947                    }
948                } else {
949                    datums
950                        .get(start_idx)
951                        .map(|d| d.clone())
952                        .unwrap_or(Datum::Null)
953                }
954            }
955            // Forbidden during planning
956            WindowFrameBound::UnboundedFollowing => unreachable!(),
957        };
958        result.push(first_value);
959    }
960    result
961}
962
963/// The expected input is in the format of [((OriginalRow, InputValue), OrderByExprs...)]
964fn last_value<'a, I>(
965    datums: I,
966    callers_temp_storage: &'a RowArena,
967    order_by: &[ColumnOrder],
968    window_frame: &WindowFrame,
969) -> Datum<'a>
970where
971    I: IntoIterator<Item = Datum<'a>>,
972{
973    let temp_storage = RowArena::new();
974    let iter = last_value_no_list(datums, &temp_storage, order_by, window_frame);
975    callers_temp_storage.make_datum(|packer| {
976        packer.push_list(iter);
977    })
978}
979
980/// Like `last_value`, but doesn't perform the final wrapping in a list, returning an Iterator
981/// instead.
982fn last_value_no_list<'a: 'b, 'b, I>(
983    datums: I,
984    callers_temp_storage: &'b RowArena,
985    order_by: &[ColumnOrder],
986    window_frame: &WindowFrame,
987) -> impl Iterator<Item = Datum<'b>>
988where
989    I: IntoIterator<Item = Datum<'a>>,
990{
991    // Sort the datums according to the ORDER BY expressions and return the ((OriginalRow, InputValue), OrderByRow) record
992    // The OrderByRow is kept around because it is required to compute the peer groups in RANGE mode
993    let datums = order_aggregate_datums_with_rank(datums, order_by);
994
995    // Decode the input (OriginalRow, InputValue) into separate datums, while keeping the OrderByRow
996    let size_hint = datums.size_hint().0;
997    let mut args = Vec::with_capacity(size_hint);
998    let mut original_rows = Vec::with_capacity(size_hint);
999    let mut order_by_rows = Vec::with_capacity(size_hint);
1000    for (d, order_by_row) in datums.into_iter() {
1001        let mut iter = d.unwrap_list().iter();
1002        let original_row = iter.next().unwrap();
1003        let arg = iter.next().unwrap();
1004        order_by_rows.push(order_by_row);
1005        original_rows.push(original_row);
1006        args.push(arg);
1007    }
1008
1009    let results = last_value_inner(args, &order_by_rows, window_frame);
1010
1011    callers_temp_storage.reserve(results.len());
1012    results
1013        .into_iter()
1014        .zip_eq(original_rows)
1015        .map(|(result_value, original_row)| {
1016            callers_temp_storage.make_datum(|packer| {
1017                packer.push_list_with(|packer| {
1018                    packer.push(result_value);
1019                    packer.push(original_row);
1020                });
1021            })
1022        })
1023}
1024
1025fn last_value_inner<'a>(
1026    args: Vec<Datum<'a>>,
1027    order_by_rows: &Vec<Row>,
1028    window_frame: &WindowFrame,
1029) -> Vec<Datum<'a>> {
1030    let length = args.len();
1031    let mut results: Vec<Datum> = Vec::with_capacity(length);
1032    for (idx, (current_datum, order_by_row)) in args.iter().zip_eq(order_by_rows).enumerate() {
1033        let last_value = match &window_frame.end_bound {
1034            WindowFrameBound::CurrentRow => match &window_frame.units {
1035                // Always return the current value when in ROWS mode
1036                WindowFrameUnits::Rows => *current_datum,
1037                WindowFrameUnits::Range => {
1038                    // When in RANGE mode, return the last value of the peer group
1039                    // The peer group is the group of rows with the same ORDER BY value
1040                    // Note: Range is only supported for the default window frame (RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW),
1041                    // which is why it does not appear in the other branches
1042                    let target_idx = order_by_rows[idx..]
1043                        .iter()
1044                        .enumerate()
1045                        .take_while(|(_, row)| *row == order_by_row)
1046                        .last()
1047                        .unwrap()
1048                        .0
1049                        + idx;
1050                    args[target_idx]
1051                }
1052                // GROUPS is not supported, and forbidden during planning
1053                WindowFrameUnits::Groups => unreachable!(),
1054            },
1055            WindowFrameBound::UnboundedFollowing => {
1056                if let WindowFrameBound::OffsetFollowing(start_offset) = &window_frame.start_bound {
1057                    let start_offset = usize::cast_from(*start_offset);
1058
1059                    // If the frame starts after the last row of the window, return null
1060                    if idx + start_offset > length - 1 {
1061                        Datum::Null
1062                    } else {
1063                        args[length - 1]
1064                    }
1065                } else {
1066                    args[length - 1]
1067                }
1068            }
1069            WindowFrameBound::OffsetFollowing(offset) => {
1070                let end_offset = usize::cast_from(*offset);
1071                let end_idx = idx.saturating_add(end_offset);
1072                if let WindowFrameBound::OffsetFollowing(start_offset) = &window_frame.start_bound {
1073                    let start_offset = usize::cast_from(*start_offset);
1074                    let start_idx = idx.saturating_add(start_offset);
1075
1076                    // If the frame is empty or starts after the last row of the window, return null
1077                    if end_offset < start_offset || start_idx >= length {
1078                        Datum::Null
1079                    } else {
1080                        // Return the last valid element in the window
1081                        args.get(end_idx).unwrap_or(&args[length - 1]).clone()
1082                    }
1083                } else {
1084                    args.get(end_idx).unwrap_or(&args[length - 1]).clone()
1085                }
1086            }
1087            WindowFrameBound::OffsetPreceding(offset) => {
1088                let end_offset = usize::cast_from(*offset);
1089                let end_idx = idx.saturating_sub(end_offset);
1090                if idx < end_offset {
1091                    // If the frame ends before the first row, return null
1092                    Datum::Null
1093                } else if let WindowFrameBound::OffsetPreceding(start_offset) =
1094                    &window_frame.start_bound
1095                {
1096                    // If the frame is empty, return null
1097                    if offset > start_offset {
1098                        Datum::Null
1099                    } else {
1100                        args[end_idx]
1101                    }
1102                } else {
1103                    args[end_idx]
1104                }
1105            }
1106            // Forbidden during planning
1107            WindowFrameBound::UnboundedPreceding => unreachable!(),
1108        };
1109        results.push(last_value);
1110    }
1111    results
1112}
1113
1114/// Executes `FusedValueWindowFunc` on a reduction group.
1115/// The expected input is in the format of `[((OriginalRow, (Args1, Args2, ...)), OrderByExprs...)]`
1116/// where `Args1`, `Args2`, are the arguments of each of the fused functions. For functions that
1117/// have only a single argument (first_value/last_value), these are simple values. For functions
1118/// that have multiple arguments (lag/lead), these are also records.
1119fn fused_value_window_func<'a, I>(
1120    input_datums: I,
1121    callers_temp_storage: &'a RowArena,
1122    funcs: &Vec<AggregateFunc>,
1123    order_by: &Vec<ColumnOrder>,
1124) -> Datum<'a>
1125where
1126    I: IntoIterator<Item = Datum<'a>>,
1127{
1128    let temp_storage = RowArena::new();
1129    let iter = fused_value_window_func_no_list(input_datums, &temp_storage, funcs, order_by);
1130    callers_temp_storage.make_datum(|packer| {
1131        packer.push_list(iter);
1132    })
1133}
1134
1135/// Like `fused_value_window_func`, but doesn't perform the final wrapping in a list, returning an
1136/// Iterator instead.
1137fn fused_value_window_func_no_list<'a: 'b, 'b, I>(
1138    input_datums: I,
1139    callers_temp_storage: &'b RowArena,
1140    funcs: &Vec<AggregateFunc>,
1141    order_by: &Vec<ColumnOrder>,
1142) -> impl Iterator<Item = Datum<'b>>
1143where
1144    I: IntoIterator<Item = Datum<'a>>,
1145{
1146    let has_last_value = funcs
1147        .iter()
1148        .any(|f| matches!(f, AggregateFunc::LastValue { .. }));
1149
1150    let input_datums_with_ranks = order_aggregate_datums_with_rank(input_datums, order_by);
1151
1152    let size_hint = input_datums_with_ranks.size_hint().0;
1153    let mut encoded_argsss = vec![Vec::with_capacity(size_hint); funcs.len()];
1154    let mut original_rows = Vec::with_capacity(size_hint);
1155    let mut order_by_rows = Vec::with_capacity(size_hint);
1156    for (d, order_by_row) in input_datums_with_ranks {
1157        let mut iter = d.unwrap_list().iter();
1158        let original_row = iter.next().unwrap();
1159        original_rows.push(original_row);
1160        let mut argss_iter = iter.next().unwrap().unwrap_list().iter();
1161        for i in 0..funcs.len() {
1162            let encoded_args = argss_iter.next().unwrap();
1163            encoded_argsss[i].push(encoded_args);
1164        }
1165        if has_last_value {
1166            order_by_rows.push(order_by_row);
1167        }
1168    }
1169
1170    let mut results_per_row = vec![Vec::with_capacity(funcs.len()); original_rows.len()];
1171    for (func, encoded_argss) in funcs.iter().zip_eq(encoded_argsss) {
1172        let results = match func {
1173            AggregateFunc::LagLead {
1174                order_by: inner_order_by,
1175                lag_lead,
1176                ignore_nulls,
1177            } => {
1178                assert_eq!(order_by, inner_order_by);
1179                let unwrapped_argss = encoded_argss
1180                    .into_iter()
1181                    .map(|encoded_args| unwrap_lag_lead_encoded_args(encoded_args))
1182                    .collect();
1183                lag_lead_inner(unwrapped_argss, lag_lead, ignore_nulls)
1184            }
1185            AggregateFunc::FirstValue {
1186                order_by: inner_order_by,
1187                window_frame,
1188            } => {
1189                assert_eq!(order_by, inner_order_by);
1190                // (No unwrapping to do on the args here, because there is only 1 arg, so it's not
1191                // wrapped into a record.)
1192                first_value_inner(encoded_argss, window_frame)
1193            }
1194            AggregateFunc::LastValue {
1195                order_by: inner_order_by,
1196                window_frame,
1197            } => {
1198                assert_eq!(order_by, inner_order_by);
1199                // (No unwrapping to do on the args here, because there is only 1 arg, so it's not
1200                // wrapped into a record.)
1201                last_value_inner(encoded_argss, &order_by_rows, window_frame)
1202            }
1203            _ => panic!("unknown window function in FusedValueWindowFunc"),
1204        };
1205        for (results, result) in results_per_row.iter_mut().zip_eq(results) {
1206            results.push(result);
1207        }
1208    }
1209
1210    callers_temp_storage.reserve(2 * original_rows.len());
1211    results_per_row
1212        .into_iter()
1213        .enumerate()
1214        .map(move |(i, results)| {
1215            callers_temp_storage.make_datum(|packer| {
1216                packer.push_list_with(|packer| {
1217                    packer
1218                        .push(callers_temp_storage.make_datum(|packer| packer.push_list(results)));
1219                    packer.push(original_rows[i]);
1220                });
1221            })
1222        })
1223}
1224
1225/// `input_datums` is an entire window partition.
1226/// The expected input is in the format of `[((OriginalRow, InputValue), OrderByExprs...)]`
1227/// See also in the comment in `window_func_applied_to`.
1228///
1229/// `wrapped_aggregate`: e.g., for `sum(...) OVER (...)`, this is the `sum(...)`.
1230///
1231/// Note that this `order_by` doesn't have expressions, only `ColumnOrder`s. For an explanation,
1232/// see the comment on `WindowExprType`.
1233fn window_aggr<'a, I, A>(
1234    input_datums: I,
1235    callers_temp_storage: &'a RowArena,
1236    wrapped_aggregate: &AggregateFunc,
1237    order_by: &[ColumnOrder],
1238    window_frame: &WindowFrame,
1239) -> Datum<'a>
1240where
1241    I: IntoIterator<Item = Datum<'a>>,
1242    A: OneByOneAggr,
1243{
1244    let temp_storage = RowArena::new();
1245    let iter = window_aggr_no_list::<I, A>(
1246        input_datums,
1247        &temp_storage,
1248        wrapped_aggregate,
1249        order_by,
1250        window_frame,
1251    );
1252    callers_temp_storage.make_datum(|packer| {
1253        packer.push_list(iter);
1254    })
1255}
1256
1257/// Like `window_aggr`, but doesn't perform the final wrapping in a list, returning an Iterator
1258/// instead.
1259fn window_aggr_no_list<'a: 'b, 'b, I, A>(
1260    input_datums: I,
1261    callers_temp_storage: &'b RowArena,
1262    wrapped_aggregate: &AggregateFunc,
1263    order_by: &[ColumnOrder],
1264    window_frame: &WindowFrame,
1265) -> impl Iterator<Item = Datum<'b>>
1266where
1267    I: IntoIterator<Item = Datum<'a>>,
1268    A: OneByOneAggr,
1269{
1270    // Sort the datums according to the ORDER BY expressions and return the ((OriginalRow, InputValue), OrderByRow) record
1271    // The OrderByRow is kept around because it is required to compute the peer groups in RANGE mode
1272    let datums = order_aggregate_datums_with_rank(input_datums, order_by);
1273
1274    // Decode the input (OriginalRow, InputValue) into separate datums, while keeping the OrderByRow
1275    let size_hint = datums.size_hint().0;
1276    let mut args: Vec<Datum> = Vec::with_capacity(size_hint);
1277    let mut original_rows: Vec<Datum> = Vec::with_capacity(size_hint);
1278    let mut order_by_rows = Vec::with_capacity(size_hint);
1279    for (d, order_by_row) in datums.into_iter() {
1280        let mut iter = d.unwrap_list().iter();
1281        let original_row = iter.next().unwrap();
1282        let arg = iter.next().unwrap();
1283        order_by_rows.push(order_by_row);
1284        original_rows.push(original_row);
1285        args.push(arg);
1286    }
1287
1288    let results = window_aggr_inner::<A>(
1289        args,
1290        &order_by_rows,
1291        wrapped_aggregate,
1292        order_by,
1293        window_frame,
1294        callers_temp_storage,
1295    );
1296
1297    callers_temp_storage.reserve(results.len());
1298    results
1299        .into_iter()
1300        .zip_eq(original_rows)
1301        .map(|(result_value, original_row)| {
1302            callers_temp_storage.make_datum(|packer| {
1303                packer.push_list_with(|packer| {
1304                    packer.push(result_value);
1305                    packer.push(original_row);
1306                });
1307            })
1308        })
1309}
1310
1311fn window_aggr_inner<'a, A>(
1312    mut args: Vec<Datum<'a>>,
1313    order_by_rows: &Vec<Row>,
1314    wrapped_aggregate: &AggregateFunc,
1315    order_by: &[ColumnOrder],
1316    window_frame: &WindowFrame,
1317    temp_storage: &'a RowArena,
1318) -> Vec<Datum<'a>>
1319where
1320    A: OneByOneAggr,
1321{
1322    let length = args.len();
1323    let mut result: Vec<Datum> = Vec::with_capacity(length);
1324
1325    // In this degenerate case, all results would be `wrapped_aggregate.default()` (usually null).
1326    // However, this currently can't happen, because
1327    // - Groups frame mode is currently not supported;
1328    // - Range frame mode is currently supported only for the default frame, which includes the
1329    //   current row.
1330    soft_assert_or_log!(
1331        !((matches!(window_frame.units, WindowFrameUnits::Groups)
1332            || matches!(window_frame.units, WindowFrameUnits::Range))
1333            && !window_frame.includes_current_row()),
1334        "window frame without current row"
1335    );
1336
1337    if (matches!(
1338        window_frame.start_bound,
1339        WindowFrameBound::UnboundedPreceding
1340    ) && matches!(window_frame.end_bound, WindowFrameBound::UnboundedFollowing))
1341        || (order_by.is_empty()
1342            && (matches!(window_frame.units, WindowFrameUnits::Groups)
1343                || matches!(window_frame.units, WindowFrameUnits::Range))
1344            && window_frame.includes_current_row())
1345    {
1346        // Either
1347        //  - UNBOUNDED frame in both directions, or
1348        //  - There is no ORDER BY and the frame is such that the current peer group is included.
1349        //    (The current peer group will be the whole partition if there is no ORDER BY.)
1350        // We simply need to compute the aggregate once, on the entire partition, and each input
1351        // row will get this one aggregate value as result.
1352        let result_value = wrapped_aggregate.eval(args, temp_storage);
1353        // Every row will get the above aggregate as result.
1354        for _ in 0..length {
1355            result.push(result_value);
1356        }
1357    } else {
1358        fn rows_between_unbounded_preceding_and_current_row<'a, A>(
1359            args: Vec<Datum<'a>>,
1360            result: &mut Vec<Datum<'a>>,
1361            mut one_by_one_aggr: A,
1362            temp_storage: &'a RowArena,
1363        ) where
1364            A: OneByOneAggr,
1365        {
1366            for current_arg in args.into_iter() {
1367                one_by_one_aggr.give(&current_arg);
1368                let result_value = one_by_one_aggr.get_current_aggregate(temp_storage);
1369                result.push(result_value);
1370            }
1371        }
1372
1373        fn groups_between_unbounded_preceding_and_current_row<'a, A>(
1374            args: Vec<Datum<'a>>,
1375            order_by_rows: &Vec<Row>,
1376            result: &mut Vec<Datum<'a>>,
1377            mut one_by_one_aggr: A,
1378            temp_storage: &'a RowArena,
1379        ) where
1380            A: OneByOneAggr,
1381        {
1382            let mut peer_group_start = 0;
1383            while peer_group_start < args.len() {
1384                // Find the boundaries of the current peer group.
1385                // peer_group_start will point to the first element of the peer group,
1386                // peer_group_end will point to _just after_ the last element of the peer group.
1387                let mut peer_group_end = peer_group_start + 1;
1388                while peer_group_end < args.len()
1389                    && order_by_rows[peer_group_start] == order_by_rows[peer_group_end]
1390                {
1391                    // The peer group goes on while the OrderByRows not differ.
1392                    peer_group_end += 1;
1393                }
1394                // Let's compute the aggregate (which will be the same for all records in this
1395                // peer group).
1396                for current_arg in args[peer_group_start..peer_group_end].iter() {
1397                    one_by_one_aggr.give(current_arg);
1398                }
1399                let agg_for_peer_group = one_by_one_aggr.get_current_aggregate(temp_storage);
1400                // Put the above aggregate into each record in the peer group.
1401                for _ in args[peer_group_start..peer_group_end].iter() {
1402                    result.push(agg_for_peer_group);
1403                }
1404                // Point to the start of the next peer group.
1405                peer_group_start = peer_group_end;
1406            }
1407        }
1408
1409        fn rows_between_offset_and_offset<'a>(
1410            args: Vec<Datum<'a>>,
1411            result: &mut Vec<Datum<'a>>,
1412            wrapped_aggregate: &AggregateFunc,
1413            temp_storage: &'a RowArena,
1414            offset_start: i64,
1415            offset_end: i64,
1416        ) {
1417            let len = args
1418                .len()
1419                .to_i64()
1420                .expect("window partition's len should fit into i64");
1421            for i in 0..len {
1422                let i = i.to_i64().expect("window partition shouldn't be super big");
1423                // Trim the start of the frame to make it not reach over the start of the window
1424                // partition.
1425                let frame_start = max(i + offset_start, 0)
1426                    .to_usize()
1427                    .expect("The max made sure it's not negative");
1428                // Trim the end of the frame to make it not reach over the end of the window
1429                // partition.
1430                let frame_end = min(i + offset_end, len - 1).to_usize();
1431                match frame_end {
1432                    Some(frame_end) => {
1433                        if frame_start <= frame_end {
1434                            // Compute the aggregate on the frame.
1435                            // TODO:
1436                            // This implementation is quite slow if the frame is large: we do an
1437                            // inner loop over the entire frame, and compute the aggregate from
1438                            // scratch. We could do better:
1439                            //  - For invertible aggregations we could do a rolling aggregation.
1440                            //  - There are various tricks for min/max as well, making use of either
1441                            //    the fixed size of the window, or that we are not retracting
1442                            //    arbitrary elements but doing queue operations. E.g., see
1443                            //    http://codercareer.blogspot.com/2012/02/no-33-maximums-in-sliding-windows.html
1444                            let frame_values = args[frame_start..=frame_end].iter().cloned();
1445                            let result_value = wrapped_aggregate.eval(frame_values, temp_storage);
1446                            result.push(result_value);
1447                        } else {
1448                            // frame_start > frame_end, so this is an empty frame.
1449                            let result_value = wrapped_aggregate.default();
1450                            result.push(result_value);
1451                        }
1452                    }
1453                    None => {
1454                        // frame_end would be negative, so this is an empty frame.
1455                        let result_value = wrapped_aggregate.default();
1456                        result.push(result_value);
1457                    }
1458                }
1459            }
1460        }
1461
1462        match (
1463            &window_frame.units,
1464            &window_frame.start_bound,
1465            &window_frame.end_bound,
1466        ) {
1467            // Cases where one edge of the frame is CurrentRow.
1468            // Note that these cases could be merged into the more general cases below where one
1469            // edge is some offset (with offset = 0), but the CurrentRow cases probably cover 95%
1470            // of user queries, so let's make this simple and fast.
1471            (Rows, UnboundedPreceding, CurrentRow) => {
1472                rows_between_unbounded_preceding_and_current_row::<A>(
1473                    args,
1474                    &mut result,
1475                    A::new(wrapped_aggregate, false),
1476                    temp_storage,
1477                );
1478            }
1479            (Rows, CurrentRow, UnboundedFollowing) => {
1480                // Same as above, but reverse.
1481                args.reverse();
1482                rows_between_unbounded_preceding_and_current_row::<A>(
1483                    args,
1484                    &mut result,
1485                    A::new(wrapped_aggregate, true),
1486                    temp_storage,
1487                );
1488                result.reverse();
1489            }
1490            (Range, UnboundedPreceding, CurrentRow) => {
1491                // Note that for the default frame, the RANGE frame mode is identical to the GROUPS
1492                // frame mode.
1493                groups_between_unbounded_preceding_and_current_row::<A>(
1494                    args,
1495                    order_by_rows,
1496                    &mut result,
1497                    A::new(wrapped_aggregate, false),
1498                    temp_storage,
1499                );
1500            }
1501            // The next several cases all call `rows_between_offset_and_offset`. Note that the
1502            // offset passed to `rows_between_offset_and_offset` should be negated when it's
1503            // PRECEDING.
1504            (Rows, OffsetPreceding(start_prec), OffsetPreceding(end_prec)) => {
1505                let start_prec = start_prec.to_i64().expect(
1506                    "window frame start OFFSET shouldn't be super big (the planning ensured this)",
1507                );
1508                let end_prec = end_prec.to_i64().expect(
1509                    "window frame end OFFSET shouldn't be super big (the planning ensured this)",
1510                );
1511                rows_between_offset_and_offset(
1512                    args,
1513                    &mut result,
1514                    wrapped_aggregate,
1515                    temp_storage,
1516                    -start_prec,
1517                    -end_prec,
1518                );
1519            }
1520            (Rows, OffsetPreceding(start_prec), OffsetFollowing(end_fol)) => {
1521                let start_prec = start_prec.to_i64().expect(
1522                    "window frame start OFFSET shouldn't be super big (the planning ensured this)",
1523                );
1524                let end_fol = end_fol.to_i64().expect(
1525                    "window frame end OFFSET shouldn't be super big (the planning ensured this)",
1526                );
1527                rows_between_offset_and_offset(
1528                    args,
1529                    &mut result,
1530                    wrapped_aggregate,
1531                    temp_storage,
1532                    -start_prec,
1533                    end_fol,
1534                );
1535            }
1536            (Rows, OffsetFollowing(start_fol), OffsetFollowing(end_fol)) => {
1537                let start_fol = start_fol.to_i64().expect(
1538                    "window frame start OFFSET shouldn't be super big (the planning ensured this)",
1539                );
1540                let end_fol = end_fol.to_i64().expect(
1541                    "window frame end OFFSET shouldn't be super big (the planning ensured this)",
1542                );
1543                rows_between_offset_and_offset(
1544                    args,
1545                    &mut result,
1546                    wrapped_aggregate,
1547                    temp_storage,
1548                    start_fol,
1549                    end_fol,
1550                );
1551            }
1552            (Rows, OffsetFollowing(_), OffsetPreceding(_)) => {
1553                unreachable!() // The planning ensured that this nonsensical case can't happen
1554            }
1555            (Rows, OffsetPreceding(start_prec), CurrentRow) => {
1556                let start_prec = start_prec.to_i64().expect(
1557                    "window frame start OFFSET shouldn't be super big (the planning ensured this)",
1558                );
1559                let end_fol = 0;
1560                rows_between_offset_and_offset(
1561                    args,
1562                    &mut result,
1563                    wrapped_aggregate,
1564                    temp_storage,
1565                    -start_prec,
1566                    end_fol,
1567                );
1568            }
1569            (Rows, CurrentRow, OffsetFollowing(end_fol)) => {
1570                let start_fol = 0;
1571                let end_fol = end_fol.to_i64().expect(
1572                    "window frame end OFFSET shouldn't be super big (the planning ensured this)",
1573                );
1574                rows_between_offset_and_offset(
1575                    args,
1576                    &mut result,
1577                    wrapped_aggregate,
1578                    temp_storage,
1579                    start_fol,
1580                    end_fol,
1581                );
1582            }
1583            (Rows, CurrentRow, CurrentRow) => {
1584                // We could have a more efficient implementation for this, but this is probably
1585                // super rare. (Might be more common with RANGE or GROUPS frame mode, though!)
1586                let start_fol = 0;
1587                let end_fol = 0;
1588                rows_between_offset_and_offset(
1589                    args,
1590                    &mut result,
1591                    wrapped_aggregate,
1592                    temp_storage,
1593                    start_fol,
1594                    end_fol,
1595                );
1596            }
1597            (Rows, CurrentRow, OffsetPreceding(_))
1598            | (Rows, UnboundedFollowing, _)
1599            | (Rows, _, UnboundedPreceding)
1600            | (Rows, OffsetFollowing(..), CurrentRow) => {
1601                unreachable!() // The planning ensured that these nonsensical cases can't happen
1602            }
1603            (Rows, UnboundedPreceding, UnboundedFollowing) => {
1604                // This is handled by the complicated if condition near the beginning of this
1605                // function.
1606                unreachable!()
1607            }
1608            (Rows, UnboundedPreceding, OffsetPreceding(_))
1609            | (Rows, UnboundedPreceding, OffsetFollowing(_))
1610            | (Rows, OffsetPreceding(..), UnboundedFollowing)
1611            | (Rows, OffsetFollowing(..), UnboundedFollowing) => {
1612                // Unsupported. Bail in the planner.
1613                // https://github.com/MaterializeInc/database-issues/issues/6720
1614                unreachable!()
1615            }
1616            (Range, _, _) => {
1617                // Unsupported.
1618                // The planner doesn't allow Range frame mode for now (except for the default
1619                // frame), see https://github.com/MaterializeInc/database-issues/issues/6585
1620                // Note that it would be easy to handle (Range, CurrentRow, UnboundedFollowing):
1621                // it would be similar to (Rows, CurrentRow, UnboundedFollowing), but would call
1622                // groups_between_unbounded_preceding_current_row.
1623                unreachable!()
1624            }
1625            (Groups, _, _) => {
1626                // Unsupported.
1627                // The planner doesn't allow Groups frame mode for now, see
1628                // https://github.com/MaterializeInc/database-issues/issues/6588
1629                unreachable!()
1630            }
1631        }
1632    }
1633
1634    result
1635}
1636
1637/// Computes a bundle of fused window aggregations.
1638/// The input is similar to `window_aggr`, but `InputValue` is not just a single value, but a record
1639/// where each component is the input to one of the aggregations.
1640fn fused_window_aggr<'a, I, A>(
1641    input_datums: I,
1642    callers_temp_storage: &'a RowArena,
1643    wrapped_aggregates: &Vec<AggregateFunc>,
1644    order_by: &Vec<ColumnOrder>,
1645    window_frame: &WindowFrame,
1646) -> Datum<'a>
1647where
1648    I: IntoIterator<Item = Datum<'a>>,
1649    A: OneByOneAggr,
1650{
1651    let temp_storage = RowArena::new();
1652    let iter = fused_window_aggr_no_list::<_, A>(
1653        input_datums,
1654        &temp_storage,
1655        wrapped_aggregates,
1656        order_by,
1657        window_frame,
1658    );
1659    callers_temp_storage.make_datum(|packer| {
1660        packer.push_list(iter);
1661    })
1662}
1663
1664/// Like `fused_window_aggr`, but doesn't perform the final wrapping in a list, returning an
1665/// Iterator instead.
1666fn fused_window_aggr_no_list<'a: 'b, 'b, I, A>(
1667    input_datums: I,
1668    callers_temp_storage: &'b RowArena,
1669    wrapped_aggregates: &Vec<AggregateFunc>,
1670    order_by: &Vec<ColumnOrder>,
1671    window_frame: &WindowFrame,
1672) -> impl Iterator<Item = Datum<'b>>
1673where
1674    I: IntoIterator<Item = Datum<'a>>,
1675    A: OneByOneAggr,
1676{
1677    // Sort the datums according to the ORDER BY expressions and return the ((OriginalRow, InputValue), OrderByRow) record
1678    // The OrderByRow is kept around because it is required to compute the peer groups in RANGE mode
1679    let datums = order_aggregate_datums_with_rank(input_datums, order_by);
1680
1681    let size_hint = datums.size_hint().0;
1682    let mut argss = vec![Vec::with_capacity(size_hint); wrapped_aggregates.len()];
1683    let mut original_rows = Vec::with_capacity(size_hint);
1684    let mut order_by_rows = Vec::with_capacity(size_hint);
1685    for (d, order_by_row) in datums {
1686        let mut iter = d.unwrap_list().iter();
1687        let original_row = iter.next().unwrap();
1688        original_rows.push(original_row);
1689        let args_iter = iter.next().unwrap().unwrap_list().iter();
1690        // Push each argument into the respective list
1691        for (args, arg) in argss.iter_mut().zip_eq(args_iter) {
1692            args.push(arg);
1693        }
1694        order_by_rows.push(order_by_row);
1695    }
1696
1697    let mut results_per_row =
1698        vec![Vec::with_capacity(wrapped_aggregates.len()); original_rows.len()];
1699    for (wrapped_aggr, args) in wrapped_aggregates.iter().zip_eq(argss) {
1700        let results = window_aggr_inner::<A>(
1701            args,
1702            &order_by_rows,
1703            wrapped_aggr,
1704            order_by,
1705            window_frame,
1706            callers_temp_storage,
1707        );
1708        for (results, result) in results_per_row.iter_mut().zip_eq(results) {
1709            results.push(result);
1710        }
1711    }
1712
1713    callers_temp_storage.reserve(2 * original_rows.len());
1714    results_per_row
1715        .into_iter()
1716        .enumerate()
1717        .map(move |(i, results)| {
1718            callers_temp_storage.make_datum(|packer| {
1719                packer.push_list_with(|packer| {
1720                    packer
1721                        .push(callers_temp_storage.make_datum(|packer| packer.push_list(results)));
1722                    packer.push(original_rows[i]);
1723                });
1724            })
1725        })
1726}
1727
1728/// An implementation of an aggregation where we can send in the input elements one-by-one, and
1729/// can also ask the current aggregate at any moment. (This just delegates to other aggregation
1730/// evaluation approaches.)
1731pub trait OneByOneAggr {
1732    /// The `reverse` parameter makes the aggregations process input elements in reverse order.
1733    /// This has an effect only for non-commutative aggregations, e.g. `list_agg`. These are
1734    /// currently only some of the Basic aggregations. (Basic aggregations are handled by
1735    /// `NaiveOneByOneAggr`).
1736    fn new(agg: &AggregateFunc, reverse: bool) -> Self;
1737    /// Pushes one input element into the aggregation.
1738    fn give(&mut self, d: &Datum);
1739    /// Returns the value of the aggregate computed on the given values so far.
1740    fn get_current_aggregate<'a>(&self, temp_storage: &'a RowArena) -> Datum<'a>;
1741}
1742
1743/// Naive implementation of [OneByOneAggr], suitable for stuff like const folding, but too slow for
1744/// rendering. This relies only on infrastructure available in `mz-expr`. It simply saves all the
1745/// given input, and calls the given [AggregateFunc]'s `eval` method when asked about the current
1746/// aggregate. (For Accumulable and Hierarchical aggregations, the rendering has more efficient
1747/// implementations, but for Basic aggregations even the rendering uses this naive implementation.)
1748#[derive(Debug)]
1749pub struct NaiveOneByOneAggr {
1750    agg: AggregateFunc,
1751    input: Vec<Row>,
1752    reverse: bool,
1753}
1754
1755impl OneByOneAggr for NaiveOneByOneAggr {
1756    fn new(agg: &AggregateFunc, reverse: bool) -> Self {
1757        NaiveOneByOneAggr {
1758            agg: agg.clone(),
1759            input: Vec::new(),
1760            reverse,
1761        }
1762    }
1763
1764    fn give(&mut self, d: &Datum) {
1765        let mut row = Row::default();
1766        row.packer().push(d);
1767        self.input.push(row);
1768    }
1769
1770    fn get_current_aggregate<'a>(&self, temp_storage: &'a RowArena) -> Datum<'a> {
1771        temp_storage.make_datum(|packer| {
1772            packer.push(if !self.reverse {
1773                self.agg
1774                    .eval(self.input.iter().map(|r| r.unpack_first()), temp_storage)
1775            } else {
1776                self.agg.eval(
1777                    self.input.iter().rev().map(|r| r.unpack_first()),
1778                    temp_storage,
1779                )
1780            });
1781        })
1782    }
1783}
1784
1785/// Identify whether the given aggregate function is Lag or Lead, since they share
1786/// implementations.
1787#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize, Hash, MzReflect)]
1788pub enum LagLeadType {
1789    Lag,
1790    Lead,
1791}
1792
1793#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize, Hash, MzReflect)]
1794pub enum AggregateFunc {
1795    MaxNumeric,
1796    MaxInt16,
1797    MaxInt32,
1798    MaxInt64,
1799    MaxUInt16,
1800    MaxUInt32,
1801    MaxUInt64,
1802    MaxMzTimestamp,
1803    MaxFloat32,
1804    MaxFloat64,
1805    MaxBool,
1806    MaxString,
1807    MaxDate,
1808    MaxTimestamp,
1809    MaxTimestampTz,
1810    MaxInterval,
1811    MaxTime,
1812    MinNumeric,
1813    MinInt16,
1814    MinInt32,
1815    MinInt64,
1816    MinUInt16,
1817    MinUInt32,
1818    MinUInt64,
1819    MinMzTimestamp,
1820    MinFloat32,
1821    MinFloat64,
1822    MinBool,
1823    MinString,
1824    MinDate,
1825    MinTimestamp,
1826    MinTimestampTz,
1827    MinInterval,
1828    MinTime,
1829    SumInt16,
1830    SumInt32,
1831    SumInt64,
1832    SumUInt16,
1833    SumUInt32,
1834    SumUInt64,
1835    SumFloat32,
1836    SumFloat64,
1837    SumNumeric,
1838    Count,
1839    Any,
1840    All,
1841    /// Accumulates `Datum::List`s whose first element is a JSON-typed `Datum`s
1842    /// into a JSON list. The other elements are columns used by `order_by`.
1843    ///
1844    /// WARNING: Unlike the `jsonb_agg` function that is exposed by the SQL
1845    /// layer, this function filters out `Datum::Null`, for consistency with
1846    /// the other aggregate functions.
1847    JsonbAgg {
1848        order_by: Vec<ColumnOrder>,
1849    },
1850    /// Zips `Datum::List`s whose first element is a JSON-typed `Datum`s into a
1851    /// JSON map. The other elements are columns used by `order_by`.
1852    ///
1853    /// WARNING: Unlike the `jsonb_object_agg` function that is exposed by the SQL
1854    /// layer, this function filters out `Datum::Null`, for consistency with
1855    /// the other aggregate functions.
1856    JsonbObjectAgg {
1857        order_by: Vec<ColumnOrder>,
1858    },
1859    /// Zips a `Datum::List` whose first element is a `Datum::List` guaranteed
1860    /// to be non-empty and whose len % 2 == 0 into a `Datum::Map`. The other
1861    /// elements are columns used by `order_by`.
1862    MapAgg {
1863        order_by: Vec<ColumnOrder>,
1864        value_type: SqlScalarType,
1865    },
1866    /// Accumulates `Datum::Array`s of `SqlScalarType::Record` whose first element is a `Datum::Array`
1867    /// into a single `Datum::Array` (the remaining fields are used by `order_by`).
1868    ArrayConcat {
1869        order_by: Vec<ColumnOrder>,
1870    },
1871    /// Accumulates `Datum::List`s of `SqlScalarType::Record` whose first field is a `Datum::List`
1872    /// into a single `Datum::List` (the remaining fields are used by `order_by`).
1873    ListConcat {
1874        order_by: Vec<ColumnOrder>,
1875    },
1876    StringAgg {
1877        order_by: Vec<ColumnOrder>,
1878    },
1879    RowNumber {
1880        order_by: Vec<ColumnOrder>,
1881    },
1882    Rank {
1883        order_by: Vec<ColumnOrder>,
1884    },
1885    DenseRank {
1886        order_by: Vec<ColumnOrder>,
1887    },
1888    LagLead {
1889        order_by: Vec<ColumnOrder>,
1890        lag_lead: LagLeadType,
1891        ignore_nulls: bool,
1892    },
1893    FirstValue {
1894        order_by: Vec<ColumnOrder>,
1895        window_frame: WindowFrame,
1896    },
1897    LastValue {
1898        order_by: Vec<ColumnOrder>,
1899        window_frame: WindowFrame,
1900    },
1901    /// Several value window functions fused into one function, to amortize overheads.
1902    FusedValueWindowFunc {
1903        funcs: Vec<AggregateFunc>,
1904        /// Currently, all the fused functions must have the same `order_by`. (We can later
1905        /// eliminate this limitation.)
1906        order_by: Vec<ColumnOrder>,
1907    },
1908    WindowAggregate {
1909        wrapped_aggregate: Box<AggregateFunc>,
1910        order_by: Vec<ColumnOrder>,
1911        window_frame: WindowFrame,
1912    },
1913    FusedWindowAggregate {
1914        wrapped_aggregates: Vec<AggregateFunc>,
1915        order_by: Vec<ColumnOrder>,
1916        window_frame: WindowFrame,
1917    },
1918    /// Accumulates any number of `Datum::Dummy`s into `Datum::Dummy`.
1919    ///
1920    /// Useful for removing an expensive aggregation while maintaining the shape
1921    /// of a reduce operator.
1922    Dummy,
1923}
1924
1925impl AggregateFunc {
1926    pub fn eval<'a, I>(&self, datums: I, temp_storage: &'a RowArena) -> Datum<'a>
1927    where
1928        I: IntoIterator<Item = Datum<'a>>,
1929    {
1930        match self {
1931            AggregateFunc::MaxNumeric => {
1932                max_datum::<'a, I, OrderedDecimal<numeric::Numeric>>(datums)
1933            }
1934            AggregateFunc::MaxInt16 => max_datum::<'a, I, i16>(datums),
1935            AggregateFunc::MaxInt32 => max_datum::<'a, I, i32>(datums),
1936            AggregateFunc::MaxInt64 => max_datum::<'a, I, i64>(datums),
1937            AggregateFunc::MaxUInt16 => max_datum::<'a, I, u16>(datums),
1938            AggregateFunc::MaxUInt32 => max_datum::<'a, I, u32>(datums),
1939            AggregateFunc::MaxUInt64 => max_datum::<'a, I, u64>(datums),
1940            AggregateFunc::MaxMzTimestamp => max_datum::<'a, I, mz_repr::Timestamp>(datums),
1941            AggregateFunc::MaxFloat32 => max_datum::<'a, I, OrderedFloat<f32>>(datums),
1942            AggregateFunc::MaxFloat64 => max_datum::<'a, I, OrderedFloat<f64>>(datums),
1943            AggregateFunc::MaxBool => max_datum::<'a, I, bool>(datums),
1944            AggregateFunc::MaxString => max_string(datums),
1945            AggregateFunc::MaxDate => max_datum::<'a, I, Date>(datums),
1946            AggregateFunc::MaxTimestamp => {
1947                max_datum::<'a, I, CheckedTimestamp<NaiveDateTime>>(datums)
1948            }
1949            AggregateFunc::MaxTimestampTz => {
1950                max_datum::<'a, I, CheckedTimestamp<DateTime<Utc>>>(datums)
1951            }
1952            AggregateFunc::MaxInterval => max_datum::<'a, I, Interval>(datums),
1953            AggregateFunc::MaxTime => max_datum::<'a, I, NaiveTime>(datums),
1954            AggregateFunc::MinNumeric => {
1955                min_datum::<'a, I, OrderedDecimal<numeric::Numeric>>(datums)
1956            }
1957            AggregateFunc::MinInt16 => min_datum::<'a, I, i16>(datums),
1958            AggregateFunc::MinInt32 => min_datum::<'a, I, i32>(datums),
1959            AggregateFunc::MinInt64 => min_datum::<'a, I, i64>(datums),
1960            AggregateFunc::MinUInt16 => min_datum::<'a, I, u16>(datums),
1961            AggregateFunc::MinUInt32 => min_datum::<'a, I, u32>(datums),
1962            AggregateFunc::MinUInt64 => min_datum::<'a, I, u64>(datums),
1963            AggregateFunc::MinMzTimestamp => min_datum::<'a, I, mz_repr::Timestamp>(datums),
1964            AggregateFunc::MinFloat32 => min_datum::<'a, I, OrderedFloat<f32>>(datums),
1965            AggregateFunc::MinFloat64 => min_datum::<'a, I, OrderedFloat<f64>>(datums),
1966            AggregateFunc::MinBool => min_datum::<'a, I, bool>(datums),
1967            AggregateFunc::MinString => min_string(datums),
1968            AggregateFunc::MinDate => min_datum::<'a, I, Date>(datums),
1969            AggregateFunc::MinTimestamp => {
1970                min_datum::<'a, I, CheckedTimestamp<NaiveDateTime>>(datums)
1971            }
1972            AggregateFunc::MinTimestampTz => {
1973                min_datum::<'a, I, CheckedTimestamp<DateTime<Utc>>>(datums)
1974            }
1975            AggregateFunc::MinInterval => min_datum::<'a, I, Interval>(datums),
1976            AggregateFunc::MinTime => min_datum::<'a, I, NaiveTime>(datums),
1977            AggregateFunc::SumInt16 => sum_datum::<'a, I, i16, i64>(datums),
1978            AggregateFunc::SumInt32 => sum_datum::<'a, I, i32, i64>(datums),
1979            AggregateFunc::SumInt64 => sum_datum::<'a, I, i64, i128>(datums),
1980            AggregateFunc::SumUInt16 => sum_datum::<'a, I, u16, u64>(datums),
1981            AggregateFunc::SumUInt32 => sum_datum::<'a, I, u32, u64>(datums),
1982            AggregateFunc::SumUInt64 => sum_datum::<'a, I, u64, u128>(datums),
1983            AggregateFunc::SumFloat32 => sum_datum::<'a, I, f32, f32>(datums),
1984            AggregateFunc::SumFloat64 => sum_datum::<'a, I, f64, f64>(datums),
1985            AggregateFunc::SumNumeric => sum_numeric(datums),
1986            AggregateFunc::Count => count(datums),
1987            AggregateFunc::Any => any(datums),
1988            AggregateFunc::All => all(datums),
1989            AggregateFunc::JsonbAgg { order_by } => jsonb_agg(datums, temp_storage, order_by),
1990            AggregateFunc::MapAgg { order_by, .. } | AggregateFunc::JsonbObjectAgg { order_by } => {
1991                dict_agg(datums, temp_storage, order_by)
1992            }
1993            AggregateFunc::ArrayConcat { order_by } => array_concat(datums, temp_storage, order_by),
1994            AggregateFunc::ListConcat { order_by } => list_concat(datums, temp_storage, order_by),
1995            AggregateFunc::StringAgg { order_by } => string_agg(datums, temp_storage, order_by),
1996            AggregateFunc::RowNumber { order_by } => row_number(datums, temp_storage, order_by),
1997            AggregateFunc::Rank { order_by } => rank(datums, temp_storage, order_by),
1998            AggregateFunc::DenseRank { order_by } => dense_rank(datums, temp_storage, order_by),
1999            AggregateFunc::LagLead {
2000                order_by,
2001                lag_lead: lag_lead_type,
2002                ignore_nulls,
2003            } => lag_lead(datums, temp_storage, order_by, lag_lead_type, ignore_nulls),
2004            AggregateFunc::FirstValue {
2005                order_by,
2006                window_frame,
2007            } => first_value(datums, temp_storage, order_by, window_frame),
2008            AggregateFunc::LastValue {
2009                order_by,
2010                window_frame,
2011            } => last_value(datums, temp_storage, order_by, window_frame),
2012            AggregateFunc::WindowAggregate {
2013                wrapped_aggregate,
2014                order_by,
2015                window_frame,
2016            } => window_aggr::<_, NaiveOneByOneAggr>(
2017                datums,
2018                temp_storage,
2019                wrapped_aggregate,
2020                order_by,
2021                window_frame,
2022            ),
2023            AggregateFunc::FusedValueWindowFunc { funcs, order_by } => {
2024                fused_value_window_func(datums, temp_storage, funcs, order_by)
2025            }
2026            AggregateFunc::FusedWindowAggregate {
2027                wrapped_aggregates,
2028                order_by,
2029                window_frame,
2030            } => fused_window_aggr::<_, NaiveOneByOneAggr>(
2031                datums,
2032                temp_storage,
2033                wrapped_aggregates,
2034                order_by,
2035                window_frame,
2036            ),
2037            AggregateFunc::Dummy => Datum::Dummy,
2038        }
2039    }
2040
2041    /// Like `eval`, but it's given a [OneByOneAggr]. If `self` is a `WindowAggregate`, then
2042    /// the given [OneByOneAggr] will be used to evaluate the wrapped aggregate inside the
2043    /// `WindowAggregate`. If `self` is not a `WindowAggregate`, then it simply calls `eval`.
2044    pub fn eval_with_fast_window_agg<'a, I, W>(
2045        &self,
2046        datums: I,
2047        temp_storage: &'a RowArena,
2048    ) -> Datum<'a>
2049    where
2050        I: IntoIterator<Item = Datum<'a>>,
2051        W: OneByOneAggr,
2052    {
2053        match self {
2054            AggregateFunc::WindowAggregate {
2055                wrapped_aggregate,
2056                order_by,
2057                window_frame,
2058            } => window_aggr::<_, W>(
2059                datums,
2060                temp_storage,
2061                wrapped_aggregate,
2062                order_by,
2063                window_frame,
2064            ),
2065            AggregateFunc::FusedWindowAggregate {
2066                wrapped_aggregates,
2067                order_by,
2068                window_frame,
2069            } => fused_window_aggr::<_, W>(
2070                datums,
2071                temp_storage,
2072                wrapped_aggregates,
2073                order_by,
2074                window_frame,
2075            ),
2076            _ => self.eval(datums, temp_storage),
2077        }
2078    }
2079
2080    pub fn eval_with_unnest_list<'a, I, W>(
2081        &self,
2082        datums: I,
2083        temp_storage: &'a RowArena,
2084    ) -> impl Iterator<Item = Datum<'a>>
2085    where
2086        I: IntoIterator<Item = Datum<'a>>,
2087        W: OneByOneAggr,
2088    {
2089        // TODO: Use `enum_dispatch` to construct a unified iterator instead of `collect_vec`.
2090        assert!(self.can_fuse_with_unnest_list());
2091        match self {
2092            AggregateFunc::RowNumber { order_by } => {
2093                row_number_no_list(datums, temp_storage, order_by).collect_vec()
2094            }
2095            AggregateFunc::Rank { order_by } => {
2096                rank_no_list(datums, temp_storage, order_by).collect_vec()
2097            }
2098            AggregateFunc::DenseRank { order_by } => {
2099                dense_rank_no_list(datums, temp_storage, order_by).collect_vec()
2100            }
2101            AggregateFunc::LagLead {
2102                order_by,
2103                lag_lead: lag_lead_type,
2104                ignore_nulls,
2105            } => lag_lead_no_list(datums, temp_storage, order_by, lag_lead_type, ignore_nulls)
2106                .collect_vec(),
2107            AggregateFunc::FirstValue {
2108                order_by,
2109                window_frame,
2110            } => first_value_no_list(datums, temp_storage, order_by, window_frame).collect_vec(),
2111            AggregateFunc::LastValue {
2112                order_by,
2113                window_frame,
2114            } => last_value_no_list(datums, temp_storage, order_by, window_frame).collect_vec(),
2115            AggregateFunc::FusedValueWindowFunc { funcs, order_by } => {
2116                fused_value_window_func_no_list(datums, temp_storage, funcs, order_by).collect_vec()
2117            }
2118            AggregateFunc::WindowAggregate {
2119                wrapped_aggregate,
2120                order_by,
2121                window_frame,
2122            } => window_aggr_no_list::<_, W>(
2123                datums,
2124                temp_storage,
2125                wrapped_aggregate,
2126                order_by,
2127                window_frame,
2128            )
2129            .collect_vec(),
2130            AggregateFunc::FusedWindowAggregate {
2131                wrapped_aggregates,
2132                order_by,
2133                window_frame,
2134            } => fused_window_aggr_no_list::<_, W>(
2135                datums,
2136                temp_storage,
2137                wrapped_aggregates,
2138                order_by,
2139                window_frame,
2140            )
2141            .collect_vec(),
2142            _ => unreachable!("asserted above that `can_fuse_with_unnest_list`"),
2143        }
2144        .into_iter()
2145    }
2146
2147    /// Returns the output of the aggregation function when applied on an empty
2148    /// input relation.
2149    pub fn default(&self) -> Datum<'static> {
2150        match self {
2151            AggregateFunc::Count => Datum::Int64(0),
2152            AggregateFunc::Any => Datum::False,
2153            AggregateFunc::All => Datum::True,
2154            AggregateFunc::Dummy => Datum::Dummy,
2155            _ => Datum::Null,
2156        }
2157    }
2158
2159    /// Returns a datum whose inclusion in the aggregation will not change its
2160    /// result.
2161    pub fn identity_datum(&self) -> Datum<'static> {
2162        match self {
2163            AggregateFunc::Any => Datum::False,
2164            AggregateFunc::All => Datum::True,
2165            AggregateFunc::Dummy => Datum::Dummy,
2166            AggregateFunc::ArrayConcat { .. } => Datum::empty_array(),
2167            AggregateFunc::ListConcat { .. } => Datum::empty_list(),
2168            AggregateFunc::RowNumber { .. }
2169            | AggregateFunc::Rank { .. }
2170            | AggregateFunc::DenseRank { .. }
2171            | AggregateFunc::LagLead { .. }
2172            | AggregateFunc::FirstValue { .. }
2173            | AggregateFunc::LastValue { .. }
2174            | AggregateFunc::WindowAggregate { .. }
2175            | AggregateFunc::FusedValueWindowFunc { .. }
2176            | AggregateFunc::FusedWindowAggregate { .. } => Datum::empty_list(),
2177            AggregateFunc::MaxNumeric
2178            | AggregateFunc::MaxInt16
2179            | AggregateFunc::MaxInt32
2180            | AggregateFunc::MaxInt64
2181            | AggregateFunc::MaxUInt16
2182            | AggregateFunc::MaxUInt32
2183            | AggregateFunc::MaxUInt64
2184            | AggregateFunc::MaxMzTimestamp
2185            | AggregateFunc::MaxFloat32
2186            | AggregateFunc::MaxFloat64
2187            | AggregateFunc::MaxBool
2188            | AggregateFunc::MaxString
2189            | AggregateFunc::MaxDate
2190            | AggregateFunc::MaxTimestamp
2191            | AggregateFunc::MaxTimestampTz
2192            | AggregateFunc::MaxInterval
2193            | AggregateFunc::MaxTime
2194            | AggregateFunc::MinNumeric
2195            | AggregateFunc::MinInt16
2196            | AggregateFunc::MinInt32
2197            | AggregateFunc::MinInt64
2198            | AggregateFunc::MinUInt16
2199            | AggregateFunc::MinUInt32
2200            | AggregateFunc::MinUInt64
2201            | AggregateFunc::MinMzTimestamp
2202            | AggregateFunc::MinFloat32
2203            | AggregateFunc::MinFloat64
2204            | AggregateFunc::MinBool
2205            | AggregateFunc::MinString
2206            | AggregateFunc::MinDate
2207            | AggregateFunc::MinTimestamp
2208            | AggregateFunc::MinTimestampTz
2209            | AggregateFunc::MinInterval
2210            | AggregateFunc::MinTime
2211            | AggregateFunc::SumInt16
2212            | AggregateFunc::SumInt32
2213            | AggregateFunc::SumInt64
2214            | AggregateFunc::SumUInt16
2215            | AggregateFunc::SumUInt32
2216            | AggregateFunc::SumUInt64
2217            | AggregateFunc::SumFloat32
2218            | AggregateFunc::SumFloat64
2219            | AggregateFunc::SumNumeric
2220            | AggregateFunc::Count
2221            | AggregateFunc::JsonbAgg { .. }
2222            | AggregateFunc::JsonbObjectAgg { .. }
2223            | AggregateFunc::MapAgg { .. }
2224            | AggregateFunc::StringAgg { .. } => Datum::Null,
2225        }
2226    }
2227
2228    pub fn can_fuse_with_unnest_list(&self) -> bool {
2229        match self {
2230            AggregateFunc::RowNumber { .. }
2231            | AggregateFunc::Rank { .. }
2232            | AggregateFunc::DenseRank { .. }
2233            | AggregateFunc::LagLead { .. }
2234            | AggregateFunc::FirstValue { .. }
2235            | AggregateFunc::LastValue { .. }
2236            | AggregateFunc::WindowAggregate { .. }
2237            | AggregateFunc::FusedValueWindowFunc { .. }
2238            | AggregateFunc::FusedWindowAggregate { .. } => true,
2239            AggregateFunc::ArrayConcat { .. }
2240            | AggregateFunc::ListConcat { .. }
2241            | AggregateFunc::Any
2242            | AggregateFunc::All
2243            | AggregateFunc::Dummy
2244            | AggregateFunc::MaxNumeric
2245            | AggregateFunc::MaxInt16
2246            | AggregateFunc::MaxInt32
2247            | AggregateFunc::MaxInt64
2248            | AggregateFunc::MaxUInt16
2249            | AggregateFunc::MaxUInt32
2250            | AggregateFunc::MaxUInt64
2251            | AggregateFunc::MaxMzTimestamp
2252            | AggregateFunc::MaxFloat32
2253            | AggregateFunc::MaxFloat64
2254            | AggregateFunc::MaxBool
2255            | AggregateFunc::MaxString
2256            | AggregateFunc::MaxDate
2257            | AggregateFunc::MaxTimestamp
2258            | AggregateFunc::MaxTimestampTz
2259            | AggregateFunc::MaxInterval
2260            | AggregateFunc::MaxTime
2261            | AggregateFunc::MinNumeric
2262            | AggregateFunc::MinInt16
2263            | AggregateFunc::MinInt32
2264            | AggregateFunc::MinInt64
2265            | AggregateFunc::MinUInt16
2266            | AggregateFunc::MinUInt32
2267            | AggregateFunc::MinUInt64
2268            | AggregateFunc::MinMzTimestamp
2269            | AggregateFunc::MinFloat32
2270            | AggregateFunc::MinFloat64
2271            | AggregateFunc::MinBool
2272            | AggregateFunc::MinString
2273            | AggregateFunc::MinDate
2274            | AggregateFunc::MinTimestamp
2275            | AggregateFunc::MinTimestampTz
2276            | AggregateFunc::MinInterval
2277            | AggregateFunc::MinTime
2278            | AggregateFunc::SumInt16
2279            | AggregateFunc::SumInt32
2280            | AggregateFunc::SumInt64
2281            | AggregateFunc::SumUInt16
2282            | AggregateFunc::SumUInt32
2283            | AggregateFunc::SumUInt64
2284            | AggregateFunc::SumFloat32
2285            | AggregateFunc::SumFloat64
2286            | AggregateFunc::SumNumeric
2287            | AggregateFunc::Count
2288            | AggregateFunc::JsonbAgg { .. }
2289            | AggregateFunc::JsonbObjectAgg { .. }
2290            | AggregateFunc::MapAgg { .. }
2291            | AggregateFunc::StringAgg { .. } => false,
2292        }
2293    }
2294
2295    /// The output column type for the result of an aggregation.
2296    ///
2297    /// The output column type also contains nullability information, which
2298    /// is (without further information) true for aggregations that are not
2299    /// counts.
2300    pub fn output_type(&self, input_type: SqlColumnType) -> SqlColumnType {
2301        let scalar_type = match self {
2302            AggregateFunc::Count => SqlScalarType::Int64,
2303            AggregateFunc::Any => SqlScalarType::Bool,
2304            AggregateFunc::All => SqlScalarType::Bool,
2305            AggregateFunc::JsonbAgg { .. } => SqlScalarType::Jsonb,
2306            AggregateFunc::JsonbObjectAgg { .. } => SqlScalarType::Jsonb,
2307            AggregateFunc::SumInt16 => SqlScalarType::Int64,
2308            AggregateFunc::SumInt32 => SqlScalarType::Int64,
2309            AggregateFunc::SumInt64 => SqlScalarType::Numeric {
2310                max_scale: Some(NumericMaxScale::ZERO),
2311            },
2312            AggregateFunc::SumUInt16 => SqlScalarType::UInt64,
2313            AggregateFunc::SumUInt32 => SqlScalarType::UInt64,
2314            AggregateFunc::SumUInt64 => SqlScalarType::Numeric {
2315                max_scale: Some(NumericMaxScale::ZERO),
2316            },
2317            AggregateFunc::MapAgg { value_type, .. } => SqlScalarType::Map {
2318                value_type: Box::new(value_type.clone()),
2319                custom_id: None,
2320            },
2321            AggregateFunc::ArrayConcat { .. } | AggregateFunc::ListConcat { .. } => {
2322                match input_type.scalar_type {
2323                    // The input is wrapped in a Record if there's an ORDER BY, so extract it out.
2324                    SqlScalarType::Record { ref fields, .. } => fields[0].1.scalar_type.clone(),
2325                    _ => unreachable!(),
2326                }
2327            }
2328            AggregateFunc::StringAgg { .. } => SqlScalarType::String,
2329            AggregateFunc::RowNumber { .. } => {
2330                AggregateFunc::output_type_ranking_window_funcs(&input_type, "?row_number?")
2331            }
2332            AggregateFunc::Rank { .. } => {
2333                AggregateFunc::output_type_ranking_window_funcs(&input_type, "?rank?")
2334            }
2335            AggregateFunc::DenseRank { .. } => {
2336                AggregateFunc::output_type_ranking_window_funcs(&input_type, "?dense_rank?")
2337            }
2338            AggregateFunc::LagLead { lag_lead: lag_lead_type, .. } => {
2339                // The input type for Lag is ((OriginalRow, EncodedArgs), OrderByExprs...)
2340                let fields = input_type.scalar_type.unwrap_record_element_type();
2341                let original_row_type = fields[0].unwrap_record_element_type()[0]
2342                    .clone()
2343                    .nullable(false);
2344                let output_type_inner = Self::lag_lead_output_type_inner_from_encoded_args(fields[0].unwrap_record_element_type()[1]);
2345                let column_name = Self::lag_lead_result_column_name(lag_lead_type);
2346
2347                SqlScalarType::List {
2348                    element_type: Box::new(SqlScalarType::Record {
2349                        fields: [
2350                            (column_name, output_type_inner),
2351                            (ColumnName::from("?orig_row?"), original_row_type),
2352                        ].into(),
2353                        custom_id: None,
2354                    }),
2355                    custom_id: None,
2356                }
2357            }
2358            AggregateFunc::FirstValue { .. } => {
2359                // The input type for FirstValue is ((OriginalRow, Arg), OrderByExprs...)
2360                let fields = input_type.scalar_type.unwrap_record_element_type();
2361                let original_row_type = fields[0].unwrap_record_element_type()[0]
2362                    .clone()
2363                    .nullable(false);
2364                let value_type = fields[0].unwrap_record_element_type()[1]
2365                    .clone()
2366                    .nullable(true); // null when the partition is empty
2367
2368                SqlScalarType::List {
2369                    element_type: Box::new(SqlScalarType::Record {
2370                        fields: [
2371                            (ColumnName::from("?first_value?"), value_type),
2372                            (ColumnName::from("?orig_row?"), original_row_type),
2373                        ].into(),
2374                        custom_id: None,
2375                    }),
2376                    custom_id: None,
2377                }
2378            }
2379            AggregateFunc::LastValue { .. } => {
2380                // The input type for LastValue is ((OriginalRow, Arg), OrderByExprs...)
2381                let fields = input_type.scalar_type.unwrap_record_element_type();
2382                let original_row_type = fields[0].unwrap_record_element_type()[0]
2383                    .clone()
2384                    .nullable(false);
2385                let value_type = fields[0].unwrap_record_element_type()[1]
2386                    .clone()
2387                    .nullable(true); // null when the partition is empty
2388
2389                SqlScalarType::List {
2390                    element_type: Box::new(SqlScalarType::Record {
2391                        fields: [
2392                            (ColumnName::from("?last_value?"), value_type),
2393                            (ColumnName::from("?orig_row?"), original_row_type),
2394                        ].into(),
2395                        custom_id: None,
2396                    }),
2397                    custom_id: None,
2398                }
2399            }
2400            AggregateFunc::WindowAggregate {
2401                wrapped_aggregate, ..
2402            } => {
2403                // The input type for a window aggregate is ((OriginalRow, Arg), OrderByExprs...)
2404                let fields = input_type.scalar_type.unwrap_record_element_type();
2405                let original_row_type = fields[0].unwrap_record_element_type()[0]
2406                    .clone()
2407                    .nullable(false);
2408                let arg_type = fields[0].unwrap_record_element_type()[1]
2409                    .clone()
2410                    .nullable(true);
2411                let wrapped_aggr_out_type = wrapped_aggregate.output_type(arg_type);
2412
2413                SqlScalarType::List {
2414                    element_type: Box::new(SqlScalarType::Record {
2415                        fields: [
2416                            (ColumnName::from("?window_agg?"), wrapped_aggr_out_type),
2417                            (ColumnName::from("?orig_row?"), original_row_type),
2418                        ].into(),
2419                        custom_id: None,
2420                    }),
2421                    custom_id: None,
2422                }
2423            }
2424            AggregateFunc::FusedWindowAggregate {
2425                wrapped_aggregates, ..
2426            } => {
2427                // The input type for a fused window aggregate is ((OriginalRow, Args), OrderByExprs...)
2428                // where `Args` is a record.
2429                let fields = input_type.scalar_type.unwrap_record_element_type();
2430                let original_row_type = fields[0].unwrap_record_element_type()[0]
2431                    .clone()
2432                    .nullable(false);
2433                let args_type = fields[0].unwrap_record_element_type()[1];
2434                let arg_types = args_type.unwrap_record_element_type();
2435                let out_fields = arg_types.iter().zip_eq(wrapped_aggregates).map(|(arg_type, wrapped_agg)| {
2436                    (
2437                        ColumnName::from(wrapped_agg.name()),
2438                        wrapped_agg.output_type((**arg_type).clone().nullable(true)),
2439                    )
2440                }).collect_vec();
2441
2442                SqlScalarType::List {
2443                    element_type: Box::new(SqlScalarType::Record {
2444                        fields: [
2445                            (ColumnName::from("?fused_window_agg?"), SqlScalarType::Record {
2446                                fields: out_fields.into(),
2447                                custom_id: None,
2448                            }.nullable(false)),
2449                            (ColumnName::from("?orig_row?"), original_row_type),
2450                        ].into(),
2451                        custom_id: None,
2452                    }),
2453                    custom_id: None,
2454                }
2455            }
2456            AggregateFunc::FusedValueWindowFunc { funcs, order_by: _ } => {
2457                // The input type is ((OriginalRow, EncodedArgs), OrderByExprs...)
2458                // where EncodedArgs is a record, where each element is the argument to one of the
2459                // function calls that got fused. This is a record for lag/lead, and a simple type
2460                // for first_value/last_value.
2461                let fields = input_type.scalar_type.unwrap_record_element_type();
2462                let original_row_type = fields[0].unwrap_record_element_type()[0]
2463                    .clone()
2464                    .nullable(false);
2465                let encoded_args_type = fields[0].unwrap_record_element_type()[1].unwrap_record_element_type();
2466
2467                SqlScalarType::List {
2468                    element_type: Box::new(SqlScalarType::Record {
2469                        fields: [
2470                            (ColumnName::from("?fused_value_window_func?"), SqlScalarType::Record {
2471                                fields: encoded_args_type.into_iter().zip_eq(funcs).map(|(arg_type, func)| {
2472                                    match func {
2473                                        AggregateFunc::LagLead { lag_lead: lag_lead_type, .. } => {
2474                                            (
2475                                                Self::lag_lead_result_column_name(lag_lead_type),
2476                                                Self::lag_lead_output_type_inner_from_encoded_args(arg_type)
2477                                            )
2478                                        },
2479                                        AggregateFunc::FirstValue { .. } => {
2480                                            (
2481                                                ColumnName::from("?first_value?"),
2482                                                arg_type.clone().nullable(true),
2483                                            )
2484                                        }
2485                                        AggregateFunc::LastValue { .. } => {
2486                                            (
2487                                                ColumnName::from("?last_value?"),
2488                                                arg_type.clone().nullable(true),
2489                                            )
2490                                        }
2491                                        _ => panic!("FusedValueWindowFunc has an unknown function"),
2492                                    }
2493                                }).collect(),
2494                                custom_id: None,
2495                            }.nullable(false)),
2496                            (ColumnName::from("?orig_row?"), original_row_type),
2497                        ].into(),
2498                        custom_id: None,
2499                    }),
2500                    custom_id: None,
2501                }
2502            }
2503            AggregateFunc::Dummy
2504            | AggregateFunc::MaxNumeric
2505            | AggregateFunc::MaxInt16
2506            | AggregateFunc::MaxInt32
2507            | AggregateFunc::MaxInt64
2508            | AggregateFunc::MaxUInt16
2509            | AggregateFunc::MaxUInt32
2510            | AggregateFunc::MaxUInt64
2511            | AggregateFunc::MaxMzTimestamp
2512            | AggregateFunc::MaxFloat32
2513            | AggregateFunc::MaxFloat64
2514            | AggregateFunc::MaxBool
2515            // Note AggregateFunc::MaxString, MinString rely on returning input
2516            // type as output type to support the proper return type for
2517            // character input.
2518            | AggregateFunc::MaxString
2519            | AggregateFunc::MaxDate
2520            | AggregateFunc::MaxTimestamp
2521            | AggregateFunc::MaxTimestampTz
2522            | AggregateFunc::MaxInterval
2523            | AggregateFunc::MaxTime
2524            | AggregateFunc::MinNumeric
2525            | AggregateFunc::MinInt16
2526            | AggregateFunc::MinInt32
2527            | AggregateFunc::MinInt64
2528            | AggregateFunc::MinUInt16
2529            | AggregateFunc::MinUInt32
2530            | AggregateFunc::MinUInt64
2531            | AggregateFunc::MinMzTimestamp
2532            | AggregateFunc::MinFloat32
2533            | AggregateFunc::MinFloat64
2534            | AggregateFunc::MinBool
2535            | AggregateFunc::MinString
2536            | AggregateFunc::MinDate
2537            | AggregateFunc::MinTimestamp
2538            | AggregateFunc::MinTimestampTz
2539            | AggregateFunc::MinInterval
2540            | AggregateFunc::MinTime
2541            | AggregateFunc::SumFloat32
2542            | AggregateFunc::SumFloat64
2543            | AggregateFunc::SumNumeric => input_type.scalar_type.clone(),
2544        };
2545        // Count never produces null, and other aggregations only produce
2546        // null in the presence of null inputs.
2547        let nullable = match self {
2548            AggregateFunc::Count => false,
2549            // Use the nullability of the underlying column being aggregated, not the Records wrapping it
2550            AggregateFunc::StringAgg { .. } => match input_type.scalar_type {
2551                // The outer Record wraps the input in the first position, and any ORDER BY expressions afterwards
2552                SqlScalarType::Record { fields, .. } => match &fields[0].1.scalar_type {
2553                    // The inner Record is a (value, separator) tuple
2554                    SqlScalarType::Record { fields, .. } => fields[0].1.nullable,
2555                    _ => unreachable!(),
2556                },
2557                _ => unreachable!(),
2558            },
2559            _ => input_type.nullable,
2560        };
2561        scalar_type.nullable(nullable)
2562    }
2563
2564    /// Compute output type for ROW_NUMBER, RANK, DENSE_RANK
2565    fn output_type_ranking_window_funcs(
2566        input_type: &SqlColumnType,
2567        col_name: &str,
2568    ) -> SqlScalarType {
2569        match input_type.scalar_type {
2570            SqlScalarType::Record { ref fields, .. } => SqlScalarType::List {
2571                element_type: Box::new(SqlScalarType::Record {
2572                    fields: [
2573                        (
2574                            ColumnName::from(col_name),
2575                            SqlScalarType::Int64.nullable(false),
2576                        ),
2577                        (ColumnName::from("?orig_row?"), {
2578                            let inner = match &fields[0].1.scalar_type {
2579                                SqlScalarType::List { element_type, .. } => element_type.clone(),
2580                                _ => unreachable!(),
2581                            };
2582                            inner.nullable(false)
2583                        }),
2584                    ]
2585                    .into(),
2586                    custom_id: None,
2587                }),
2588                custom_id: None,
2589            },
2590            _ => unreachable!(),
2591        }
2592    }
2593
2594    /// Given the `EncodedArgs` part of `((OriginalRow, EncodedArgs), OrderByExprs...)`,
2595    /// this computes the type of the first field of the output type. (The first field is the
2596    /// real result, the rest is the original row.)
2597    fn lag_lead_output_type_inner_from_encoded_args(
2598        encoded_args_type: &SqlScalarType,
2599    ) -> SqlColumnType {
2600        // lag/lead have 3 arguments, and the output type is
2601        // the same as the first of these, but always nullable. (It's null when the
2602        // lag/lead computation reaches over the bounds of the window partition.)
2603        encoded_args_type.unwrap_record_element_type()[0]
2604            .clone()
2605            .nullable(true)
2606    }
2607
2608    fn lag_lead_result_column_name(lag_lead_type: &LagLeadType) -> ColumnName {
2609        ColumnName::from(match lag_lead_type {
2610            LagLeadType::Lag => "?lag?",
2611            LagLeadType::Lead => "?lead?",
2612        })
2613    }
2614
2615    /// Returns true if the non-null constraint on the aggregation can be
2616    /// converted into a non-null constraint on its parameter expression, ie.
2617    /// whether the result of the aggregation is null if all the input values
2618    /// are null.
2619    pub fn propagates_nonnull_constraint(&self) -> bool {
2620        match self {
2621            AggregateFunc::MaxNumeric
2622            | AggregateFunc::MaxInt16
2623            | AggregateFunc::MaxInt32
2624            | AggregateFunc::MaxInt64
2625            | AggregateFunc::MaxUInt16
2626            | AggregateFunc::MaxUInt32
2627            | AggregateFunc::MaxUInt64
2628            | AggregateFunc::MaxMzTimestamp
2629            | AggregateFunc::MaxFloat32
2630            | AggregateFunc::MaxFloat64
2631            | AggregateFunc::MaxBool
2632            | AggregateFunc::MaxString
2633            | AggregateFunc::MaxDate
2634            | AggregateFunc::MaxTimestamp
2635            | AggregateFunc::MaxTimestampTz
2636            | AggregateFunc::MinNumeric
2637            | AggregateFunc::MinInt16
2638            | AggregateFunc::MinInt32
2639            | AggregateFunc::MinInt64
2640            | AggregateFunc::MinUInt16
2641            | AggregateFunc::MinUInt32
2642            | AggregateFunc::MinUInt64
2643            | AggregateFunc::MinMzTimestamp
2644            | AggregateFunc::MinFloat32
2645            | AggregateFunc::MinFloat64
2646            | AggregateFunc::MinBool
2647            | AggregateFunc::MinString
2648            | AggregateFunc::MinDate
2649            | AggregateFunc::MinTimestamp
2650            | AggregateFunc::MinTimestampTz
2651            | AggregateFunc::SumInt16
2652            | AggregateFunc::SumInt32
2653            | AggregateFunc::SumInt64
2654            | AggregateFunc::SumUInt16
2655            | AggregateFunc::SumUInt32
2656            | AggregateFunc::SumUInt64
2657            | AggregateFunc::SumFloat32
2658            | AggregateFunc::SumFloat64
2659            | AggregateFunc::SumNumeric
2660            | AggregateFunc::StringAgg { .. } => true,
2661            // Count is never null
2662            AggregateFunc::Count => false,
2663            _ => false,
2664        }
2665    }
2666}
2667
2668fn jsonb_each<'a>(
2669    a: Datum<'a>,
2670    temp_storage: &'a RowArena,
2671    stringify: bool,
2672) -> impl Iterator<Item = (Row, Diff)> + 'a {
2673    // First produce a map, so that a common iterator can be returned.
2674    let map = match a {
2675        Datum::Map(dict) => dict,
2676        _ => mz_repr::DatumMap::empty(),
2677    };
2678
2679    map.iter().map(move |(k, mut v)| {
2680        if stringify {
2681            v = jsonb_stringify(v, temp_storage);
2682        }
2683        (Row::pack_slice(&[Datum::String(k), v]), Diff::ONE)
2684    })
2685}
2686
2687fn jsonb_object_keys<'a>(a: Datum<'a>) -> impl Iterator<Item = (Row, Diff)> + 'a {
2688    let map = match a {
2689        Datum::Map(dict) => dict,
2690        _ => mz_repr::DatumMap::empty(),
2691    };
2692
2693    map.iter()
2694        .map(move |(k, _)| (Row::pack_slice(&[Datum::String(k)]), Diff::ONE))
2695}
2696
2697fn jsonb_array_elements<'a>(
2698    a: Datum<'a>,
2699    temp_storage: &'a RowArena,
2700    stringify: bool,
2701) -> impl Iterator<Item = (Row, Diff)> + 'a {
2702    let list = match a {
2703        Datum::List(list) => list,
2704        _ => mz_repr::DatumList::empty(),
2705    };
2706    list.iter().map(move |mut e| {
2707        if stringify {
2708            e = jsonb_stringify(e, temp_storage);
2709        }
2710        (Row::pack_slice(&[e]), Diff::ONE)
2711    })
2712}
2713
2714fn regexp_extract(a: Datum, r: &AnalyzedRegex) -> Option<(Row, Diff)> {
2715    let r = r.inner();
2716    let a = a.unwrap_str();
2717    let captures = r.captures(a)?;
2718    let datums = captures
2719        .iter()
2720        .skip(1)
2721        .map(|m| Datum::from(m.map(|m| m.as_str())));
2722    Some((Row::pack(datums), Diff::ONE))
2723}
2724
2725fn regexp_matches<'a, 'r: 'a>(
2726    exprs: &[Datum<'a>],
2727) -> Result<impl Iterator<Item = (Row, Diff)> + 'a, EvalError> {
2728    // There are only two acceptable ways to call this function:
2729    // 1. regexp_matches(string, regex)
2730    // 2. regexp_matches(string, regex, flag)
2731    assert!(exprs.len() == 2 || exprs.len() == 3);
2732    let a = exprs[0].unwrap_str();
2733    let r = exprs[1].unwrap_str();
2734
2735    let (regex, opts) = if exprs.len() == 3 {
2736        let flag = exprs[2].unwrap_str();
2737        let opts = AnalyzedRegexOpts::from_str(flag)?;
2738        (AnalyzedRegex::new(r, opts)?, opts)
2739    } else {
2740        let opts = AnalyzedRegexOpts::default();
2741        (AnalyzedRegex::new(r, opts)?, opts)
2742    };
2743
2744    let regex = regex.inner().clone();
2745
2746    let iter = regex.captures_iter(a).map(move |captures| {
2747        let matches = captures
2748            .iter()
2749            // The first match is the *entire* match, we want the capture groups by themselves.
2750            .skip(1)
2751            .map(|m| Datum::from(m.map(|m| m.as_str())))
2752            .collect::<Vec<_>>();
2753
2754        let mut binding = SharedRow::get();
2755        let mut packer = binding.packer();
2756
2757        let dimension = ArrayDimension {
2758            lower_bound: 1,
2759            length: matches.len(),
2760        };
2761        packer
2762            .try_push_array(&[dimension], matches)
2763            .expect("generated dimensions above");
2764
2765        (binding.clone(), Diff::ONE)
2766    });
2767
2768    // This is slightly unfortunate, but we need to collect the captures into a
2769    // Vec before we can yield them, because we can't return a iter with a
2770    // reference to the local `regex` variable.
2771    // We attempt to minimize the cost of this by using a SmallVec.
2772    let out = iter.collect::<SmallVec<[_; 3]>>();
2773
2774    if opts.global {
2775        Ok(Either::Left(out.into_iter()))
2776    } else {
2777        Ok(Either::Right(out.into_iter().take(1)))
2778    }
2779}
2780
2781fn generate_series<N>(
2782    start: N,
2783    stop: N,
2784    step: N,
2785) -> Result<impl Iterator<Item = (Row, Diff)>, EvalError>
2786where
2787    N: Integer + Signed + CheckedAdd + Clone,
2788    Datum<'static>: From<N>,
2789{
2790    if step == N::zero() {
2791        return Err(EvalError::InvalidParameterValue(
2792            "step size cannot equal zero".into(),
2793        ));
2794    }
2795    Ok(num::range_step_inclusive(start, stop, step)
2796        .map(move |i| (Row::pack_slice(&[Datum::from(i)]), Diff::ONE)))
2797}
2798
2799/// Like
2800/// [`num::range_step_inclusive`](https://github.com/rust-num/num-iter/blob/ddb14c1e796d401014c6c7a727de61d8109ad986/src/lib.rs#L279),
2801/// but for our timestamp types using [`Interval`] for `step`.xwxw
2802#[derive(Clone)]
2803pub struct TimestampRangeStepInclusive<T> {
2804    state: CheckedTimestamp<T>,
2805    stop: CheckedTimestamp<T>,
2806    step: Interval,
2807    rev: bool,
2808    done: bool,
2809}
2810
2811impl<T: TimestampLike> Iterator for TimestampRangeStepInclusive<T> {
2812    type Item = CheckedTimestamp<T>;
2813
2814    #[inline]
2815    fn next(&mut self) -> Option<CheckedTimestamp<T>> {
2816        if !self.done
2817            && ((self.rev && self.state >= self.stop) || (!self.rev && self.state <= self.stop))
2818        {
2819            let result = self.state.clone();
2820            match add_timestamp_months(self.state.deref(), self.step.months) {
2821                Ok(state) => match state.checked_add_signed(self.step.duration_as_chrono()) {
2822                    Some(v) => match CheckedTimestamp::from_timestamplike(v) {
2823                        Ok(v) => self.state = v,
2824                        Err(_) => self.done = true,
2825                    },
2826                    None => self.done = true,
2827                },
2828                Err(..) => {
2829                    self.done = true;
2830                }
2831            }
2832
2833            Some(result)
2834        } else {
2835            None
2836        }
2837    }
2838}
2839
2840fn generate_series_ts<T: TimestampLike>(
2841    start: CheckedTimestamp<T>,
2842    stop: CheckedTimestamp<T>,
2843    step: Interval,
2844    conv: fn(CheckedTimestamp<T>) -> Datum<'static>,
2845) -> Result<impl Iterator<Item = (Row, Diff)>, EvalError> {
2846    let normalized_step = step.as_microseconds();
2847    if normalized_step == 0 {
2848        return Err(EvalError::InvalidParameterValue(
2849            "step size cannot equal zero".into(),
2850        ));
2851    }
2852    let rev = normalized_step < 0;
2853
2854    let trsi = TimestampRangeStepInclusive {
2855        state: start,
2856        stop,
2857        step,
2858        rev,
2859        done: false,
2860    };
2861
2862    Ok(trsi.map(move |i| (Row::pack_slice(&[conv(i)]), Diff::ONE)))
2863}
2864
2865fn generate_subscripts_array(
2866    a: Datum,
2867    dim: i32,
2868) -> Result<Box<dyn Iterator<Item = (Row, Diff)>>, EvalError> {
2869    if dim <= 0 {
2870        return Ok(Box::new(iter::empty()));
2871    }
2872
2873    match a.unwrap_array().dims().into_iter().nth(
2874        (dim - 1)
2875            .try_into()
2876            .map_err(|_| EvalError::Int32OutOfRange((dim - 1).to_string().into()))?,
2877    ) {
2878        Some(requested_dim) => Ok(Box::new(generate_series::<i32>(
2879            requested_dim.lower_bound.try_into().map_err(|_| {
2880                EvalError::Int32OutOfRange(requested_dim.lower_bound.to_string().into())
2881            })?,
2882            requested_dim
2883                .length
2884                .try_into()
2885                .map_err(|_| EvalError::Int32OutOfRange(requested_dim.length.to_string().into()))?,
2886            1,
2887        )?)),
2888        None => Ok(Box::new(iter::empty())),
2889    }
2890}
2891
2892fn unnest_array<'a>(a: Datum<'a>) -> impl Iterator<Item = (Row, Diff)> + 'a {
2893    a.unwrap_array()
2894        .elements()
2895        .iter()
2896        .map(move |e| (Row::pack_slice(&[e]), Diff::ONE))
2897}
2898
2899fn unnest_list<'a>(a: Datum<'a>) -> impl Iterator<Item = (Row, Diff)> + 'a {
2900    a.unwrap_list()
2901        .iter()
2902        .map(move |e| (Row::pack_slice(&[e]), Diff::ONE))
2903}
2904
2905fn unnest_map<'a>(a: Datum<'a>) -> impl Iterator<Item = (Row, Diff)> + 'a {
2906    a.unwrap_map()
2907        .iter()
2908        .map(move |(k, v)| (Row::pack_slice(&[Datum::from(k), v]), Diff::ONE))
2909}
2910
2911impl AggregateFunc {
2912    /// The base function name without the `~[...]` suffix used when rendering
2913    /// variants that represent a parameterized function family.
2914    pub fn name(&self) -> &'static str {
2915        match self {
2916            Self::MaxNumeric => "max",
2917            Self::MaxInt16 => "max",
2918            Self::MaxInt32 => "max",
2919            Self::MaxInt64 => "max",
2920            Self::MaxUInt16 => "max",
2921            Self::MaxUInt32 => "max",
2922            Self::MaxUInt64 => "max",
2923            Self::MaxMzTimestamp => "max",
2924            Self::MaxFloat32 => "max",
2925            Self::MaxFloat64 => "max",
2926            Self::MaxBool => "max",
2927            Self::MaxString => "max",
2928            Self::MaxDate => "max",
2929            Self::MaxTimestamp => "max",
2930            Self::MaxTimestampTz => "max",
2931            Self::MaxInterval => "max",
2932            Self::MaxTime => "max",
2933            Self::MinNumeric => "min",
2934            Self::MinInt16 => "min",
2935            Self::MinInt32 => "min",
2936            Self::MinInt64 => "min",
2937            Self::MinUInt16 => "min",
2938            Self::MinUInt32 => "min",
2939            Self::MinUInt64 => "min",
2940            Self::MinMzTimestamp => "min",
2941            Self::MinFloat32 => "min",
2942            Self::MinFloat64 => "min",
2943            Self::MinBool => "min",
2944            Self::MinString => "min",
2945            Self::MinDate => "min",
2946            Self::MinTimestamp => "min",
2947            Self::MinTimestampTz => "min",
2948            Self::MinInterval => "min",
2949            Self::MinTime => "min",
2950            Self::SumInt16 => "sum",
2951            Self::SumInt32 => "sum",
2952            Self::SumInt64 => "sum",
2953            Self::SumUInt16 => "sum",
2954            Self::SumUInt32 => "sum",
2955            Self::SumUInt64 => "sum",
2956            Self::SumFloat32 => "sum",
2957            Self::SumFloat64 => "sum",
2958            Self::SumNumeric => "sum",
2959            Self::Count => "count",
2960            Self::Any => "any",
2961            Self::All => "all",
2962            Self::JsonbAgg { .. } => "jsonb_agg",
2963            Self::JsonbObjectAgg { .. } => "jsonb_object_agg",
2964            Self::MapAgg { .. } => "map_agg",
2965            Self::ArrayConcat { .. } => "array_agg",
2966            Self::ListConcat { .. } => "list_agg",
2967            Self::StringAgg { .. } => "string_agg",
2968            Self::RowNumber { .. } => "row_number",
2969            Self::Rank { .. } => "rank",
2970            Self::DenseRank { .. } => "dense_rank",
2971            Self::LagLead {
2972                lag_lead: LagLeadType::Lag,
2973                ..
2974            } => "lag",
2975            Self::LagLead {
2976                lag_lead: LagLeadType::Lead,
2977                ..
2978            } => "lead",
2979            Self::FirstValue { .. } => "first_value",
2980            Self::LastValue { .. } => "last_value",
2981            Self::WindowAggregate { .. } => "window_agg",
2982            Self::FusedValueWindowFunc { .. } => "fused_value_window_func",
2983            Self::FusedWindowAggregate { .. } => "fused_window_agg",
2984            Self::Dummy => "dummy",
2985        }
2986    }
2987}
2988
2989impl<'a, M> fmt::Display for HumanizedExpr<'a, AggregateFunc, M>
2990where
2991    M: HumanizerMode,
2992{
2993    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
2994        use AggregateFunc::*;
2995        let name = self.expr.name();
2996        match self.expr {
2997            JsonbAgg { order_by }
2998            | JsonbObjectAgg { order_by }
2999            | MapAgg { order_by, .. }
3000            | ArrayConcat { order_by }
3001            | ListConcat { order_by }
3002            | StringAgg { order_by }
3003            | RowNumber { order_by }
3004            | Rank { order_by }
3005            | DenseRank { order_by } => {
3006                let order_by = order_by.iter().map(|col| self.child(col));
3007                write!(f, "{}[order_by=[{}]]", name, separated(", ", order_by))
3008            }
3009            LagLead {
3010                lag_lead: _,
3011                ignore_nulls,
3012                order_by,
3013            } => {
3014                let order_by = order_by.iter().map(|col| self.child(col));
3015                f.write_str(name)?;
3016                f.write_str("[")?;
3017                if *ignore_nulls {
3018                    f.write_str("ignore_nulls=true, ")?;
3019                }
3020                write!(f, "order_by=[{}]", separated(", ", order_by))?;
3021                f.write_str("]")
3022            }
3023            FirstValue {
3024                order_by,
3025                window_frame,
3026            } => {
3027                let order_by = order_by.iter().map(|col| self.child(col));
3028                f.write_str(name)?;
3029                f.write_str("[")?;
3030                write!(f, "order_by=[{}]", separated(", ", order_by))?;
3031                if *window_frame != WindowFrame::default() {
3032                    write!(f, " {}", window_frame)?;
3033                }
3034                f.write_str("]")
3035            }
3036            LastValue {
3037                order_by,
3038                window_frame,
3039            } => {
3040                let order_by = order_by.iter().map(|col| self.child(col));
3041                f.write_str(name)?;
3042                f.write_str("[")?;
3043                write!(f, "order_by=[{}]", separated(", ", order_by))?;
3044                if *window_frame != WindowFrame::default() {
3045                    write!(f, " {}", window_frame)?;
3046                }
3047                f.write_str("]")
3048            }
3049            WindowAggregate {
3050                wrapped_aggregate,
3051                order_by,
3052                window_frame,
3053            } => {
3054                let order_by = order_by.iter().map(|col| self.child(col));
3055                let wrapped_aggregate = self.child(wrapped_aggregate.deref());
3056                f.write_str(name)?;
3057                f.write_str("[")?;
3058                write!(f, "{} ", wrapped_aggregate)?;
3059                write!(f, "order_by=[{}]", separated(", ", order_by))?;
3060                if *window_frame != WindowFrame::default() {
3061                    write!(f, " {}", window_frame)?;
3062                }
3063                f.write_str("]")
3064            }
3065            FusedValueWindowFunc { funcs, order_by } => {
3066                let order_by = order_by.iter().map(|col| self.child(col));
3067                let funcs = separated(", ", funcs.iter().map(|func| self.child(func)));
3068                f.write_str(name)?;
3069                f.write_str("[")?;
3070                write!(f, "{} ", funcs)?;
3071                write!(f, "order_by=[{}]", separated(", ", order_by))?;
3072                f.write_str("]")
3073            }
3074            _ => f.write_str(name),
3075        }
3076    }
3077}
3078
3079#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize, Hash, MzReflect)]
3080pub struct CaptureGroupDesc {
3081    pub index: u32,
3082    pub name: Option<String>,
3083    pub nullable: bool,
3084}
3085
3086#[derive(
3087    Clone,
3088    Copy,
3089    Debug,
3090    Eq,
3091    PartialEq,
3092    Ord,
3093    PartialOrd,
3094    Serialize,
3095    Deserialize,
3096    Hash,
3097    MzReflect,
3098    Default,
3099)]
3100pub struct AnalyzedRegexOpts {
3101    pub case_insensitive: bool,
3102    pub global: bool,
3103}
3104
3105impl FromStr for AnalyzedRegexOpts {
3106    type Err = EvalError;
3107
3108    fn from_str(s: &str) -> Result<Self, Self::Err> {
3109        let mut opts = AnalyzedRegexOpts::default();
3110        for c in s.chars() {
3111            match c {
3112                'i' => opts.case_insensitive = true,
3113                'g' => opts.global = true,
3114                _ => return Err(EvalError::InvalidRegexFlag(c)),
3115            }
3116        }
3117        Ok(opts)
3118    }
3119}
3120
3121#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize, Hash, MzReflect)]
3122pub struct AnalyzedRegex(ReprRegex, Vec<CaptureGroupDesc>, AnalyzedRegexOpts);
3123
3124impl AnalyzedRegex {
3125    pub fn new(s: &str, opts: AnalyzedRegexOpts) -> Result<Self, regex::Error> {
3126        let r = ReprRegex::new(s, opts.case_insensitive)?;
3127        // TODO(benesch): remove potentially dangerous usage of `as`.
3128        #[allow(clippy::as_conversions)]
3129        let descs: Vec<_> = r
3130            .capture_names()
3131            .enumerate()
3132            // The first capture is the entire matched string.
3133            // This will often not be useful, so skip it.
3134            // If people want it they can just surround their
3135            // entire regex in an explicit capture group.
3136            .skip(1)
3137            .map(|(i, name)| CaptureGroupDesc {
3138                index: i as u32,
3139                name: name.map(String::from),
3140                // TODO -- we can do better.
3141                // https://github.com/MaterializeInc/database-issues/issues/612
3142                nullable: true,
3143            })
3144            .collect();
3145        Ok(Self(r, descs, opts))
3146    }
3147    pub fn capture_groups_len(&self) -> usize {
3148        self.1.len()
3149    }
3150    pub fn capture_groups_iter(&self) -> impl Iterator<Item = &CaptureGroupDesc> {
3151        self.1.iter()
3152    }
3153    pub fn inner(&self) -> &Regex {
3154        &(self.0).regex
3155    }
3156    pub fn opts(&self) -> &AnalyzedRegexOpts {
3157        &self.2
3158    }
3159}
3160
3161pub fn csv_extract(a: Datum<'_>, n_cols: usize) -> impl Iterator<Item = (Row, Diff)> + '_ {
3162    let bytes = a.unwrap_str().as_bytes();
3163    let mut row = Row::default();
3164    let csv_reader = csv::ReaderBuilder::new()
3165        .has_headers(false)
3166        .from_reader(bytes);
3167    csv_reader.into_records().filter_map(move |res| match res {
3168        Ok(sr) if sr.len() == n_cols => {
3169            row.packer().extend(sr.iter().map(Datum::String));
3170            Some((row.clone(), Diff::ONE))
3171        }
3172        _ => None,
3173    })
3174}
3175
3176pub fn repeat(a: Datum) -> Option<(Row, Diff)> {
3177    let n = a.unwrap_int64();
3178    if n != 0 {
3179        Some((Row::default(), n.into()))
3180    } else {
3181        None
3182    }
3183}
3184
3185fn wrap<'a>(datums: &'a [Datum<'a>], width: usize) -> impl Iterator<Item = (Row, Diff)> + 'a {
3186    datums
3187        .chunks(width)
3188        .map(|chunk| (Row::pack(chunk), Diff::ONE))
3189}
3190
3191fn acl_explode<'a>(
3192    acl_items: Datum<'a>,
3193    temp_storage: &'a RowArena,
3194) -> Result<impl Iterator<Item = (Row, Diff)> + 'a, EvalError> {
3195    let acl_items = acl_items.unwrap_array();
3196    let mut res = Vec::new();
3197    for acl_item in acl_items.elements().iter() {
3198        if acl_item.is_null() {
3199            return Err(EvalError::AclArrayNullElement);
3200        }
3201        let acl_item = acl_item.unwrap_acl_item();
3202        for privilege in acl_item.acl_mode.explode() {
3203            let row = [
3204                Datum::UInt32(acl_item.grantor.0),
3205                Datum::UInt32(acl_item.grantee.0),
3206                Datum::String(temp_storage.push_string(privilege.to_string())),
3207                // GRANT OPTION is not implemented, so we hardcode false.
3208                Datum::False,
3209            ];
3210            res.push((Row::pack_slice(&row), Diff::ONE));
3211        }
3212    }
3213    Ok(res.into_iter())
3214}
3215
3216fn mz_acl_explode<'a>(
3217    mz_acl_items: Datum<'a>,
3218    temp_storage: &'a RowArena,
3219) -> Result<impl Iterator<Item = (Row, Diff)> + 'a, EvalError> {
3220    let mz_acl_items = mz_acl_items.unwrap_array();
3221    let mut res = Vec::new();
3222    for mz_acl_item in mz_acl_items.elements().iter() {
3223        if mz_acl_item.is_null() {
3224            return Err(EvalError::MzAclArrayNullElement);
3225        }
3226        let mz_acl_item = mz_acl_item.unwrap_mz_acl_item();
3227        for privilege in mz_acl_item.acl_mode.explode() {
3228            let row = [
3229                Datum::String(temp_storage.push_string(mz_acl_item.grantor.to_string())),
3230                Datum::String(temp_storage.push_string(mz_acl_item.grantee.to_string())),
3231                Datum::String(temp_storage.push_string(privilege.to_string())),
3232                // GRANT OPTION is not implemented, so we hardcode false.
3233                Datum::False,
3234            ];
3235            res.push((Row::pack_slice(&row), Diff::ONE));
3236        }
3237    }
3238    Ok(res.into_iter())
3239}
3240
3241/// When adding a new `TableFunc` variant, please consider adding it to
3242/// `TableFunc::with_ordinality`!
3243#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize, Hash, MzReflect)]
3244pub enum TableFunc {
3245    AclExplode,
3246    MzAclExplode,
3247    JsonbEach {
3248        stringify: bool,
3249    },
3250    JsonbObjectKeys,
3251    JsonbArrayElements {
3252        stringify: bool,
3253    },
3254    RegexpExtract(AnalyzedRegex),
3255    CsvExtract(usize),
3256    GenerateSeriesInt32,
3257    GenerateSeriesInt64,
3258    GenerateSeriesTimestamp,
3259    GenerateSeriesTimestampTz,
3260    /// Supplied with an input count,
3261    ///   1. Adds a column as if a typed subquery result,
3262    ///   2. Filters the row away if the count is only one,
3263    ///   3. Errors if the count is not exactly one.
3264    /// The intent is that this presents as if a subquery result with too many
3265    /// records contributing. The error column has the same type as the result
3266    /// should have, but we only produce it if the count exceeds one.
3267    ///
3268    /// This logic could nearly be achieved with map, filter, project logic,
3269    /// but has been challenging to do in a way that respects the vagaries of
3270    /// SQL and our semantics. If we reveal a constant value in the column we
3271    /// risk the optimizer pruning the branch; if we reveal that this will not
3272    /// produce rows we risk the optimizer pruning the branch; if we reveal that
3273    /// the only possible value is an error we risk the optimizer propagating that
3274    /// error without guards.
3275    ///
3276    /// Before replacing this by an `MirScalarExpr`, quadruple check that it
3277    /// would not result in misoptimizations due to expression evaluation order
3278    /// being utterly undefined, and predicate pushdown trimming any fragments
3279    /// that might produce columns that will not be needed.
3280    GuardSubquerySize {
3281        column_type: SqlScalarType,
3282    },
3283    Repeat,
3284    UnnestArray {
3285        el_typ: SqlScalarType,
3286    },
3287    UnnestList {
3288        el_typ: SqlScalarType,
3289    },
3290    UnnestMap {
3291        value_type: SqlScalarType,
3292    },
3293    /// Given `n` input expressions, wraps them into `n / width` rows, each of
3294    /// `width` columns.
3295    ///
3296    /// This function is not intended to be called directly by end users, but
3297    /// is useful in the planning of e.g. VALUES clauses.
3298    Wrap {
3299        types: Vec<SqlColumnType>,
3300        width: usize,
3301    },
3302    GenerateSubscriptsArray,
3303    /// Execute some arbitrary scalar function as a table function.
3304    TabletizedScalar {
3305        name: String,
3306        relation: SqlRelationType,
3307    },
3308    RegexpMatches,
3309    /// Implements the WITH ORDINALITY clause.
3310    ///
3311    /// Don't construct `TableFunc::WithOrdinality` manually! Use the `with_ordinality` constructor
3312    /// function instead, which checks whether the given table function supports `WithOrdinality`.
3313    #[allow(private_interfaces)]
3314    WithOrdinality(WithOrdinality),
3315}
3316
3317/// Evaluates the inner table function, expands its results into unary (repeating each row as
3318/// many times as the diff indicates), and appends an integer corresponding to the ordinal
3319/// position (starting from 1). For example, it numbers the elements of a list when calling
3320/// `unnest_list`.
3321///
3322/// Private enum variant of `TableFunc`. Don't construct this directly, but use
3323/// `TableFunc::with_ordinality` instead.
3324#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize, Hash, MzReflect)]
3325struct WithOrdinality {
3326    inner: Box<TableFunc>,
3327}
3328
3329impl TableFunc {
3330    /// Adds `WITH ORDINALITY` to a table function if it's allowed on the given table function.
3331    pub fn with_ordinality(inner: TableFunc) -> Option<TableFunc> {
3332        match inner {
3333            TableFunc::AclExplode
3334            | TableFunc::MzAclExplode
3335            | TableFunc::JsonbEach { .. }
3336            | TableFunc::JsonbObjectKeys
3337            | TableFunc::JsonbArrayElements { .. }
3338            | TableFunc::RegexpExtract(_)
3339            | TableFunc::CsvExtract(_)
3340            | TableFunc::GenerateSeriesInt32
3341            | TableFunc::GenerateSeriesInt64
3342            | TableFunc::GenerateSeriesTimestamp
3343            | TableFunc::GenerateSeriesTimestampTz
3344            | TableFunc::GuardSubquerySize { .. }
3345            | TableFunc::Repeat
3346            | TableFunc::UnnestArray { .. }
3347            | TableFunc::UnnestList { .. }
3348            | TableFunc::UnnestMap { .. }
3349            | TableFunc::Wrap { .. }
3350            | TableFunc::GenerateSubscriptsArray
3351            | TableFunc::TabletizedScalar { .. }
3352            | TableFunc::RegexpMatches => Some(TableFunc::WithOrdinality(WithOrdinality {
3353                inner: Box::new(inner),
3354            })),
3355            // IMPORTANT: Before adding a new table function here, consider negative diffs:
3356            // `WithOrdinality::eval` will panic if the inner table function emits a negative diff.
3357            _ => None,
3358        }
3359    }
3360}
3361
3362impl TableFunc {
3363    /// Executes `self` on the given input row (`datums`).
3364    pub fn eval<'a>(
3365        &'a self,
3366        datums: &'a [Datum<'a>],
3367        temp_storage: &'a RowArena,
3368    ) -> Result<Box<dyn Iterator<Item = (Row, Diff)> + 'a>, EvalError> {
3369        if self.empty_on_null_input() && datums.iter().any(|d| d.is_null()) {
3370            return Ok(Box::new(vec![].into_iter()));
3371        }
3372        match self {
3373            TableFunc::AclExplode => Ok(Box::new(acl_explode(datums[0], temp_storage)?)),
3374            TableFunc::MzAclExplode => Ok(Box::new(mz_acl_explode(datums[0], temp_storage)?)),
3375            TableFunc::JsonbEach { stringify } => {
3376                Ok(Box::new(jsonb_each(datums[0], temp_storage, *stringify)))
3377            }
3378            TableFunc::JsonbObjectKeys => Ok(Box::new(jsonb_object_keys(datums[0]))),
3379            TableFunc::JsonbArrayElements { stringify } => Ok(Box::new(jsonb_array_elements(
3380                datums[0],
3381                temp_storage,
3382                *stringify,
3383            ))),
3384            TableFunc::RegexpExtract(a) => Ok(Box::new(regexp_extract(datums[0], a).into_iter())),
3385            TableFunc::CsvExtract(n_cols) => Ok(Box::new(csv_extract(datums[0], *n_cols))),
3386            TableFunc::GenerateSeriesInt32 => {
3387                let res = generate_series(
3388                    datums[0].unwrap_int32(),
3389                    datums[1].unwrap_int32(),
3390                    datums[2].unwrap_int32(),
3391                )?;
3392                Ok(Box::new(res))
3393            }
3394            TableFunc::GenerateSeriesInt64 => {
3395                let res = generate_series(
3396                    datums[0].unwrap_int64(),
3397                    datums[1].unwrap_int64(),
3398                    datums[2].unwrap_int64(),
3399                )?;
3400                Ok(Box::new(res))
3401            }
3402            TableFunc::GenerateSeriesTimestamp => {
3403                fn pass_through<'a>(d: CheckedTimestamp<NaiveDateTime>) -> Datum<'a> {
3404                    Datum::from(d)
3405                }
3406                let res = generate_series_ts(
3407                    datums[0].unwrap_timestamp(),
3408                    datums[1].unwrap_timestamp(),
3409                    datums[2].unwrap_interval(),
3410                    pass_through,
3411                )?;
3412                Ok(Box::new(res))
3413            }
3414            TableFunc::GenerateSeriesTimestampTz => {
3415                fn gen_ts_tz<'a>(d: CheckedTimestamp<DateTime<Utc>>) -> Datum<'a> {
3416                    Datum::from(d)
3417                }
3418                let res = generate_series_ts(
3419                    datums[0].unwrap_timestamptz(),
3420                    datums[1].unwrap_timestamptz(),
3421                    datums[2].unwrap_interval(),
3422                    gen_ts_tz,
3423                )?;
3424                Ok(Box::new(res))
3425            }
3426            TableFunc::GenerateSubscriptsArray => {
3427                generate_subscripts_array(datums[0], datums[1].unwrap_int32())
3428            }
3429            TableFunc::GuardSubquerySize { column_type: _ } => {
3430                // We error if the count is not one,
3431                // and produce no rows if equal to one.
3432                let count = datums[0].unwrap_int64();
3433                if count != 1 {
3434                    Err(EvalError::MultipleRowsFromSubquery)
3435                } else {
3436                    Ok(Box::new([].into_iter()))
3437                }
3438            }
3439            TableFunc::Repeat => Ok(Box::new(repeat(datums[0]).into_iter())),
3440            TableFunc::UnnestArray { .. } => Ok(Box::new(unnest_array(datums[0]))),
3441            TableFunc::UnnestList { .. } => Ok(Box::new(unnest_list(datums[0]))),
3442            TableFunc::UnnestMap { .. } => Ok(Box::new(unnest_map(datums[0]))),
3443            TableFunc::Wrap { width, .. } => Ok(Box::new(wrap(datums, *width))),
3444            TableFunc::TabletizedScalar { .. } => {
3445                let r = Row::pack_slice(datums);
3446                Ok(Box::new(std::iter::once((r, Diff::ONE))))
3447            }
3448            TableFunc::RegexpMatches => Ok(Box::new(regexp_matches(datums)?)),
3449            TableFunc::WithOrdinality(func_with_ordinality) => {
3450                func_with_ordinality.eval(datums, temp_storage)
3451            }
3452        }
3453    }
3454
3455    pub fn output_type(&self) -> SqlRelationType {
3456        let (column_types, keys) = match self {
3457            TableFunc::AclExplode => {
3458                let column_types = vec![
3459                    SqlScalarType::Oid.nullable(false),
3460                    SqlScalarType::Oid.nullable(false),
3461                    SqlScalarType::String.nullable(false),
3462                    SqlScalarType::Bool.nullable(false),
3463                ];
3464                let keys = vec![];
3465                (column_types, keys)
3466            }
3467            TableFunc::MzAclExplode => {
3468                let column_types = vec![
3469                    SqlScalarType::String.nullable(false),
3470                    SqlScalarType::String.nullable(false),
3471                    SqlScalarType::String.nullable(false),
3472                    SqlScalarType::Bool.nullable(false),
3473                ];
3474                let keys = vec![];
3475                (column_types, keys)
3476            }
3477            TableFunc::JsonbEach { stringify: true } => {
3478                let column_types = vec![
3479                    SqlScalarType::String.nullable(false),
3480                    SqlScalarType::String.nullable(true),
3481                ];
3482                let keys = vec![];
3483                (column_types, keys)
3484            }
3485            TableFunc::JsonbEach { stringify: false } => {
3486                let column_types = vec![
3487                    SqlScalarType::String.nullable(false),
3488                    SqlScalarType::Jsonb.nullable(false),
3489                ];
3490                let keys = vec![];
3491                (column_types, keys)
3492            }
3493            TableFunc::JsonbObjectKeys => {
3494                let column_types = vec![SqlScalarType::String.nullable(false)];
3495                let keys = vec![];
3496                (column_types, keys)
3497            }
3498            TableFunc::JsonbArrayElements { stringify: true } => {
3499                let column_types = vec![SqlScalarType::String.nullable(true)];
3500                let keys = vec![];
3501                (column_types, keys)
3502            }
3503            TableFunc::JsonbArrayElements { stringify: false } => {
3504                let column_types = vec![SqlScalarType::Jsonb.nullable(false)];
3505                let keys = vec![];
3506                (column_types, keys)
3507            }
3508            TableFunc::RegexpExtract(a) => {
3509                let column_types = a
3510                    .capture_groups_iter()
3511                    .map(|cg| SqlScalarType::String.nullable(cg.nullable))
3512                    .collect();
3513                let keys = vec![];
3514                (column_types, keys)
3515            }
3516            TableFunc::CsvExtract(n_cols) => {
3517                let column_types = iter::repeat(SqlScalarType::String.nullable(false))
3518                    .take(*n_cols)
3519                    .collect();
3520                let keys = vec![];
3521                (column_types, keys)
3522            }
3523            TableFunc::GenerateSeriesInt32 => {
3524                let column_types = vec![SqlScalarType::Int32.nullable(false)];
3525                let keys = vec![vec![0]];
3526                (column_types, keys)
3527            }
3528            TableFunc::GenerateSeriesInt64 => {
3529                let column_types = vec![SqlScalarType::Int64.nullable(false)];
3530                let keys = vec![vec![0]];
3531                (column_types, keys)
3532            }
3533            TableFunc::GenerateSeriesTimestamp => {
3534                let column_types =
3535                    vec![SqlScalarType::Timestamp { precision: None }.nullable(false)];
3536                let keys = vec![vec![0]];
3537                (column_types, keys)
3538            }
3539            TableFunc::GenerateSeriesTimestampTz => {
3540                let column_types =
3541                    vec![SqlScalarType::TimestampTz { precision: None }.nullable(false)];
3542                let keys = vec![vec![0]];
3543                (column_types, keys)
3544            }
3545            TableFunc::GenerateSubscriptsArray => {
3546                let column_types = vec![SqlScalarType::Int32.nullable(false)];
3547                let keys = vec![vec![0]];
3548                (column_types, keys)
3549            }
3550            TableFunc::GuardSubquerySize { column_type } => {
3551                let column_types = vec![column_type.clone().nullable(false)];
3552                let keys = vec![];
3553                (column_types, keys)
3554            }
3555            TableFunc::Repeat => {
3556                let column_types = vec![];
3557                let keys = vec![];
3558                (column_types, keys)
3559            }
3560            TableFunc::UnnestArray { el_typ } => {
3561                let column_types = vec![el_typ.clone().nullable(true)];
3562                let keys = vec![];
3563                (column_types, keys)
3564            }
3565            TableFunc::UnnestList { el_typ } => {
3566                let column_types = vec![el_typ.clone().nullable(true)];
3567                let keys = vec![];
3568                (column_types, keys)
3569            }
3570            TableFunc::UnnestMap { value_type } => {
3571                let column_types = vec![
3572                    SqlScalarType::String.nullable(false),
3573                    value_type.clone().nullable(true),
3574                ];
3575                let keys = vec![vec![0]];
3576                (column_types, keys)
3577            }
3578            TableFunc::Wrap { types, .. } => {
3579                let column_types = types.clone();
3580                let keys = vec![];
3581                (column_types, keys)
3582            }
3583            TableFunc::TabletizedScalar { relation, .. } => {
3584                return relation.clone();
3585            }
3586            TableFunc::RegexpMatches => {
3587                let column_types =
3588                    vec![SqlScalarType::Array(Box::new(SqlScalarType::String)).nullable(false)];
3589                let keys = vec![];
3590
3591                (column_types, keys)
3592            }
3593            TableFunc::WithOrdinality(WithOrdinality { inner }) => {
3594                let mut typ = inner.output_type();
3595                // Add the ordinality column.
3596                typ.column_types.push(SqlScalarType::Int64.nullable(false));
3597                // The ordinality column is always a key.
3598                typ.keys.push(vec![typ.column_types.len() - 1]);
3599                (typ.column_types, typ.keys)
3600            }
3601        };
3602
3603        soft_assert_eq_no_log!(column_types.len(), self.output_arity());
3604
3605        if !keys.is_empty() {
3606            SqlRelationType::new(column_types).with_keys(keys)
3607        } else {
3608            SqlRelationType::new(column_types)
3609        }
3610    }
3611
3612    pub fn output_arity(&self) -> usize {
3613        match self {
3614            TableFunc::AclExplode => 4,
3615            TableFunc::MzAclExplode => 4,
3616            TableFunc::JsonbEach { .. } => 2,
3617            TableFunc::JsonbObjectKeys => 1,
3618            TableFunc::JsonbArrayElements { .. } => 1,
3619            TableFunc::RegexpExtract(a) => a.capture_groups_len(),
3620            TableFunc::CsvExtract(n_cols) => *n_cols,
3621            TableFunc::GenerateSeriesInt32 => 1,
3622            TableFunc::GenerateSeriesInt64 => 1,
3623            TableFunc::GenerateSeriesTimestamp => 1,
3624            TableFunc::GenerateSeriesTimestampTz => 1,
3625            TableFunc::GenerateSubscriptsArray => 1,
3626            TableFunc::GuardSubquerySize { .. } => 1,
3627            TableFunc::Repeat => 0,
3628            TableFunc::UnnestArray { .. } => 1,
3629            TableFunc::UnnestList { .. } => 1,
3630            TableFunc::UnnestMap { .. } => 2,
3631            TableFunc::Wrap { width, .. } => *width,
3632            TableFunc::TabletizedScalar { relation, .. } => relation.column_types.len(),
3633            TableFunc::RegexpMatches => 1,
3634            TableFunc::WithOrdinality(WithOrdinality { inner }) => inner.output_arity() + 1,
3635        }
3636    }
3637
3638    pub fn empty_on_null_input(&self) -> bool {
3639        match self {
3640            TableFunc::AclExplode
3641            | TableFunc::MzAclExplode
3642            | TableFunc::JsonbEach { .. }
3643            | TableFunc::JsonbObjectKeys
3644            | TableFunc::JsonbArrayElements { .. }
3645            | TableFunc::GenerateSeriesInt32
3646            | TableFunc::GenerateSeriesInt64
3647            | TableFunc::GenerateSeriesTimestamp
3648            | TableFunc::GenerateSeriesTimestampTz
3649            | TableFunc::GenerateSubscriptsArray
3650            | TableFunc::RegexpExtract(_)
3651            | TableFunc::CsvExtract(_)
3652            | TableFunc::Repeat
3653            | TableFunc::UnnestArray { .. }
3654            | TableFunc::UnnestList { .. }
3655            | TableFunc::UnnestMap { .. }
3656            | TableFunc::RegexpMatches => true,
3657            TableFunc::GuardSubquerySize { .. } => false,
3658            TableFunc::Wrap { .. } => false,
3659            TableFunc::TabletizedScalar { .. } => false,
3660            TableFunc::WithOrdinality(WithOrdinality { inner }) => inner.empty_on_null_input(),
3661        }
3662    }
3663
3664    /// True iff the table function preserves the append-only property of its input.
3665    pub fn preserves_monotonicity(&self) -> bool {
3666        // Most variants preserve monotonicity, but all variants are enumerated to
3667        // ensure that added variants at least check that this is the case.
3668        match self {
3669            TableFunc::AclExplode => false,
3670            TableFunc::MzAclExplode => false,
3671            TableFunc::JsonbEach { .. } => true,
3672            TableFunc::JsonbObjectKeys => true,
3673            TableFunc::JsonbArrayElements { .. } => true,
3674            TableFunc::RegexpExtract(_) => true,
3675            TableFunc::CsvExtract(_) => true,
3676            TableFunc::GenerateSeriesInt32 => true,
3677            TableFunc::GenerateSeriesInt64 => true,
3678            TableFunc::GenerateSeriesTimestamp => true,
3679            TableFunc::GenerateSeriesTimestampTz => true,
3680            TableFunc::GenerateSubscriptsArray => true,
3681            TableFunc::Repeat => false,
3682            TableFunc::UnnestArray { .. } => true,
3683            TableFunc::UnnestList { .. } => true,
3684            TableFunc::UnnestMap { .. } => true,
3685            TableFunc::Wrap { .. } => true,
3686            TableFunc::TabletizedScalar { .. } => true,
3687            TableFunc::RegexpMatches => true,
3688            TableFunc::GuardSubquerySize { .. } => false,
3689            TableFunc::WithOrdinality(WithOrdinality { inner }) => inner.preserves_monotonicity(),
3690        }
3691    }
3692}
3693
3694impl fmt::Display for TableFunc {
3695    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
3696        match self {
3697            TableFunc::AclExplode => f.write_str("aclexplode"),
3698            TableFunc::MzAclExplode => f.write_str("mz_aclexplode"),
3699            TableFunc::JsonbEach { .. } => f.write_str("jsonb_each"),
3700            TableFunc::JsonbObjectKeys => f.write_str("jsonb_object_keys"),
3701            TableFunc::JsonbArrayElements { .. } => f.write_str("jsonb_array_elements"),
3702            TableFunc::RegexpExtract(a) => write!(f, "regexp_extract({:?}, _)", a.0),
3703            TableFunc::CsvExtract(n_cols) => write!(f, "csv_extract({}, _)", n_cols),
3704            TableFunc::GenerateSeriesInt32 => f.write_str("generate_series"),
3705            TableFunc::GenerateSeriesInt64 => f.write_str("generate_series"),
3706            TableFunc::GenerateSeriesTimestamp => f.write_str("generate_series"),
3707            TableFunc::GenerateSeriesTimestampTz => f.write_str("generate_series"),
3708            TableFunc::GenerateSubscriptsArray => f.write_str("generate_subscripts"),
3709            TableFunc::GuardSubquerySize { .. } => f.write_str("guard_subquery_size"),
3710            TableFunc::Repeat => f.write_str("repeat_row"),
3711            TableFunc::UnnestArray { .. } => f.write_str("unnest_array"),
3712            TableFunc::UnnestList { .. } => f.write_str("unnest_list"),
3713            TableFunc::UnnestMap { .. } => f.write_str("unnest_map"),
3714            TableFunc::Wrap { width, .. } => write!(f, "wrap{}", width),
3715            TableFunc::TabletizedScalar { name, .. } => f.write_str(name),
3716            TableFunc::RegexpMatches => write!(f, "regexp_matches(_, _, _)"),
3717            TableFunc::WithOrdinality(WithOrdinality { inner }) => {
3718                write!(f, "{}[with_ordinality]", inner)
3719            }
3720        }
3721    }
3722}
3723
3724impl WithOrdinality {
3725    /// Executes the `self.inner` table function on the given input row (`datums`), and zips
3726    /// 1, 2, 3, ... to the result as a new column. We need to expand rows with non-1 diffs into the
3727    /// corresponding number of rows with unit diffs, because the ordinality column will have
3728    /// different values for each copy.
3729    ///
3730    /// # Panics
3731    ///
3732    /// Panics if the `inner` table function emits a negative diff.
3733    fn eval<'a>(
3734        &'a self,
3735        datums: &'a [Datum<'a>],
3736        temp_storage: &'a RowArena,
3737    ) -> Result<Box<dyn Iterator<Item = (Row, Diff)> + 'a>, EvalError> {
3738        let mut next_ordinal: i64 = 1;
3739        let it = self
3740            .inner
3741            .eval(datums, temp_storage)?
3742            .flat_map(move |(mut row, diff)| {
3743                let diff = diff.into_inner();
3744                // WITH ORDINALITY is not well-defined for negative diffs. This is ok, since the
3745                // only table function that can emit negative diffs is `repeat_row`, which is in
3746                // `mz_unsafe`, so users can never call it.
3747                //
3748                // (We also don't need to worry about negative diffs in FlatMap's input, because
3749                // the diff of the input of the FlatMap is factored in after we return from here.)
3750                assert!(diff >= 0);
3751                // The ordinals that will be associated with this row.
3752                let mut ordinals = next_ordinal..(next_ordinal + diff);
3753                next_ordinal += diff;
3754                // The maximum byte capacity we need for the original row and its ordinal.
3755                let cap = row.data_len() + datum_size(&Datum::Int64(next_ordinal));
3756                iter::from_fn(move || {
3757                    let ordinal = ordinals.next()?;
3758                    let mut row = if ordinals.is_empty() {
3759                        // This is the last row, so no need to clone. (Most table functions emit
3760                        // only 1 diffs, so this completely avoids cloning in most cases.)
3761                        std::mem::take(&mut row)
3762                    } else {
3763                        let mut new_row = Row::with_capacity(cap);
3764                        new_row.clone_from(&row);
3765                        new_row
3766                    };
3767                    RowPacker::for_existing_row(&mut row).push(Datum::Int64(ordinal));
3768                    Some((row, Diff::ONE))
3769                })
3770            });
3771        Ok(Box::new(it))
3772    }
3773}