1#![allow(missing_docs)]
11
12use std::cmp::{max, min};
13use std::iter::Sum;
14use std::ops::Deref;
15use std::str::FromStr;
16use std::{fmt, iter};
17
18use chrono::{DateTime, NaiveDateTime, NaiveTime, Utc};
19use dec::OrderedDecimal;
20use itertools::{Either, Itertools};
21use mz_lowertest::MzReflect;
22use mz_ore::cast::CastFrom;
23
24use mz_ore::str::separated;
25use mz_ore::{soft_assert_eq_no_log, soft_assert_or_log, soft_panic_or_log};
26use mz_repr::adt::array::ArrayDimension;
27use mz_repr::adt::date::Date;
28use mz_repr::adt::interval::Interval;
29use mz_repr::adt::numeric::{self, Numeric, NumericMaxScale};
30use mz_repr::adt::regex::{Regex as ReprRegex, RegexCompilationError};
31use mz_repr::adt::timestamp::{CheckedTimestamp, TimestampLike};
32use mz_repr::{
33 ColumnName, Datum, Diff, ReprColumnType, ReprRelationType, Row, RowArena, RowPacker, SharedRow,
34 SqlColumnType, SqlRelationType, SqlScalarType, datum_size,
35};
36use num::{CheckedAdd, Integer, Signed, ToPrimitive};
37use ordered_float::OrderedFloat;
38use regex::Regex;
39use serde::{Deserialize, Serialize};
40use smallvec::SmallVec;
41
42use crate::EvalError;
43use crate::WindowFrameBound::{
44 CurrentRow, OffsetFollowing, OffsetPreceding, UnboundedFollowing, UnboundedPreceding,
45};
46use crate::WindowFrameUnits::{Groups, Range, Rows};
47use crate::explain::{HumanizedExpr, HumanizerMode};
48use crate::relation::{
49 ColumnOrder, WindowFrame, WindowFrameBound, WindowFrameUnits, compare_columns,
50};
51use crate::scalar::func::{add_timestamp_months, jsonb_stringify};
52
53fn max_string<'a, I>(datums: I) -> Datum<'a>
57where
58 I: IntoIterator<Item = Datum<'a>>,
59{
60 match datums
61 .into_iter()
62 .filter(|d| !d.is_null())
63 .max_by(|a, b| a.unwrap_str().cmp(b.unwrap_str()))
64 {
65 Some(datum) => datum,
66 None => Datum::Null,
67 }
68}
69
70fn max_datum<'a, I, DatumType>(datums: I) -> Datum<'a>
71where
72 I: IntoIterator<Item = Datum<'a>>,
73 DatumType: TryFrom<Datum<'a>> + Ord,
74 <DatumType as TryFrom<Datum<'a>>>::Error: std::fmt::Debug,
75 Datum<'a>: From<Option<DatumType>>,
76{
77 let x: Option<DatumType> = datums
78 .into_iter()
79 .filter(|d| !d.is_null())
80 .map(|d| DatumType::try_from(d).expect("unexpected type"))
81 .max();
82
83 x.into()
84}
85
86fn min_datum<'a, I, DatumType>(datums: I) -> Datum<'a>
87where
88 I: IntoIterator<Item = Datum<'a>>,
89 DatumType: TryFrom<Datum<'a>> + Ord,
90 <DatumType as TryFrom<Datum<'a>>>::Error: std::fmt::Debug,
91 Datum<'a>: From<Option<DatumType>>,
92{
93 let x: Option<DatumType> = datums
94 .into_iter()
95 .filter(|d| !d.is_null())
96 .map(|d| DatumType::try_from(d).expect("unexpected type"))
97 .min();
98
99 x.into()
100}
101
102fn min_string<'a, I>(datums: I) -> Datum<'a>
103where
104 I: IntoIterator<Item = Datum<'a>>,
105{
106 match datums
107 .into_iter()
108 .filter(|d| !d.is_null())
109 .min_by(|a, b| a.unwrap_str().cmp(b.unwrap_str()))
110 {
111 Some(datum) => datum,
112 None => Datum::Null,
113 }
114}
115
116fn sum_datum<'a, I, DatumType, ResultType>(datums: I) -> Datum<'a>
117where
118 I: IntoIterator<Item = Datum<'a>>,
119 DatumType: TryFrom<Datum<'a>>,
120 <DatumType as TryFrom<Datum<'a>>>::Error: std::fmt::Debug,
121 ResultType: From<DatumType> + Sum + Into<Datum<'a>>,
122{
123 let mut datums = datums.into_iter().filter(|d| !d.is_null()).peekable();
124 if datums.peek().is_none() {
125 Datum::Null
126 } else {
127 let x = datums
128 .map(|d| ResultType::from(DatumType::try_from(d).expect("unexpected type")))
129 .sum::<ResultType>();
130 x.into()
131 }
132}
133
134fn sum_numeric<'a, I>(datums: I) -> Datum<'a>
135where
136 I: IntoIterator<Item = Datum<'a>>,
137{
138 let mut cx = numeric::cx_datum();
139 let mut sum = Numeric::zero();
140 let mut empty = true;
141 for d in datums {
142 if !d.is_null() {
143 empty = false;
144 cx.add(&mut sum, &d.unwrap_numeric().0);
145 }
146 }
147 match empty {
148 true => Datum::Null,
149 false => Datum::from(sum),
150 }
151}
152
153#[allow(clippy::as_conversions)]
155fn count<'a, I>(datums: I) -> Datum<'a>
156where
157 I: IntoIterator<Item = Datum<'a>>,
158{
159 let x: i64 = datums.into_iter().filter(|d| !d.is_null()).count() as i64;
161 Datum::from(x)
162}
163
164fn any<'a, I>(datums: I) -> Datum<'a>
165where
166 I: IntoIterator<Item = Datum<'a>>,
167{
168 datums
169 .into_iter()
170 .fold(Datum::False, |state, next| match (state, next) {
171 (Datum::True, _) | (_, Datum::True) => Datum::True,
172 (Datum::Null, _) | (_, Datum::Null) => Datum::Null,
173 _ => Datum::False,
174 })
175}
176
177fn all<'a, I>(datums: I) -> Datum<'a>
178where
179 I: IntoIterator<Item = Datum<'a>>,
180{
181 datums
182 .into_iter()
183 .fold(Datum::True, |state, next| match (state, next) {
184 (Datum::False, _) | (_, Datum::False) => Datum::False,
185 (Datum::Null, _) | (_, Datum::Null) => Datum::Null,
186 _ => Datum::True,
187 })
188}
189
190fn string_agg<'a, I>(datums: I, temp_storage: &'a RowArena, order_by: &[ColumnOrder]) -> Datum<'a>
191where
192 I: IntoIterator<Item = Datum<'a>>,
193{
194 const EMPTY_SEP: &str = "";
195
196 let datums = order_aggregate_datums(datums, order_by);
197 let mut sep_value_pairs = datums.into_iter().filter_map(|d| {
198 if d.is_null() {
199 return None;
200 }
201 let mut value_sep = d.unwrap_list().iter();
202 match (value_sep.next().unwrap(), value_sep.next().unwrap()) {
203 (Datum::Null, _) => None,
204 (Datum::String(val), Datum::Null) => Some((EMPTY_SEP, val)),
205 (Datum::String(val), Datum::String(sep)) => Some((sep, val)),
206 _ => unreachable!(),
207 }
208 });
209
210 let mut s = String::default();
211 match sep_value_pairs.next() {
212 Some((_, value)) => s.push_str(value),
214 None => return Datum::Null,
216 }
217
218 for (sep, value) in sep_value_pairs {
219 s.push_str(sep);
220 s.push_str(value);
221 }
222
223 Datum::String(temp_storage.push_string(s))
224}
225
226fn jsonb_agg<'a, I>(datums: I, temp_storage: &'a RowArena, order_by: &[ColumnOrder]) -> Datum<'a>
227where
228 I: IntoIterator<Item = Datum<'a>>,
229{
230 let datums = order_aggregate_datums(datums, order_by);
231 temp_storage.make_datum(|packer| {
232 packer.push_list(datums.into_iter().filter(|d| !d.is_null()));
233 })
234}
235
236fn dict_agg<'a, I>(datums: I, temp_storage: &'a RowArena, order_by: &[ColumnOrder]) -> Datum<'a>
237where
238 I: IntoIterator<Item = Datum<'a>>,
239{
240 let datums = order_aggregate_datums(datums, order_by);
241 temp_storage.make_datum(|packer| {
242 let mut datums: Vec<_> = datums
243 .into_iter()
244 .filter_map(|d| {
245 if d.is_null() {
246 return None;
247 }
248 let mut list = d.unwrap_list().iter();
249 let key = list.next().unwrap();
250 let val = list.next().unwrap();
251 if key.is_null() {
252 None
255 } else {
256 Some((key.unwrap_str(), val))
257 }
258 })
259 .collect();
260 datums.sort_by_key(|(k, _v)| *k);
266 datums.reverse();
267 datums.dedup_by_key(|(k, _v)| *k);
268 datums.reverse();
269 packer.push_dict(datums);
270 })
271}
272
273pub fn order_aggregate_datums<'a: 'b, 'b, I>(
283 datums: I,
284 order_by: &[ColumnOrder],
285) -> impl Iterator<Item = Datum<'b>>
286where
287 I: IntoIterator<Item = Datum<'a>>,
288{
289 order_aggregate_datums_with_rank_inner(datums, order_by)
290 .into_iter()
291 .map(|(payload, _order_datums)| payload)
293}
294
295fn order_aggregate_datums_with_rank<'a, I>(
298 datums: I,
299 order_by: &[ColumnOrder],
300) -> impl Iterator<Item = (Datum<'a>, Row)>
301where
302 I: IntoIterator<Item = Datum<'a>>,
303{
304 order_aggregate_datums_with_rank_inner(datums, order_by)
305 .into_iter()
306 .map(|(payload, order_by_datums)| (payload, Row::pack(order_by_datums)))
307}
308
309fn order_aggregate_datums_with_rank_inner<'a, I>(
310 datums: I,
311 order_by: &[ColumnOrder],
312) -> Vec<(Datum<'a>, Vec<Datum<'a>>)>
313where
314 I: IntoIterator<Item = Datum<'a>>,
315{
316 let mut decoded: Vec<(Datum, Vec<Datum>)> = datums
317 .into_iter()
318 .map(|d| {
319 let list = d.unwrap_list();
320 let mut list_it = list.iter();
321 let payload = list_it.next().unwrap();
322
323 let mut order_by_datums = Vec::with_capacity(order_by.len());
333 for _ in 0..order_by.len() {
334 order_by_datums.push(
335 list_it
336 .next()
337 .expect("must have exactly the same number of Datums as `order_by`"),
338 );
339 }
340
341 (payload, order_by_datums)
342 })
343 .collect();
344
345 let mut sort_by =
346 |(payload_left, left_order_by_datums): &(Datum, Vec<Datum>),
347 (payload_right, right_order_by_datums): &(Datum, Vec<Datum>)| {
348 compare_columns(
349 order_by,
350 left_order_by_datums,
351 right_order_by_datums,
352 || payload_left.cmp(payload_right),
353 )
354 };
355 decoded.sort_unstable_by(&mut sort_by);
360 decoded
361}
362
363fn array_concat<'a, I>(datums: I, temp_storage: &'a RowArena, order_by: &[ColumnOrder]) -> Datum<'a>
364where
365 I: IntoIterator<Item = Datum<'a>>,
366{
367 let datums = order_aggregate_datums(datums, order_by);
368 let datums: Vec<_> = datums
369 .into_iter()
370 .map(|d| d.unwrap_array().elements().iter())
371 .flatten()
372 .collect();
373 let dims = ArrayDimension {
374 lower_bound: 1,
375 length: datums.len(),
376 };
377 temp_storage.make_datum(|packer| {
378 packer.try_push_array(&[dims], datums).unwrap();
379 })
380}
381
382fn list_concat<'a, I>(datums: I, temp_storage: &'a RowArena, order_by: &[ColumnOrder]) -> Datum<'a>
383where
384 I: IntoIterator<Item = Datum<'a>>,
385{
386 let datums = order_aggregate_datums(datums, order_by);
387 temp_storage.make_datum(|packer| {
388 packer.push_list(datums.into_iter().map(|d| d.unwrap_list().iter()).flatten());
389 })
390}
391
392fn row_number<'a, I>(
396 datums: I,
397 callers_temp_storage: &'a RowArena,
398 order_by: &[ColumnOrder],
399) -> Datum<'a>
400where
401 I: IntoIterator<Item = Datum<'a>>,
402{
403 let temp_storage = RowArena::new();
407 let datums = row_number_no_list(datums, &temp_storage, order_by);
408
409 callers_temp_storage.make_datum(|packer| {
410 packer.push_list(datums);
411 })
412}
413
414fn row_number_no_list<'a: 'b, 'b, I>(
417 datums: I,
418 callers_temp_storage: &'b RowArena,
419 order_by: &[ColumnOrder],
420) -> impl Iterator<Item = Datum<'b>>
421where
422 I: IntoIterator<Item = Datum<'a>>,
423{
424 let datums = order_aggregate_datums(datums, order_by);
425
426 callers_temp_storage.reserve(datums.size_hint().0);
427 #[allow(clippy::disallowed_methods)]
428 datums
429 .into_iter()
430 .map(|d| d.unwrap_list().iter())
431 .flatten()
432 .zip(1i64..)
433 .map(|(d, i)| {
434 callers_temp_storage.make_datum(|packer| {
435 packer.push_list_with(|packer| {
436 packer.push(Datum::Int64(i));
437 packer.push(d);
438 });
439 })
440 })
441}
442
443fn rank<'a, I>(datums: I, callers_temp_storage: &'a RowArena, order_by: &[ColumnOrder]) -> Datum<'a>
447where
448 I: IntoIterator<Item = Datum<'a>>,
449{
450 let temp_storage = RowArena::new();
451 let datums = rank_no_list(datums, &temp_storage, order_by);
452
453 callers_temp_storage.make_datum(|packer| {
454 packer.push_list(datums);
455 })
456}
457
458fn rank_no_list<'a: 'b, 'b, I>(
461 datums: I,
462 callers_temp_storage: &'b RowArena,
463 order_by: &[ColumnOrder],
464) -> impl Iterator<Item = Datum<'b>>
465where
466 I: IntoIterator<Item = Datum<'a>>,
467{
468 let datums = order_aggregate_datums_with_rank(datums, order_by);
470
471 let mut datums = datums
472 .into_iter()
473 .map(|(d0, order_row)| {
474 d0.unwrap_list()
475 .iter()
476 .map(move |d1| (d1, order_row.clone()))
477 })
478 .flatten();
479
480 callers_temp_storage.reserve(datums.size_hint().0);
481 datums
482 .next()
483 .map_or(vec![], |(first_datum, first_order_row)| {
484 datums.fold(
487 (first_order_row, 1, 1, vec![(first_datum, 1)]),
488 |mut acc, (next_datum, next_order_row)| {
489 let (ref mut acc_row, ref mut acc_rank, ref mut acc_row_num, ref mut output) = acc;
490 *acc_row_num += 1;
491 if *acc_row != next_order_row {
493 *acc_rank = *acc_row_num;
494 *acc_row = next_order_row;
495 }
496
497 (*output).push((next_datum, *acc_rank));
498 acc
499 })
500 }.3).into_iter().map(|(d, i)| {
501 callers_temp_storage.make_datum(|packer| {
502 packer.push_list_with(|packer| {
503 packer.push(Datum::Int64(i));
504 packer.push(d);
505 });
506 })
507 })
508}
509
510fn dense_rank<'a, I>(
514 datums: I,
515 callers_temp_storage: &'a RowArena,
516 order_by: &[ColumnOrder],
517) -> Datum<'a>
518where
519 I: IntoIterator<Item = Datum<'a>>,
520{
521 let temp_storage = RowArena::new();
522 let datums = dense_rank_no_list(datums, &temp_storage, order_by);
523
524 callers_temp_storage.make_datum(|packer| {
525 packer.push_list(datums);
526 })
527}
528
529fn dense_rank_no_list<'a: 'b, 'b, I>(
532 datums: I,
533 callers_temp_storage: &'b RowArena,
534 order_by: &[ColumnOrder],
535) -> impl Iterator<Item = Datum<'b>>
536where
537 I: IntoIterator<Item = Datum<'a>>,
538{
539 let datums = order_aggregate_datums_with_rank(datums, order_by);
541
542 let mut datums = datums
543 .into_iter()
544 .map(|(d0, order_row)| {
545 d0.unwrap_list()
546 .iter()
547 .map(move |d1| (d1, order_row.clone()))
548 })
549 .flatten();
550
551 callers_temp_storage.reserve(datums.size_hint().0);
552 datums
553 .next()
554 .map_or(vec![], |(first_datum, first_order_row)| {
555 datums.fold(
558 (first_order_row, 1, vec![(first_datum, 1)]),
559 |mut acc, (next_datum, next_order_row)| {
560 let (ref mut acc_row, ref mut acc_rank, ref mut output) = acc;
561 if *acc_row != next_order_row {
563 *acc_rank += 1;
564 *acc_row = next_order_row;
565 }
566
567 (*output).push((next_datum, *acc_rank));
568 acc
569 })
570 }.2).into_iter().map(|(d, i)| {
571 callers_temp_storage.make_datum(|packer| {
572 packer.push_list_with(|packer| {
573 packer.push(Datum::Int64(i));
574 packer.push(d);
575 });
576 })
577 })
578}
579
580fn lag_lead<'a, I>(
602 datums: I,
603 callers_temp_storage: &'a RowArena,
604 order_by: &[ColumnOrder],
605 lag_lead_type: &LagLeadType,
606 ignore_nulls: &bool,
607) -> Datum<'a>
608where
609 I: IntoIterator<Item = Datum<'a>>,
610{
611 let temp_storage = RowArena::new();
612 let iter = lag_lead_no_list(datums, &temp_storage, order_by, lag_lead_type, ignore_nulls);
613 callers_temp_storage.make_datum(|packer| {
614 packer.push_list(iter);
615 })
616}
617
618fn lag_lead_no_list<'a: 'b, 'b, I>(
621 datums: I,
622 callers_temp_storage: &'b RowArena,
623 order_by: &[ColumnOrder],
624 lag_lead_type: &LagLeadType,
625 ignore_nulls: &bool,
626) -> impl Iterator<Item = Datum<'b>>
627where
628 I: IntoIterator<Item = Datum<'a>>,
629{
630 let datums = order_aggregate_datums(datums, order_by);
632
633 let (orig_rows, unwrapped_args): (Vec<_>, Vec<_>) = datums
637 .into_iter()
638 .map(|d| {
639 let mut iter = d.unwrap_list().iter();
640 let original_row = iter.next().unwrap();
641 let (input_value, offset, default_value) =
642 unwrap_lag_lead_encoded_args(iter.next().unwrap());
643 (original_row, (input_value, offset, default_value))
644 })
645 .unzip();
646
647 let result = lag_lead_inner(unwrapped_args, lag_lead_type, ignore_nulls);
648
649 callers_temp_storage.reserve(result.len());
650 result
651 .into_iter()
652 .zip_eq(orig_rows)
653 .map(|(result_value, original_row)| {
654 callers_temp_storage.make_datum(|packer| {
655 packer.push_list_with(|packer| {
656 packer.push(result_value);
657 packer.push(original_row);
658 });
659 })
660 })
661}
662
663fn unwrap_lag_lead_encoded_args(encoded_args: Datum) -> (Datum, Datum, Datum) {
665 let mut encoded_args_iter = encoded_args.unwrap_list().iter();
666 let (input_value, offset, default_value) = (
667 encoded_args_iter.next().unwrap(),
668 encoded_args_iter.next().unwrap(),
669 encoded_args_iter.next().unwrap(),
670 );
671 (input_value, offset, default_value)
672}
673
674fn lag_lead_inner<'a>(
677 args: Vec<(Datum<'a>, Datum<'a>, Datum<'a>)>,
678 lag_lead_type: &LagLeadType,
679 ignore_nulls: &bool,
680) -> Vec<Datum<'a>> {
681 if *ignore_nulls {
682 lag_lead_inner_ignore_nulls(args, lag_lead_type)
683 } else {
684 lag_lead_inner_respect_nulls(args, lag_lead_type)
685 }
686}
687
688fn lag_lead_inner_respect_nulls<'a>(
689 args: Vec<(Datum<'a>, Datum<'a>, Datum<'a>)>,
690 lag_lead_type: &LagLeadType,
691) -> Vec<Datum<'a>> {
692 let mut result: Vec<Datum> = Vec::with_capacity(args.len());
693 for (idx, (_, offset, default_value)) in args.iter().enumerate() {
694 if offset.is_null() {
696 result.push(Datum::Null);
697 continue;
698 }
699
700 let idx = i64::try_from(idx).expect("Array index does not fit in i64");
701 let offset = i64::from(offset.unwrap_int32());
702 let offset = match lag_lead_type {
703 LagLeadType::Lag => -offset,
704 LagLeadType::Lead => offset,
705 };
706
707 let datums_get = |i: i64| -> Option<Datum> {
709 match u64::try_from(i) {
710 Ok(i) => args
711 .get(usize::cast_from(i))
712 .map(|d| Some(d.0)) .unwrap_or(None), Err(_) => None, }
716 };
717
718 let lagged_value = datums_get(idx + offset).unwrap_or(*default_value);
719
720 result.push(lagged_value);
721 }
722
723 result
724}
725
726#[allow(clippy::as_conversions)]
730fn lag_lead_inner_ignore_nulls<'a>(
731 args: Vec<(Datum<'a>, Datum<'a>, Datum<'a>)>,
732 lag_lead_type: &LagLeadType,
733) -> Vec<Datum<'a>> {
734 if i64::try_from(args.len()).is_err() {
737 panic!("window partition way too big")
738 }
739 let mut skip_nulls_backward = vec![None; args.len()];
742 let mut last_non_null: i64 = -1;
743 let pairs = args
744 .iter()
745 .enumerate()
746 .zip_eq(skip_nulls_backward.iter_mut());
747 for ((i, (d, _, _)), slot) in pairs {
748 if d.is_null() {
749 *slot = Some(last_non_null);
750 } else {
751 last_non_null = i as i64;
752 }
753 }
754 let mut skip_nulls_forward = vec![None; args.len()];
755 let mut last_non_null: i64 = args.len() as i64;
756 let pairs = args
757 .iter()
758 .enumerate()
759 .rev()
760 .zip_eq(skip_nulls_forward.iter_mut().rev());
761 for ((i, (d, _, _)), slot) in pairs {
762 if d.is_null() {
763 *slot = Some(last_non_null);
764 } else {
765 last_non_null = i as i64;
766 }
767 }
768
769 let mut result: Vec<Datum> = Vec::with_capacity(args.len());
771 for (idx, (_, offset, default_value)) in args.iter().enumerate() {
772 if offset.is_null() {
774 result.push(Datum::Null);
775 continue;
776 }
777
778 let idx = idx as i64; let offset = i64::cast_from(offset.unwrap_int32());
780 let offset = match lag_lead_type {
781 LagLeadType::Lag => -offset,
782 LagLeadType::Lead => offset,
783 };
784 let increment = offset.signum();
785
786 let datums_get = |i: i64| -> Option<Datum> {
788 match u64::try_from(i) {
789 Ok(i) => args
790 .get(usize::cast_from(i))
791 .map(|d| Some(d.0)) .unwrap_or(None), Err(_) => None, }
795 };
796
797 let lagged_value = if increment != 0 {
798 let mut j = idx;
808 for _ in 0..num::abs(offset) {
809 j += increment;
810 if datums_get(j).is_some_and(|d| d.is_null()) {
812 let ju = j as usize; if increment > 0 {
814 j = skip_nulls_forward[ju].expect("checked above that it's null");
815 } else {
816 j = skip_nulls_backward[ju].expect("checked above that it's null");
817 }
818 }
819 if datums_get(j).is_none() {
820 break;
821 }
822 }
823 match datums_get(j) {
824 Some(datum) => datum,
825 None => *default_value,
826 }
827 } else {
828 assert_eq!(offset, 0);
829 let datum = datums_get(idx).expect("known to exist");
830 if !datum.is_null() {
831 datum
832 } else {
833 panic!("0 offset in lag/lead IGNORE NULLS");
838 }
839 };
840
841 result.push(lagged_value);
842 }
843
844 result
845}
846
847fn first_value<'a, I>(
849 datums: I,
850 callers_temp_storage: &'a RowArena,
851 order_by: &[ColumnOrder],
852 window_frame: &WindowFrame,
853) -> Datum<'a>
854where
855 I: IntoIterator<Item = Datum<'a>>,
856{
857 let temp_storage = RowArena::new();
858 let iter = first_value_no_list(datums, &temp_storage, order_by, window_frame);
859 callers_temp_storage.make_datum(|packer| {
860 packer.push_list(iter);
861 })
862}
863
864fn first_value_no_list<'a: 'b, 'b, I>(
867 datums: I,
868 callers_temp_storage: &'b RowArena,
869 order_by: &[ColumnOrder],
870 window_frame: &WindowFrame,
871) -> impl Iterator<Item = Datum<'b>>
872where
873 I: IntoIterator<Item = Datum<'a>>,
874{
875 let datums = order_aggregate_datums(datums, order_by);
877
878 let (orig_rows, args): (Vec<_>, Vec<_>) = datums
880 .into_iter()
881 .map(|d| {
882 let mut iter = d.unwrap_list().iter();
883 let original_row = iter.next().unwrap();
884 let arg = iter.next().unwrap();
885
886 (original_row, arg)
887 })
888 .unzip();
889
890 let results = first_value_inner(args, window_frame);
891
892 callers_temp_storage.reserve(results.len());
893 results
894 .into_iter()
895 .zip_eq(orig_rows)
896 .map(|(result_value, original_row)| {
897 callers_temp_storage.make_datum(|packer| {
898 packer.push_list_with(|packer| {
899 packer.push(result_value);
900 packer.push(original_row);
901 });
902 })
903 })
904}
905
906fn first_value_inner<'a>(datums: Vec<Datum<'a>>, window_frame: &WindowFrame) -> Vec<Datum<'a>> {
907 let length = datums.len();
908 let mut result: Vec<Datum> = Vec::with_capacity(length);
909 for (idx, current_datum) in datums.iter().enumerate() {
910 let first_value = match &window_frame.start_bound {
911 WindowFrameBound::CurrentRow => *current_datum,
913 WindowFrameBound::UnboundedPreceding => {
914 if let WindowFrameBound::OffsetPreceding(end_offset) = &window_frame.end_bound {
915 let end_offset = usize::cast_from(*end_offset);
916
917 if idx < end_offset {
919 Datum::Null
920 } else {
921 datums[0]
922 }
923 } else {
924 datums[0]
925 }
926 }
927 WindowFrameBound::OffsetPreceding(offset) => {
928 let start_offset = usize::cast_from(*offset);
929 let start_idx = idx.saturating_sub(start_offset);
930 if let WindowFrameBound::OffsetPreceding(end_offset) = &window_frame.end_bound {
931 let end_offset = usize::cast_from(*end_offset);
932
933 if start_offset < end_offset || idx < end_offset {
935 Datum::Null
936 } else {
937 datums[start_idx]
938 }
939 } else {
940 datums[start_idx]
941 }
942 }
943 WindowFrameBound::OffsetFollowing(offset) => {
944 let start_offset = usize::cast_from(*offset);
945 let start_idx = idx.saturating_add(start_offset);
946 if let WindowFrameBound::OffsetFollowing(end_offset) = &window_frame.end_bound {
947 if offset > end_offset || start_idx >= length {
949 Datum::Null
950 } else {
951 datums[start_idx]
952 }
953 } else {
954 datums
955 .get(start_idx)
956 .map(|d| d.clone())
957 .unwrap_or(Datum::Null)
958 }
959 }
960 WindowFrameBound::UnboundedFollowing => unreachable!(),
962 };
963 result.push(first_value);
964 }
965 result
966}
967
968fn last_value<'a, I>(
970 datums: I,
971 callers_temp_storage: &'a RowArena,
972 order_by: &[ColumnOrder],
973 window_frame: &WindowFrame,
974) -> Datum<'a>
975where
976 I: IntoIterator<Item = Datum<'a>>,
977{
978 let temp_storage = RowArena::new();
979 let iter = last_value_no_list(datums, &temp_storage, order_by, window_frame);
980 callers_temp_storage.make_datum(|packer| {
981 packer.push_list(iter);
982 })
983}
984
985fn last_value_no_list<'a: 'b, 'b, I>(
988 datums: I,
989 callers_temp_storage: &'b RowArena,
990 order_by: &[ColumnOrder],
991 window_frame: &WindowFrame,
992) -> impl Iterator<Item = Datum<'b>>
993where
994 I: IntoIterator<Item = Datum<'a>>,
995{
996 let datums = order_aggregate_datums_with_rank(datums, order_by);
999
1000 let size_hint = datums.size_hint().0;
1002 let mut args = Vec::with_capacity(size_hint);
1003 let mut original_rows = Vec::with_capacity(size_hint);
1004 let mut order_by_rows = Vec::with_capacity(size_hint);
1005 for (d, order_by_row) in datums.into_iter() {
1006 let mut iter = d.unwrap_list().iter();
1007 let original_row = iter.next().unwrap();
1008 let arg = iter.next().unwrap();
1009 order_by_rows.push(order_by_row);
1010 original_rows.push(original_row);
1011 args.push(arg);
1012 }
1013
1014 let results = last_value_inner(args, &order_by_rows, window_frame);
1015
1016 callers_temp_storage.reserve(results.len());
1017 results
1018 .into_iter()
1019 .zip_eq(original_rows)
1020 .map(|(result_value, original_row)| {
1021 callers_temp_storage.make_datum(|packer| {
1022 packer.push_list_with(|packer| {
1023 packer.push(result_value);
1024 packer.push(original_row);
1025 });
1026 })
1027 })
1028}
1029
1030fn last_value_inner<'a>(
1031 args: Vec<Datum<'a>>,
1032 order_by_rows: &Vec<Row>,
1033 window_frame: &WindowFrame,
1034) -> Vec<Datum<'a>> {
1035 let length = args.len();
1036 let mut results: Vec<Datum> = Vec::with_capacity(length);
1037 for (idx, (current_datum, order_by_row)) in args.iter().zip_eq(order_by_rows).enumerate() {
1038 let last_value = match &window_frame.end_bound {
1039 WindowFrameBound::CurrentRow => match &window_frame.units {
1040 WindowFrameUnits::Rows => *current_datum,
1042 WindowFrameUnits::Range => {
1043 let target_idx = order_by_rows[idx..]
1048 .iter()
1049 .enumerate()
1050 .take_while(|(_, row)| *row == order_by_row)
1051 .last()
1052 .unwrap()
1053 .0
1054 + idx;
1055 args[target_idx]
1056 }
1057 WindowFrameUnits::Groups => unreachable!(),
1059 },
1060 WindowFrameBound::UnboundedFollowing => {
1061 if let WindowFrameBound::OffsetFollowing(start_offset) = &window_frame.start_bound {
1062 let start_offset = usize::cast_from(*start_offset);
1063
1064 if idx + start_offset > length - 1 {
1066 Datum::Null
1067 } else {
1068 args[length - 1]
1069 }
1070 } else {
1071 args[length - 1]
1072 }
1073 }
1074 WindowFrameBound::OffsetFollowing(offset) => {
1075 let end_offset = usize::cast_from(*offset);
1076 let end_idx = idx.saturating_add(end_offset);
1077 if let WindowFrameBound::OffsetFollowing(start_offset) = &window_frame.start_bound {
1078 let start_offset = usize::cast_from(*start_offset);
1079 let start_idx = idx.saturating_add(start_offset);
1080
1081 if end_offset < start_offset || start_idx >= length {
1083 Datum::Null
1084 } else {
1085 args.get(end_idx).unwrap_or(&args[length - 1]).clone()
1087 }
1088 } else {
1089 args.get(end_idx).unwrap_or(&args[length - 1]).clone()
1090 }
1091 }
1092 WindowFrameBound::OffsetPreceding(offset) => {
1093 let end_offset = usize::cast_from(*offset);
1094 let end_idx = idx.saturating_sub(end_offset);
1095 if idx < end_offset {
1096 Datum::Null
1098 } else if let WindowFrameBound::OffsetPreceding(start_offset) =
1099 &window_frame.start_bound
1100 {
1101 if offset > start_offset {
1103 Datum::Null
1104 } else {
1105 args[end_idx]
1106 }
1107 } else {
1108 args[end_idx]
1109 }
1110 }
1111 WindowFrameBound::UnboundedPreceding => unreachable!(),
1113 };
1114 results.push(last_value);
1115 }
1116 results
1117}
1118
1119fn fused_value_window_func<'a, I>(
1125 input_datums: I,
1126 callers_temp_storage: &'a RowArena,
1127 funcs: &Vec<AggregateFunc>,
1128 order_by: &Vec<ColumnOrder>,
1129) -> Datum<'a>
1130where
1131 I: IntoIterator<Item = Datum<'a>>,
1132{
1133 let temp_storage = RowArena::new();
1134 let iter = fused_value_window_func_no_list(input_datums, &temp_storage, funcs, order_by);
1135 callers_temp_storage.make_datum(|packer| {
1136 packer.push_list(iter);
1137 })
1138}
1139
1140fn fused_value_window_func_no_list<'a: 'b, 'b, I>(
1143 input_datums: I,
1144 callers_temp_storage: &'b RowArena,
1145 funcs: &Vec<AggregateFunc>,
1146 order_by: &Vec<ColumnOrder>,
1147) -> impl Iterator<Item = Datum<'b>>
1148where
1149 I: IntoIterator<Item = Datum<'a>>,
1150{
1151 let has_last_value = funcs
1152 .iter()
1153 .any(|f| matches!(f, AggregateFunc::LastValue { .. }));
1154
1155 let input_datums_with_ranks = order_aggregate_datums_with_rank(input_datums, order_by);
1156
1157 let size_hint = input_datums_with_ranks.size_hint().0;
1158 let mut encoded_argsss = vec![Vec::with_capacity(size_hint); funcs.len()];
1159 let mut original_rows = Vec::with_capacity(size_hint);
1160 let mut order_by_rows = Vec::with_capacity(size_hint);
1161 for (d, order_by_row) in input_datums_with_ranks {
1162 let mut iter = d.unwrap_list().iter();
1163 let original_row = iter.next().unwrap();
1164 original_rows.push(original_row);
1165 let mut argss_iter = iter.next().unwrap().unwrap_list().iter();
1166 for i in 0..funcs.len() {
1167 let encoded_args = argss_iter.next().unwrap();
1168 encoded_argsss[i].push(encoded_args);
1169 }
1170 if has_last_value {
1171 order_by_rows.push(order_by_row);
1172 }
1173 }
1174
1175 let mut results_per_row = vec![Vec::with_capacity(funcs.len()); original_rows.len()];
1176 for (func, encoded_argss) in funcs.iter().zip_eq(encoded_argsss) {
1177 let results = match func {
1178 AggregateFunc::LagLead {
1179 order_by: inner_order_by,
1180 lag_lead,
1181 ignore_nulls,
1182 } => {
1183 assert_eq!(order_by, inner_order_by);
1184 let unwrapped_argss = encoded_argss
1185 .into_iter()
1186 .map(|encoded_args| unwrap_lag_lead_encoded_args(encoded_args))
1187 .collect();
1188 lag_lead_inner(unwrapped_argss, lag_lead, ignore_nulls)
1189 }
1190 AggregateFunc::FirstValue {
1191 order_by: inner_order_by,
1192 window_frame,
1193 } => {
1194 assert_eq!(order_by, inner_order_by);
1195 first_value_inner(encoded_argss, window_frame)
1198 }
1199 AggregateFunc::LastValue {
1200 order_by: inner_order_by,
1201 window_frame,
1202 } => {
1203 assert_eq!(order_by, inner_order_by);
1204 last_value_inner(encoded_argss, &order_by_rows, window_frame)
1207 }
1208 _ => panic!("unknown window function in FusedValueWindowFunc"),
1209 };
1210 for (results, result) in results_per_row.iter_mut().zip_eq(results) {
1211 results.push(result);
1212 }
1213 }
1214
1215 callers_temp_storage.reserve(2 * original_rows.len());
1216 results_per_row
1217 .into_iter()
1218 .enumerate()
1219 .map(move |(i, results)| {
1220 callers_temp_storage.make_datum(|packer| {
1221 packer.push_list_with(|packer| {
1222 packer
1223 .push(callers_temp_storage.make_datum(|packer| packer.push_list(results)));
1224 packer.push(original_rows[i]);
1225 });
1226 })
1227 })
1228}
1229
1230fn window_aggr<'a, I, A>(
1239 input_datums: I,
1240 callers_temp_storage: &'a RowArena,
1241 wrapped_aggregate: &AggregateFunc,
1242 order_by: &[ColumnOrder],
1243 window_frame: &WindowFrame,
1244) -> Datum<'a>
1245where
1246 I: IntoIterator<Item = Datum<'a>>,
1247 A: OneByOneAggr,
1248{
1249 let temp_storage = RowArena::new();
1250 let iter = window_aggr_no_list::<I, A>(
1251 input_datums,
1252 &temp_storage,
1253 wrapped_aggregate,
1254 order_by,
1255 window_frame,
1256 );
1257 callers_temp_storage.make_datum(|packer| {
1258 packer.push_list(iter);
1259 })
1260}
1261
1262fn window_aggr_no_list<'a: 'b, 'b, I, A>(
1265 input_datums: I,
1266 callers_temp_storage: &'b RowArena,
1267 wrapped_aggregate: &AggregateFunc,
1268 order_by: &[ColumnOrder],
1269 window_frame: &WindowFrame,
1270) -> impl Iterator<Item = Datum<'b>>
1271where
1272 I: IntoIterator<Item = Datum<'a>>,
1273 A: OneByOneAggr,
1274{
1275 let datums = order_aggregate_datums_with_rank(input_datums, order_by);
1278
1279 let size_hint = datums.size_hint().0;
1281 let mut args: Vec<Datum> = Vec::with_capacity(size_hint);
1282 let mut original_rows: Vec<Datum> = Vec::with_capacity(size_hint);
1283 let mut order_by_rows = Vec::with_capacity(size_hint);
1284 for (d, order_by_row) in datums.into_iter() {
1285 let mut iter = d.unwrap_list().iter();
1286 let original_row = iter.next().unwrap();
1287 let arg = iter.next().unwrap();
1288 order_by_rows.push(order_by_row);
1289 original_rows.push(original_row);
1290 args.push(arg);
1291 }
1292
1293 let results = window_aggr_inner::<A>(
1294 args,
1295 &order_by_rows,
1296 wrapped_aggregate,
1297 order_by,
1298 window_frame,
1299 callers_temp_storage,
1300 );
1301
1302 callers_temp_storage.reserve(results.len());
1303 results
1304 .into_iter()
1305 .zip_eq(original_rows)
1306 .map(|(result_value, original_row)| {
1307 callers_temp_storage.make_datum(|packer| {
1308 packer.push_list_with(|packer| {
1309 packer.push(result_value);
1310 packer.push(original_row);
1311 });
1312 })
1313 })
1314}
1315
1316fn window_aggr_inner<'a, A>(
1317 mut args: Vec<Datum<'a>>,
1318 order_by_rows: &Vec<Row>,
1319 wrapped_aggregate: &AggregateFunc,
1320 order_by: &[ColumnOrder],
1321 window_frame: &WindowFrame,
1322 temp_storage: &'a RowArena,
1323) -> Vec<Datum<'a>>
1324where
1325 A: OneByOneAggr,
1326{
1327 let length = args.len();
1328 let mut result: Vec<Datum> = Vec::with_capacity(length);
1329
1330 soft_assert_or_log!(
1336 !((matches!(window_frame.units, WindowFrameUnits::Groups)
1337 || matches!(window_frame.units, WindowFrameUnits::Range))
1338 && !window_frame.includes_current_row()),
1339 "window frame without current row"
1340 );
1341
1342 if (matches!(
1343 window_frame.start_bound,
1344 WindowFrameBound::UnboundedPreceding
1345 ) && matches!(window_frame.end_bound, WindowFrameBound::UnboundedFollowing))
1346 || (order_by.is_empty()
1347 && (matches!(window_frame.units, WindowFrameUnits::Groups)
1348 || matches!(window_frame.units, WindowFrameUnits::Range))
1349 && window_frame.includes_current_row())
1350 {
1351 let result_value = wrapped_aggregate.eval(args, temp_storage);
1358 for _ in 0..length {
1360 result.push(result_value);
1361 }
1362 } else {
1363 fn rows_between_unbounded_preceding_and_current_row<'a, A>(
1364 args: Vec<Datum<'a>>,
1365 result: &mut Vec<Datum<'a>>,
1366 mut one_by_one_aggr: A,
1367 temp_storage: &'a RowArena,
1368 ) where
1369 A: OneByOneAggr,
1370 {
1371 for current_arg in args.into_iter() {
1372 one_by_one_aggr.give(¤t_arg);
1373 let result_value = one_by_one_aggr.get_current_aggregate(temp_storage);
1374 result.push(result_value);
1375 }
1376 }
1377
1378 fn groups_between_unbounded_preceding_and_current_row<'a, A>(
1379 args: Vec<Datum<'a>>,
1380 order_by_rows: &Vec<Row>,
1381 result: &mut Vec<Datum<'a>>,
1382 mut one_by_one_aggr: A,
1383 temp_storage: &'a RowArena,
1384 ) where
1385 A: OneByOneAggr,
1386 {
1387 let mut peer_group_start = 0;
1388 while peer_group_start < args.len() {
1389 let mut peer_group_end = peer_group_start + 1;
1393 while peer_group_end < args.len()
1394 && order_by_rows[peer_group_start] == order_by_rows[peer_group_end]
1395 {
1396 peer_group_end += 1;
1398 }
1399 for current_arg in args[peer_group_start..peer_group_end].iter() {
1402 one_by_one_aggr.give(current_arg);
1403 }
1404 let agg_for_peer_group = one_by_one_aggr.get_current_aggregate(temp_storage);
1405 for _ in args[peer_group_start..peer_group_end].iter() {
1407 result.push(agg_for_peer_group);
1408 }
1409 peer_group_start = peer_group_end;
1411 }
1412 }
1413
1414 fn rows_between_offset_and_offset<'a>(
1415 args: Vec<Datum<'a>>,
1416 result: &mut Vec<Datum<'a>>,
1417 wrapped_aggregate: &AggregateFunc,
1418 temp_storage: &'a RowArena,
1419 offset_start: i64,
1420 offset_end: i64,
1421 ) {
1422 let len = args
1423 .len()
1424 .to_i64()
1425 .expect("window partition's len should fit into i64");
1426 for i in 0..len {
1427 let i = i.to_i64().expect("window partition shouldn't be super big");
1428 let frame_start = max(i + offset_start, 0)
1431 .to_usize()
1432 .expect("The max made sure it's not negative");
1433 let frame_end = min(i + offset_end, len - 1).to_usize();
1436 match frame_end {
1437 Some(frame_end) => {
1438 if frame_start <= frame_end {
1439 let frame_values = args[frame_start..=frame_end].iter().cloned();
1450 let result_value = wrapped_aggregate.eval(frame_values, temp_storage);
1451 result.push(result_value);
1452 } else {
1453 let result_value = wrapped_aggregate.default();
1455 result.push(result_value);
1456 }
1457 }
1458 None => {
1459 let result_value = wrapped_aggregate.default();
1461 result.push(result_value);
1462 }
1463 }
1464 }
1465 }
1466
1467 match (
1468 &window_frame.units,
1469 &window_frame.start_bound,
1470 &window_frame.end_bound,
1471 ) {
1472 (Rows, UnboundedPreceding, CurrentRow) => {
1477 rows_between_unbounded_preceding_and_current_row::<A>(
1478 args,
1479 &mut result,
1480 A::new(wrapped_aggregate, false),
1481 temp_storage,
1482 );
1483 }
1484 (Rows, CurrentRow, UnboundedFollowing) => {
1485 args.reverse();
1487 rows_between_unbounded_preceding_and_current_row::<A>(
1488 args,
1489 &mut result,
1490 A::new(wrapped_aggregate, true),
1491 temp_storage,
1492 );
1493 result.reverse();
1494 }
1495 (Range, UnboundedPreceding, CurrentRow) => {
1496 groups_between_unbounded_preceding_and_current_row::<A>(
1499 args,
1500 order_by_rows,
1501 &mut result,
1502 A::new(wrapped_aggregate, false),
1503 temp_storage,
1504 );
1505 }
1506 (Rows, OffsetPreceding(start_prec), OffsetPreceding(end_prec)) => {
1510 let start_prec = start_prec.to_i64().expect(
1511 "window frame start OFFSET shouldn't be super big (the planning ensured this)",
1512 );
1513 let end_prec = end_prec.to_i64().expect(
1514 "window frame end OFFSET shouldn't be super big (the planning ensured this)",
1515 );
1516 rows_between_offset_and_offset(
1517 args,
1518 &mut result,
1519 wrapped_aggregate,
1520 temp_storage,
1521 -start_prec,
1522 -end_prec,
1523 );
1524 }
1525 (Rows, OffsetPreceding(start_prec), OffsetFollowing(end_fol)) => {
1526 let start_prec = start_prec.to_i64().expect(
1527 "window frame start OFFSET shouldn't be super big (the planning ensured this)",
1528 );
1529 let end_fol = end_fol.to_i64().expect(
1530 "window frame end OFFSET shouldn't be super big (the planning ensured this)",
1531 );
1532 rows_between_offset_and_offset(
1533 args,
1534 &mut result,
1535 wrapped_aggregate,
1536 temp_storage,
1537 -start_prec,
1538 end_fol,
1539 );
1540 }
1541 (Rows, OffsetFollowing(start_fol), OffsetFollowing(end_fol)) => {
1542 let start_fol = start_fol.to_i64().expect(
1543 "window frame start OFFSET shouldn't be super big (the planning ensured this)",
1544 );
1545 let end_fol = end_fol.to_i64().expect(
1546 "window frame end OFFSET shouldn't be super big (the planning ensured this)",
1547 );
1548 rows_between_offset_and_offset(
1549 args,
1550 &mut result,
1551 wrapped_aggregate,
1552 temp_storage,
1553 start_fol,
1554 end_fol,
1555 );
1556 }
1557 (Rows, OffsetFollowing(_), OffsetPreceding(_)) => {
1558 unreachable!() }
1560 (Rows, OffsetPreceding(start_prec), CurrentRow) => {
1561 let start_prec = start_prec.to_i64().expect(
1562 "window frame start OFFSET shouldn't be super big (the planning ensured this)",
1563 );
1564 let end_fol = 0;
1565 rows_between_offset_and_offset(
1566 args,
1567 &mut result,
1568 wrapped_aggregate,
1569 temp_storage,
1570 -start_prec,
1571 end_fol,
1572 );
1573 }
1574 (Rows, CurrentRow, OffsetFollowing(end_fol)) => {
1575 let start_fol = 0;
1576 let end_fol = end_fol.to_i64().expect(
1577 "window frame end OFFSET shouldn't be super big (the planning ensured this)",
1578 );
1579 rows_between_offset_and_offset(
1580 args,
1581 &mut result,
1582 wrapped_aggregate,
1583 temp_storage,
1584 start_fol,
1585 end_fol,
1586 );
1587 }
1588 (Rows, CurrentRow, CurrentRow) => {
1589 let start_fol = 0;
1592 let end_fol = 0;
1593 rows_between_offset_and_offset(
1594 args,
1595 &mut result,
1596 wrapped_aggregate,
1597 temp_storage,
1598 start_fol,
1599 end_fol,
1600 );
1601 }
1602 (Rows, CurrentRow, OffsetPreceding(_))
1603 | (Rows, UnboundedFollowing, _)
1604 | (Rows, _, UnboundedPreceding)
1605 | (Rows, OffsetFollowing(..), CurrentRow) => {
1606 unreachable!() }
1608 (Rows, UnboundedPreceding, UnboundedFollowing) => {
1609 unreachable!()
1612 }
1613 (Rows, UnboundedPreceding, OffsetPreceding(_))
1614 | (Rows, UnboundedPreceding, OffsetFollowing(_))
1615 | (Rows, OffsetPreceding(..), UnboundedFollowing)
1616 | (Rows, OffsetFollowing(..), UnboundedFollowing) => {
1617 unreachable!()
1620 }
1621 (Range, _, _) => {
1622 unreachable!()
1629 }
1630 (Groups, _, _) => {
1631 unreachable!()
1635 }
1636 }
1637 }
1638
1639 result
1640}
1641
1642fn fused_window_aggr<'a, I, A>(
1646 input_datums: I,
1647 callers_temp_storage: &'a RowArena,
1648 wrapped_aggregates: &Vec<AggregateFunc>,
1649 order_by: &Vec<ColumnOrder>,
1650 window_frame: &WindowFrame,
1651) -> Datum<'a>
1652where
1653 I: IntoIterator<Item = Datum<'a>>,
1654 A: OneByOneAggr,
1655{
1656 let temp_storage = RowArena::new();
1657 let iter = fused_window_aggr_no_list::<_, A>(
1658 input_datums,
1659 &temp_storage,
1660 wrapped_aggregates,
1661 order_by,
1662 window_frame,
1663 );
1664 callers_temp_storage.make_datum(|packer| {
1665 packer.push_list(iter);
1666 })
1667}
1668
1669fn fused_window_aggr_no_list<'a: 'b, 'b, I, A>(
1672 input_datums: I,
1673 callers_temp_storage: &'b RowArena,
1674 wrapped_aggregates: &Vec<AggregateFunc>,
1675 order_by: &Vec<ColumnOrder>,
1676 window_frame: &WindowFrame,
1677) -> impl Iterator<Item = Datum<'b>>
1678where
1679 I: IntoIterator<Item = Datum<'a>>,
1680 A: OneByOneAggr,
1681{
1682 let datums = order_aggregate_datums_with_rank(input_datums, order_by);
1685
1686 let size_hint = datums.size_hint().0;
1687 let mut argss = vec![Vec::with_capacity(size_hint); wrapped_aggregates.len()];
1688 let mut original_rows = Vec::with_capacity(size_hint);
1689 let mut order_by_rows = Vec::with_capacity(size_hint);
1690 for (d, order_by_row) in datums {
1691 let mut iter = d.unwrap_list().iter();
1692 let original_row = iter.next().unwrap();
1693 original_rows.push(original_row);
1694 let args_iter = iter.next().unwrap().unwrap_list().iter();
1695 for (args, arg) in argss.iter_mut().zip_eq(args_iter) {
1697 args.push(arg);
1698 }
1699 order_by_rows.push(order_by_row);
1700 }
1701
1702 let mut results_per_row =
1703 vec![Vec::with_capacity(wrapped_aggregates.len()); original_rows.len()];
1704 for (wrapped_aggr, args) in wrapped_aggregates.iter().zip_eq(argss) {
1705 let results = window_aggr_inner::<A>(
1706 args,
1707 &order_by_rows,
1708 wrapped_aggr,
1709 order_by,
1710 window_frame,
1711 callers_temp_storage,
1712 );
1713 for (results, result) in results_per_row.iter_mut().zip_eq(results) {
1714 results.push(result);
1715 }
1716 }
1717
1718 callers_temp_storage.reserve(2 * original_rows.len());
1719 results_per_row
1720 .into_iter()
1721 .enumerate()
1722 .map(move |(i, results)| {
1723 callers_temp_storage.make_datum(|packer| {
1724 packer.push_list_with(|packer| {
1725 packer
1726 .push(callers_temp_storage.make_datum(|packer| packer.push_list(results)));
1727 packer.push(original_rows[i]);
1728 });
1729 })
1730 })
1731}
1732
1733pub trait OneByOneAggr {
1737 fn new(agg: &AggregateFunc, reverse: bool) -> Self;
1742 fn give(&mut self, d: &Datum);
1744 fn get_current_aggregate<'a>(&self, temp_storage: &'a RowArena) -> Datum<'a>;
1746}
1747
1748#[derive(Debug)]
1754pub struct NaiveOneByOneAggr {
1755 agg: AggregateFunc,
1756 input: Vec<Row>,
1757 reverse: bool,
1758}
1759
1760impl OneByOneAggr for NaiveOneByOneAggr {
1761 fn new(agg: &AggregateFunc, reverse: bool) -> Self {
1762 NaiveOneByOneAggr {
1763 agg: agg.clone(),
1764 input: Vec::new(),
1765 reverse,
1766 }
1767 }
1768
1769 fn give(&mut self, d: &Datum) {
1770 let mut row = Row::default();
1771 row.packer().push(d);
1772 self.input.push(row);
1773 }
1774
1775 fn get_current_aggregate<'a>(&self, temp_storage: &'a RowArena) -> Datum<'a> {
1776 temp_storage.make_datum(|packer| {
1777 packer.push(if !self.reverse {
1778 self.agg
1779 .eval(self.input.iter().map(|r| r.unpack_first()), temp_storage)
1780 } else {
1781 self.agg.eval(
1782 self.input.iter().rev().map(|r| r.unpack_first()),
1783 temp_storage,
1784 )
1785 });
1786 })
1787 }
1788}
1789
1790#[derive(
1793 Clone,
1794 Debug,
1795 Eq,
1796 PartialEq,
1797 Ord,
1798 PartialOrd,
1799 Serialize,
1800 Deserialize,
1801 Hash,
1802 MzReflect
1803)]
1804pub enum LagLeadType {
1805 Lag,
1806 Lead,
1807}
1808
1809#[derive(
1810 Clone,
1811 Debug,
1812 Eq,
1813 PartialEq,
1814 Ord,
1815 PartialOrd,
1816 Serialize,
1817 Deserialize,
1818 Hash,
1819 MzReflect
1820)]
1821pub enum AggregateFunc {
1822 MaxNumeric,
1823 MaxInt16,
1824 MaxInt32,
1825 MaxInt64,
1826 MaxUInt16,
1827 MaxUInt32,
1828 MaxUInt64,
1829 MaxMzTimestamp,
1830 MaxFloat32,
1831 MaxFloat64,
1832 MaxBool,
1833 MaxString,
1834 MaxDate,
1835 MaxTimestamp,
1836 MaxTimestampTz,
1837 MaxInterval,
1838 MaxTime,
1839 MinNumeric,
1840 MinInt16,
1841 MinInt32,
1842 MinInt64,
1843 MinUInt16,
1844 MinUInt32,
1845 MinUInt64,
1846 MinMzTimestamp,
1847 MinFloat32,
1848 MinFloat64,
1849 MinBool,
1850 MinString,
1851 MinDate,
1852 MinTimestamp,
1853 MinTimestampTz,
1854 MinInterval,
1855 MinTime,
1856 SumInt16,
1857 SumInt32,
1858 SumInt64,
1859 SumUInt16,
1860 SumUInt32,
1861 SumUInt64,
1862 SumFloat32,
1863 SumFloat64,
1864 SumNumeric,
1865 Count,
1866 Any,
1867 All,
1868 JsonbAgg {
1875 order_by: Vec<ColumnOrder>,
1876 },
1877 JsonbObjectAgg {
1884 order_by: Vec<ColumnOrder>,
1885 },
1886 MapAgg {
1890 order_by: Vec<ColumnOrder>,
1891 value_type: SqlScalarType,
1892 },
1893 ArrayConcat {
1896 order_by: Vec<ColumnOrder>,
1897 },
1898 ListConcat {
1901 order_by: Vec<ColumnOrder>,
1902 },
1903 StringAgg {
1904 order_by: Vec<ColumnOrder>,
1905 },
1906 RowNumber {
1907 order_by: Vec<ColumnOrder>,
1908 },
1909 Rank {
1910 order_by: Vec<ColumnOrder>,
1911 },
1912 DenseRank {
1913 order_by: Vec<ColumnOrder>,
1914 },
1915 LagLead {
1916 order_by: Vec<ColumnOrder>,
1917 lag_lead: LagLeadType,
1918 ignore_nulls: bool,
1919 },
1920 FirstValue {
1921 order_by: Vec<ColumnOrder>,
1922 window_frame: WindowFrame,
1923 },
1924 LastValue {
1925 order_by: Vec<ColumnOrder>,
1926 window_frame: WindowFrame,
1927 },
1928 FusedValueWindowFunc {
1930 funcs: Vec<AggregateFunc>,
1931 order_by: Vec<ColumnOrder>,
1934 },
1935 WindowAggregate {
1936 wrapped_aggregate: Box<AggregateFunc>,
1937 order_by: Vec<ColumnOrder>,
1938 window_frame: WindowFrame,
1939 },
1940 FusedWindowAggregate {
1941 wrapped_aggregates: Vec<AggregateFunc>,
1942 order_by: Vec<ColumnOrder>,
1943 window_frame: WindowFrame,
1944 },
1945 Dummy,
1950}
1951
1952impl AggregateFunc {
1953 pub fn eval<'a, I>(&self, datums: I, temp_storage: &'a RowArena) -> Datum<'a>
1954 where
1955 I: IntoIterator<Item = Datum<'a>>,
1956 {
1957 match self {
1958 AggregateFunc::MaxNumeric => {
1959 max_datum::<'a, I, OrderedDecimal<numeric::Numeric>>(datums)
1960 }
1961 AggregateFunc::MaxInt16 => max_datum::<'a, I, i16>(datums),
1962 AggregateFunc::MaxInt32 => max_datum::<'a, I, i32>(datums),
1963 AggregateFunc::MaxInt64 => max_datum::<'a, I, i64>(datums),
1964 AggregateFunc::MaxUInt16 => max_datum::<'a, I, u16>(datums),
1965 AggregateFunc::MaxUInt32 => max_datum::<'a, I, u32>(datums),
1966 AggregateFunc::MaxUInt64 => max_datum::<'a, I, u64>(datums),
1967 AggregateFunc::MaxMzTimestamp => max_datum::<'a, I, mz_repr::Timestamp>(datums),
1968 AggregateFunc::MaxFloat32 => max_datum::<'a, I, OrderedFloat<f32>>(datums),
1969 AggregateFunc::MaxFloat64 => max_datum::<'a, I, OrderedFloat<f64>>(datums),
1970 AggregateFunc::MaxBool => max_datum::<'a, I, bool>(datums),
1971 AggregateFunc::MaxString => max_string(datums),
1972 AggregateFunc::MaxDate => max_datum::<'a, I, Date>(datums),
1973 AggregateFunc::MaxTimestamp => {
1974 max_datum::<'a, I, CheckedTimestamp<NaiveDateTime>>(datums)
1975 }
1976 AggregateFunc::MaxTimestampTz => {
1977 max_datum::<'a, I, CheckedTimestamp<DateTime<Utc>>>(datums)
1978 }
1979 AggregateFunc::MaxInterval => max_datum::<'a, I, Interval>(datums),
1980 AggregateFunc::MaxTime => max_datum::<'a, I, NaiveTime>(datums),
1981 AggregateFunc::MinNumeric => {
1982 min_datum::<'a, I, OrderedDecimal<numeric::Numeric>>(datums)
1983 }
1984 AggregateFunc::MinInt16 => min_datum::<'a, I, i16>(datums),
1985 AggregateFunc::MinInt32 => min_datum::<'a, I, i32>(datums),
1986 AggregateFunc::MinInt64 => min_datum::<'a, I, i64>(datums),
1987 AggregateFunc::MinUInt16 => min_datum::<'a, I, u16>(datums),
1988 AggregateFunc::MinUInt32 => min_datum::<'a, I, u32>(datums),
1989 AggregateFunc::MinUInt64 => min_datum::<'a, I, u64>(datums),
1990 AggregateFunc::MinMzTimestamp => min_datum::<'a, I, mz_repr::Timestamp>(datums),
1991 AggregateFunc::MinFloat32 => min_datum::<'a, I, OrderedFloat<f32>>(datums),
1992 AggregateFunc::MinFloat64 => min_datum::<'a, I, OrderedFloat<f64>>(datums),
1993 AggregateFunc::MinBool => min_datum::<'a, I, bool>(datums),
1994 AggregateFunc::MinString => min_string(datums),
1995 AggregateFunc::MinDate => min_datum::<'a, I, Date>(datums),
1996 AggregateFunc::MinTimestamp => {
1997 min_datum::<'a, I, CheckedTimestamp<NaiveDateTime>>(datums)
1998 }
1999 AggregateFunc::MinTimestampTz => {
2000 min_datum::<'a, I, CheckedTimestamp<DateTime<Utc>>>(datums)
2001 }
2002 AggregateFunc::MinInterval => min_datum::<'a, I, Interval>(datums),
2003 AggregateFunc::MinTime => min_datum::<'a, I, NaiveTime>(datums),
2004 AggregateFunc::SumInt16 => sum_datum::<'a, I, i16, i64>(datums),
2005 AggregateFunc::SumInt32 => sum_datum::<'a, I, i32, i64>(datums),
2006 AggregateFunc::SumInt64 => sum_datum::<'a, I, i64, i128>(datums),
2007 AggregateFunc::SumUInt16 => sum_datum::<'a, I, u16, u64>(datums),
2008 AggregateFunc::SumUInt32 => sum_datum::<'a, I, u32, u64>(datums),
2009 AggregateFunc::SumUInt64 => sum_datum::<'a, I, u64, u128>(datums),
2010 AggregateFunc::SumFloat32 => sum_datum::<'a, I, f32, f32>(datums),
2011 AggregateFunc::SumFloat64 => sum_datum::<'a, I, f64, f64>(datums),
2012 AggregateFunc::SumNumeric => sum_numeric(datums),
2013 AggregateFunc::Count => count(datums),
2014 AggregateFunc::Any => any(datums),
2015 AggregateFunc::All => all(datums),
2016 AggregateFunc::JsonbAgg { order_by } => jsonb_agg(datums, temp_storage, order_by),
2017 AggregateFunc::MapAgg { order_by, .. } | AggregateFunc::JsonbObjectAgg { order_by } => {
2018 dict_agg(datums, temp_storage, order_by)
2019 }
2020 AggregateFunc::ArrayConcat { order_by } => array_concat(datums, temp_storage, order_by),
2021 AggregateFunc::ListConcat { order_by } => list_concat(datums, temp_storage, order_by),
2022 AggregateFunc::StringAgg { order_by } => string_agg(datums, temp_storage, order_by),
2023 AggregateFunc::RowNumber { order_by } => row_number(datums, temp_storage, order_by),
2024 AggregateFunc::Rank { order_by } => rank(datums, temp_storage, order_by),
2025 AggregateFunc::DenseRank { order_by } => dense_rank(datums, temp_storage, order_by),
2026 AggregateFunc::LagLead {
2027 order_by,
2028 lag_lead: lag_lead_type,
2029 ignore_nulls,
2030 } => lag_lead(datums, temp_storage, order_by, lag_lead_type, ignore_nulls),
2031 AggregateFunc::FirstValue {
2032 order_by,
2033 window_frame,
2034 } => first_value(datums, temp_storage, order_by, window_frame),
2035 AggregateFunc::LastValue {
2036 order_by,
2037 window_frame,
2038 } => last_value(datums, temp_storage, order_by, window_frame),
2039 AggregateFunc::WindowAggregate {
2040 wrapped_aggregate,
2041 order_by,
2042 window_frame,
2043 } => window_aggr::<_, NaiveOneByOneAggr>(
2044 datums,
2045 temp_storage,
2046 wrapped_aggregate,
2047 order_by,
2048 window_frame,
2049 ),
2050 AggregateFunc::FusedValueWindowFunc { funcs, order_by } => {
2051 fused_value_window_func(datums, temp_storage, funcs, order_by)
2052 }
2053 AggregateFunc::FusedWindowAggregate {
2054 wrapped_aggregates,
2055 order_by,
2056 window_frame,
2057 } => fused_window_aggr::<_, NaiveOneByOneAggr>(
2058 datums,
2059 temp_storage,
2060 wrapped_aggregates,
2061 order_by,
2062 window_frame,
2063 ),
2064 AggregateFunc::Dummy => Datum::Dummy,
2065 }
2066 }
2067
2068 pub fn eval_with_fast_window_agg<'a, I, W>(
2072 &self,
2073 datums: I,
2074 temp_storage: &'a RowArena,
2075 ) -> Datum<'a>
2076 where
2077 I: IntoIterator<Item = Datum<'a>>,
2078 W: OneByOneAggr,
2079 {
2080 match self {
2081 AggregateFunc::WindowAggregate {
2082 wrapped_aggregate,
2083 order_by,
2084 window_frame,
2085 } => window_aggr::<_, W>(
2086 datums,
2087 temp_storage,
2088 wrapped_aggregate,
2089 order_by,
2090 window_frame,
2091 ),
2092 AggregateFunc::FusedWindowAggregate {
2093 wrapped_aggregates,
2094 order_by,
2095 window_frame,
2096 } => fused_window_aggr::<_, W>(
2097 datums,
2098 temp_storage,
2099 wrapped_aggregates,
2100 order_by,
2101 window_frame,
2102 ),
2103 _ => self.eval(datums, temp_storage),
2104 }
2105 }
2106
2107 pub fn eval_with_unnest_list<'a, I, W>(
2108 &self,
2109 datums: I,
2110 temp_storage: &'a RowArena,
2111 ) -> impl Iterator<Item = Datum<'a>>
2112 where
2113 I: IntoIterator<Item = Datum<'a>>,
2114 W: OneByOneAggr,
2115 {
2116 assert!(self.can_fuse_with_unnest_list());
2118 match self {
2119 AggregateFunc::RowNumber { order_by } => {
2120 row_number_no_list(datums, temp_storage, order_by).collect_vec()
2121 }
2122 AggregateFunc::Rank { order_by } => {
2123 rank_no_list(datums, temp_storage, order_by).collect_vec()
2124 }
2125 AggregateFunc::DenseRank { order_by } => {
2126 dense_rank_no_list(datums, temp_storage, order_by).collect_vec()
2127 }
2128 AggregateFunc::LagLead {
2129 order_by,
2130 lag_lead: lag_lead_type,
2131 ignore_nulls,
2132 } => lag_lead_no_list(datums, temp_storage, order_by, lag_lead_type, ignore_nulls)
2133 .collect_vec(),
2134 AggregateFunc::FirstValue {
2135 order_by,
2136 window_frame,
2137 } => first_value_no_list(datums, temp_storage, order_by, window_frame).collect_vec(),
2138 AggregateFunc::LastValue {
2139 order_by,
2140 window_frame,
2141 } => last_value_no_list(datums, temp_storage, order_by, window_frame).collect_vec(),
2142 AggregateFunc::FusedValueWindowFunc { funcs, order_by } => {
2143 fused_value_window_func_no_list(datums, temp_storage, funcs, order_by).collect_vec()
2144 }
2145 AggregateFunc::WindowAggregate {
2146 wrapped_aggregate,
2147 order_by,
2148 window_frame,
2149 } => window_aggr_no_list::<_, W>(
2150 datums,
2151 temp_storage,
2152 wrapped_aggregate,
2153 order_by,
2154 window_frame,
2155 )
2156 .collect_vec(),
2157 AggregateFunc::FusedWindowAggregate {
2158 wrapped_aggregates,
2159 order_by,
2160 window_frame,
2161 } => fused_window_aggr_no_list::<_, W>(
2162 datums,
2163 temp_storage,
2164 wrapped_aggregates,
2165 order_by,
2166 window_frame,
2167 )
2168 .collect_vec(),
2169 _ => unreachable!("asserted above that `can_fuse_with_unnest_list`"),
2170 }
2171 .into_iter()
2172 }
2173
2174 pub fn default(&self) -> Datum<'static> {
2177 match self {
2178 AggregateFunc::Count => Datum::Int64(0),
2179 AggregateFunc::Any => Datum::False,
2180 AggregateFunc::All => Datum::True,
2181 AggregateFunc::Dummy => Datum::Dummy,
2182 _ => Datum::Null,
2183 }
2184 }
2185
2186 pub fn identity_datum(&self) -> Datum<'static> {
2189 match self {
2190 AggregateFunc::Any => Datum::False,
2191 AggregateFunc::All => Datum::True,
2192 AggregateFunc::Dummy => Datum::Dummy,
2193 AggregateFunc::ArrayConcat { .. } => Datum::empty_array(),
2194 AggregateFunc::ListConcat { .. } => Datum::empty_list(),
2195 AggregateFunc::RowNumber { .. }
2196 | AggregateFunc::Rank { .. }
2197 | AggregateFunc::DenseRank { .. }
2198 | AggregateFunc::LagLead { .. }
2199 | AggregateFunc::FirstValue { .. }
2200 | AggregateFunc::LastValue { .. }
2201 | AggregateFunc::WindowAggregate { .. }
2202 | AggregateFunc::FusedValueWindowFunc { .. }
2203 | AggregateFunc::FusedWindowAggregate { .. } => Datum::empty_list(),
2204 AggregateFunc::MaxNumeric
2205 | AggregateFunc::MaxInt16
2206 | AggregateFunc::MaxInt32
2207 | AggregateFunc::MaxInt64
2208 | AggregateFunc::MaxUInt16
2209 | AggregateFunc::MaxUInt32
2210 | AggregateFunc::MaxUInt64
2211 | AggregateFunc::MaxMzTimestamp
2212 | AggregateFunc::MaxFloat32
2213 | AggregateFunc::MaxFloat64
2214 | AggregateFunc::MaxBool
2215 | AggregateFunc::MaxString
2216 | AggregateFunc::MaxDate
2217 | AggregateFunc::MaxTimestamp
2218 | AggregateFunc::MaxTimestampTz
2219 | AggregateFunc::MaxInterval
2220 | AggregateFunc::MaxTime
2221 | AggregateFunc::MinNumeric
2222 | AggregateFunc::MinInt16
2223 | AggregateFunc::MinInt32
2224 | AggregateFunc::MinInt64
2225 | AggregateFunc::MinUInt16
2226 | AggregateFunc::MinUInt32
2227 | AggregateFunc::MinUInt64
2228 | AggregateFunc::MinMzTimestamp
2229 | AggregateFunc::MinFloat32
2230 | AggregateFunc::MinFloat64
2231 | AggregateFunc::MinBool
2232 | AggregateFunc::MinString
2233 | AggregateFunc::MinDate
2234 | AggregateFunc::MinTimestamp
2235 | AggregateFunc::MinTimestampTz
2236 | AggregateFunc::MinInterval
2237 | AggregateFunc::MinTime
2238 | AggregateFunc::SumInt16
2239 | AggregateFunc::SumInt32
2240 | AggregateFunc::SumInt64
2241 | AggregateFunc::SumUInt16
2242 | AggregateFunc::SumUInt32
2243 | AggregateFunc::SumUInt64
2244 | AggregateFunc::SumFloat32
2245 | AggregateFunc::SumFloat64
2246 | AggregateFunc::SumNumeric
2247 | AggregateFunc::Count
2248 | AggregateFunc::JsonbAgg { .. }
2249 | AggregateFunc::JsonbObjectAgg { .. }
2250 | AggregateFunc::MapAgg { .. }
2251 | AggregateFunc::StringAgg { .. } => Datum::Null,
2252 }
2253 }
2254
2255 pub fn can_fuse_with_unnest_list(&self) -> bool {
2256 match self {
2257 AggregateFunc::RowNumber { .. }
2258 | AggregateFunc::Rank { .. }
2259 | AggregateFunc::DenseRank { .. }
2260 | AggregateFunc::LagLead { .. }
2261 | AggregateFunc::FirstValue { .. }
2262 | AggregateFunc::LastValue { .. }
2263 | AggregateFunc::WindowAggregate { .. }
2264 | AggregateFunc::FusedValueWindowFunc { .. }
2265 | AggregateFunc::FusedWindowAggregate { .. } => true,
2266 AggregateFunc::ArrayConcat { .. }
2267 | AggregateFunc::ListConcat { .. }
2268 | AggregateFunc::Any
2269 | AggregateFunc::All
2270 | AggregateFunc::Dummy
2271 | AggregateFunc::MaxNumeric
2272 | AggregateFunc::MaxInt16
2273 | AggregateFunc::MaxInt32
2274 | AggregateFunc::MaxInt64
2275 | AggregateFunc::MaxUInt16
2276 | AggregateFunc::MaxUInt32
2277 | AggregateFunc::MaxUInt64
2278 | AggregateFunc::MaxMzTimestamp
2279 | AggregateFunc::MaxFloat32
2280 | AggregateFunc::MaxFloat64
2281 | AggregateFunc::MaxBool
2282 | AggregateFunc::MaxString
2283 | AggregateFunc::MaxDate
2284 | AggregateFunc::MaxTimestamp
2285 | AggregateFunc::MaxTimestampTz
2286 | AggregateFunc::MaxInterval
2287 | AggregateFunc::MaxTime
2288 | AggregateFunc::MinNumeric
2289 | AggregateFunc::MinInt16
2290 | AggregateFunc::MinInt32
2291 | AggregateFunc::MinInt64
2292 | AggregateFunc::MinUInt16
2293 | AggregateFunc::MinUInt32
2294 | AggregateFunc::MinUInt64
2295 | AggregateFunc::MinMzTimestamp
2296 | AggregateFunc::MinFloat32
2297 | AggregateFunc::MinFloat64
2298 | AggregateFunc::MinBool
2299 | AggregateFunc::MinString
2300 | AggregateFunc::MinDate
2301 | AggregateFunc::MinTimestamp
2302 | AggregateFunc::MinTimestampTz
2303 | AggregateFunc::MinInterval
2304 | AggregateFunc::MinTime
2305 | AggregateFunc::SumInt16
2306 | AggregateFunc::SumInt32
2307 | AggregateFunc::SumInt64
2308 | AggregateFunc::SumUInt16
2309 | AggregateFunc::SumUInt32
2310 | AggregateFunc::SumUInt64
2311 | AggregateFunc::SumFloat32
2312 | AggregateFunc::SumFloat64
2313 | AggregateFunc::SumNumeric
2314 | AggregateFunc::Count
2315 | AggregateFunc::JsonbAgg { .. }
2316 | AggregateFunc::JsonbObjectAgg { .. }
2317 | AggregateFunc::MapAgg { .. }
2318 | AggregateFunc::StringAgg { .. } => false,
2319 }
2320 }
2321
2322 pub fn output_sql_type(&self, input_type: SqlColumnType) -> SqlColumnType {
2328 let scalar_type = match self {
2329 AggregateFunc::Count => SqlScalarType::Int64,
2330 AggregateFunc::Any => SqlScalarType::Bool,
2331 AggregateFunc::All => SqlScalarType::Bool,
2332 AggregateFunc::JsonbAgg { .. } => SqlScalarType::Jsonb,
2333 AggregateFunc::JsonbObjectAgg { .. } => SqlScalarType::Jsonb,
2334 AggregateFunc::SumInt16 => SqlScalarType::Int64,
2335 AggregateFunc::SumInt32 => SqlScalarType::Int64,
2336 AggregateFunc::SumInt64 => SqlScalarType::Numeric {
2337 max_scale: Some(NumericMaxScale::ZERO),
2338 },
2339 AggregateFunc::SumUInt16 => SqlScalarType::UInt64,
2340 AggregateFunc::SumUInt32 => SqlScalarType::UInt64,
2341 AggregateFunc::SumUInt64 => SqlScalarType::Numeric {
2342 max_scale: Some(NumericMaxScale::ZERO),
2343 },
2344 AggregateFunc::MapAgg { value_type, .. } => SqlScalarType::Map {
2345 value_type: Box::new(value_type.clone()),
2346 custom_id: None,
2347 },
2348 AggregateFunc::ArrayConcat { .. } | AggregateFunc::ListConcat { .. } => {
2349 match input_type.scalar_type {
2350 SqlScalarType::Record { ref fields, .. } => fields[0].1.scalar_type.clone(),
2352 _ => unreachable!(),
2353 }
2354 }
2355 AggregateFunc::StringAgg { .. } => SqlScalarType::String,
2356 AggregateFunc::RowNumber { .. } => {
2357 AggregateFunc::output_type_ranking_window_funcs(&input_type, "?row_number?")
2358 }
2359 AggregateFunc::Rank { .. } => {
2360 AggregateFunc::output_type_ranking_window_funcs(&input_type, "?rank?")
2361 }
2362 AggregateFunc::DenseRank { .. } => {
2363 AggregateFunc::output_type_ranking_window_funcs(&input_type, "?dense_rank?")
2364 }
2365 AggregateFunc::LagLead { lag_lead: lag_lead_type, .. } => {
2366 let fields = input_type.scalar_type.unwrap_record_element_type();
2368 let original_row_type = fields[0].unwrap_record_element_type()[0]
2369 .clone()
2370 .nullable(false);
2371 let encoded_args = fields[0].unwrap_record_element_type()[1];
2372 let output_type_inner =
2373 Self::lag_lead_output_type_inner_from_encoded_args(encoded_args);
2374 let column_name = Self::lag_lead_result_column_name(lag_lead_type);
2375
2376 SqlScalarType::List {
2377 element_type: Box::new(SqlScalarType::Record {
2378 fields: [
2379 (column_name, output_type_inner),
2380 (ColumnName::from("?orig_row?"), original_row_type),
2381 ].into(),
2382 custom_id: None,
2383 }),
2384 custom_id: None,
2385 }
2386 }
2387 AggregateFunc::FirstValue { .. } => {
2388 let fields = input_type.scalar_type.unwrap_record_element_type();
2390 let original_row_type = fields[0].unwrap_record_element_type()[0]
2391 .clone()
2392 .nullable(false);
2393 let value_type = fields[0].unwrap_record_element_type()[1]
2394 .clone()
2395 .nullable(true); SqlScalarType::List {
2398 element_type: Box::new(SqlScalarType::Record {
2399 fields: [
2400 (ColumnName::from("?first_value?"), value_type),
2401 (ColumnName::from("?orig_row?"), original_row_type),
2402 ].into(),
2403 custom_id: None,
2404 }),
2405 custom_id: None,
2406 }
2407 }
2408 AggregateFunc::LastValue { .. } => {
2409 let fields = input_type.scalar_type.unwrap_record_element_type();
2411 let original_row_type = fields[0].unwrap_record_element_type()[0]
2412 .clone()
2413 .nullable(false);
2414 let value_type = fields[0].unwrap_record_element_type()[1]
2415 .clone()
2416 .nullable(true); SqlScalarType::List {
2419 element_type: Box::new(SqlScalarType::Record {
2420 fields: [
2421 (ColumnName::from("?last_value?"), value_type),
2422 (ColumnName::from("?orig_row?"), original_row_type),
2423 ].into(),
2424 custom_id: None,
2425 }),
2426 custom_id: None,
2427 }
2428 }
2429 AggregateFunc::WindowAggregate {
2430 wrapped_aggregate, ..
2431 } => {
2432 let fields = input_type.scalar_type.unwrap_record_element_type();
2434 let original_row_type = fields[0].unwrap_record_element_type()[0]
2435 .clone()
2436 .nullable(false);
2437 let arg_type = fields[0].unwrap_record_element_type()[1]
2438 .clone()
2439 .nullable(true);
2440 let wrapped_aggr_out_type = wrapped_aggregate.output_sql_type(arg_type);
2441
2442 SqlScalarType::List {
2443 element_type: Box::new(SqlScalarType::Record {
2444 fields: [
2445 (ColumnName::from("?window_agg?"), wrapped_aggr_out_type),
2446 (ColumnName::from("?orig_row?"), original_row_type),
2447 ].into(),
2448 custom_id: None,
2449 }),
2450 custom_id: None,
2451 }
2452 }
2453 AggregateFunc::FusedWindowAggregate {
2454 wrapped_aggregates, ..
2455 } => {
2456 let fields = input_type.scalar_type.unwrap_record_element_type();
2459 let original_row_type = fields[0].unwrap_record_element_type()[0]
2460 .clone()
2461 .nullable(false);
2462 let args_type = fields[0].unwrap_record_element_type()[1];
2463 let arg_types = args_type.unwrap_record_element_type();
2464 let out_fields = arg_types.iter().zip_eq(wrapped_aggregates).map(
2465 |(arg_type, wrapped_agg)| {
2466 (
2467 ColumnName::from(wrapped_agg.name()),
2468 wrapped_agg.output_sql_type((**arg_type).clone().nullable(true)),
2469 )
2470 }).collect_vec();
2471
2472 SqlScalarType::List {
2473 element_type: Box::new(SqlScalarType::Record {
2474 fields: [
2475 (ColumnName::from("?fused_window_agg?"), SqlScalarType::Record {
2476 fields: out_fields.into(),
2477 custom_id: None,
2478 }.nullable(false)),
2479 (ColumnName::from("?orig_row?"), original_row_type),
2480 ].into(),
2481 custom_id: None,
2482 }),
2483 custom_id: None,
2484 }
2485 }
2486 AggregateFunc::FusedValueWindowFunc { funcs, order_by: _ } => {
2487 let fields = input_type.scalar_type.unwrap_record_element_type();
2492 let original_row_type = fields[0].unwrap_record_element_type()[0]
2493 .clone()
2494 .nullable(false);
2495 let encoded_args_type = fields[0]
2496 .unwrap_record_element_type()[1]
2497 .unwrap_record_element_type();
2498
2499 SqlScalarType::List {
2500 element_type: Box::new(SqlScalarType::Record {
2501 fields: [
2502 (
2503 ColumnName::from("?fused_value_window_func?"),
2504 SqlScalarType::Record {
2505 fields: encoded_args_type.into_iter().zip_eq(funcs).map(
2506 |(arg_type, func)| {
2507 match func {
2508 AggregateFunc::LagLead {
2509 lag_lead: lag_lead_type, ..
2510 } => {
2511 let name = Self::lag_lead_result_column_name(
2512 lag_lead_type,
2513 );
2514 let ty = Self
2515 ::lag_lead_output_type_inner_from_encoded_args(
2516 arg_type,
2517 );
2518 (name, ty)
2519 },
2520 AggregateFunc::FirstValue { .. } => {
2521 (
2522 ColumnName::from("?first_value?"),
2523 arg_type.clone().nullable(true),
2524 )
2525 }
2526 AggregateFunc::LastValue { .. } => {
2527 (
2528 ColumnName::from("?last_value?"),
2529 arg_type.clone().nullable(true),
2530 )
2531 }
2532 _ => panic!("FusedValueWindowFunc has an unknown function"),
2533 }
2534 }).collect(),
2535 custom_id: None,
2536 }.nullable(false)),
2537 (ColumnName::from("?orig_row?"), original_row_type),
2538 ].into(),
2539 custom_id: None,
2540 }),
2541 custom_id: None,
2542 }
2543 }
2544 AggregateFunc::Dummy
2545 | AggregateFunc::MaxNumeric
2546 | AggregateFunc::MaxInt16
2547 | AggregateFunc::MaxInt32
2548 | AggregateFunc::MaxInt64
2549 | AggregateFunc::MaxUInt16
2550 | AggregateFunc::MaxUInt32
2551 | AggregateFunc::MaxUInt64
2552 | AggregateFunc::MaxMzTimestamp
2553 | AggregateFunc::MaxFloat32
2554 | AggregateFunc::MaxFloat64
2555 | AggregateFunc::MaxBool
2556 | AggregateFunc::MaxString
2560 | AggregateFunc::MaxDate
2561 | AggregateFunc::MaxTimestamp
2562 | AggregateFunc::MaxTimestampTz
2563 | AggregateFunc::MaxInterval
2564 | AggregateFunc::MaxTime
2565 | AggregateFunc::MinNumeric
2566 | AggregateFunc::MinInt16
2567 | AggregateFunc::MinInt32
2568 | AggregateFunc::MinInt64
2569 | AggregateFunc::MinUInt16
2570 | AggregateFunc::MinUInt32
2571 | AggregateFunc::MinUInt64
2572 | AggregateFunc::MinMzTimestamp
2573 | AggregateFunc::MinFloat32
2574 | AggregateFunc::MinFloat64
2575 | AggregateFunc::MinBool
2576 | AggregateFunc::MinString
2577 | AggregateFunc::MinDate
2578 | AggregateFunc::MinTimestamp
2579 | AggregateFunc::MinTimestampTz
2580 | AggregateFunc::MinInterval
2581 | AggregateFunc::MinTime
2582 | AggregateFunc::SumFloat32
2583 | AggregateFunc::SumFloat64
2584 | AggregateFunc::SumNumeric => input_type.scalar_type.clone(),
2585 };
2586 let nullable = match self {
2589 AggregateFunc::Count => false,
2590 AggregateFunc::StringAgg { .. } => match input_type.scalar_type {
2592 SqlScalarType::Record { fields, .. } => match &fields[0].1.scalar_type {
2594 SqlScalarType::Record { fields, .. } => fields[0].1.nullable,
2596 _ => unreachable!(),
2597 },
2598 _ => unreachable!(),
2599 },
2600 _ => input_type.nullable,
2601 };
2602 scalar_type.nullable(nullable)
2603 }
2604
2605 pub fn output_type(&self, input_type: ReprColumnType) -> ReprColumnType {
2609 ReprColumnType::from(&self.output_sql_type(SqlColumnType::from_repr(&input_type)))
2610 }
2611
2612 fn output_type_ranking_window_funcs(
2614 input_type: &SqlColumnType,
2615 col_name: &str,
2616 ) -> SqlScalarType {
2617 match input_type.scalar_type {
2618 SqlScalarType::Record { ref fields, .. } => SqlScalarType::List {
2619 element_type: Box::new(SqlScalarType::Record {
2620 fields: [
2621 (
2622 ColumnName::from(col_name),
2623 SqlScalarType::Int64.nullable(false),
2624 ),
2625 (ColumnName::from("?orig_row?"), {
2626 let inner = match &fields[0].1.scalar_type {
2627 SqlScalarType::List { element_type, .. } => element_type.clone(),
2628 _ => unreachable!(),
2629 };
2630 inner.nullable(false)
2631 }),
2632 ]
2633 .into(),
2634 custom_id: None,
2635 }),
2636 custom_id: None,
2637 },
2638 _ => unreachable!(),
2639 }
2640 }
2641
2642 fn lag_lead_output_type_inner_from_encoded_args(
2646 encoded_args_type: &SqlScalarType,
2647 ) -> SqlColumnType {
2648 encoded_args_type.unwrap_record_element_type()[0]
2652 .clone()
2653 .nullable(true)
2654 }
2655
2656 fn lag_lead_result_column_name(lag_lead_type: &LagLeadType) -> ColumnName {
2657 ColumnName::from(match lag_lead_type {
2658 LagLeadType::Lag => "?lag?",
2659 LagLeadType::Lead => "?lead?",
2660 })
2661 }
2662
2663 pub fn propagates_nonnull_constraint(&self) -> bool {
2668 match self {
2669 AggregateFunc::MaxNumeric
2670 | AggregateFunc::MaxInt16
2671 | AggregateFunc::MaxInt32
2672 | AggregateFunc::MaxInt64
2673 | AggregateFunc::MaxUInt16
2674 | AggregateFunc::MaxUInt32
2675 | AggregateFunc::MaxUInt64
2676 | AggregateFunc::MaxMzTimestamp
2677 | AggregateFunc::MaxFloat32
2678 | AggregateFunc::MaxFloat64
2679 | AggregateFunc::MaxBool
2680 | AggregateFunc::MaxString
2681 | AggregateFunc::MaxDate
2682 | AggregateFunc::MaxTimestamp
2683 | AggregateFunc::MaxTimestampTz
2684 | AggregateFunc::MaxInterval
2685 | AggregateFunc::MaxTime
2686 | AggregateFunc::MinNumeric
2687 | AggregateFunc::MinInt16
2688 | AggregateFunc::MinInt32
2689 | AggregateFunc::MinInt64
2690 | AggregateFunc::MinUInt16
2691 | AggregateFunc::MinUInt32
2692 | AggregateFunc::MinUInt64
2693 | AggregateFunc::MinMzTimestamp
2694 | AggregateFunc::MinFloat32
2695 | AggregateFunc::MinFloat64
2696 | AggregateFunc::MinBool
2697 | AggregateFunc::MinString
2698 | AggregateFunc::MinDate
2699 | AggregateFunc::MinTimestamp
2700 | AggregateFunc::MinTimestampTz
2701 | AggregateFunc::MinInterval
2702 | AggregateFunc::MinTime
2703 | AggregateFunc::SumInt16
2704 | AggregateFunc::SumInt32
2705 | AggregateFunc::SumInt64
2706 | AggregateFunc::SumUInt16
2707 | AggregateFunc::SumUInt32
2708 | AggregateFunc::SumUInt64
2709 | AggregateFunc::SumFloat32
2710 | AggregateFunc::SumFloat64
2711 | AggregateFunc::SumNumeric
2712 | AggregateFunc::StringAgg { .. } => true,
2713 AggregateFunc::Count
2715 | AggregateFunc::Any
2716 | AggregateFunc::All
2717 | AggregateFunc::JsonbAgg { .. }
2718 | AggregateFunc::JsonbObjectAgg { .. }
2719 | AggregateFunc::MapAgg { .. }
2720 | AggregateFunc::ArrayConcat { .. }
2721 | AggregateFunc::ListConcat { .. }
2722 | AggregateFunc::RowNumber { .. }
2723 | AggregateFunc::Rank { .. }
2724 | AggregateFunc::DenseRank { .. }
2725 | AggregateFunc::LagLead { .. }
2726 | AggregateFunc::FirstValue { .. }
2727 | AggregateFunc::LastValue { .. }
2728 | AggregateFunc::FusedValueWindowFunc { .. }
2729 | AggregateFunc::WindowAggregate { .. }
2730 | AggregateFunc::FusedWindowAggregate { .. }
2731 | AggregateFunc::Dummy => false,
2732 }
2733 }
2734}
2735
2736fn jsonb_each<'a>(a: Datum<'a>) -> impl Iterator<Item = (Row, Diff)> + 'a {
2737 let map = match a {
2739 Datum::Map(dict) => dict,
2740 _ => mz_repr::DatumMap::empty(),
2741 };
2742
2743 map.iter()
2744 .map(move |(k, v)| (Row::pack_slice(&[Datum::String(k), v]), Diff::ONE))
2745}
2746
2747fn jsonb_each_stringify<'a>(
2748 a: Datum<'a>,
2749 temp_storage: &'a RowArena,
2750) -> impl Iterator<Item = (Row, Diff)> + 'a {
2751 let map = match a {
2753 Datum::Map(dict) => dict,
2754 _ => mz_repr::DatumMap::empty(),
2755 };
2756
2757 map.iter().map(move |(k, mut v)| {
2758 v = jsonb_stringify(v, temp_storage)
2759 .map(Datum::String)
2760 .unwrap_or(Datum::Null);
2761 (Row::pack_slice(&[Datum::String(k), v]), Diff::ONE)
2762 })
2763}
2764
2765fn jsonb_object_keys<'a>(a: Datum<'a>) -> impl Iterator<Item = (Row, Diff)> + 'a {
2766 let map = match a {
2767 Datum::Map(dict) => dict,
2768 _ => mz_repr::DatumMap::empty(),
2769 };
2770
2771 map.iter()
2772 .map(move |(k, _)| (Row::pack_slice(&[Datum::String(k)]), Diff::ONE))
2773}
2774
2775fn jsonb_array_elements<'a>(a: Datum<'a>) -> impl Iterator<Item = (Row, Diff)> + 'a {
2776 let list = match a {
2777 Datum::List(list) => list,
2778 _ => mz_repr::DatumList::empty(),
2779 };
2780 list.iter().map(move |e| (Row::pack_slice(&[e]), Diff::ONE))
2781}
2782
2783fn jsonb_array_elements_stringify<'a>(
2784 a: Datum<'a>,
2785 temp_storage: &'a RowArena,
2786) -> impl Iterator<Item = (Row, Diff)> + 'a {
2787 let list = match a {
2788 Datum::List(list) => list,
2789 _ => mz_repr::DatumList::empty(),
2790 };
2791 list.iter().map(move |mut e| {
2792 e = jsonb_stringify(e, temp_storage)
2793 .map(Datum::String)
2794 .unwrap_or(Datum::Null);
2795 (Row::pack_slice(&[e]), Diff::ONE)
2796 })
2797}
2798
2799fn regexp_extract(a: Datum, r: &AnalyzedRegex) -> Option<(Row, Diff)> {
2800 let r = r.inner();
2801 let a = a.unwrap_str();
2802 let captures = r.captures(a)?;
2803 let datums = captures
2804 .iter()
2805 .skip(1)
2806 .map(|m| Datum::from(m.map(|m| m.as_str())));
2807 Some((Row::pack(datums), Diff::ONE))
2808}
2809
2810fn regexp_matches<'a>(
2811 exprs: &[Datum<'a>],
2812) -> Result<impl Iterator<Item = (Row, Diff)> + 'a, EvalError> {
2813 assert!(exprs.len() == 2 || exprs.len() == 3);
2817 let a = exprs[0].unwrap_str();
2818 let r = exprs[1].unwrap_str();
2819
2820 let (regex, opts) = if exprs.len() == 3 {
2821 let flag = exprs[2].unwrap_str();
2822 let opts = AnalyzedRegexOpts::from_str(flag)?;
2823 (AnalyzedRegex::new(r, opts)?, opts)
2824 } else {
2825 let opts = AnalyzedRegexOpts::default();
2826 (AnalyzedRegex::new(r, opts)?, opts)
2827 };
2828
2829 let regex = regex.inner().clone();
2830
2831 let iter = regex.captures_iter(a).map(move |captures| {
2832 let matches = captures
2833 .iter()
2834 .skip(1)
2836 .map(|m| Datum::from(m.map(|m| m.as_str())))
2837 .collect::<Vec<_>>();
2838
2839 let mut binding = SharedRow::get();
2840 let mut packer = binding.packer();
2841
2842 let dimension = ArrayDimension {
2843 lower_bound: 1,
2844 length: matches.len(),
2845 };
2846 packer
2847 .try_push_array(&[dimension], matches)
2848 .expect("generated dimensions above");
2849
2850 (binding.clone(), Diff::ONE)
2851 });
2852
2853 let out = iter.collect::<SmallVec<[_; 3]>>();
2858
2859 if opts.global {
2860 Ok(Either::Left(out.into_iter()))
2861 } else {
2862 Ok(Either::Right(out.into_iter().take(1)))
2863 }
2864}
2865
2866fn generate_series<N>(
2867 start: N,
2868 stop: N,
2869 step: N,
2870) -> Result<impl Iterator<Item = (Row, Diff)>, EvalError>
2871where
2872 N: Integer + Signed + CheckedAdd + Clone,
2873 Datum<'static>: From<N>,
2874{
2875 if step == N::zero() {
2876 return Err(EvalError::InvalidParameterValue(
2877 "step size cannot equal zero".into(),
2878 ));
2879 }
2880 Ok(num::range_step_inclusive(start, stop, step)
2881 .map(move |i| (Row::pack_slice(&[Datum::from(i)]), Diff::ONE)))
2882}
2883
2884#[derive(Clone)]
2888pub struct TimestampRangeStepInclusive<T> {
2889 state: CheckedTimestamp<T>,
2890 stop: CheckedTimestamp<T>,
2891 step: Interval,
2892 rev: bool,
2893 done: bool,
2894}
2895
2896impl<T: TimestampLike> Iterator for TimestampRangeStepInclusive<T> {
2897 type Item = CheckedTimestamp<T>;
2898
2899 #[inline]
2900 fn next(&mut self) -> Option<CheckedTimestamp<T>> {
2901 if !self.done
2902 && ((self.rev && self.state >= self.stop) || (!self.rev && self.state <= self.stop))
2903 {
2904 let result = self.state.clone();
2905 match add_timestamp_months(self.state.deref(), self.step.months) {
2906 Ok(state) => match state.checked_add_signed(self.step.duration_as_chrono()) {
2907 Some(v) => match CheckedTimestamp::from_timestamplike(v) {
2908 Ok(v) => self.state = v,
2909 Err(_) => self.done = true,
2910 },
2911 None => self.done = true,
2912 },
2913 Err(..) => {
2914 self.done = true;
2915 }
2916 }
2917
2918 Some(result)
2919 } else {
2920 None
2921 }
2922 }
2923}
2924
2925fn generate_series_ts<T: TimestampLike>(
2926 start: CheckedTimestamp<T>,
2927 stop: CheckedTimestamp<T>,
2928 step: Interval,
2929 conv: fn(CheckedTimestamp<T>) -> Datum<'static>,
2930) -> Result<impl Iterator<Item = (Row, Diff)>, EvalError> {
2931 let normalized_step = step.as_microseconds();
2932 if normalized_step == 0 {
2933 return Err(EvalError::InvalidParameterValue(
2934 "step size cannot equal zero".into(),
2935 ));
2936 }
2937 let rev = normalized_step < 0;
2938
2939 let trsi = TimestampRangeStepInclusive {
2940 state: start,
2941 stop,
2942 step,
2943 rev,
2944 done: false,
2945 };
2946
2947 Ok(trsi.map(move |i| (Row::pack_slice(&[conv(i)]), Diff::ONE)))
2948}
2949
2950fn generate_subscripts_array(
2951 a: Datum,
2952 dim: i32,
2953) -> Result<Box<dyn Iterator<Item = (Row, Diff)>>, EvalError> {
2954 if dim <= 0 {
2955 return Ok(Box::new(iter::empty()));
2956 }
2957
2958 match a.unwrap_array().dims().into_iter().nth(
2959 (dim - 1)
2960 .try_into()
2961 .map_err(|_| EvalError::Int32OutOfRange((dim - 1).to_string().into()))?,
2962 ) {
2963 Some(requested_dim) => Ok(Box::new(generate_series::<i32>(
2964 requested_dim.lower_bound.try_into().map_err(|_| {
2965 EvalError::Int32OutOfRange(requested_dim.lower_bound.to_string().into())
2966 })?,
2967 requested_dim
2968 .length
2969 .try_into()
2970 .map_err(|_| EvalError::Int32OutOfRange(requested_dim.length.to_string().into()))?,
2971 1,
2972 )?)),
2973 None => Ok(Box::new(iter::empty())),
2974 }
2975}
2976
2977fn unnest_array<'a>(a: Datum<'a>) -> impl Iterator<Item = (Row, Diff)> + 'a {
2978 a.unwrap_array()
2979 .elements()
2980 .iter()
2981 .map(move |e| (Row::pack_slice(&[e]), Diff::ONE))
2982}
2983
2984fn unnest_list<'a>(a: Datum<'a>) -> impl Iterator<Item = (Row, Diff)> + 'a {
2985 a.unwrap_list()
2986 .iter()
2987 .map(move |e| (Row::pack_slice(&[e]), Diff::ONE))
2988}
2989
2990fn unnest_map<'a>(a: Datum<'a>) -> impl Iterator<Item = (Row, Diff)> + 'a {
2991 a.unwrap_map()
2992 .iter()
2993 .map(move |(k, v)| (Row::pack_slice(&[Datum::from(k), v]), Diff::ONE))
2994}
2995
2996impl AggregateFunc {
2997 pub fn name(&self) -> &'static str {
3000 match self {
3001 Self::MaxNumeric => "max",
3002 Self::MaxInt16 => "max",
3003 Self::MaxInt32 => "max",
3004 Self::MaxInt64 => "max",
3005 Self::MaxUInt16 => "max",
3006 Self::MaxUInt32 => "max",
3007 Self::MaxUInt64 => "max",
3008 Self::MaxMzTimestamp => "max",
3009 Self::MaxFloat32 => "max",
3010 Self::MaxFloat64 => "max",
3011 Self::MaxBool => "max",
3012 Self::MaxString => "max",
3013 Self::MaxDate => "max",
3014 Self::MaxTimestamp => "max",
3015 Self::MaxTimestampTz => "max",
3016 Self::MaxInterval => "max",
3017 Self::MaxTime => "max",
3018 Self::MinNumeric => "min",
3019 Self::MinInt16 => "min",
3020 Self::MinInt32 => "min",
3021 Self::MinInt64 => "min",
3022 Self::MinUInt16 => "min",
3023 Self::MinUInt32 => "min",
3024 Self::MinUInt64 => "min",
3025 Self::MinMzTimestamp => "min",
3026 Self::MinFloat32 => "min",
3027 Self::MinFloat64 => "min",
3028 Self::MinBool => "min",
3029 Self::MinString => "min",
3030 Self::MinDate => "min",
3031 Self::MinTimestamp => "min",
3032 Self::MinTimestampTz => "min",
3033 Self::MinInterval => "min",
3034 Self::MinTime => "min",
3035 Self::SumInt16 => "sum",
3036 Self::SumInt32 => "sum",
3037 Self::SumInt64 => "sum",
3038 Self::SumUInt16 => "sum",
3039 Self::SumUInt32 => "sum",
3040 Self::SumUInt64 => "sum",
3041 Self::SumFloat32 => "sum",
3042 Self::SumFloat64 => "sum",
3043 Self::SumNumeric => "sum",
3044 Self::Count => "count",
3045 Self::Any => "any",
3046 Self::All => "all",
3047 Self::JsonbAgg { .. } => "jsonb_agg",
3048 Self::JsonbObjectAgg { .. } => "jsonb_object_agg",
3049 Self::MapAgg { .. } => "map_agg",
3050 Self::ArrayConcat { .. } => "array_agg",
3051 Self::ListConcat { .. } => "list_agg",
3052 Self::StringAgg { .. } => "string_agg",
3053 Self::RowNumber { .. } => "row_number",
3054 Self::Rank { .. } => "rank",
3055 Self::DenseRank { .. } => "dense_rank",
3056 Self::LagLead {
3057 lag_lead: LagLeadType::Lag,
3058 ..
3059 } => "lag",
3060 Self::LagLead {
3061 lag_lead: LagLeadType::Lead,
3062 ..
3063 } => "lead",
3064 Self::FirstValue { .. } => "first_value",
3065 Self::LastValue { .. } => "last_value",
3066 Self::WindowAggregate { .. } => "window_agg",
3067 Self::FusedValueWindowFunc { .. } => "fused_value_window_func",
3068 Self::FusedWindowAggregate { .. } => "fused_window_agg",
3069 Self::Dummy => "dummy",
3070 }
3071 }
3072}
3073
3074impl<'a, M> fmt::Display for HumanizedExpr<'a, AggregateFunc, M>
3075where
3076 M: HumanizerMode,
3077{
3078 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
3079 use AggregateFunc::*;
3080 let name = self.expr.name();
3081 match self.expr {
3082 JsonbAgg { order_by }
3083 | JsonbObjectAgg { order_by }
3084 | MapAgg { order_by, .. }
3085 | ArrayConcat { order_by }
3086 | ListConcat { order_by }
3087 | StringAgg { order_by }
3088 | RowNumber { order_by }
3089 | Rank { order_by }
3090 | DenseRank { order_by } => {
3091 let order_by = order_by.iter().map(|col| self.child(col));
3092 write!(f, "{}[order_by=[{}]]", name, separated(", ", order_by))
3093 }
3094 LagLead {
3095 lag_lead: _,
3096 ignore_nulls,
3097 order_by,
3098 } => {
3099 let order_by = order_by.iter().map(|col| self.child(col));
3100 f.write_str(name)?;
3101 f.write_str("[")?;
3102 if *ignore_nulls {
3103 f.write_str("ignore_nulls=true, ")?;
3104 }
3105 write!(f, "order_by=[{}]", separated(", ", order_by))?;
3106 f.write_str("]")
3107 }
3108 FirstValue {
3109 order_by,
3110 window_frame,
3111 } => {
3112 let order_by = order_by.iter().map(|col| self.child(col));
3113 f.write_str(name)?;
3114 f.write_str("[")?;
3115 write!(f, "order_by=[{}]", separated(", ", order_by))?;
3116 if *window_frame != WindowFrame::default() {
3117 write!(f, " {}", window_frame)?;
3118 }
3119 f.write_str("]")
3120 }
3121 LastValue {
3122 order_by,
3123 window_frame,
3124 } => {
3125 let order_by = order_by.iter().map(|col| self.child(col));
3126 f.write_str(name)?;
3127 f.write_str("[")?;
3128 write!(f, "order_by=[{}]", separated(", ", order_by))?;
3129 if *window_frame != WindowFrame::default() {
3130 write!(f, " {}", window_frame)?;
3131 }
3132 f.write_str("]")
3133 }
3134 WindowAggregate {
3135 wrapped_aggregate,
3136 order_by,
3137 window_frame,
3138 } => {
3139 let order_by = order_by.iter().map(|col| self.child(col));
3140 let wrapped_aggregate = self.child(wrapped_aggregate.deref());
3141 f.write_str(name)?;
3142 f.write_str("[")?;
3143 write!(f, "{} ", wrapped_aggregate)?;
3144 write!(f, "order_by=[{}]", separated(", ", order_by))?;
3145 if *window_frame != WindowFrame::default() {
3146 write!(f, " {}", window_frame)?;
3147 }
3148 f.write_str("]")
3149 }
3150 FusedValueWindowFunc { funcs, order_by } => {
3151 let order_by = order_by.iter().map(|col| self.child(col));
3152 let funcs = separated(", ", funcs.iter().map(|func| self.child(func)));
3153 f.write_str(name)?;
3154 f.write_str("[")?;
3155 write!(f, "{} ", funcs)?;
3156 write!(f, "order_by=[{}]", separated(", ", order_by))?;
3157 f.write_str("]")
3158 }
3159 _ => f.write_str(name),
3160 }
3161 }
3162}
3163
3164#[derive(
3165 Clone,
3166 Debug,
3167 Eq,
3168 PartialEq,
3169 Ord,
3170 PartialOrd,
3171 Serialize,
3172 Deserialize,
3173 Hash,
3174 MzReflect
3175)]
3176pub struct CaptureGroupDesc {
3177 pub index: u32,
3178 pub name: Option<String>,
3179 pub nullable: bool,
3180}
3181
3182#[derive(
3183 Clone,
3184 Copy,
3185 Debug,
3186 Eq,
3187 PartialEq,
3188 Ord,
3189 PartialOrd,
3190 Serialize,
3191 Deserialize,
3192 Hash,
3193 MzReflect,
3194 Default
3195)]
3196pub struct AnalyzedRegexOpts {
3197 pub case_insensitive: bool,
3198 pub global: bool,
3199}
3200
3201impl FromStr for AnalyzedRegexOpts {
3202 type Err = EvalError;
3203
3204 fn from_str(s: &str) -> Result<Self, Self::Err> {
3205 let mut opts = AnalyzedRegexOpts::default();
3206 for c in s.chars() {
3207 match c {
3208 'i' => opts.case_insensitive = true,
3209 'g' => opts.global = true,
3210 _ => return Err(EvalError::InvalidRegexFlag(c)),
3211 }
3212 }
3213 Ok(opts)
3214 }
3215}
3216
3217#[derive(
3218 Clone,
3219 Debug,
3220 Eq,
3221 PartialEq,
3222 Ord,
3223 PartialOrd,
3224 Serialize,
3225 Deserialize,
3226 Hash,
3227 MzReflect
3228)]
3229pub struct AnalyzedRegex(ReprRegex, Vec<CaptureGroupDesc>, AnalyzedRegexOpts);
3230
3231impl AnalyzedRegex {
3232 pub fn new(s: &str, opts: AnalyzedRegexOpts) -> Result<Self, RegexCompilationError> {
3233 let r = ReprRegex::new(s, opts.case_insensitive)?;
3234 #[allow(clippy::as_conversions)]
3236 let descs: Vec<_> = r
3237 .capture_names()
3238 .enumerate()
3239 .skip(1)
3244 .map(|(i, name)| CaptureGroupDesc {
3245 index: i as u32,
3246 name: name.map(String::from),
3247 nullable: true,
3250 })
3251 .collect();
3252 Ok(Self(r, descs, opts))
3253 }
3254 pub fn capture_groups_len(&self) -> usize {
3255 self.1.len()
3256 }
3257 pub fn capture_groups_iter(&self) -> impl Iterator<Item = &CaptureGroupDesc> {
3258 self.1.iter()
3259 }
3260 pub fn inner(&self) -> &Regex {
3261 &(self.0).regex
3262 }
3263 pub fn opts(&self) -> &AnalyzedRegexOpts {
3264 &self.2
3265 }
3266}
3267
3268pub fn csv_extract(a: Datum<'_>, n_cols: usize) -> impl Iterator<Item = (Row, Diff)> + '_ {
3269 let bytes = a.unwrap_str().as_bytes();
3270 let mut row = Row::default();
3271 let csv_reader = csv::ReaderBuilder::new()
3272 .has_headers(false)
3273 .from_reader(bytes);
3274 csv_reader.into_records().filter_map(move |res| match res {
3275 Ok(sr) if sr.len() == n_cols => {
3276 row.packer().extend(sr.iter().map(Datum::String));
3277 Some((row.clone(), Diff::ONE))
3278 }
3279 _ => None,
3280 })
3281}
3282
3283pub fn repeat(a: Datum) -> Option<(Row, Diff)> {
3284 let n = a.unwrap_int64();
3285 if n != 0 {
3286 Some((Row::default(), n.into()))
3287 } else {
3288 None
3289 }
3290}
3291
3292fn wrap<'a>(datums: &'a [Datum<'a>], width: usize) -> impl Iterator<Item = (Row, Diff)> + 'a {
3293 datums
3294 .chunks(width)
3295 .map(|chunk| (Row::pack(chunk), Diff::ONE))
3296}
3297
3298fn acl_explode<'a>(
3299 acl_items: Datum<'a>,
3300 temp_storage: &'a RowArena,
3301) -> Result<impl Iterator<Item = (Row, Diff)> + 'a, EvalError> {
3302 let acl_items = acl_items.unwrap_array();
3303 let mut res = Vec::new();
3304 for acl_item in acl_items.elements().iter() {
3305 if acl_item.is_null() {
3306 return Err(EvalError::AclArrayNullElement);
3307 }
3308 let acl_item = acl_item.unwrap_acl_item();
3309 for privilege in acl_item.acl_mode.explode() {
3310 let row = [
3311 Datum::UInt32(acl_item.grantor.0),
3312 Datum::UInt32(acl_item.grantee.0),
3313 Datum::String(temp_storage.push_string(privilege.to_string())),
3314 Datum::False,
3316 ];
3317 res.push((Row::pack_slice(&row), Diff::ONE));
3318 }
3319 }
3320 Ok(res.into_iter())
3321}
3322
3323fn mz_acl_explode<'a>(
3324 mz_acl_items: Datum<'a>,
3325 temp_storage: &'a RowArena,
3326) -> Result<impl Iterator<Item = (Row, Diff)> + 'a, EvalError> {
3327 let mz_acl_items = mz_acl_items.unwrap_array();
3328 let mut res = Vec::new();
3329 for mz_acl_item in mz_acl_items.elements().iter() {
3330 if mz_acl_item.is_null() {
3331 return Err(EvalError::MzAclArrayNullElement);
3332 }
3333 let mz_acl_item = mz_acl_item.unwrap_mz_acl_item();
3334 for privilege in mz_acl_item.acl_mode.explode() {
3335 let row = [
3336 Datum::String(temp_storage.push_string(mz_acl_item.grantor.to_string())),
3337 Datum::String(temp_storage.push_string(mz_acl_item.grantee.to_string())),
3338 Datum::String(temp_storage.push_string(privilege.to_string())),
3339 Datum::False,
3341 ];
3342 res.push((Row::pack_slice(&row), Diff::ONE));
3343 }
3344 }
3345 Ok(res.into_iter())
3346}
3347
3348#[derive(
3351 Clone,
3352 Debug,
3353 Eq,
3354 PartialEq,
3355 Ord,
3356 PartialOrd,
3357 Serialize,
3358 Deserialize,
3359 Hash,
3360 MzReflect
3361)]
3362pub enum TableFunc {
3363 AclExplode,
3364 MzAclExplode,
3365 JsonbEach,
3366 JsonbEachStringify,
3367 JsonbObjectKeys,
3368 JsonbArrayElements,
3369 JsonbArrayElementsStringify,
3370 RegexpExtract(AnalyzedRegex),
3371 CsvExtract(usize),
3372 GenerateSeriesInt32,
3373 GenerateSeriesInt64,
3374 GenerateSeriesTimestamp,
3375 GenerateSeriesTimestampTz,
3376 GuardSubquerySize {
3397 column_type: SqlScalarType,
3398 },
3399 Repeat,
3400 UnnestArray {
3401 el_typ: SqlScalarType,
3402 },
3403 UnnestList {
3404 el_typ: SqlScalarType,
3405 },
3406 UnnestMap {
3407 value_type: SqlScalarType,
3408 },
3409 Wrap {
3415 types: Vec<SqlColumnType>,
3416 width: usize,
3417 },
3418 GenerateSubscriptsArray,
3419 TabletizedScalar {
3421 name: String,
3422 relation: SqlRelationType,
3423 },
3424 RegexpMatches,
3425 #[allow(private_interfaces)]
3430 WithOrdinality(WithOrdinality),
3431}
3432
3433#[derive(
3441 Clone,
3442 Debug,
3443 Eq,
3444 PartialEq,
3445 Ord,
3446 PartialOrd,
3447 Serialize,
3448 Deserialize,
3449 Hash,
3450 MzReflect
3451)]
3452struct WithOrdinality {
3453 inner: Box<TableFunc>,
3454}
3455
3456impl TableFunc {
3457 pub fn with_ordinality(inner: TableFunc) -> Option<TableFunc> {
3459 match inner {
3460 TableFunc::AclExplode
3461 | TableFunc::MzAclExplode
3462 | TableFunc::JsonbEach
3463 | TableFunc::JsonbEachStringify
3464 | TableFunc::JsonbObjectKeys
3465 | TableFunc::JsonbArrayElements
3466 | TableFunc::JsonbArrayElementsStringify
3467 | TableFunc::RegexpExtract(_)
3468 | TableFunc::CsvExtract(_)
3469 | TableFunc::GenerateSeriesInt32
3470 | TableFunc::GenerateSeriesInt64
3471 | TableFunc::GenerateSeriesTimestamp
3472 | TableFunc::GenerateSeriesTimestampTz
3473 | TableFunc::GuardSubquerySize { .. }
3474 | TableFunc::Repeat
3475 | TableFunc::UnnestArray { .. }
3476 | TableFunc::UnnestList { .. }
3477 | TableFunc::UnnestMap { .. }
3478 | TableFunc::Wrap { .. }
3479 | TableFunc::GenerateSubscriptsArray
3480 | TableFunc::TabletizedScalar { .. }
3481 | TableFunc::RegexpMatches => Some(TableFunc::WithOrdinality(WithOrdinality {
3482 inner: Box::new(inner),
3483 })),
3484 TableFunc::WithOrdinality(_) => None,
3487 }
3488 }
3489}
3490
3491impl TableFunc {
3492 pub fn eval<'a>(
3494 &'a self,
3495 datums: &'a [Datum<'a>],
3496 temp_storage: &'a RowArena,
3497 ) -> Result<Box<dyn Iterator<Item = (Row, Diff)> + 'a>, EvalError> {
3498 if self.empty_on_null_input() && datums.iter().any(|d| d.is_null()) {
3499 return Ok(Box::new(vec![].into_iter()));
3500 }
3501 match self {
3502 TableFunc::AclExplode => Ok(Box::new(acl_explode(datums[0], temp_storage)?)),
3503 TableFunc::MzAclExplode => Ok(Box::new(mz_acl_explode(datums[0], temp_storage)?)),
3504 TableFunc::JsonbEach => Ok(Box::new(jsonb_each(datums[0]))),
3505 TableFunc::JsonbEachStringify => {
3506 Ok(Box::new(jsonb_each_stringify(datums[0], temp_storage)))
3507 }
3508 TableFunc::JsonbObjectKeys => Ok(Box::new(jsonb_object_keys(datums[0]))),
3509 TableFunc::JsonbArrayElements => Ok(Box::new(jsonb_array_elements(datums[0]))),
3510 TableFunc::JsonbArrayElementsStringify => Ok(Box::new(jsonb_array_elements_stringify(
3511 datums[0],
3512 temp_storage,
3513 ))),
3514 TableFunc::RegexpExtract(a) => Ok(Box::new(regexp_extract(datums[0], a).into_iter())),
3515 TableFunc::CsvExtract(n_cols) => Ok(Box::new(csv_extract(datums[0], *n_cols))),
3516 TableFunc::GenerateSeriesInt32 => {
3517 let res = generate_series(
3518 datums[0].unwrap_int32(),
3519 datums[1].unwrap_int32(),
3520 datums[2].unwrap_int32(),
3521 )?;
3522 Ok(Box::new(res))
3523 }
3524 TableFunc::GenerateSeriesInt64 => {
3525 let res = generate_series(
3526 datums[0].unwrap_int64(),
3527 datums[1].unwrap_int64(),
3528 datums[2].unwrap_int64(),
3529 )?;
3530 Ok(Box::new(res))
3531 }
3532 TableFunc::GenerateSeriesTimestamp => {
3533 fn pass_through<'a>(d: CheckedTimestamp<NaiveDateTime>) -> Datum<'a> {
3534 Datum::from(d)
3535 }
3536 let res = generate_series_ts(
3537 datums[0].unwrap_timestamp(),
3538 datums[1].unwrap_timestamp(),
3539 datums[2].unwrap_interval(),
3540 pass_through,
3541 )?;
3542 Ok(Box::new(res))
3543 }
3544 TableFunc::GenerateSeriesTimestampTz => {
3545 fn gen_ts_tz<'a>(d: CheckedTimestamp<DateTime<Utc>>) -> Datum<'a> {
3546 Datum::from(d)
3547 }
3548 let res = generate_series_ts(
3549 datums[0].unwrap_timestamptz(),
3550 datums[1].unwrap_timestamptz(),
3551 datums[2].unwrap_interval(),
3552 gen_ts_tz,
3553 )?;
3554 Ok(Box::new(res))
3555 }
3556 TableFunc::GenerateSubscriptsArray => {
3557 generate_subscripts_array(datums[0], datums[1].unwrap_int32())
3558 }
3559 TableFunc::GuardSubquerySize { column_type: _ } => {
3560 let count = datums[0].unwrap_int64();
3563 if count == 1 {
3564 Ok(Box::new([].into_iter()))
3565 } else if count > 1 {
3566 Err(EvalError::MultipleRowsFromSubquery)
3567 } else if count < 0 {
3568 Err(EvalError::NegativeRowsFromSubquery)
3569 } else {
3570 soft_panic_or_log!("subquery counting unexpectedly produced 0");
3573 Err(EvalError::Internal(
3574 "subquery counting unexpectedly produced 0".into(),
3575 ))
3576 }
3577 }
3578 TableFunc::Repeat => Ok(Box::new(repeat(datums[0]).into_iter())),
3579 TableFunc::UnnestArray { .. } => Ok(Box::new(unnest_array(datums[0]))),
3580 TableFunc::UnnestList { .. } => Ok(Box::new(unnest_list(datums[0]))),
3581 TableFunc::UnnestMap { .. } => Ok(Box::new(unnest_map(datums[0]))),
3582 TableFunc::Wrap { width, .. } => Ok(Box::new(wrap(datums, *width))),
3583 TableFunc::TabletizedScalar { .. } => {
3584 let r = Row::pack_slice(datums);
3585 Ok(Box::new(std::iter::once((r, Diff::ONE))))
3586 }
3587 TableFunc::RegexpMatches => Ok(Box::new(regexp_matches(datums)?)),
3588 TableFunc::WithOrdinality(func_with_ordinality) => {
3589 func_with_ordinality.eval(datums, temp_storage)
3590 }
3591 }
3592 }
3593
3594 pub fn output_sql_type(&self) -> SqlRelationType {
3595 let (column_types, keys) = match self {
3596 TableFunc::AclExplode => {
3597 let column_types = vec![
3598 SqlScalarType::Oid.nullable(false),
3599 SqlScalarType::Oid.nullable(false),
3600 SqlScalarType::String.nullable(false),
3601 SqlScalarType::Bool.nullable(false),
3602 ];
3603 let keys = vec![];
3604 (column_types, keys)
3605 }
3606 TableFunc::MzAclExplode => {
3607 let column_types = vec![
3608 SqlScalarType::String.nullable(false),
3609 SqlScalarType::String.nullable(false),
3610 SqlScalarType::String.nullable(false),
3611 SqlScalarType::Bool.nullable(false),
3612 ];
3613 let keys = vec![];
3614 (column_types, keys)
3615 }
3616 TableFunc::JsonbEach => {
3617 let column_types = vec![
3618 SqlScalarType::String.nullable(false),
3619 SqlScalarType::Jsonb.nullable(false),
3620 ];
3621 let keys = vec![];
3622 (column_types, keys)
3623 }
3624 TableFunc::JsonbEachStringify => {
3625 let column_types = vec![
3626 SqlScalarType::String.nullable(false),
3627 SqlScalarType::String.nullable(true),
3628 ];
3629 let keys = vec![];
3630 (column_types, keys)
3631 }
3632 TableFunc::JsonbObjectKeys => {
3633 let column_types = vec![SqlScalarType::String.nullable(false)];
3634 let keys = vec![];
3635 (column_types, keys)
3636 }
3637 TableFunc::JsonbArrayElements => {
3638 let column_types = vec![SqlScalarType::Jsonb.nullable(false)];
3639 let keys = vec![];
3640 (column_types, keys)
3641 }
3642 TableFunc::JsonbArrayElementsStringify => {
3643 let column_types = vec![SqlScalarType::String.nullable(true)];
3644 let keys = vec![];
3645 (column_types, keys)
3646 }
3647 TableFunc::RegexpExtract(a) => {
3648 let column_types = a
3649 .capture_groups_iter()
3650 .map(|cg| SqlScalarType::String.nullable(cg.nullable))
3651 .collect();
3652 let keys = vec![];
3653 (column_types, keys)
3654 }
3655 TableFunc::CsvExtract(n_cols) => {
3656 let column_types = iter::repeat(SqlScalarType::String.nullable(false))
3657 .take(*n_cols)
3658 .collect();
3659 let keys = vec![];
3660 (column_types, keys)
3661 }
3662 TableFunc::GenerateSeriesInt32 => {
3663 let column_types = vec![SqlScalarType::Int32.nullable(false)];
3664 let keys = vec![vec![0]];
3665 (column_types, keys)
3666 }
3667 TableFunc::GenerateSeriesInt64 => {
3668 let column_types = vec![SqlScalarType::Int64.nullable(false)];
3669 let keys = vec![vec![0]];
3670 (column_types, keys)
3671 }
3672 TableFunc::GenerateSeriesTimestamp => {
3673 let column_types =
3674 vec![SqlScalarType::Timestamp { precision: None }.nullable(false)];
3675 let keys = vec![vec![0]];
3676 (column_types, keys)
3677 }
3678 TableFunc::GenerateSeriesTimestampTz => {
3679 let column_types =
3680 vec![SqlScalarType::TimestampTz { precision: None }.nullable(false)];
3681 let keys = vec![vec![0]];
3682 (column_types, keys)
3683 }
3684 TableFunc::GenerateSubscriptsArray => {
3685 let column_types = vec![SqlScalarType::Int32.nullable(false)];
3686 let keys = vec![vec![0]];
3687 (column_types, keys)
3688 }
3689 TableFunc::GuardSubquerySize { column_type } => {
3690 let column_types = vec![column_type.clone().nullable(false)];
3691 let keys = vec![];
3692 (column_types, keys)
3693 }
3694 TableFunc::Repeat => {
3695 let column_types = vec![];
3696 let keys = vec![];
3697 (column_types, keys)
3698 }
3699 TableFunc::UnnestArray { el_typ } => {
3700 let column_types = vec![el_typ.clone().nullable(true)];
3701 let keys = vec![];
3702 (column_types, keys)
3703 }
3704 TableFunc::UnnestList { el_typ } => {
3705 let column_types = vec![el_typ.clone().nullable(true)];
3706 let keys = vec![];
3707 (column_types, keys)
3708 }
3709 TableFunc::UnnestMap { value_type } => {
3710 let column_types = vec![
3711 SqlScalarType::String.nullable(false),
3712 value_type.clone().nullable(true),
3713 ];
3714 let keys = vec![vec![0]];
3715 (column_types, keys)
3716 }
3717 TableFunc::Wrap { types, .. } => {
3718 let column_types = types.clone();
3719 let keys = vec![];
3720 (column_types, keys)
3721 }
3722 TableFunc::TabletizedScalar { relation, .. } => {
3723 return relation.clone();
3724 }
3725 TableFunc::RegexpMatches => {
3726 let column_types =
3727 vec![SqlScalarType::Array(Box::new(SqlScalarType::String)).nullable(false)];
3728 let keys = vec![];
3729
3730 (column_types, keys)
3731 }
3732 TableFunc::WithOrdinality(WithOrdinality { inner }) => {
3733 let mut typ = inner.output_sql_type();
3734 typ.column_types.push(SqlScalarType::Int64.nullable(false));
3736 typ.keys.push(vec![typ.column_types.len() - 1]);
3738 (typ.column_types, typ.keys)
3739 }
3740 };
3741
3742 soft_assert_eq_no_log!(column_types.len(), self.output_arity());
3743
3744 if !keys.is_empty() {
3745 SqlRelationType::new(column_types).with_keys(keys)
3746 } else {
3747 SqlRelationType::new(column_types)
3748 }
3749 }
3750
3751 pub fn output_type(&self) -> ReprRelationType {
3755 ReprRelationType::from(&self.output_sql_type())
3756 }
3757
3758 pub fn output_arity(&self) -> usize {
3759 match self {
3760 TableFunc::AclExplode => 4,
3761 TableFunc::MzAclExplode => 4,
3762 TableFunc::JsonbEach => 2,
3763 TableFunc::JsonbEachStringify => 2,
3764 TableFunc::JsonbObjectKeys => 1,
3765 TableFunc::JsonbArrayElements => 1,
3766 TableFunc::JsonbArrayElementsStringify => 1,
3767 TableFunc::RegexpExtract(a) => a.capture_groups_len(),
3768 TableFunc::CsvExtract(n_cols) => *n_cols,
3769 TableFunc::GenerateSeriesInt32 => 1,
3770 TableFunc::GenerateSeriesInt64 => 1,
3771 TableFunc::GenerateSeriesTimestamp => 1,
3772 TableFunc::GenerateSeriesTimestampTz => 1,
3773 TableFunc::GenerateSubscriptsArray => 1,
3774 TableFunc::GuardSubquerySize { .. } => 1,
3775 TableFunc::Repeat => 0,
3776 TableFunc::UnnestArray { .. } => 1,
3777 TableFunc::UnnestList { .. } => 1,
3778 TableFunc::UnnestMap { .. } => 2,
3779 TableFunc::Wrap { width, .. } => *width,
3780 TableFunc::TabletizedScalar { relation, .. } => relation.column_types.len(),
3781 TableFunc::RegexpMatches => 1,
3782 TableFunc::WithOrdinality(WithOrdinality { inner }) => inner.output_arity() + 1,
3783 }
3784 }
3785
3786 pub fn empty_on_null_input(&self) -> bool {
3787 match self {
3788 TableFunc::AclExplode
3789 | TableFunc::MzAclExplode
3790 | TableFunc::JsonbEach
3791 | TableFunc::JsonbEachStringify
3792 | TableFunc::JsonbObjectKeys
3793 | TableFunc::JsonbArrayElements
3794 | TableFunc::JsonbArrayElementsStringify
3795 | TableFunc::GenerateSeriesInt32
3796 | TableFunc::GenerateSeriesInt64
3797 | TableFunc::GenerateSeriesTimestamp
3798 | TableFunc::GenerateSeriesTimestampTz
3799 | TableFunc::GenerateSubscriptsArray
3800 | TableFunc::RegexpExtract(_)
3801 | TableFunc::CsvExtract(_)
3802 | TableFunc::Repeat
3803 | TableFunc::UnnestArray { .. }
3804 | TableFunc::UnnestList { .. }
3805 | TableFunc::UnnestMap { .. }
3806 | TableFunc::RegexpMatches => true,
3807 TableFunc::GuardSubquerySize { .. } => false,
3808 TableFunc::Wrap { .. } => false,
3809 TableFunc::TabletizedScalar { .. } => false,
3810 TableFunc::WithOrdinality(WithOrdinality { inner }) => inner.empty_on_null_input(),
3811 }
3812 }
3813
3814 pub fn preserves_monotonicity(&self) -> bool {
3816 match self {
3819 TableFunc::AclExplode => false,
3820 TableFunc::MzAclExplode => false,
3821 TableFunc::JsonbEach => true,
3822 TableFunc::JsonbEachStringify => true,
3823 TableFunc::JsonbObjectKeys => true,
3824 TableFunc::JsonbArrayElements => true,
3825 TableFunc::JsonbArrayElementsStringify => true,
3826 TableFunc::RegexpExtract(_) => true,
3827 TableFunc::CsvExtract(_) => true,
3828 TableFunc::GenerateSeriesInt32 => true,
3829 TableFunc::GenerateSeriesInt64 => true,
3830 TableFunc::GenerateSeriesTimestamp => true,
3831 TableFunc::GenerateSeriesTimestampTz => true,
3832 TableFunc::GenerateSubscriptsArray => true,
3833 TableFunc::Repeat => false,
3834 TableFunc::UnnestArray { .. } => true,
3835 TableFunc::UnnestList { .. } => true,
3836 TableFunc::UnnestMap { .. } => true,
3837 TableFunc::Wrap { .. } => true,
3838 TableFunc::TabletizedScalar { .. } => true,
3839 TableFunc::RegexpMatches => true,
3840 TableFunc::GuardSubquerySize { .. } => false,
3841 TableFunc::WithOrdinality(WithOrdinality { inner }) => inner.preserves_monotonicity(),
3842 }
3843 }
3844}
3845
3846impl fmt::Display for TableFunc {
3847 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
3848 match self {
3849 TableFunc::AclExplode => f.write_str("aclexplode"),
3850 TableFunc::MzAclExplode => f.write_str("mz_aclexplode"),
3851 TableFunc::JsonbEach => f.write_str("jsonb_each"),
3852 TableFunc::JsonbEachStringify => f.write_str("jsonb_each_text"),
3853 TableFunc::JsonbObjectKeys => f.write_str("jsonb_object_keys"),
3854 TableFunc::JsonbArrayElements => f.write_str("jsonb_array_elements"),
3855 TableFunc::JsonbArrayElementsStringify => f.write_str("jsonb_array_elements_text"),
3856 TableFunc::RegexpExtract(a) => write!(f, "regexp_extract({:?}, _)", a.0),
3857 TableFunc::CsvExtract(n_cols) => write!(f, "csv_extract({}, _)", n_cols),
3858 TableFunc::GenerateSeriesInt32 => f.write_str("generate_series"),
3859 TableFunc::GenerateSeriesInt64 => f.write_str("generate_series"),
3860 TableFunc::GenerateSeriesTimestamp => f.write_str("generate_series"),
3861 TableFunc::GenerateSeriesTimestampTz => f.write_str("generate_series"),
3862 TableFunc::GenerateSubscriptsArray => f.write_str("generate_subscripts"),
3863 TableFunc::GuardSubquerySize { .. } => f.write_str("guard_subquery_size"),
3864 TableFunc::Repeat => f.write_str("repeat_row"),
3865 TableFunc::UnnestArray { .. } => f.write_str("unnest_array"),
3866 TableFunc::UnnestList { .. } => f.write_str("unnest_list"),
3867 TableFunc::UnnestMap { .. } => f.write_str("unnest_map"),
3868 TableFunc::Wrap { width, .. } => write!(f, "wrap{}", width),
3869 TableFunc::TabletizedScalar { name, .. } => f.write_str(name),
3870 TableFunc::RegexpMatches => write!(f, "regexp_matches(_, _, _)"),
3871 TableFunc::WithOrdinality(WithOrdinality { inner }) => {
3872 write!(f, "{}[with_ordinality]", inner)
3873 }
3874 }
3875 }
3876}
3877
3878impl WithOrdinality {
3879 fn eval<'a>(
3888 &'a self,
3889 datums: &'a [Datum<'a>],
3890 temp_storage: &'a RowArena,
3891 ) -> Result<Box<dyn Iterator<Item = (Row, Diff)> + 'a>, EvalError> {
3892 let mut next_ordinal: i64 = 1;
3893 let it = self
3894 .inner
3895 .eval(datums, temp_storage)?
3896 .flat_map(move |(mut row, diff)| {
3897 let diff = diff.into_inner();
3898 assert!(diff >= 0);
3905 let mut ordinals = next_ordinal..(next_ordinal + diff);
3907 next_ordinal += diff;
3908 let cap = row.data_len() + datum_size(&Datum::Int64(next_ordinal));
3910 iter::from_fn(move || {
3911 let ordinal = ordinals.next()?;
3912 let mut row = if ordinals.is_empty() {
3913 std::mem::take(&mut row)
3916 } else {
3917 let mut new_row = Row::with_capacity(cap);
3918 new_row.clone_from(&row);
3919 new_row
3920 };
3921 RowPacker::for_existing_row(&mut row).push(Datum::Int64(ordinal));
3922 Some((row, Diff::ONE))
3923 })
3924 });
3925 Ok(Box::new(it))
3926 }
3927}