1#![allow(missing_docs)]
11
12use std::cmp::{max, min};
13use std::iter::Sum;
14use std::ops::Deref;
15use std::str::FromStr;
16use std::{fmt, iter};
17
18use chrono::{DateTime, NaiveDateTime, NaiveTime, Utc};
19use dec::OrderedDecimal;
20use itertools::{Either, Itertools};
21use mz_lowertest::MzReflect;
22use mz_ore::cast::CastFrom;
23
24use mz_ore::str::separated;
25use mz_ore::{soft_assert_eq_no_log, soft_assert_or_log};
26use mz_repr::adt::array::ArrayDimension;
27use mz_repr::adt::date::Date;
28use mz_repr::adt::interval::Interval;
29use mz_repr::adt::numeric::{self, Numeric, NumericMaxScale};
30use mz_repr::adt::regex::{Regex as ReprRegex, RegexCompilationError};
31use mz_repr::adt::timestamp::{CheckedTimestamp, TimestampLike};
32use mz_repr::{
33 ColumnName, Datum, Diff, ReprColumnType, ReprRelationType, Row, RowArena, RowPacker, SharedRow,
34 SqlColumnType, SqlRelationType, SqlScalarType, datum_size,
35};
36use num::{CheckedAdd, Integer, Signed, ToPrimitive};
37use ordered_float::OrderedFloat;
38use regex::Regex;
39use serde::{Deserialize, Serialize};
40use smallvec::SmallVec;
41
42use crate::EvalError;
43use crate::WindowFrameBound::{
44 CurrentRow, OffsetFollowing, OffsetPreceding, UnboundedFollowing, UnboundedPreceding,
45};
46use crate::WindowFrameUnits::{Groups, Range, Rows};
47use crate::explain::{HumanizedExpr, HumanizerMode};
48use crate::relation::{
49 ColumnOrder, WindowFrame, WindowFrameBound, WindowFrameUnits, compare_columns,
50};
51use crate::scalar::func::{add_timestamp_months, jsonb_stringify};
52
53fn max_string<'a, I>(datums: I) -> Datum<'a>
57where
58 I: IntoIterator<Item = Datum<'a>>,
59{
60 match datums
61 .into_iter()
62 .filter(|d| !d.is_null())
63 .max_by(|a, b| a.unwrap_str().cmp(b.unwrap_str()))
64 {
65 Some(datum) => datum,
66 None => Datum::Null,
67 }
68}
69
70fn max_datum<'a, I, DatumType>(datums: I) -> Datum<'a>
71where
72 I: IntoIterator<Item = Datum<'a>>,
73 DatumType: TryFrom<Datum<'a>> + Ord,
74 <DatumType as TryFrom<Datum<'a>>>::Error: std::fmt::Debug,
75 Datum<'a>: From<Option<DatumType>>,
76{
77 let x: Option<DatumType> = datums
78 .into_iter()
79 .filter(|d| !d.is_null())
80 .map(|d| DatumType::try_from(d).expect("unexpected type"))
81 .max();
82
83 x.into()
84}
85
86fn min_datum<'a, I, DatumType>(datums: I) -> Datum<'a>
87where
88 I: IntoIterator<Item = Datum<'a>>,
89 DatumType: TryFrom<Datum<'a>> + Ord,
90 <DatumType as TryFrom<Datum<'a>>>::Error: std::fmt::Debug,
91 Datum<'a>: From<Option<DatumType>>,
92{
93 let x: Option<DatumType> = datums
94 .into_iter()
95 .filter(|d| !d.is_null())
96 .map(|d| DatumType::try_from(d).expect("unexpected type"))
97 .min();
98
99 x.into()
100}
101
102fn min_string<'a, I>(datums: I) -> Datum<'a>
103where
104 I: IntoIterator<Item = Datum<'a>>,
105{
106 match datums
107 .into_iter()
108 .filter(|d| !d.is_null())
109 .min_by(|a, b| a.unwrap_str().cmp(b.unwrap_str()))
110 {
111 Some(datum) => datum,
112 None => Datum::Null,
113 }
114}
115
116fn sum_datum<'a, I, DatumType, ResultType>(datums: I) -> Datum<'a>
117where
118 I: IntoIterator<Item = Datum<'a>>,
119 DatumType: TryFrom<Datum<'a>>,
120 <DatumType as TryFrom<Datum<'a>>>::Error: std::fmt::Debug,
121 ResultType: From<DatumType> + Sum + Into<Datum<'a>>,
122{
123 let mut datums = datums.into_iter().filter(|d| !d.is_null()).peekable();
124 if datums.peek().is_none() {
125 Datum::Null
126 } else {
127 let x = datums
128 .map(|d| ResultType::from(DatumType::try_from(d).expect("unexpected type")))
129 .sum::<ResultType>();
130 x.into()
131 }
132}
133
134fn sum_numeric<'a, I>(datums: I) -> Datum<'a>
135where
136 I: IntoIterator<Item = Datum<'a>>,
137{
138 let mut cx = numeric::cx_datum();
139 let mut sum = Numeric::zero();
140 let mut empty = true;
141 for d in datums {
142 if !d.is_null() {
143 empty = false;
144 cx.add(&mut sum, &d.unwrap_numeric().0);
145 }
146 }
147 match empty {
148 true => Datum::Null,
149 false => Datum::from(sum),
150 }
151}
152
153#[allow(clippy::as_conversions)]
155fn count<'a, I>(datums: I) -> Datum<'a>
156where
157 I: IntoIterator<Item = Datum<'a>>,
158{
159 let x: i64 = datums.into_iter().filter(|d| !d.is_null()).count() as i64;
161 Datum::from(x)
162}
163
164fn any<'a, I>(datums: I) -> Datum<'a>
165where
166 I: IntoIterator<Item = Datum<'a>>,
167{
168 datums
169 .into_iter()
170 .fold(Datum::False, |state, next| match (state, next) {
171 (Datum::True, _) | (_, Datum::True) => Datum::True,
172 (Datum::Null, _) | (_, Datum::Null) => Datum::Null,
173 _ => Datum::False,
174 })
175}
176
177fn all<'a, I>(datums: I) -> Datum<'a>
178where
179 I: IntoIterator<Item = Datum<'a>>,
180{
181 datums
182 .into_iter()
183 .fold(Datum::True, |state, next| match (state, next) {
184 (Datum::False, _) | (_, Datum::False) => Datum::False,
185 (Datum::Null, _) | (_, Datum::Null) => Datum::Null,
186 _ => Datum::True,
187 })
188}
189
190fn string_agg<'a, I>(datums: I, temp_storage: &'a RowArena, order_by: &[ColumnOrder]) -> Datum<'a>
191where
192 I: IntoIterator<Item = Datum<'a>>,
193{
194 const EMPTY_SEP: &str = "";
195
196 let datums = order_aggregate_datums(datums, order_by);
197 let mut sep_value_pairs = datums.into_iter().filter_map(|d| {
198 if d.is_null() {
199 return None;
200 }
201 let mut value_sep = d.unwrap_list().iter();
202 match (value_sep.next().unwrap(), value_sep.next().unwrap()) {
203 (Datum::Null, _) => None,
204 (Datum::String(val), Datum::Null) => Some((EMPTY_SEP, val)),
205 (Datum::String(val), Datum::String(sep)) => Some((sep, val)),
206 _ => unreachable!(),
207 }
208 });
209
210 let mut s = String::default();
211 match sep_value_pairs.next() {
212 Some((_, value)) => s.push_str(value),
214 None => return Datum::Null,
216 }
217
218 for (sep, value) in sep_value_pairs {
219 s.push_str(sep);
220 s.push_str(value);
221 }
222
223 Datum::String(temp_storage.push_string(s))
224}
225
226fn jsonb_agg<'a, I>(datums: I, temp_storage: &'a RowArena, order_by: &[ColumnOrder]) -> Datum<'a>
227where
228 I: IntoIterator<Item = Datum<'a>>,
229{
230 let datums = order_aggregate_datums(datums, order_by);
231 temp_storage.make_datum(|packer| {
232 packer.push_list(datums.into_iter().filter(|d| !d.is_null()));
233 })
234}
235
236fn dict_agg<'a, I>(datums: I, temp_storage: &'a RowArena, order_by: &[ColumnOrder]) -> Datum<'a>
237where
238 I: IntoIterator<Item = Datum<'a>>,
239{
240 let datums = order_aggregate_datums(datums, order_by);
241 temp_storage.make_datum(|packer| {
242 let mut datums: Vec<_> = datums
243 .into_iter()
244 .filter_map(|d| {
245 if d.is_null() {
246 return None;
247 }
248 let mut list = d.unwrap_list().iter();
249 let key = list.next().unwrap();
250 let val = list.next().unwrap();
251 if key.is_null() {
252 None
255 } else {
256 Some((key.unwrap_str(), val))
257 }
258 })
259 .collect();
260 datums.sort_by_key(|(k, _v)| *k);
266 datums.reverse();
267 datums.dedup_by_key(|(k, _v)| *k);
268 datums.reverse();
269 packer.push_dict(datums);
270 })
271}
272
273pub fn order_aggregate_datums<'a: 'b, 'b, I>(
283 datums: I,
284 order_by: &[ColumnOrder],
285) -> impl Iterator<Item = Datum<'b>>
286where
287 I: IntoIterator<Item = Datum<'a>>,
288{
289 order_aggregate_datums_with_rank_inner(datums, order_by)
290 .into_iter()
291 .map(|(payload, _order_datums)| payload)
293}
294
295fn order_aggregate_datums_with_rank<'a, I>(
298 datums: I,
299 order_by: &[ColumnOrder],
300) -> impl Iterator<Item = (Datum<'a>, Row)>
301where
302 I: IntoIterator<Item = Datum<'a>>,
303{
304 order_aggregate_datums_with_rank_inner(datums, order_by)
305 .into_iter()
306 .map(|(payload, order_by_datums)| (payload, Row::pack(order_by_datums)))
307}
308
309fn order_aggregate_datums_with_rank_inner<'a, I>(
310 datums: I,
311 order_by: &[ColumnOrder],
312) -> Vec<(Datum<'a>, Vec<Datum<'a>>)>
313where
314 I: IntoIterator<Item = Datum<'a>>,
315{
316 let mut decoded: Vec<(Datum, Vec<Datum>)> = datums
317 .into_iter()
318 .map(|d| {
319 let list = d.unwrap_list();
320 let mut list_it = list.iter();
321 let payload = list_it.next().unwrap();
322
323 let mut order_by_datums = Vec::with_capacity(order_by.len());
333 for _ in 0..order_by.len() {
334 order_by_datums.push(
335 list_it
336 .next()
337 .expect("must have exactly the same number of Datums as `order_by`"),
338 );
339 }
340
341 (payload, order_by_datums)
342 })
343 .collect();
344
345 let mut sort_by =
346 |(payload_left, left_order_by_datums): &(Datum, Vec<Datum>),
347 (payload_right, right_order_by_datums): &(Datum, Vec<Datum>)| {
348 compare_columns(
349 order_by,
350 left_order_by_datums,
351 right_order_by_datums,
352 || payload_left.cmp(payload_right),
353 )
354 };
355 decoded.sort_unstable_by(&mut sort_by);
360 decoded
361}
362
363fn array_concat<'a, I>(datums: I, temp_storage: &'a RowArena, order_by: &[ColumnOrder]) -> Datum<'a>
364where
365 I: IntoIterator<Item = Datum<'a>>,
366{
367 let datums = order_aggregate_datums(datums, order_by);
368 let datums: Vec<_> = datums
369 .into_iter()
370 .map(|d| d.unwrap_array().elements().iter())
371 .flatten()
372 .collect();
373 let dims = ArrayDimension {
374 lower_bound: 1,
375 length: datums.len(),
376 };
377 temp_storage.make_datum(|packer| {
378 packer.try_push_array(&[dims], datums).unwrap();
379 })
380}
381
382fn list_concat<'a, I>(datums: I, temp_storage: &'a RowArena, order_by: &[ColumnOrder]) -> Datum<'a>
383where
384 I: IntoIterator<Item = Datum<'a>>,
385{
386 let datums = order_aggregate_datums(datums, order_by);
387 temp_storage.make_datum(|packer| {
388 packer.push_list(datums.into_iter().map(|d| d.unwrap_list().iter()).flatten());
389 })
390}
391
392fn row_number<'a, I>(
396 datums: I,
397 callers_temp_storage: &'a RowArena,
398 order_by: &[ColumnOrder],
399) -> Datum<'a>
400where
401 I: IntoIterator<Item = Datum<'a>>,
402{
403 let temp_storage = RowArena::new();
407 let datums = row_number_no_list(datums, &temp_storage, order_by);
408
409 callers_temp_storage.make_datum(|packer| {
410 packer.push_list(datums);
411 })
412}
413
414fn row_number_no_list<'a: 'b, 'b, I>(
417 datums: I,
418 callers_temp_storage: &'b RowArena,
419 order_by: &[ColumnOrder],
420) -> impl Iterator<Item = Datum<'b>>
421where
422 I: IntoIterator<Item = Datum<'a>>,
423{
424 let datums = order_aggregate_datums(datums, order_by);
425
426 callers_temp_storage.reserve(datums.size_hint().0);
427 #[allow(clippy::disallowed_methods)]
428 datums
429 .into_iter()
430 .map(|d| d.unwrap_list().iter())
431 .flatten()
432 .zip(1i64..)
433 .map(|(d, i)| {
434 callers_temp_storage.make_datum(|packer| {
435 packer.push_list_with(|packer| {
436 packer.push(Datum::Int64(i));
437 packer.push(d);
438 });
439 })
440 })
441}
442
443fn rank<'a, I>(datums: I, callers_temp_storage: &'a RowArena, order_by: &[ColumnOrder]) -> Datum<'a>
447where
448 I: IntoIterator<Item = Datum<'a>>,
449{
450 let temp_storage = RowArena::new();
451 let datums = rank_no_list(datums, &temp_storage, order_by);
452
453 callers_temp_storage.make_datum(|packer| {
454 packer.push_list(datums);
455 })
456}
457
458fn rank_no_list<'a: 'b, 'b, I>(
461 datums: I,
462 callers_temp_storage: &'b RowArena,
463 order_by: &[ColumnOrder],
464) -> impl Iterator<Item = Datum<'b>>
465where
466 I: IntoIterator<Item = Datum<'a>>,
467{
468 let datums = order_aggregate_datums_with_rank(datums, order_by);
470
471 let mut datums = datums
472 .into_iter()
473 .map(|(d0, order_row)| {
474 d0.unwrap_list()
475 .iter()
476 .map(move |d1| (d1, order_row.clone()))
477 })
478 .flatten();
479
480 callers_temp_storage.reserve(datums.size_hint().0);
481 datums
482 .next()
483 .map_or(vec![], |(first_datum, first_order_row)| {
484 datums.fold(
487 (first_order_row, 1, 1, vec![(first_datum, 1)]),
488 |mut acc, (next_datum, next_order_row)| {
489 let (ref mut acc_row, ref mut acc_rank, ref mut acc_row_num, ref mut output) = acc;
490 *acc_row_num += 1;
491 if *acc_row != next_order_row {
493 *acc_rank = *acc_row_num;
494 *acc_row = next_order_row;
495 }
496
497 (*output).push((next_datum, *acc_rank));
498 acc
499 })
500 }.3).into_iter().map(|(d, i)| {
501 callers_temp_storage.make_datum(|packer| {
502 packer.push_list_with(|packer| {
503 packer.push(Datum::Int64(i));
504 packer.push(d);
505 });
506 })
507 })
508}
509
510fn dense_rank<'a, I>(
514 datums: I,
515 callers_temp_storage: &'a RowArena,
516 order_by: &[ColumnOrder],
517) -> Datum<'a>
518where
519 I: IntoIterator<Item = Datum<'a>>,
520{
521 let temp_storage = RowArena::new();
522 let datums = dense_rank_no_list(datums, &temp_storage, order_by);
523
524 callers_temp_storage.make_datum(|packer| {
525 packer.push_list(datums);
526 })
527}
528
529fn dense_rank_no_list<'a: 'b, 'b, I>(
532 datums: I,
533 callers_temp_storage: &'b RowArena,
534 order_by: &[ColumnOrder],
535) -> impl Iterator<Item = Datum<'b>>
536where
537 I: IntoIterator<Item = Datum<'a>>,
538{
539 let datums = order_aggregate_datums_with_rank(datums, order_by);
541
542 let mut datums = datums
543 .into_iter()
544 .map(|(d0, order_row)| {
545 d0.unwrap_list()
546 .iter()
547 .map(move |d1| (d1, order_row.clone()))
548 })
549 .flatten();
550
551 callers_temp_storage.reserve(datums.size_hint().0);
552 datums
553 .next()
554 .map_or(vec![], |(first_datum, first_order_row)| {
555 datums.fold(
558 (first_order_row, 1, vec![(first_datum, 1)]),
559 |mut acc, (next_datum, next_order_row)| {
560 let (ref mut acc_row, ref mut acc_rank, ref mut output) = acc;
561 if *acc_row != next_order_row {
563 *acc_rank += 1;
564 *acc_row = next_order_row;
565 }
566
567 (*output).push((next_datum, *acc_rank));
568 acc
569 })
570 }.2).into_iter().map(|(d, i)| {
571 callers_temp_storage.make_datum(|packer| {
572 packer.push_list_with(|packer| {
573 packer.push(Datum::Int64(i));
574 packer.push(d);
575 });
576 })
577 })
578}
579
580fn lag_lead<'a, I>(
602 datums: I,
603 callers_temp_storage: &'a RowArena,
604 order_by: &[ColumnOrder],
605 lag_lead_type: &LagLeadType,
606 ignore_nulls: &bool,
607) -> Datum<'a>
608where
609 I: IntoIterator<Item = Datum<'a>>,
610{
611 let temp_storage = RowArena::new();
612 let iter = lag_lead_no_list(datums, &temp_storage, order_by, lag_lead_type, ignore_nulls);
613 callers_temp_storage.make_datum(|packer| {
614 packer.push_list(iter);
615 })
616}
617
618fn lag_lead_no_list<'a: 'b, 'b, I>(
621 datums: I,
622 callers_temp_storage: &'b RowArena,
623 order_by: &[ColumnOrder],
624 lag_lead_type: &LagLeadType,
625 ignore_nulls: &bool,
626) -> impl Iterator<Item = Datum<'b>>
627where
628 I: IntoIterator<Item = Datum<'a>>,
629{
630 let datums = order_aggregate_datums(datums, order_by);
632
633 let (orig_rows, unwrapped_args): (Vec<_>, Vec<_>) = datums
637 .into_iter()
638 .map(|d| {
639 let mut iter = d.unwrap_list().iter();
640 let original_row = iter.next().unwrap();
641 let (input_value, offset, default_value) =
642 unwrap_lag_lead_encoded_args(iter.next().unwrap());
643 (original_row, (input_value, offset, default_value))
644 })
645 .unzip();
646
647 let result = lag_lead_inner(unwrapped_args, lag_lead_type, ignore_nulls);
648
649 callers_temp_storage.reserve(result.len());
650 result
651 .into_iter()
652 .zip_eq(orig_rows)
653 .map(|(result_value, original_row)| {
654 callers_temp_storage.make_datum(|packer| {
655 packer.push_list_with(|packer| {
656 packer.push(result_value);
657 packer.push(original_row);
658 });
659 })
660 })
661}
662
663fn unwrap_lag_lead_encoded_args(encoded_args: Datum) -> (Datum, Datum, Datum) {
665 let mut encoded_args_iter = encoded_args.unwrap_list().iter();
666 let (input_value, offset, default_value) = (
667 encoded_args_iter.next().unwrap(),
668 encoded_args_iter.next().unwrap(),
669 encoded_args_iter.next().unwrap(),
670 );
671 (input_value, offset, default_value)
672}
673
674fn lag_lead_inner<'a>(
677 args: Vec<(Datum<'a>, Datum<'a>, Datum<'a>)>,
678 lag_lead_type: &LagLeadType,
679 ignore_nulls: &bool,
680) -> Vec<Datum<'a>> {
681 if *ignore_nulls {
682 lag_lead_inner_ignore_nulls(args, lag_lead_type)
683 } else {
684 lag_lead_inner_respect_nulls(args, lag_lead_type)
685 }
686}
687
688fn lag_lead_inner_respect_nulls<'a>(
689 args: Vec<(Datum<'a>, Datum<'a>, Datum<'a>)>,
690 lag_lead_type: &LagLeadType,
691) -> Vec<Datum<'a>> {
692 let mut result: Vec<Datum> = Vec::with_capacity(args.len());
693 for (idx, (_, offset, default_value)) in args.iter().enumerate() {
694 if offset.is_null() {
696 result.push(Datum::Null);
697 continue;
698 }
699
700 let idx = i64::try_from(idx).expect("Array index does not fit in i64");
701 let offset = i64::from(offset.unwrap_int32());
702 let offset = match lag_lead_type {
703 LagLeadType::Lag => -offset,
704 LagLeadType::Lead => offset,
705 };
706
707 let datums_get = |i: i64| -> Option<Datum> {
709 match u64::try_from(i) {
710 Ok(i) => args
711 .get(usize::cast_from(i))
712 .map(|d| Some(d.0)) .unwrap_or(None), Err(_) => None, }
716 };
717
718 let lagged_value = datums_get(idx + offset).unwrap_or(*default_value);
719
720 result.push(lagged_value);
721 }
722
723 result
724}
725
726#[allow(clippy::as_conversions)]
730fn lag_lead_inner_ignore_nulls<'a>(
731 args: Vec<(Datum<'a>, Datum<'a>, Datum<'a>)>,
732 lag_lead_type: &LagLeadType,
733) -> Vec<Datum<'a>> {
734 if i64::try_from(args.len()).is_err() {
737 panic!("window partition way too big")
738 }
739 let mut skip_nulls_backward = vec![None; args.len()];
742 let mut last_non_null: i64 = -1;
743 let pairs = args
744 .iter()
745 .enumerate()
746 .zip_eq(skip_nulls_backward.iter_mut());
747 for ((i, (d, _, _)), slot) in pairs {
748 if d.is_null() {
749 *slot = Some(last_non_null);
750 } else {
751 last_non_null = i as i64;
752 }
753 }
754 let mut skip_nulls_forward = vec![None; args.len()];
755 let mut last_non_null: i64 = args.len() as i64;
756 let pairs = args
757 .iter()
758 .enumerate()
759 .rev()
760 .zip_eq(skip_nulls_forward.iter_mut().rev());
761 for ((i, (d, _, _)), slot) in pairs {
762 if d.is_null() {
763 *slot = Some(last_non_null);
764 } else {
765 last_non_null = i as i64;
766 }
767 }
768
769 let mut result: Vec<Datum> = Vec::with_capacity(args.len());
771 for (idx, (_, offset, default_value)) in args.iter().enumerate() {
772 if offset.is_null() {
774 result.push(Datum::Null);
775 continue;
776 }
777
778 let idx = idx as i64; let offset = i64::cast_from(offset.unwrap_int32());
780 let offset = match lag_lead_type {
781 LagLeadType::Lag => -offset,
782 LagLeadType::Lead => offset,
783 };
784 let increment = offset.signum();
785
786 let datums_get = |i: i64| -> Option<Datum> {
788 match u64::try_from(i) {
789 Ok(i) => args
790 .get(usize::cast_from(i))
791 .map(|d| Some(d.0)) .unwrap_or(None), Err(_) => None, }
795 };
796
797 let lagged_value = if increment != 0 {
798 let mut j = idx;
808 for _ in 0..num::abs(offset) {
809 j += increment;
810 if datums_get(j).is_some_and(|d| d.is_null()) {
812 let ju = j as usize; if increment > 0 {
814 j = skip_nulls_forward[ju].expect("checked above that it's null");
815 } else {
816 j = skip_nulls_backward[ju].expect("checked above that it's null");
817 }
818 }
819 if datums_get(j).is_none() {
820 break;
821 }
822 }
823 match datums_get(j) {
824 Some(datum) => datum,
825 None => *default_value,
826 }
827 } else {
828 assert_eq!(offset, 0);
829 let datum = datums_get(idx).expect("known to exist");
830 if !datum.is_null() {
831 datum
832 } else {
833 panic!("0 offset in lag/lead IGNORE NULLS");
838 }
839 };
840
841 result.push(lagged_value);
842 }
843
844 result
845}
846
847fn first_value<'a, I>(
849 datums: I,
850 callers_temp_storage: &'a RowArena,
851 order_by: &[ColumnOrder],
852 window_frame: &WindowFrame,
853) -> Datum<'a>
854where
855 I: IntoIterator<Item = Datum<'a>>,
856{
857 let temp_storage = RowArena::new();
858 let iter = first_value_no_list(datums, &temp_storage, order_by, window_frame);
859 callers_temp_storage.make_datum(|packer| {
860 packer.push_list(iter);
861 })
862}
863
864fn first_value_no_list<'a: 'b, 'b, I>(
867 datums: I,
868 callers_temp_storage: &'b RowArena,
869 order_by: &[ColumnOrder],
870 window_frame: &WindowFrame,
871) -> impl Iterator<Item = Datum<'b>>
872where
873 I: IntoIterator<Item = Datum<'a>>,
874{
875 let datums = order_aggregate_datums(datums, order_by);
877
878 let (orig_rows, args): (Vec<_>, Vec<_>) = datums
880 .into_iter()
881 .map(|d| {
882 let mut iter = d.unwrap_list().iter();
883 let original_row = iter.next().unwrap();
884 let arg = iter.next().unwrap();
885
886 (original_row, arg)
887 })
888 .unzip();
889
890 let results = first_value_inner(args, window_frame);
891
892 callers_temp_storage.reserve(results.len());
893 results
894 .into_iter()
895 .zip_eq(orig_rows)
896 .map(|(result_value, original_row)| {
897 callers_temp_storage.make_datum(|packer| {
898 packer.push_list_with(|packer| {
899 packer.push(result_value);
900 packer.push(original_row);
901 });
902 })
903 })
904}
905
906fn first_value_inner<'a>(datums: Vec<Datum<'a>>, window_frame: &WindowFrame) -> Vec<Datum<'a>> {
907 let length = datums.len();
908 let mut result: Vec<Datum> = Vec::with_capacity(length);
909 for (idx, current_datum) in datums.iter().enumerate() {
910 let first_value = match &window_frame.start_bound {
911 WindowFrameBound::CurrentRow => *current_datum,
913 WindowFrameBound::UnboundedPreceding => {
914 if let WindowFrameBound::OffsetPreceding(end_offset) = &window_frame.end_bound {
915 let end_offset = usize::cast_from(*end_offset);
916
917 if idx < end_offset {
919 Datum::Null
920 } else {
921 datums[0]
922 }
923 } else {
924 datums[0]
925 }
926 }
927 WindowFrameBound::OffsetPreceding(offset) => {
928 let start_offset = usize::cast_from(*offset);
929 let start_idx = idx.saturating_sub(start_offset);
930 if let WindowFrameBound::OffsetPreceding(end_offset) = &window_frame.end_bound {
931 let end_offset = usize::cast_from(*end_offset);
932
933 if start_offset < end_offset || idx < end_offset {
935 Datum::Null
936 } else {
937 datums[start_idx]
938 }
939 } else {
940 datums[start_idx]
941 }
942 }
943 WindowFrameBound::OffsetFollowing(offset) => {
944 let start_offset = usize::cast_from(*offset);
945 let start_idx = idx.saturating_add(start_offset);
946 if let WindowFrameBound::OffsetFollowing(end_offset) = &window_frame.end_bound {
947 if offset > end_offset || start_idx >= length {
949 Datum::Null
950 } else {
951 datums[start_idx]
952 }
953 } else {
954 datums
955 .get(start_idx)
956 .map(|d| d.clone())
957 .unwrap_or(Datum::Null)
958 }
959 }
960 WindowFrameBound::UnboundedFollowing => unreachable!(),
962 };
963 result.push(first_value);
964 }
965 result
966}
967
968fn last_value<'a, I>(
970 datums: I,
971 callers_temp_storage: &'a RowArena,
972 order_by: &[ColumnOrder],
973 window_frame: &WindowFrame,
974) -> Datum<'a>
975where
976 I: IntoIterator<Item = Datum<'a>>,
977{
978 let temp_storage = RowArena::new();
979 let iter = last_value_no_list(datums, &temp_storage, order_by, window_frame);
980 callers_temp_storage.make_datum(|packer| {
981 packer.push_list(iter);
982 })
983}
984
985fn last_value_no_list<'a: 'b, 'b, I>(
988 datums: I,
989 callers_temp_storage: &'b RowArena,
990 order_by: &[ColumnOrder],
991 window_frame: &WindowFrame,
992) -> impl Iterator<Item = Datum<'b>>
993where
994 I: IntoIterator<Item = Datum<'a>>,
995{
996 let datums = order_aggregate_datums_with_rank(datums, order_by);
999
1000 let size_hint = datums.size_hint().0;
1002 let mut args = Vec::with_capacity(size_hint);
1003 let mut original_rows = Vec::with_capacity(size_hint);
1004 let mut order_by_rows = Vec::with_capacity(size_hint);
1005 for (d, order_by_row) in datums.into_iter() {
1006 let mut iter = d.unwrap_list().iter();
1007 let original_row = iter.next().unwrap();
1008 let arg = iter.next().unwrap();
1009 order_by_rows.push(order_by_row);
1010 original_rows.push(original_row);
1011 args.push(arg);
1012 }
1013
1014 let results = last_value_inner(args, &order_by_rows, window_frame);
1015
1016 callers_temp_storage.reserve(results.len());
1017 results
1018 .into_iter()
1019 .zip_eq(original_rows)
1020 .map(|(result_value, original_row)| {
1021 callers_temp_storage.make_datum(|packer| {
1022 packer.push_list_with(|packer| {
1023 packer.push(result_value);
1024 packer.push(original_row);
1025 });
1026 })
1027 })
1028}
1029
1030fn last_value_inner<'a>(
1031 args: Vec<Datum<'a>>,
1032 order_by_rows: &Vec<Row>,
1033 window_frame: &WindowFrame,
1034) -> Vec<Datum<'a>> {
1035 let length = args.len();
1036 let mut results: Vec<Datum> = Vec::with_capacity(length);
1037 for (idx, (current_datum, order_by_row)) in args.iter().zip_eq(order_by_rows).enumerate() {
1038 let last_value = match &window_frame.end_bound {
1039 WindowFrameBound::CurrentRow => match &window_frame.units {
1040 WindowFrameUnits::Rows => *current_datum,
1042 WindowFrameUnits::Range => {
1043 let target_idx = order_by_rows[idx..]
1048 .iter()
1049 .enumerate()
1050 .take_while(|(_, row)| *row == order_by_row)
1051 .last()
1052 .unwrap()
1053 .0
1054 + idx;
1055 args[target_idx]
1056 }
1057 WindowFrameUnits::Groups => unreachable!(),
1059 },
1060 WindowFrameBound::UnboundedFollowing => {
1061 if let WindowFrameBound::OffsetFollowing(start_offset) = &window_frame.start_bound {
1062 let start_offset = usize::cast_from(*start_offset);
1063
1064 if idx + start_offset > length - 1 {
1066 Datum::Null
1067 } else {
1068 args[length - 1]
1069 }
1070 } else {
1071 args[length - 1]
1072 }
1073 }
1074 WindowFrameBound::OffsetFollowing(offset) => {
1075 let end_offset = usize::cast_from(*offset);
1076 let end_idx = idx.saturating_add(end_offset);
1077 if let WindowFrameBound::OffsetFollowing(start_offset) = &window_frame.start_bound {
1078 let start_offset = usize::cast_from(*start_offset);
1079 let start_idx = idx.saturating_add(start_offset);
1080
1081 if end_offset < start_offset || start_idx >= length {
1083 Datum::Null
1084 } else {
1085 args.get(end_idx).unwrap_or(&args[length - 1]).clone()
1087 }
1088 } else {
1089 args.get(end_idx).unwrap_or(&args[length - 1]).clone()
1090 }
1091 }
1092 WindowFrameBound::OffsetPreceding(offset) => {
1093 let end_offset = usize::cast_from(*offset);
1094 let end_idx = idx.saturating_sub(end_offset);
1095 if idx < end_offset {
1096 Datum::Null
1098 } else if let WindowFrameBound::OffsetPreceding(start_offset) =
1099 &window_frame.start_bound
1100 {
1101 if offset > start_offset {
1103 Datum::Null
1104 } else {
1105 args[end_idx]
1106 }
1107 } else {
1108 args[end_idx]
1109 }
1110 }
1111 WindowFrameBound::UnboundedPreceding => unreachable!(),
1113 };
1114 results.push(last_value);
1115 }
1116 results
1117}
1118
1119fn fused_value_window_func<'a, I>(
1125 input_datums: I,
1126 callers_temp_storage: &'a RowArena,
1127 funcs: &Vec<AggregateFunc>,
1128 order_by: &Vec<ColumnOrder>,
1129) -> Datum<'a>
1130where
1131 I: IntoIterator<Item = Datum<'a>>,
1132{
1133 let temp_storage = RowArena::new();
1134 let iter = fused_value_window_func_no_list(input_datums, &temp_storage, funcs, order_by);
1135 callers_temp_storage.make_datum(|packer| {
1136 packer.push_list(iter);
1137 })
1138}
1139
1140fn fused_value_window_func_no_list<'a: 'b, 'b, I>(
1143 input_datums: I,
1144 callers_temp_storage: &'b RowArena,
1145 funcs: &Vec<AggregateFunc>,
1146 order_by: &Vec<ColumnOrder>,
1147) -> impl Iterator<Item = Datum<'b>>
1148where
1149 I: IntoIterator<Item = Datum<'a>>,
1150{
1151 let has_last_value = funcs
1152 .iter()
1153 .any(|f| matches!(f, AggregateFunc::LastValue { .. }));
1154
1155 let input_datums_with_ranks = order_aggregate_datums_with_rank(input_datums, order_by);
1156
1157 let size_hint = input_datums_with_ranks.size_hint().0;
1158 let mut encoded_argsss = vec![Vec::with_capacity(size_hint); funcs.len()];
1159 let mut original_rows = Vec::with_capacity(size_hint);
1160 let mut order_by_rows = Vec::with_capacity(size_hint);
1161 for (d, order_by_row) in input_datums_with_ranks {
1162 let mut iter = d.unwrap_list().iter();
1163 let original_row = iter.next().unwrap();
1164 original_rows.push(original_row);
1165 let mut argss_iter = iter.next().unwrap().unwrap_list().iter();
1166 for i in 0..funcs.len() {
1167 let encoded_args = argss_iter.next().unwrap();
1168 encoded_argsss[i].push(encoded_args);
1169 }
1170 if has_last_value {
1171 order_by_rows.push(order_by_row);
1172 }
1173 }
1174
1175 let mut results_per_row = vec![Vec::with_capacity(funcs.len()); original_rows.len()];
1176 for (func, encoded_argss) in funcs.iter().zip_eq(encoded_argsss) {
1177 let results = match func {
1178 AggregateFunc::LagLead {
1179 order_by: inner_order_by,
1180 lag_lead,
1181 ignore_nulls,
1182 } => {
1183 assert_eq!(order_by, inner_order_by);
1184 let unwrapped_argss = encoded_argss
1185 .into_iter()
1186 .map(|encoded_args| unwrap_lag_lead_encoded_args(encoded_args))
1187 .collect();
1188 lag_lead_inner(unwrapped_argss, lag_lead, ignore_nulls)
1189 }
1190 AggregateFunc::FirstValue {
1191 order_by: inner_order_by,
1192 window_frame,
1193 } => {
1194 assert_eq!(order_by, inner_order_by);
1195 first_value_inner(encoded_argss, window_frame)
1198 }
1199 AggregateFunc::LastValue {
1200 order_by: inner_order_by,
1201 window_frame,
1202 } => {
1203 assert_eq!(order_by, inner_order_by);
1204 last_value_inner(encoded_argss, &order_by_rows, window_frame)
1207 }
1208 _ => panic!("unknown window function in FusedValueWindowFunc"),
1209 };
1210 for (results, result) in results_per_row.iter_mut().zip_eq(results) {
1211 results.push(result);
1212 }
1213 }
1214
1215 callers_temp_storage.reserve(2 * original_rows.len());
1216 results_per_row
1217 .into_iter()
1218 .enumerate()
1219 .map(move |(i, results)| {
1220 callers_temp_storage.make_datum(|packer| {
1221 packer.push_list_with(|packer| {
1222 packer
1223 .push(callers_temp_storage.make_datum(|packer| packer.push_list(results)));
1224 packer.push(original_rows[i]);
1225 });
1226 })
1227 })
1228}
1229
1230fn window_aggr<'a, I, A>(
1239 input_datums: I,
1240 callers_temp_storage: &'a RowArena,
1241 wrapped_aggregate: &AggregateFunc,
1242 order_by: &[ColumnOrder],
1243 window_frame: &WindowFrame,
1244) -> Datum<'a>
1245where
1246 I: IntoIterator<Item = Datum<'a>>,
1247 A: OneByOneAggr,
1248{
1249 let temp_storage = RowArena::new();
1250 let iter = window_aggr_no_list::<I, A>(
1251 input_datums,
1252 &temp_storage,
1253 wrapped_aggregate,
1254 order_by,
1255 window_frame,
1256 );
1257 callers_temp_storage.make_datum(|packer| {
1258 packer.push_list(iter);
1259 })
1260}
1261
1262fn window_aggr_no_list<'a: 'b, 'b, I, A>(
1265 input_datums: I,
1266 callers_temp_storage: &'b RowArena,
1267 wrapped_aggregate: &AggregateFunc,
1268 order_by: &[ColumnOrder],
1269 window_frame: &WindowFrame,
1270) -> impl Iterator<Item = Datum<'b>>
1271where
1272 I: IntoIterator<Item = Datum<'a>>,
1273 A: OneByOneAggr,
1274{
1275 let datums = order_aggregate_datums_with_rank(input_datums, order_by);
1278
1279 let size_hint = datums.size_hint().0;
1281 let mut args: Vec<Datum> = Vec::with_capacity(size_hint);
1282 let mut original_rows: Vec<Datum> = Vec::with_capacity(size_hint);
1283 let mut order_by_rows = Vec::with_capacity(size_hint);
1284 for (d, order_by_row) in datums.into_iter() {
1285 let mut iter = d.unwrap_list().iter();
1286 let original_row = iter.next().unwrap();
1287 let arg = iter.next().unwrap();
1288 order_by_rows.push(order_by_row);
1289 original_rows.push(original_row);
1290 args.push(arg);
1291 }
1292
1293 let results = window_aggr_inner::<A>(
1294 args,
1295 &order_by_rows,
1296 wrapped_aggregate,
1297 order_by,
1298 window_frame,
1299 callers_temp_storage,
1300 );
1301
1302 callers_temp_storage.reserve(results.len());
1303 results
1304 .into_iter()
1305 .zip_eq(original_rows)
1306 .map(|(result_value, original_row)| {
1307 callers_temp_storage.make_datum(|packer| {
1308 packer.push_list_with(|packer| {
1309 packer.push(result_value);
1310 packer.push(original_row);
1311 });
1312 })
1313 })
1314}
1315
1316fn window_aggr_inner<'a, A>(
1317 mut args: Vec<Datum<'a>>,
1318 order_by_rows: &Vec<Row>,
1319 wrapped_aggregate: &AggregateFunc,
1320 order_by: &[ColumnOrder],
1321 window_frame: &WindowFrame,
1322 temp_storage: &'a RowArena,
1323) -> Vec<Datum<'a>>
1324where
1325 A: OneByOneAggr,
1326{
1327 let length = args.len();
1328 let mut result: Vec<Datum> = Vec::with_capacity(length);
1329
1330 soft_assert_or_log!(
1336 !((matches!(window_frame.units, WindowFrameUnits::Groups)
1337 || matches!(window_frame.units, WindowFrameUnits::Range))
1338 && !window_frame.includes_current_row()),
1339 "window frame without current row"
1340 );
1341
1342 if (matches!(
1343 window_frame.start_bound,
1344 WindowFrameBound::UnboundedPreceding
1345 ) && matches!(window_frame.end_bound, WindowFrameBound::UnboundedFollowing))
1346 || (order_by.is_empty()
1347 && (matches!(window_frame.units, WindowFrameUnits::Groups)
1348 || matches!(window_frame.units, WindowFrameUnits::Range))
1349 && window_frame.includes_current_row())
1350 {
1351 let result_value = wrapped_aggregate.eval(args, temp_storage);
1358 for _ in 0..length {
1360 result.push(result_value);
1361 }
1362 } else {
1363 fn rows_between_unbounded_preceding_and_current_row<'a, A>(
1364 args: Vec<Datum<'a>>,
1365 result: &mut Vec<Datum<'a>>,
1366 mut one_by_one_aggr: A,
1367 temp_storage: &'a RowArena,
1368 ) where
1369 A: OneByOneAggr,
1370 {
1371 for current_arg in args.into_iter() {
1372 one_by_one_aggr.give(¤t_arg);
1373 let result_value = one_by_one_aggr.get_current_aggregate(temp_storage);
1374 result.push(result_value);
1375 }
1376 }
1377
1378 fn groups_between_unbounded_preceding_and_current_row<'a, A>(
1379 args: Vec<Datum<'a>>,
1380 order_by_rows: &Vec<Row>,
1381 result: &mut Vec<Datum<'a>>,
1382 mut one_by_one_aggr: A,
1383 temp_storage: &'a RowArena,
1384 ) where
1385 A: OneByOneAggr,
1386 {
1387 let mut peer_group_start = 0;
1388 while peer_group_start < args.len() {
1389 let mut peer_group_end = peer_group_start + 1;
1393 while peer_group_end < args.len()
1394 && order_by_rows[peer_group_start] == order_by_rows[peer_group_end]
1395 {
1396 peer_group_end += 1;
1398 }
1399 for current_arg in args[peer_group_start..peer_group_end].iter() {
1402 one_by_one_aggr.give(current_arg);
1403 }
1404 let agg_for_peer_group = one_by_one_aggr.get_current_aggregate(temp_storage);
1405 for _ in args[peer_group_start..peer_group_end].iter() {
1407 result.push(agg_for_peer_group);
1408 }
1409 peer_group_start = peer_group_end;
1411 }
1412 }
1413
1414 fn rows_between_offset_and_offset<'a>(
1415 args: Vec<Datum<'a>>,
1416 result: &mut Vec<Datum<'a>>,
1417 wrapped_aggregate: &AggregateFunc,
1418 temp_storage: &'a RowArena,
1419 offset_start: i64,
1420 offset_end: i64,
1421 ) {
1422 let len = args
1423 .len()
1424 .to_i64()
1425 .expect("window partition's len should fit into i64");
1426 for i in 0..len {
1427 let i = i.to_i64().expect("window partition shouldn't be super big");
1428 let frame_start = max(i + offset_start, 0)
1431 .to_usize()
1432 .expect("The max made sure it's not negative");
1433 let frame_end = min(i + offset_end, len - 1).to_usize();
1436 match frame_end {
1437 Some(frame_end) => {
1438 if frame_start <= frame_end {
1439 let frame_values = args[frame_start..=frame_end].iter().cloned();
1450 let result_value = wrapped_aggregate.eval(frame_values, temp_storage);
1451 result.push(result_value);
1452 } else {
1453 let result_value = wrapped_aggregate.default();
1455 result.push(result_value);
1456 }
1457 }
1458 None => {
1459 let result_value = wrapped_aggregate.default();
1461 result.push(result_value);
1462 }
1463 }
1464 }
1465 }
1466
1467 match (
1468 &window_frame.units,
1469 &window_frame.start_bound,
1470 &window_frame.end_bound,
1471 ) {
1472 (Rows, UnboundedPreceding, CurrentRow) => {
1477 rows_between_unbounded_preceding_and_current_row::<A>(
1478 args,
1479 &mut result,
1480 A::new(wrapped_aggregate, false),
1481 temp_storage,
1482 );
1483 }
1484 (Rows, CurrentRow, UnboundedFollowing) => {
1485 args.reverse();
1487 rows_between_unbounded_preceding_and_current_row::<A>(
1488 args,
1489 &mut result,
1490 A::new(wrapped_aggregate, true),
1491 temp_storage,
1492 );
1493 result.reverse();
1494 }
1495 (Range, UnboundedPreceding, CurrentRow) => {
1496 groups_between_unbounded_preceding_and_current_row::<A>(
1499 args,
1500 order_by_rows,
1501 &mut result,
1502 A::new(wrapped_aggregate, false),
1503 temp_storage,
1504 );
1505 }
1506 (Rows, OffsetPreceding(start_prec), OffsetPreceding(end_prec)) => {
1510 let start_prec = start_prec.to_i64().expect(
1511 "window frame start OFFSET shouldn't be super big (the planning ensured this)",
1512 );
1513 let end_prec = end_prec.to_i64().expect(
1514 "window frame end OFFSET shouldn't be super big (the planning ensured this)",
1515 );
1516 rows_between_offset_and_offset(
1517 args,
1518 &mut result,
1519 wrapped_aggregate,
1520 temp_storage,
1521 -start_prec,
1522 -end_prec,
1523 );
1524 }
1525 (Rows, OffsetPreceding(start_prec), OffsetFollowing(end_fol)) => {
1526 let start_prec = start_prec.to_i64().expect(
1527 "window frame start OFFSET shouldn't be super big (the planning ensured this)",
1528 );
1529 let end_fol = end_fol.to_i64().expect(
1530 "window frame end OFFSET shouldn't be super big (the planning ensured this)",
1531 );
1532 rows_between_offset_and_offset(
1533 args,
1534 &mut result,
1535 wrapped_aggregate,
1536 temp_storage,
1537 -start_prec,
1538 end_fol,
1539 );
1540 }
1541 (Rows, OffsetFollowing(start_fol), OffsetFollowing(end_fol)) => {
1542 let start_fol = start_fol.to_i64().expect(
1543 "window frame start OFFSET shouldn't be super big (the planning ensured this)",
1544 );
1545 let end_fol = end_fol.to_i64().expect(
1546 "window frame end OFFSET shouldn't be super big (the planning ensured this)",
1547 );
1548 rows_between_offset_and_offset(
1549 args,
1550 &mut result,
1551 wrapped_aggregate,
1552 temp_storage,
1553 start_fol,
1554 end_fol,
1555 );
1556 }
1557 (Rows, OffsetFollowing(_), OffsetPreceding(_)) => {
1558 unreachable!() }
1560 (Rows, OffsetPreceding(start_prec), CurrentRow) => {
1561 let start_prec = start_prec.to_i64().expect(
1562 "window frame start OFFSET shouldn't be super big (the planning ensured this)",
1563 );
1564 let end_fol = 0;
1565 rows_between_offset_and_offset(
1566 args,
1567 &mut result,
1568 wrapped_aggregate,
1569 temp_storage,
1570 -start_prec,
1571 end_fol,
1572 );
1573 }
1574 (Rows, CurrentRow, OffsetFollowing(end_fol)) => {
1575 let start_fol = 0;
1576 let end_fol = end_fol.to_i64().expect(
1577 "window frame end OFFSET shouldn't be super big (the planning ensured this)",
1578 );
1579 rows_between_offset_and_offset(
1580 args,
1581 &mut result,
1582 wrapped_aggregate,
1583 temp_storage,
1584 start_fol,
1585 end_fol,
1586 );
1587 }
1588 (Rows, CurrentRow, CurrentRow) => {
1589 let start_fol = 0;
1592 let end_fol = 0;
1593 rows_between_offset_and_offset(
1594 args,
1595 &mut result,
1596 wrapped_aggregate,
1597 temp_storage,
1598 start_fol,
1599 end_fol,
1600 );
1601 }
1602 (Rows, CurrentRow, OffsetPreceding(_))
1603 | (Rows, UnboundedFollowing, _)
1604 | (Rows, _, UnboundedPreceding)
1605 | (Rows, OffsetFollowing(..), CurrentRow) => {
1606 unreachable!() }
1608 (Rows, UnboundedPreceding, UnboundedFollowing) => {
1609 unreachable!()
1612 }
1613 (Rows, UnboundedPreceding, OffsetPreceding(_))
1614 | (Rows, UnboundedPreceding, OffsetFollowing(_))
1615 | (Rows, OffsetPreceding(..), UnboundedFollowing)
1616 | (Rows, OffsetFollowing(..), UnboundedFollowing) => {
1617 unreachable!()
1620 }
1621 (Range, _, _) => {
1622 unreachable!()
1629 }
1630 (Groups, _, _) => {
1631 unreachable!()
1635 }
1636 }
1637 }
1638
1639 result
1640}
1641
1642fn fused_window_aggr<'a, I, A>(
1646 input_datums: I,
1647 callers_temp_storage: &'a RowArena,
1648 wrapped_aggregates: &Vec<AggregateFunc>,
1649 order_by: &Vec<ColumnOrder>,
1650 window_frame: &WindowFrame,
1651) -> Datum<'a>
1652where
1653 I: IntoIterator<Item = Datum<'a>>,
1654 A: OneByOneAggr,
1655{
1656 let temp_storage = RowArena::new();
1657 let iter = fused_window_aggr_no_list::<_, A>(
1658 input_datums,
1659 &temp_storage,
1660 wrapped_aggregates,
1661 order_by,
1662 window_frame,
1663 );
1664 callers_temp_storage.make_datum(|packer| {
1665 packer.push_list(iter);
1666 })
1667}
1668
1669fn fused_window_aggr_no_list<'a: 'b, 'b, I, A>(
1672 input_datums: I,
1673 callers_temp_storage: &'b RowArena,
1674 wrapped_aggregates: &Vec<AggregateFunc>,
1675 order_by: &Vec<ColumnOrder>,
1676 window_frame: &WindowFrame,
1677) -> impl Iterator<Item = Datum<'b>>
1678where
1679 I: IntoIterator<Item = Datum<'a>>,
1680 A: OneByOneAggr,
1681{
1682 let datums = order_aggregate_datums_with_rank(input_datums, order_by);
1685
1686 let size_hint = datums.size_hint().0;
1687 let mut argss = vec![Vec::with_capacity(size_hint); wrapped_aggregates.len()];
1688 let mut original_rows = Vec::with_capacity(size_hint);
1689 let mut order_by_rows = Vec::with_capacity(size_hint);
1690 for (d, order_by_row) in datums {
1691 let mut iter = d.unwrap_list().iter();
1692 let original_row = iter.next().unwrap();
1693 original_rows.push(original_row);
1694 let args_iter = iter.next().unwrap().unwrap_list().iter();
1695 for (args, arg) in argss.iter_mut().zip_eq(args_iter) {
1697 args.push(arg);
1698 }
1699 order_by_rows.push(order_by_row);
1700 }
1701
1702 let mut results_per_row =
1703 vec![Vec::with_capacity(wrapped_aggregates.len()); original_rows.len()];
1704 for (wrapped_aggr, args) in wrapped_aggregates.iter().zip_eq(argss) {
1705 let results = window_aggr_inner::<A>(
1706 args,
1707 &order_by_rows,
1708 wrapped_aggr,
1709 order_by,
1710 window_frame,
1711 callers_temp_storage,
1712 );
1713 for (results, result) in results_per_row.iter_mut().zip_eq(results) {
1714 results.push(result);
1715 }
1716 }
1717
1718 callers_temp_storage.reserve(2 * original_rows.len());
1719 results_per_row
1720 .into_iter()
1721 .enumerate()
1722 .map(move |(i, results)| {
1723 callers_temp_storage.make_datum(|packer| {
1724 packer.push_list_with(|packer| {
1725 packer
1726 .push(callers_temp_storage.make_datum(|packer| packer.push_list(results)));
1727 packer.push(original_rows[i]);
1728 });
1729 })
1730 })
1731}
1732
1733pub trait OneByOneAggr {
1737 fn new(agg: &AggregateFunc, reverse: bool) -> Self;
1742 fn give(&mut self, d: &Datum);
1744 fn get_current_aggregate<'a>(&self, temp_storage: &'a RowArena) -> Datum<'a>;
1746}
1747
1748#[derive(Debug)]
1754pub struct NaiveOneByOneAggr {
1755 agg: AggregateFunc,
1756 input: Vec<Row>,
1757 reverse: bool,
1758}
1759
1760impl OneByOneAggr for NaiveOneByOneAggr {
1761 fn new(agg: &AggregateFunc, reverse: bool) -> Self {
1762 NaiveOneByOneAggr {
1763 agg: agg.clone(),
1764 input: Vec::new(),
1765 reverse,
1766 }
1767 }
1768
1769 fn give(&mut self, d: &Datum) {
1770 let mut row = Row::default();
1771 row.packer().push(d);
1772 self.input.push(row);
1773 }
1774
1775 fn get_current_aggregate<'a>(&self, temp_storage: &'a RowArena) -> Datum<'a> {
1776 temp_storage.make_datum(|packer| {
1777 packer.push(if !self.reverse {
1778 self.agg
1779 .eval(self.input.iter().map(|r| r.unpack_first()), temp_storage)
1780 } else {
1781 self.agg.eval(
1782 self.input.iter().rev().map(|r| r.unpack_first()),
1783 temp_storage,
1784 )
1785 });
1786 })
1787 }
1788}
1789
1790#[derive(
1793 Clone,
1794 Debug,
1795 Eq,
1796 PartialEq,
1797 Ord,
1798 PartialOrd,
1799 Serialize,
1800 Deserialize,
1801 Hash,
1802 MzReflect
1803)]
1804pub enum LagLeadType {
1805 Lag,
1806 Lead,
1807}
1808
1809#[derive(
1810 Clone,
1811 Debug,
1812 Eq,
1813 PartialEq,
1814 Ord,
1815 PartialOrd,
1816 Serialize,
1817 Deserialize,
1818 Hash,
1819 MzReflect
1820)]
1821pub enum AggregateFunc {
1822 MaxNumeric,
1823 MaxInt16,
1824 MaxInt32,
1825 MaxInt64,
1826 MaxUInt16,
1827 MaxUInt32,
1828 MaxUInt64,
1829 MaxMzTimestamp,
1830 MaxFloat32,
1831 MaxFloat64,
1832 MaxBool,
1833 MaxString,
1834 MaxDate,
1835 MaxTimestamp,
1836 MaxTimestampTz,
1837 MaxInterval,
1838 MaxTime,
1839 MinNumeric,
1840 MinInt16,
1841 MinInt32,
1842 MinInt64,
1843 MinUInt16,
1844 MinUInt32,
1845 MinUInt64,
1846 MinMzTimestamp,
1847 MinFloat32,
1848 MinFloat64,
1849 MinBool,
1850 MinString,
1851 MinDate,
1852 MinTimestamp,
1853 MinTimestampTz,
1854 MinInterval,
1855 MinTime,
1856 SumInt16,
1857 SumInt32,
1858 SumInt64,
1859 SumUInt16,
1860 SumUInt32,
1861 SumUInt64,
1862 SumFloat32,
1863 SumFloat64,
1864 SumNumeric,
1865 Count,
1866 Any,
1867 All,
1868 JsonbAgg {
1875 order_by: Vec<ColumnOrder>,
1876 },
1877 JsonbObjectAgg {
1884 order_by: Vec<ColumnOrder>,
1885 },
1886 MapAgg {
1890 order_by: Vec<ColumnOrder>,
1891 value_type: SqlScalarType,
1892 },
1893 ArrayConcat {
1896 order_by: Vec<ColumnOrder>,
1897 },
1898 ListConcat {
1901 order_by: Vec<ColumnOrder>,
1902 },
1903 StringAgg {
1904 order_by: Vec<ColumnOrder>,
1905 },
1906 RowNumber {
1907 order_by: Vec<ColumnOrder>,
1908 },
1909 Rank {
1910 order_by: Vec<ColumnOrder>,
1911 },
1912 DenseRank {
1913 order_by: Vec<ColumnOrder>,
1914 },
1915 LagLead {
1916 order_by: Vec<ColumnOrder>,
1917 lag_lead: LagLeadType,
1918 ignore_nulls: bool,
1919 },
1920 FirstValue {
1921 order_by: Vec<ColumnOrder>,
1922 window_frame: WindowFrame,
1923 },
1924 LastValue {
1925 order_by: Vec<ColumnOrder>,
1926 window_frame: WindowFrame,
1927 },
1928 FusedValueWindowFunc {
1930 funcs: Vec<AggregateFunc>,
1931 order_by: Vec<ColumnOrder>,
1934 },
1935 WindowAggregate {
1936 wrapped_aggregate: Box<AggregateFunc>,
1937 order_by: Vec<ColumnOrder>,
1938 window_frame: WindowFrame,
1939 },
1940 FusedWindowAggregate {
1941 wrapped_aggregates: Vec<AggregateFunc>,
1942 order_by: Vec<ColumnOrder>,
1943 window_frame: WindowFrame,
1944 },
1945 Dummy,
1950}
1951
1952impl AggregateFunc {
1953 pub fn eval<'a, I>(&self, datums: I, temp_storage: &'a RowArena) -> Datum<'a>
1954 where
1955 I: IntoIterator<Item = Datum<'a>>,
1956 {
1957 match self {
1958 AggregateFunc::MaxNumeric => {
1959 max_datum::<'a, I, OrderedDecimal<numeric::Numeric>>(datums)
1960 }
1961 AggregateFunc::MaxInt16 => max_datum::<'a, I, i16>(datums),
1962 AggregateFunc::MaxInt32 => max_datum::<'a, I, i32>(datums),
1963 AggregateFunc::MaxInt64 => max_datum::<'a, I, i64>(datums),
1964 AggregateFunc::MaxUInt16 => max_datum::<'a, I, u16>(datums),
1965 AggregateFunc::MaxUInt32 => max_datum::<'a, I, u32>(datums),
1966 AggregateFunc::MaxUInt64 => max_datum::<'a, I, u64>(datums),
1967 AggregateFunc::MaxMzTimestamp => max_datum::<'a, I, mz_repr::Timestamp>(datums),
1968 AggregateFunc::MaxFloat32 => max_datum::<'a, I, OrderedFloat<f32>>(datums),
1969 AggregateFunc::MaxFloat64 => max_datum::<'a, I, OrderedFloat<f64>>(datums),
1970 AggregateFunc::MaxBool => max_datum::<'a, I, bool>(datums),
1971 AggregateFunc::MaxString => max_string(datums),
1972 AggregateFunc::MaxDate => max_datum::<'a, I, Date>(datums),
1973 AggregateFunc::MaxTimestamp => {
1974 max_datum::<'a, I, CheckedTimestamp<NaiveDateTime>>(datums)
1975 }
1976 AggregateFunc::MaxTimestampTz => {
1977 max_datum::<'a, I, CheckedTimestamp<DateTime<Utc>>>(datums)
1978 }
1979 AggregateFunc::MaxInterval => max_datum::<'a, I, Interval>(datums),
1980 AggregateFunc::MaxTime => max_datum::<'a, I, NaiveTime>(datums),
1981 AggregateFunc::MinNumeric => {
1982 min_datum::<'a, I, OrderedDecimal<numeric::Numeric>>(datums)
1983 }
1984 AggregateFunc::MinInt16 => min_datum::<'a, I, i16>(datums),
1985 AggregateFunc::MinInt32 => min_datum::<'a, I, i32>(datums),
1986 AggregateFunc::MinInt64 => min_datum::<'a, I, i64>(datums),
1987 AggregateFunc::MinUInt16 => min_datum::<'a, I, u16>(datums),
1988 AggregateFunc::MinUInt32 => min_datum::<'a, I, u32>(datums),
1989 AggregateFunc::MinUInt64 => min_datum::<'a, I, u64>(datums),
1990 AggregateFunc::MinMzTimestamp => min_datum::<'a, I, mz_repr::Timestamp>(datums),
1991 AggregateFunc::MinFloat32 => min_datum::<'a, I, OrderedFloat<f32>>(datums),
1992 AggregateFunc::MinFloat64 => min_datum::<'a, I, OrderedFloat<f64>>(datums),
1993 AggregateFunc::MinBool => min_datum::<'a, I, bool>(datums),
1994 AggregateFunc::MinString => min_string(datums),
1995 AggregateFunc::MinDate => min_datum::<'a, I, Date>(datums),
1996 AggregateFunc::MinTimestamp => {
1997 min_datum::<'a, I, CheckedTimestamp<NaiveDateTime>>(datums)
1998 }
1999 AggregateFunc::MinTimestampTz => {
2000 min_datum::<'a, I, CheckedTimestamp<DateTime<Utc>>>(datums)
2001 }
2002 AggregateFunc::MinInterval => min_datum::<'a, I, Interval>(datums),
2003 AggregateFunc::MinTime => min_datum::<'a, I, NaiveTime>(datums),
2004 AggregateFunc::SumInt16 => sum_datum::<'a, I, i16, i64>(datums),
2005 AggregateFunc::SumInt32 => sum_datum::<'a, I, i32, i64>(datums),
2006 AggregateFunc::SumInt64 => sum_datum::<'a, I, i64, i128>(datums),
2007 AggregateFunc::SumUInt16 => sum_datum::<'a, I, u16, u64>(datums),
2008 AggregateFunc::SumUInt32 => sum_datum::<'a, I, u32, u64>(datums),
2009 AggregateFunc::SumUInt64 => sum_datum::<'a, I, u64, u128>(datums),
2010 AggregateFunc::SumFloat32 => sum_datum::<'a, I, f32, f32>(datums),
2011 AggregateFunc::SumFloat64 => sum_datum::<'a, I, f64, f64>(datums),
2012 AggregateFunc::SumNumeric => sum_numeric(datums),
2013 AggregateFunc::Count => count(datums),
2014 AggregateFunc::Any => any(datums),
2015 AggregateFunc::All => all(datums),
2016 AggregateFunc::JsonbAgg { order_by } => jsonb_agg(datums, temp_storage, order_by),
2017 AggregateFunc::MapAgg { order_by, .. } | AggregateFunc::JsonbObjectAgg { order_by } => {
2018 dict_agg(datums, temp_storage, order_by)
2019 }
2020 AggregateFunc::ArrayConcat { order_by } => array_concat(datums, temp_storage, order_by),
2021 AggregateFunc::ListConcat { order_by } => list_concat(datums, temp_storage, order_by),
2022 AggregateFunc::StringAgg { order_by } => string_agg(datums, temp_storage, order_by),
2023 AggregateFunc::RowNumber { order_by } => row_number(datums, temp_storage, order_by),
2024 AggregateFunc::Rank { order_by } => rank(datums, temp_storage, order_by),
2025 AggregateFunc::DenseRank { order_by } => dense_rank(datums, temp_storage, order_by),
2026 AggregateFunc::LagLead {
2027 order_by,
2028 lag_lead: lag_lead_type,
2029 ignore_nulls,
2030 } => lag_lead(datums, temp_storage, order_by, lag_lead_type, ignore_nulls),
2031 AggregateFunc::FirstValue {
2032 order_by,
2033 window_frame,
2034 } => first_value(datums, temp_storage, order_by, window_frame),
2035 AggregateFunc::LastValue {
2036 order_by,
2037 window_frame,
2038 } => last_value(datums, temp_storage, order_by, window_frame),
2039 AggregateFunc::WindowAggregate {
2040 wrapped_aggregate,
2041 order_by,
2042 window_frame,
2043 } => window_aggr::<_, NaiveOneByOneAggr>(
2044 datums,
2045 temp_storage,
2046 wrapped_aggregate,
2047 order_by,
2048 window_frame,
2049 ),
2050 AggregateFunc::FusedValueWindowFunc { funcs, order_by } => {
2051 fused_value_window_func(datums, temp_storage, funcs, order_by)
2052 }
2053 AggregateFunc::FusedWindowAggregate {
2054 wrapped_aggregates,
2055 order_by,
2056 window_frame,
2057 } => fused_window_aggr::<_, NaiveOneByOneAggr>(
2058 datums,
2059 temp_storage,
2060 wrapped_aggregates,
2061 order_by,
2062 window_frame,
2063 ),
2064 AggregateFunc::Dummy => Datum::Dummy,
2065 }
2066 }
2067
2068 pub fn eval_with_fast_window_agg<'a, I, W>(
2072 &self,
2073 datums: I,
2074 temp_storage: &'a RowArena,
2075 ) -> Datum<'a>
2076 where
2077 I: IntoIterator<Item = Datum<'a>>,
2078 W: OneByOneAggr,
2079 {
2080 match self {
2081 AggregateFunc::WindowAggregate {
2082 wrapped_aggregate,
2083 order_by,
2084 window_frame,
2085 } => window_aggr::<_, W>(
2086 datums,
2087 temp_storage,
2088 wrapped_aggregate,
2089 order_by,
2090 window_frame,
2091 ),
2092 AggregateFunc::FusedWindowAggregate {
2093 wrapped_aggregates,
2094 order_by,
2095 window_frame,
2096 } => fused_window_aggr::<_, W>(
2097 datums,
2098 temp_storage,
2099 wrapped_aggregates,
2100 order_by,
2101 window_frame,
2102 ),
2103 _ => self.eval(datums, temp_storage),
2104 }
2105 }
2106
2107 pub fn eval_with_unnest_list<'a, I, W>(
2108 &self,
2109 datums: I,
2110 temp_storage: &'a RowArena,
2111 ) -> impl Iterator<Item = Datum<'a>>
2112 where
2113 I: IntoIterator<Item = Datum<'a>>,
2114 W: OneByOneAggr,
2115 {
2116 assert!(self.can_fuse_with_unnest_list());
2118 match self {
2119 AggregateFunc::RowNumber { order_by } => {
2120 row_number_no_list(datums, temp_storage, order_by).collect_vec()
2121 }
2122 AggregateFunc::Rank { order_by } => {
2123 rank_no_list(datums, temp_storage, order_by).collect_vec()
2124 }
2125 AggregateFunc::DenseRank { order_by } => {
2126 dense_rank_no_list(datums, temp_storage, order_by).collect_vec()
2127 }
2128 AggregateFunc::LagLead {
2129 order_by,
2130 lag_lead: lag_lead_type,
2131 ignore_nulls,
2132 } => lag_lead_no_list(datums, temp_storage, order_by, lag_lead_type, ignore_nulls)
2133 .collect_vec(),
2134 AggregateFunc::FirstValue {
2135 order_by,
2136 window_frame,
2137 } => first_value_no_list(datums, temp_storage, order_by, window_frame).collect_vec(),
2138 AggregateFunc::LastValue {
2139 order_by,
2140 window_frame,
2141 } => last_value_no_list(datums, temp_storage, order_by, window_frame).collect_vec(),
2142 AggregateFunc::FusedValueWindowFunc { funcs, order_by } => {
2143 fused_value_window_func_no_list(datums, temp_storage, funcs, order_by).collect_vec()
2144 }
2145 AggregateFunc::WindowAggregate {
2146 wrapped_aggregate,
2147 order_by,
2148 window_frame,
2149 } => window_aggr_no_list::<_, W>(
2150 datums,
2151 temp_storage,
2152 wrapped_aggregate,
2153 order_by,
2154 window_frame,
2155 )
2156 .collect_vec(),
2157 AggregateFunc::FusedWindowAggregate {
2158 wrapped_aggregates,
2159 order_by,
2160 window_frame,
2161 } => fused_window_aggr_no_list::<_, W>(
2162 datums,
2163 temp_storage,
2164 wrapped_aggregates,
2165 order_by,
2166 window_frame,
2167 )
2168 .collect_vec(),
2169 _ => unreachable!("asserted above that `can_fuse_with_unnest_list`"),
2170 }
2171 .into_iter()
2172 }
2173
2174 pub fn default(&self) -> Datum<'static> {
2177 match self {
2178 AggregateFunc::Count => Datum::Int64(0),
2179 AggregateFunc::Any => Datum::False,
2180 AggregateFunc::All => Datum::True,
2181 AggregateFunc::Dummy => Datum::Dummy,
2182 _ => Datum::Null,
2183 }
2184 }
2185
2186 pub fn identity_datum(&self) -> Datum<'static> {
2189 match self {
2190 AggregateFunc::Any => Datum::False,
2191 AggregateFunc::All => Datum::True,
2192 AggregateFunc::Dummy => Datum::Dummy,
2193 AggregateFunc::ArrayConcat { .. } => Datum::empty_array(),
2194 AggregateFunc::ListConcat { .. } => Datum::empty_list(),
2195 AggregateFunc::RowNumber { .. }
2196 | AggregateFunc::Rank { .. }
2197 | AggregateFunc::DenseRank { .. }
2198 | AggregateFunc::LagLead { .. }
2199 | AggregateFunc::FirstValue { .. }
2200 | AggregateFunc::LastValue { .. }
2201 | AggregateFunc::WindowAggregate { .. }
2202 | AggregateFunc::FusedValueWindowFunc { .. }
2203 | AggregateFunc::FusedWindowAggregate { .. } => Datum::empty_list(),
2204 AggregateFunc::MaxNumeric
2205 | AggregateFunc::MaxInt16
2206 | AggregateFunc::MaxInt32
2207 | AggregateFunc::MaxInt64
2208 | AggregateFunc::MaxUInt16
2209 | AggregateFunc::MaxUInt32
2210 | AggregateFunc::MaxUInt64
2211 | AggregateFunc::MaxMzTimestamp
2212 | AggregateFunc::MaxFloat32
2213 | AggregateFunc::MaxFloat64
2214 | AggregateFunc::MaxBool
2215 | AggregateFunc::MaxString
2216 | AggregateFunc::MaxDate
2217 | AggregateFunc::MaxTimestamp
2218 | AggregateFunc::MaxTimestampTz
2219 | AggregateFunc::MaxInterval
2220 | AggregateFunc::MaxTime
2221 | AggregateFunc::MinNumeric
2222 | AggregateFunc::MinInt16
2223 | AggregateFunc::MinInt32
2224 | AggregateFunc::MinInt64
2225 | AggregateFunc::MinUInt16
2226 | AggregateFunc::MinUInt32
2227 | AggregateFunc::MinUInt64
2228 | AggregateFunc::MinMzTimestamp
2229 | AggregateFunc::MinFloat32
2230 | AggregateFunc::MinFloat64
2231 | AggregateFunc::MinBool
2232 | AggregateFunc::MinString
2233 | AggregateFunc::MinDate
2234 | AggregateFunc::MinTimestamp
2235 | AggregateFunc::MinTimestampTz
2236 | AggregateFunc::MinInterval
2237 | AggregateFunc::MinTime
2238 | AggregateFunc::SumInt16
2239 | AggregateFunc::SumInt32
2240 | AggregateFunc::SumInt64
2241 | AggregateFunc::SumUInt16
2242 | AggregateFunc::SumUInt32
2243 | AggregateFunc::SumUInt64
2244 | AggregateFunc::SumFloat32
2245 | AggregateFunc::SumFloat64
2246 | AggregateFunc::SumNumeric
2247 | AggregateFunc::Count
2248 | AggregateFunc::JsonbAgg { .. }
2249 | AggregateFunc::JsonbObjectAgg { .. }
2250 | AggregateFunc::MapAgg { .. }
2251 | AggregateFunc::StringAgg { .. } => Datum::Null,
2252 }
2253 }
2254
2255 pub fn can_fuse_with_unnest_list(&self) -> bool {
2256 match self {
2257 AggregateFunc::RowNumber { .. }
2258 | AggregateFunc::Rank { .. }
2259 | AggregateFunc::DenseRank { .. }
2260 | AggregateFunc::LagLead { .. }
2261 | AggregateFunc::FirstValue { .. }
2262 | AggregateFunc::LastValue { .. }
2263 | AggregateFunc::WindowAggregate { .. }
2264 | AggregateFunc::FusedValueWindowFunc { .. }
2265 | AggregateFunc::FusedWindowAggregate { .. } => true,
2266 AggregateFunc::ArrayConcat { .. }
2267 | AggregateFunc::ListConcat { .. }
2268 | AggregateFunc::Any
2269 | AggregateFunc::All
2270 | AggregateFunc::Dummy
2271 | AggregateFunc::MaxNumeric
2272 | AggregateFunc::MaxInt16
2273 | AggregateFunc::MaxInt32
2274 | AggregateFunc::MaxInt64
2275 | AggregateFunc::MaxUInt16
2276 | AggregateFunc::MaxUInt32
2277 | AggregateFunc::MaxUInt64
2278 | AggregateFunc::MaxMzTimestamp
2279 | AggregateFunc::MaxFloat32
2280 | AggregateFunc::MaxFloat64
2281 | AggregateFunc::MaxBool
2282 | AggregateFunc::MaxString
2283 | AggregateFunc::MaxDate
2284 | AggregateFunc::MaxTimestamp
2285 | AggregateFunc::MaxTimestampTz
2286 | AggregateFunc::MaxInterval
2287 | AggregateFunc::MaxTime
2288 | AggregateFunc::MinNumeric
2289 | AggregateFunc::MinInt16
2290 | AggregateFunc::MinInt32
2291 | AggregateFunc::MinInt64
2292 | AggregateFunc::MinUInt16
2293 | AggregateFunc::MinUInt32
2294 | AggregateFunc::MinUInt64
2295 | AggregateFunc::MinMzTimestamp
2296 | AggregateFunc::MinFloat32
2297 | AggregateFunc::MinFloat64
2298 | AggregateFunc::MinBool
2299 | AggregateFunc::MinString
2300 | AggregateFunc::MinDate
2301 | AggregateFunc::MinTimestamp
2302 | AggregateFunc::MinTimestampTz
2303 | AggregateFunc::MinInterval
2304 | AggregateFunc::MinTime
2305 | AggregateFunc::SumInt16
2306 | AggregateFunc::SumInt32
2307 | AggregateFunc::SumInt64
2308 | AggregateFunc::SumUInt16
2309 | AggregateFunc::SumUInt32
2310 | AggregateFunc::SumUInt64
2311 | AggregateFunc::SumFloat32
2312 | AggregateFunc::SumFloat64
2313 | AggregateFunc::SumNumeric
2314 | AggregateFunc::Count
2315 | AggregateFunc::JsonbAgg { .. }
2316 | AggregateFunc::JsonbObjectAgg { .. }
2317 | AggregateFunc::MapAgg { .. }
2318 | AggregateFunc::StringAgg { .. } => false,
2319 }
2320 }
2321
2322 pub fn output_sql_type(&self, input_type: SqlColumnType) -> SqlColumnType {
2328 let scalar_type = match self {
2329 AggregateFunc::Count => SqlScalarType::Int64,
2330 AggregateFunc::Any => SqlScalarType::Bool,
2331 AggregateFunc::All => SqlScalarType::Bool,
2332 AggregateFunc::JsonbAgg { .. } => SqlScalarType::Jsonb,
2333 AggregateFunc::JsonbObjectAgg { .. } => SqlScalarType::Jsonb,
2334 AggregateFunc::SumInt16 => SqlScalarType::Int64,
2335 AggregateFunc::SumInt32 => SqlScalarType::Int64,
2336 AggregateFunc::SumInt64 => SqlScalarType::Numeric {
2337 max_scale: Some(NumericMaxScale::ZERO),
2338 },
2339 AggregateFunc::SumUInt16 => SqlScalarType::UInt64,
2340 AggregateFunc::SumUInt32 => SqlScalarType::UInt64,
2341 AggregateFunc::SumUInt64 => SqlScalarType::Numeric {
2342 max_scale: Some(NumericMaxScale::ZERO),
2343 },
2344 AggregateFunc::MapAgg { value_type, .. } => SqlScalarType::Map {
2345 value_type: Box::new(value_type.clone()),
2346 custom_id: None,
2347 },
2348 AggregateFunc::ArrayConcat { .. } | AggregateFunc::ListConcat { .. } => {
2349 match input_type.scalar_type {
2350 SqlScalarType::Record { ref fields, .. } => fields[0].1.scalar_type.clone(),
2352 _ => unreachable!(),
2353 }
2354 }
2355 AggregateFunc::StringAgg { .. } => SqlScalarType::String,
2356 AggregateFunc::RowNumber { .. } => {
2357 AggregateFunc::output_type_ranking_window_funcs(&input_type, "?row_number?")
2358 }
2359 AggregateFunc::Rank { .. } => {
2360 AggregateFunc::output_type_ranking_window_funcs(&input_type, "?rank?")
2361 }
2362 AggregateFunc::DenseRank { .. } => {
2363 AggregateFunc::output_type_ranking_window_funcs(&input_type, "?dense_rank?")
2364 }
2365 AggregateFunc::LagLead { lag_lead: lag_lead_type, .. } => {
2366 let fields = input_type.scalar_type.unwrap_record_element_type();
2368 let original_row_type = fields[0].unwrap_record_element_type()[0]
2369 .clone()
2370 .nullable(false);
2371 let encoded_args = fields[0].unwrap_record_element_type()[1];
2372 let output_type_inner =
2373 Self::lag_lead_output_type_inner_from_encoded_args(encoded_args);
2374 let column_name = Self::lag_lead_result_column_name(lag_lead_type);
2375
2376 SqlScalarType::List {
2377 element_type: Box::new(SqlScalarType::Record {
2378 fields: [
2379 (column_name, output_type_inner),
2380 (ColumnName::from("?orig_row?"), original_row_type),
2381 ].into(),
2382 custom_id: None,
2383 }),
2384 custom_id: None,
2385 }
2386 }
2387 AggregateFunc::FirstValue { .. } => {
2388 let fields = input_type.scalar_type.unwrap_record_element_type();
2390 let original_row_type = fields[0].unwrap_record_element_type()[0]
2391 .clone()
2392 .nullable(false);
2393 let value_type = fields[0].unwrap_record_element_type()[1]
2394 .clone()
2395 .nullable(true); SqlScalarType::List {
2398 element_type: Box::new(SqlScalarType::Record {
2399 fields: [
2400 (ColumnName::from("?first_value?"), value_type),
2401 (ColumnName::from("?orig_row?"), original_row_type),
2402 ].into(),
2403 custom_id: None,
2404 }),
2405 custom_id: None,
2406 }
2407 }
2408 AggregateFunc::LastValue { .. } => {
2409 let fields = input_type.scalar_type.unwrap_record_element_type();
2411 let original_row_type = fields[0].unwrap_record_element_type()[0]
2412 .clone()
2413 .nullable(false);
2414 let value_type = fields[0].unwrap_record_element_type()[1]
2415 .clone()
2416 .nullable(true); SqlScalarType::List {
2419 element_type: Box::new(SqlScalarType::Record {
2420 fields: [
2421 (ColumnName::from("?last_value?"), value_type),
2422 (ColumnName::from("?orig_row?"), original_row_type),
2423 ].into(),
2424 custom_id: None,
2425 }),
2426 custom_id: None,
2427 }
2428 }
2429 AggregateFunc::WindowAggregate {
2430 wrapped_aggregate, ..
2431 } => {
2432 let fields = input_type.scalar_type.unwrap_record_element_type();
2434 let original_row_type = fields[0].unwrap_record_element_type()[0]
2435 .clone()
2436 .nullable(false);
2437 let arg_type = fields[0].unwrap_record_element_type()[1]
2438 .clone()
2439 .nullable(true);
2440 let wrapped_aggr_out_type = wrapped_aggregate.output_sql_type(arg_type);
2441
2442 SqlScalarType::List {
2443 element_type: Box::new(SqlScalarType::Record {
2444 fields: [
2445 (ColumnName::from("?window_agg?"), wrapped_aggr_out_type),
2446 (ColumnName::from("?orig_row?"), original_row_type),
2447 ].into(),
2448 custom_id: None,
2449 }),
2450 custom_id: None,
2451 }
2452 }
2453 AggregateFunc::FusedWindowAggregate {
2454 wrapped_aggregates, ..
2455 } => {
2456 let fields = input_type.scalar_type.unwrap_record_element_type();
2459 let original_row_type = fields[0].unwrap_record_element_type()[0]
2460 .clone()
2461 .nullable(false);
2462 let args_type = fields[0].unwrap_record_element_type()[1];
2463 let arg_types = args_type.unwrap_record_element_type();
2464 let out_fields = arg_types.iter().zip_eq(wrapped_aggregates).map(
2465 |(arg_type, wrapped_agg)| {
2466 (
2467 ColumnName::from(wrapped_agg.name()),
2468 wrapped_agg.output_sql_type((**arg_type).clone().nullable(true)),
2469 )
2470 }).collect_vec();
2471
2472 SqlScalarType::List {
2473 element_type: Box::new(SqlScalarType::Record {
2474 fields: [
2475 (ColumnName::from("?fused_window_agg?"), SqlScalarType::Record {
2476 fields: out_fields.into(),
2477 custom_id: None,
2478 }.nullable(false)),
2479 (ColumnName::from("?orig_row?"), original_row_type),
2480 ].into(),
2481 custom_id: None,
2482 }),
2483 custom_id: None,
2484 }
2485 }
2486 AggregateFunc::FusedValueWindowFunc { funcs, order_by: _ } => {
2487 let fields = input_type.scalar_type.unwrap_record_element_type();
2492 let original_row_type = fields[0].unwrap_record_element_type()[0]
2493 .clone()
2494 .nullable(false);
2495 let encoded_args_type = fields[0]
2496 .unwrap_record_element_type()[1]
2497 .unwrap_record_element_type();
2498
2499 SqlScalarType::List {
2500 element_type: Box::new(SqlScalarType::Record {
2501 fields: [
2502 (
2503 ColumnName::from("?fused_value_window_func?"),
2504 SqlScalarType::Record {
2505 fields: encoded_args_type.into_iter().zip_eq(funcs).map(
2506 |(arg_type, func)| {
2507 match func {
2508 AggregateFunc::LagLead {
2509 lag_lead: lag_lead_type, ..
2510 } => {
2511 let name = Self::lag_lead_result_column_name(
2512 lag_lead_type,
2513 );
2514 let ty = Self
2515 ::lag_lead_output_type_inner_from_encoded_args(
2516 arg_type,
2517 );
2518 (name, ty)
2519 },
2520 AggregateFunc::FirstValue { .. } => {
2521 (
2522 ColumnName::from("?first_value?"),
2523 arg_type.clone().nullable(true),
2524 )
2525 }
2526 AggregateFunc::LastValue { .. } => {
2527 (
2528 ColumnName::from("?last_value?"),
2529 arg_type.clone().nullable(true),
2530 )
2531 }
2532 _ => panic!("FusedValueWindowFunc has an unknown function"),
2533 }
2534 }).collect(),
2535 custom_id: None,
2536 }.nullable(false)),
2537 (ColumnName::from("?orig_row?"), original_row_type),
2538 ].into(),
2539 custom_id: None,
2540 }),
2541 custom_id: None,
2542 }
2543 }
2544 AggregateFunc::Dummy
2545 | AggregateFunc::MaxNumeric
2546 | AggregateFunc::MaxInt16
2547 | AggregateFunc::MaxInt32
2548 | AggregateFunc::MaxInt64
2549 | AggregateFunc::MaxUInt16
2550 | AggregateFunc::MaxUInt32
2551 | AggregateFunc::MaxUInt64
2552 | AggregateFunc::MaxMzTimestamp
2553 | AggregateFunc::MaxFloat32
2554 | AggregateFunc::MaxFloat64
2555 | AggregateFunc::MaxBool
2556 | AggregateFunc::MaxString
2560 | AggregateFunc::MaxDate
2561 | AggregateFunc::MaxTimestamp
2562 | AggregateFunc::MaxTimestampTz
2563 | AggregateFunc::MaxInterval
2564 | AggregateFunc::MaxTime
2565 | AggregateFunc::MinNumeric
2566 | AggregateFunc::MinInt16
2567 | AggregateFunc::MinInt32
2568 | AggregateFunc::MinInt64
2569 | AggregateFunc::MinUInt16
2570 | AggregateFunc::MinUInt32
2571 | AggregateFunc::MinUInt64
2572 | AggregateFunc::MinMzTimestamp
2573 | AggregateFunc::MinFloat32
2574 | AggregateFunc::MinFloat64
2575 | AggregateFunc::MinBool
2576 | AggregateFunc::MinString
2577 | AggregateFunc::MinDate
2578 | AggregateFunc::MinTimestamp
2579 | AggregateFunc::MinTimestampTz
2580 | AggregateFunc::MinInterval
2581 | AggregateFunc::MinTime
2582 | AggregateFunc::SumFloat32
2583 | AggregateFunc::SumFloat64
2584 | AggregateFunc::SumNumeric => input_type.scalar_type.clone(),
2585 };
2586 let nullable = match self {
2589 AggregateFunc::Count => false,
2590 AggregateFunc::StringAgg { .. } => match input_type.scalar_type {
2592 SqlScalarType::Record { fields, .. } => match &fields[0].1.scalar_type {
2594 SqlScalarType::Record { fields, .. } => fields[0].1.nullable,
2596 _ => unreachable!(),
2597 },
2598 _ => unreachable!(),
2599 },
2600 _ => input_type.nullable,
2601 };
2602 scalar_type.nullable(nullable)
2603 }
2604
2605 pub fn output_type(&self, input_type: ReprColumnType) -> ReprColumnType {
2609 ReprColumnType::from(&self.output_sql_type(SqlColumnType::from_repr(&input_type)))
2610 }
2611
2612 fn output_type_ranking_window_funcs(
2614 input_type: &SqlColumnType,
2615 col_name: &str,
2616 ) -> SqlScalarType {
2617 match input_type.scalar_type {
2618 SqlScalarType::Record { ref fields, .. } => SqlScalarType::List {
2619 element_type: Box::new(SqlScalarType::Record {
2620 fields: [
2621 (
2622 ColumnName::from(col_name),
2623 SqlScalarType::Int64.nullable(false),
2624 ),
2625 (ColumnName::from("?orig_row?"), {
2626 let inner = match &fields[0].1.scalar_type {
2627 SqlScalarType::List { element_type, .. } => element_type.clone(),
2628 _ => unreachable!(),
2629 };
2630 inner.nullable(false)
2631 }),
2632 ]
2633 .into(),
2634 custom_id: None,
2635 }),
2636 custom_id: None,
2637 },
2638 _ => unreachable!(),
2639 }
2640 }
2641
2642 fn lag_lead_output_type_inner_from_encoded_args(
2646 encoded_args_type: &SqlScalarType,
2647 ) -> SqlColumnType {
2648 encoded_args_type.unwrap_record_element_type()[0]
2652 .clone()
2653 .nullable(true)
2654 }
2655
2656 fn lag_lead_result_column_name(lag_lead_type: &LagLeadType) -> ColumnName {
2657 ColumnName::from(match lag_lead_type {
2658 LagLeadType::Lag => "?lag?",
2659 LagLeadType::Lead => "?lead?",
2660 })
2661 }
2662
2663 pub fn propagates_nonnull_constraint(&self) -> bool {
2668 match self {
2669 AggregateFunc::MaxNumeric
2670 | AggregateFunc::MaxInt16
2671 | AggregateFunc::MaxInt32
2672 | AggregateFunc::MaxInt64
2673 | AggregateFunc::MaxUInt16
2674 | AggregateFunc::MaxUInt32
2675 | AggregateFunc::MaxUInt64
2676 | AggregateFunc::MaxMzTimestamp
2677 | AggregateFunc::MaxFloat32
2678 | AggregateFunc::MaxFloat64
2679 | AggregateFunc::MaxBool
2680 | AggregateFunc::MaxString
2681 | AggregateFunc::MaxDate
2682 | AggregateFunc::MaxTimestamp
2683 | AggregateFunc::MaxTimestampTz
2684 | AggregateFunc::MinNumeric
2685 | AggregateFunc::MinInt16
2686 | AggregateFunc::MinInt32
2687 | AggregateFunc::MinInt64
2688 | AggregateFunc::MinUInt16
2689 | AggregateFunc::MinUInt32
2690 | AggregateFunc::MinUInt64
2691 | AggregateFunc::MinMzTimestamp
2692 | AggregateFunc::MinFloat32
2693 | AggregateFunc::MinFloat64
2694 | AggregateFunc::MinBool
2695 | AggregateFunc::MinString
2696 | AggregateFunc::MinDate
2697 | AggregateFunc::MinTimestamp
2698 | AggregateFunc::MinTimestampTz
2699 | AggregateFunc::SumInt16
2700 | AggregateFunc::SumInt32
2701 | AggregateFunc::SumInt64
2702 | AggregateFunc::SumUInt16
2703 | AggregateFunc::SumUInt32
2704 | AggregateFunc::SumUInt64
2705 | AggregateFunc::SumFloat32
2706 | AggregateFunc::SumFloat64
2707 | AggregateFunc::SumNumeric
2708 | AggregateFunc::StringAgg { .. } => true,
2709 AggregateFunc::Count => false,
2711 _ => false,
2712 }
2713 }
2714}
2715
2716fn jsonb_each<'a>(a: Datum<'a>) -> impl Iterator<Item = (Row, Diff)> + 'a {
2717 let map = match a {
2719 Datum::Map(dict) => dict,
2720 _ => mz_repr::DatumMap::empty(),
2721 };
2722
2723 map.iter()
2724 .map(move |(k, v)| (Row::pack_slice(&[Datum::String(k), v]), Diff::ONE))
2725}
2726
2727fn jsonb_each_stringify<'a>(
2728 a: Datum<'a>,
2729 temp_storage: &'a RowArena,
2730) -> impl Iterator<Item = (Row, Diff)> + 'a {
2731 let map = match a {
2733 Datum::Map(dict) => dict,
2734 _ => mz_repr::DatumMap::empty(),
2735 };
2736
2737 map.iter().map(move |(k, mut v)| {
2738 v = jsonb_stringify(v, temp_storage)
2739 .map(Datum::String)
2740 .unwrap_or(Datum::Null);
2741 (Row::pack_slice(&[Datum::String(k), v]), Diff::ONE)
2742 })
2743}
2744
2745fn jsonb_object_keys<'a>(a: Datum<'a>) -> impl Iterator<Item = (Row, Diff)> + 'a {
2746 let map = match a {
2747 Datum::Map(dict) => dict,
2748 _ => mz_repr::DatumMap::empty(),
2749 };
2750
2751 map.iter()
2752 .map(move |(k, _)| (Row::pack_slice(&[Datum::String(k)]), Diff::ONE))
2753}
2754
2755fn jsonb_array_elements<'a>(a: Datum<'a>) -> impl Iterator<Item = (Row, Diff)> + 'a {
2756 let list = match a {
2757 Datum::List(list) => list,
2758 _ => mz_repr::DatumList::empty(),
2759 };
2760 list.iter().map(move |e| (Row::pack_slice(&[e]), Diff::ONE))
2761}
2762
2763fn jsonb_array_elements_stringify<'a>(
2764 a: Datum<'a>,
2765 temp_storage: &'a RowArena,
2766) -> impl Iterator<Item = (Row, Diff)> + 'a {
2767 let list = match a {
2768 Datum::List(list) => list,
2769 _ => mz_repr::DatumList::empty(),
2770 };
2771 list.iter().map(move |mut e| {
2772 e = jsonb_stringify(e, temp_storage)
2773 .map(Datum::String)
2774 .unwrap_or(Datum::Null);
2775 (Row::pack_slice(&[e]), Diff::ONE)
2776 })
2777}
2778
2779fn regexp_extract(a: Datum, r: &AnalyzedRegex) -> Option<(Row, Diff)> {
2780 let r = r.inner();
2781 let a = a.unwrap_str();
2782 let captures = r.captures(a)?;
2783 let datums = captures
2784 .iter()
2785 .skip(1)
2786 .map(|m| Datum::from(m.map(|m| m.as_str())));
2787 Some((Row::pack(datums), Diff::ONE))
2788}
2789
2790fn regexp_matches<'a, 'r: 'a>(
2791 exprs: &[Datum<'a>],
2792) -> Result<impl Iterator<Item = (Row, Diff)> + 'a, EvalError> {
2793 assert!(exprs.len() == 2 || exprs.len() == 3);
2797 let a = exprs[0].unwrap_str();
2798 let r = exprs[1].unwrap_str();
2799
2800 let (regex, opts) = if exprs.len() == 3 {
2801 let flag = exprs[2].unwrap_str();
2802 let opts = AnalyzedRegexOpts::from_str(flag)?;
2803 (AnalyzedRegex::new(r, opts)?, opts)
2804 } else {
2805 let opts = AnalyzedRegexOpts::default();
2806 (AnalyzedRegex::new(r, opts)?, opts)
2807 };
2808
2809 let regex = regex.inner().clone();
2810
2811 let iter = regex.captures_iter(a).map(move |captures| {
2812 let matches = captures
2813 .iter()
2814 .skip(1)
2816 .map(|m| Datum::from(m.map(|m| m.as_str())))
2817 .collect::<Vec<_>>();
2818
2819 let mut binding = SharedRow::get();
2820 let mut packer = binding.packer();
2821
2822 let dimension = ArrayDimension {
2823 lower_bound: 1,
2824 length: matches.len(),
2825 };
2826 packer
2827 .try_push_array(&[dimension], matches)
2828 .expect("generated dimensions above");
2829
2830 (binding.clone(), Diff::ONE)
2831 });
2832
2833 let out = iter.collect::<SmallVec<[_; 3]>>();
2838
2839 if opts.global {
2840 Ok(Either::Left(out.into_iter()))
2841 } else {
2842 Ok(Either::Right(out.into_iter().take(1)))
2843 }
2844}
2845
2846fn generate_series<N>(
2847 start: N,
2848 stop: N,
2849 step: N,
2850) -> Result<impl Iterator<Item = (Row, Diff)>, EvalError>
2851where
2852 N: Integer + Signed + CheckedAdd + Clone,
2853 Datum<'static>: From<N>,
2854{
2855 if step == N::zero() {
2856 return Err(EvalError::InvalidParameterValue(
2857 "step size cannot equal zero".into(),
2858 ));
2859 }
2860 Ok(num::range_step_inclusive(start, stop, step)
2861 .map(move |i| (Row::pack_slice(&[Datum::from(i)]), Diff::ONE)))
2862}
2863
2864#[derive(Clone)]
2868pub struct TimestampRangeStepInclusive<T> {
2869 state: CheckedTimestamp<T>,
2870 stop: CheckedTimestamp<T>,
2871 step: Interval,
2872 rev: bool,
2873 done: bool,
2874}
2875
2876impl<T: TimestampLike> Iterator for TimestampRangeStepInclusive<T> {
2877 type Item = CheckedTimestamp<T>;
2878
2879 #[inline]
2880 fn next(&mut self) -> Option<CheckedTimestamp<T>> {
2881 if !self.done
2882 && ((self.rev && self.state >= self.stop) || (!self.rev && self.state <= self.stop))
2883 {
2884 let result = self.state.clone();
2885 match add_timestamp_months(self.state.deref(), self.step.months) {
2886 Ok(state) => match state.checked_add_signed(self.step.duration_as_chrono()) {
2887 Some(v) => match CheckedTimestamp::from_timestamplike(v) {
2888 Ok(v) => self.state = v,
2889 Err(_) => self.done = true,
2890 },
2891 None => self.done = true,
2892 },
2893 Err(..) => {
2894 self.done = true;
2895 }
2896 }
2897
2898 Some(result)
2899 } else {
2900 None
2901 }
2902 }
2903}
2904
2905fn generate_series_ts<T: TimestampLike>(
2906 start: CheckedTimestamp<T>,
2907 stop: CheckedTimestamp<T>,
2908 step: Interval,
2909 conv: fn(CheckedTimestamp<T>) -> Datum<'static>,
2910) -> Result<impl Iterator<Item = (Row, Diff)>, EvalError> {
2911 let normalized_step = step.as_microseconds();
2912 if normalized_step == 0 {
2913 return Err(EvalError::InvalidParameterValue(
2914 "step size cannot equal zero".into(),
2915 ));
2916 }
2917 let rev = normalized_step < 0;
2918
2919 let trsi = TimestampRangeStepInclusive {
2920 state: start,
2921 stop,
2922 step,
2923 rev,
2924 done: false,
2925 };
2926
2927 Ok(trsi.map(move |i| (Row::pack_slice(&[conv(i)]), Diff::ONE)))
2928}
2929
2930fn generate_subscripts_array(
2931 a: Datum,
2932 dim: i32,
2933) -> Result<Box<dyn Iterator<Item = (Row, Diff)>>, EvalError> {
2934 if dim <= 0 {
2935 return Ok(Box::new(iter::empty()));
2936 }
2937
2938 match a.unwrap_array().dims().into_iter().nth(
2939 (dim - 1)
2940 .try_into()
2941 .map_err(|_| EvalError::Int32OutOfRange((dim - 1).to_string().into()))?,
2942 ) {
2943 Some(requested_dim) => Ok(Box::new(generate_series::<i32>(
2944 requested_dim.lower_bound.try_into().map_err(|_| {
2945 EvalError::Int32OutOfRange(requested_dim.lower_bound.to_string().into())
2946 })?,
2947 requested_dim
2948 .length
2949 .try_into()
2950 .map_err(|_| EvalError::Int32OutOfRange(requested_dim.length.to_string().into()))?,
2951 1,
2952 )?)),
2953 None => Ok(Box::new(iter::empty())),
2954 }
2955}
2956
2957fn unnest_array<'a>(a: Datum<'a>) -> impl Iterator<Item = (Row, Diff)> + 'a {
2958 a.unwrap_array()
2959 .elements()
2960 .iter()
2961 .map(move |e| (Row::pack_slice(&[e]), Diff::ONE))
2962}
2963
2964fn unnest_list<'a>(a: Datum<'a>) -> impl Iterator<Item = (Row, Diff)> + 'a {
2965 a.unwrap_list()
2966 .iter()
2967 .map(move |e| (Row::pack_slice(&[e]), Diff::ONE))
2968}
2969
2970fn unnest_map<'a>(a: Datum<'a>) -> impl Iterator<Item = (Row, Diff)> + 'a {
2971 a.unwrap_map()
2972 .iter()
2973 .map(move |(k, v)| (Row::pack_slice(&[Datum::from(k), v]), Diff::ONE))
2974}
2975
2976impl AggregateFunc {
2977 pub fn name(&self) -> &'static str {
2980 match self {
2981 Self::MaxNumeric => "max",
2982 Self::MaxInt16 => "max",
2983 Self::MaxInt32 => "max",
2984 Self::MaxInt64 => "max",
2985 Self::MaxUInt16 => "max",
2986 Self::MaxUInt32 => "max",
2987 Self::MaxUInt64 => "max",
2988 Self::MaxMzTimestamp => "max",
2989 Self::MaxFloat32 => "max",
2990 Self::MaxFloat64 => "max",
2991 Self::MaxBool => "max",
2992 Self::MaxString => "max",
2993 Self::MaxDate => "max",
2994 Self::MaxTimestamp => "max",
2995 Self::MaxTimestampTz => "max",
2996 Self::MaxInterval => "max",
2997 Self::MaxTime => "max",
2998 Self::MinNumeric => "min",
2999 Self::MinInt16 => "min",
3000 Self::MinInt32 => "min",
3001 Self::MinInt64 => "min",
3002 Self::MinUInt16 => "min",
3003 Self::MinUInt32 => "min",
3004 Self::MinUInt64 => "min",
3005 Self::MinMzTimestamp => "min",
3006 Self::MinFloat32 => "min",
3007 Self::MinFloat64 => "min",
3008 Self::MinBool => "min",
3009 Self::MinString => "min",
3010 Self::MinDate => "min",
3011 Self::MinTimestamp => "min",
3012 Self::MinTimestampTz => "min",
3013 Self::MinInterval => "min",
3014 Self::MinTime => "min",
3015 Self::SumInt16 => "sum",
3016 Self::SumInt32 => "sum",
3017 Self::SumInt64 => "sum",
3018 Self::SumUInt16 => "sum",
3019 Self::SumUInt32 => "sum",
3020 Self::SumUInt64 => "sum",
3021 Self::SumFloat32 => "sum",
3022 Self::SumFloat64 => "sum",
3023 Self::SumNumeric => "sum",
3024 Self::Count => "count",
3025 Self::Any => "any",
3026 Self::All => "all",
3027 Self::JsonbAgg { .. } => "jsonb_agg",
3028 Self::JsonbObjectAgg { .. } => "jsonb_object_agg",
3029 Self::MapAgg { .. } => "map_agg",
3030 Self::ArrayConcat { .. } => "array_agg",
3031 Self::ListConcat { .. } => "list_agg",
3032 Self::StringAgg { .. } => "string_agg",
3033 Self::RowNumber { .. } => "row_number",
3034 Self::Rank { .. } => "rank",
3035 Self::DenseRank { .. } => "dense_rank",
3036 Self::LagLead {
3037 lag_lead: LagLeadType::Lag,
3038 ..
3039 } => "lag",
3040 Self::LagLead {
3041 lag_lead: LagLeadType::Lead,
3042 ..
3043 } => "lead",
3044 Self::FirstValue { .. } => "first_value",
3045 Self::LastValue { .. } => "last_value",
3046 Self::WindowAggregate { .. } => "window_agg",
3047 Self::FusedValueWindowFunc { .. } => "fused_value_window_func",
3048 Self::FusedWindowAggregate { .. } => "fused_window_agg",
3049 Self::Dummy => "dummy",
3050 }
3051 }
3052}
3053
3054impl<'a, M> fmt::Display for HumanizedExpr<'a, AggregateFunc, M>
3055where
3056 M: HumanizerMode,
3057{
3058 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
3059 use AggregateFunc::*;
3060 let name = self.expr.name();
3061 match self.expr {
3062 JsonbAgg { order_by }
3063 | JsonbObjectAgg { order_by }
3064 | MapAgg { order_by, .. }
3065 | ArrayConcat { order_by }
3066 | ListConcat { order_by }
3067 | StringAgg { order_by }
3068 | RowNumber { order_by }
3069 | Rank { order_by }
3070 | DenseRank { order_by } => {
3071 let order_by = order_by.iter().map(|col| self.child(col));
3072 write!(f, "{}[order_by=[{}]]", name, separated(", ", order_by))
3073 }
3074 LagLead {
3075 lag_lead: _,
3076 ignore_nulls,
3077 order_by,
3078 } => {
3079 let order_by = order_by.iter().map(|col| self.child(col));
3080 f.write_str(name)?;
3081 f.write_str("[")?;
3082 if *ignore_nulls {
3083 f.write_str("ignore_nulls=true, ")?;
3084 }
3085 write!(f, "order_by=[{}]", separated(", ", order_by))?;
3086 f.write_str("]")
3087 }
3088 FirstValue {
3089 order_by,
3090 window_frame,
3091 } => {
3092 let order_by = order_by.iter().map(|col| self.child(col));
3093 f.write_str(name)?;
3094 f.write_str("[")?;
3095 write!(f, "order_by=[{}]", separated(", ", order_by))?;
3096 if *window_frame != WindowFrame::default() {
3097 write!(f, " {}", window_frame)?;
3098 }
3099 f.write_str("]")
3100 }
3101 LastValue {
3102 order_by,
3103 window_frame,
3104 } => {
3105 let order_by = order_by.iter().map(|col| self.child(col));
3106 f.write_str(name)?;
3107 f.write_str("[")?;
3108 write!(f, "order_by=[{}]", separated(", ", order_by))?;
3109 if *window_frame != WindowFrame::default() {
3110 write!(f, " {}", window_frame)?;
3111 }
3112 f.write_str("]")
3113 }
3114 WindowAggregate {
3115 wrapped_aggregate,
3116 order_by,
3117 window_frame,
3118 } => {
3119 let order_by = order_by.iter().map(|col| self.child(col));
3120 let wrapped_aggregate = self.child(wrapped_aggregate.deref());
3121 f.write_str(name)?;
3122 f.write_str("[")?;
3123 write!(f, "{} ", wrapped_aggregate)?;
3124 write!(f, "order_by=[{}]", separated(", ", order_by))?;
3125 if *window_frame != WindowFrame::default() {
3126 write!(f, " {}", window_frame)?;
3127 }
3128 f.write_str("]")
3129 }
3130 FusedValueWindowFunc { funcs, order_by } => {
3131 let order_by = order_by.iter().map(|col| self.child(col));
3132 let funcs = separated(", ", funcs.iter().map(|func| self.child(func)));
3133 f.write_str(name)?;
3134 f.write_str("[")?;
3135 write!(f, "{} ", funcs)?;
3136 write!(f, "order_by=[{}]", separated(", ", order_by))?;
3137 f.write_str("]")
3138 }
3139 _ => f.write_str(name),
3140 }
3141 }
3142}
3143
3144#[derive(
3145 Clone,
3146 Debug,
3147 Eq,
3148 PartialEq,
3149 Ord,
3150 PartialOrd,
3151 Serialize,
3152 Deserialize,
3153 Hash,
3154 MzReflect
3155)]
3156pub struct CaptureGroupDesc {
3157 pub index: u32,
3158 pub name: Option<String>,
3159 pub nullable: bool,
3160}
3161
3162#[derive(
3163 Clone,
3164 Copy,
3165 Debug,
3166 Eq,
3167 PartialEq,
3168 Ord,
3169 PartialOrd,
3170 Serialize,
3171 Deserialize,
3172 Hash,
3173 MzReflect,
3174 Default
3175)]
3176pub struct AnalyzedRegexOpts {
3177 pub case_insensitive: bool,
3178 pub global: bool,
3179}
3180
3181impl FromStr for AnalyzedRegexOpts {
3182 type Err = EvalError;
3183
3184 fn from_str(s: &str) -> Result<Self, Self::Err> {
3185 let mut opts = AnalyzedRegexOpts::default();
3186 for c in s.chars() {
3187 match c {
3188 'i' => opts.case_insensitive = true,
3189 'g' => opts.global = true,
3190 _ => return Err(EvalError::InvalidRegexFlag(c)),
3191 }
3192 }
3193 Ok(opts)
3194 }
3195}
3196
3197#[derive(
3198 Clone,
3199 Debug,
3200 Eq,
3201 PartialEq,
3202 Ord,
3203 PartialOrd,
3204 Serialize,
3205 Deserialize,
3206 Hash,
3207 MzReflect
3208)]
3209pub struct AnalyzedRegex(ReprRegex, Vec<CaptureGroupDesc>, AnalyzedRegexOpts);
3210
3211impl AnalyzedRegex {
3212 pub fn new(s: &str, opts: AnalyzedRegexOpts) -> Result<Self, RegexCompilationError> {
3213 let r = ReprRegex::new(s, opts.case_insensitive)?;
3214 #[allow(clippy::as_conversions)]
3216 let descs: Vec<_> = r
3217 .capture_names()
3218 .enumerate()
3219 .skip(1)
3224 .map(|(i, name)| CaptureGroupDesc {
3225 index: i as u32,
3226 name: name.map(String::from),
3227 nullable: true,
3230 })
3231 .collect();
3232 Ok(Self(r, descs, opts))
3233 }
3234 pub fn capture_groups_len(&self) -> usize {
3235 self.1.len()
3236 }
3237 pub fn capture_groups_iter(&self) -> impl Iterator<Item = &CaptureGroupDesc> {
3238 self.1.iter()
3239 }
3240 pub fn inner(&self) -> &Regex {
3241 &(self.0).regex
3242 }
3243 pub fn opts(&self) -> &AnalyzedRegexOpts {
3244 &self.2
3245 }
3246}
3247
3248pub fn csv_extract(a: Datum<'_>, n_cols: usize) -> impl Iterator<Item = (Row, Diff)> + '_ {
3249 let bytes = a.unwrap_str().as_bytes();
3250 let mut row = Row::default();
3251 let csv_reader = csv::ReaderBuilder::new()
3252 .has_headers(false)
3253 .from_reader(bytes);
3254 csv_reader.into_records().filter_map(move |res| match res {
3255 Ok(sr) if sr.len() == n_cols => {
3256 row.packer().extend(sr.iter().map(Datum::String));
3257 Some((row.clone(), Diff::ONE))
3258 }
3259 _ => None,
3260 })
3261}
3262
3263pub fn repeat(a: Datum) -> Option<(Row, Diff)> {
3264 let n = a.unwrap_int64();
3265 if n != 0 {
3266 Some((Row::default(), n.into()))
3267 } else {
3268 None
3269 }
3270}
3271
3272fn wrap<'a>(datums: &'a [Datum<'a>], width: usize) -> impl Iterator<Item = (Row, Diff)> + 'a {
3273 datums
3274 .chunks(width)
3275 .map(|chunk| (Row::pack(chunk), Diff::ONE))
3276}
3277
3278fn acl_explode<'a>(
3279 acl_items: Datum<'a>,
3280 temp_storage: &'a RowArena,
3281) -> Result<impl Iterator<Item = (Row, Diff)> + 'a, EvalError> {
3282 let acl_items = acl_items.unwrap_array();
3283 let mut res = Vec::new();
3284 for acl_item in acl_items.elements().iter() {
3285 if acl_item.is_null() {
3286 return Err(EvalError::AclArrayNullElement);
3287 }
3288 let acl_item = acl_item.unwrap_acl_item();
3289 for privilege in acl_item.acl_mode.explode() {
3290 let row = [
3291 Datum::UInt32(acl_item.grantor.0),
3292 Datum::UInt32(acl_item.grantee.0),
3293 Datum::String(temp_storage.push_string(privilege.to_string())),
3294 Datum::False,
3296 ];
3297 res.push((Row::pack_slice(&row), Diff::ONE));
3298 }
3299 }
3300 Ok(res.into_iter())
3301}
3302
3303fn mz_acl_explode<'a>(
3304 mz_acl_items: Datum<'a>,
3305 temp_storage: &'a RowArena,
3306) -> Result<impl Iterator<Item = (Row, Diff)> + 'a, EvalError> {
3307 let mz_acl_items = mz_acl_items.unwrap_array();
3308 let mut res = Vec::new();
3309 for mz_acl_item in mz_acl_items.elements().iter() {
3310 if mz_acl_item.is_null() {
3311 return Err(EvalError::MzAclArrayNullElement);
3312 }
3313 let mz_acl_item = mz_acl_item.unwrap_mz_acl_item();
3314 for privilege in mz_acl_item.acl_mode.explode() {
3315 let row = [
3316 Datum::String(temp_storage.push_string(mz_acl_item.grantor.to_string())),
3317 Datum::String(temp_storage.push_string(mz_acl_item.grantee.to_string())),
3318 Datum::String(temp_storage.push_string(privilege.to_string())),
3319 Datum::False,
3321 ];
3322 res.push((Row::pack_slice(&row), Diff::ONE));
3323 }
3324 }
3325 Ok(res.into_iter())
3326}
3327
3328#[derive(
3331 Clone,
3332 Debug,
3333 Eq,
3334 PartialEq,
3335 Ord,
3336 PartialOrd,
3337 Serialize,
3338 Deserialize,
3339 Hash,
3340 MzReflect
3341)]
3342pub enum TableFunc {
3343 AclExplode,
3344 MzAclExplode,
3345 JsonbEach,
3346 JsonbEachStringify,
3347 JsonbObjectKeys,
3348 JsonbArrayElements,
3349 JsonbArrayElementsStringify,
3350 RegexpExtract(AnalyzedRegex),
3351 CsvExtract(usize),
3352 GenerateSeriesInt32,
3353 GenerateSeriesInt64,
3354 GenerateSeriesTimestamp,
3355 GenerateSeriesTimestampTz,
3356 GuardSubquerySize {
3377 column_type: SqlScalarType,
3378 },
3379 Repeat,
3380 UnnestArray {
3381 el_typ: SqlScalarType,
3382 },
3383 UnnestList {
3384 el_typ: SqlScalarType,
3385 },
3386 UnnestMap {
3387 value_type: SqlScalarType,
3388 },
3389 Wrap {
3395 types: Vec<SqlColumnType>,
3396 width: usize,
3397 },
3398 GenerateSubscriptsArray,
3399 TabletizedScalar {
3401 name: String,
3402 relation: SqlRelationType,
3403 },
3404 RegexpMatches,
3405 #[allow(private_interfaces)]
3410 WithOrdinality(WithOrdinality),
3411}
3412
3413#[derive(
3421 Clone,
3422 Debug,
3423 Eq,
3424 PartialEq,
3425 Ord,
3426 PartialOrd,
3427 Serialize,
3428 Deserialize,
3429 Hash,
3430 MzReflect
3431)]
3432struct WithOrdinality {
3433 inner: Box<TableFunc>,
3434}
3435
3436impl TableFunc {
3437 pub fn with_ordinality(inner: TableFunc) -> Option<TableFunc> {
3439 match inner {
3440 TableFunc::AclExplode
3441 | TableFunc::MzAclExplode
3442 | TableFunc::JsonbEach
3443 | TableFunc::JsonbEachStringify
3444 | TableFunc::JsonbObjectKeys
3445 | TableFunc::JsonbArrayElements
3446 | TableFunc::JsonbArrayElementsStringify
3447 | TableFunc::RegexpExtract(_)
3448 | TableFunc::CsvExtract(_)
3449 | TableFunc::GenerateSeriesInt32
3450 | TableFunc::GenerateSeriesInt64
3451 | TableFunc::GenerateSeriesTimestamp
3452 | TableFunc::GenerateSeriesTimestampTz
3453 | TableFunc::GuardSubquerySize { .. }
3454 | TableFunc::Repeat
3455 | TableFunc::UnnestArray { .. }
3456 | TableFunc::UnnestList { .. }
3457 | TableFunc::UnnestMap { .. }
3458 | TableFunc::Wrap { .. }
3459 | TableFunc::GenerateSubscriptsArray
3460 | TableFunc::TabletizedScalar { .. }
3461 | TableFunc::RegexpMatches => Some(TableFunc::WithOrdinality(WithOrdinality {
3462 inner: Box::new(inner),
3463 })),
3464 TableFunc::WithOrdinality(_) => None,
3467 }
3468 }
3469}
3470
3471impl TableFunc {
3472 pub fn eval<'a>(
3474 &'a self,
3475 datums: &'a [Datum<'a>],
3476 temp_storage: &'a RowArena,
3477 ) -> Result<Box<dyn Iterator<Item = (Row, Diff)> + 'a>, EvalError> {
3478 if self.empty_on_null_input() && datums.iter().any(|d| d.is_null()) {
3479 return Ok(Box::new(vec![].into_iter()));
3480 }
3481 match self {
3482 TableFunc::AclExplode => Ok(Box::new(acl_explode(datums[0], temp_storage)?)),
3483 TableFunc::MzAclExplode => Ok(Box::new(mz_acl_explode(datums[0], temp_storage)?)),
3484 TableFunc::JsonbEach => Ok(Box::new(jsonb_each(datums[0]))),
3485 TableFunc::JsonbEachStringify => {
3486 Ok(Box::new(jsonb_each_stringify(datums[0], temp_storage)))
3487 }
3488 TableFunc::JsonbObjectKeys => Ok(Box::new(jsonb_object_keys(datums[0]))),
3489 TableFunc::JsonbArrayElements => Ok(Box::new(jsonb_array_elements(datums[0]))),
3490 TableFunc::JsonbArrayElementsStringify => Ok(Box::new(jsonb_array_elements_stringify(
3491 datums[0],
3492 temp_storage,
3493 ))),
3494 TableFunc::RegexpExtract(a) => Ok(Box::new(regexp_extract(datums[0], a).into_iter())),
3495 TableFunc::CsvExtract(n_cols) => Ok(Box::new(csv_extract(datums[0], *n_cols))),
3496 TableFunc::GenerateSeriesInt32 => {
3497 let res = generate_series(
3498 datums[0].unwrap_int32(),
3499 datums[1].unwrap_int32(),
3500 datums[2].unwrap_int32(),
3501 )?;
3502 Ok(Box::new(res))
3503 }
3504 TableFunc::GenerateSeriesInt64 => {
3505 let res = generate_series(
3506 datums[0].unwrap_int64(),
3507 datums[1].unwrap_int64(),
3508 datums[2].unwrap_int64(),
3509 )?;
3510 Ok(Box::new(res))
3511 }
3512 TableFunc::GenerateSeriesTimestamp => {
3513 fn pass_through<'a>(d: CheckedTimestamp<NaiveDateTime>) -> Datum<'a> {
3514 Datum::from(d)
3515 }
3516 let res = generate_series_ts(
3517 datums[0].unwrap_timestamp(),
3518 datums[1].unwrap_timestamp(),
3519 datums[2].unwrap_interval(),
3520 pass_through,
3521 )?;
3522 Ok(Box::new(res))
3523 }
3524 TableFunc::GenerateSeriesTimestampTz => {
3525 fn gen_ts_tz<'a>(d: CheckedTimestamp<DateTime<Utc>>) -> Datum<'a> {
3526 Datum::from(d)
3527 }
3528 let res = generate_series_ts(
3529 datums[0].unwrap_timestamptz(),
3530 datums[1].unwrap_timestamptz(),
3531 datums[2].unwrap_interval(),
3532 gen_ts_tz,
3533 )?;
3534 Ok(Box::new(res))
3535 }
3536 TableFunc::GenerateSubscriptsArray => {
3537 generate_subscripts_array(datums[0], datums[1].unwrap_int32())
3538 }
3539 TableFunc::GuardSubquerySize { column_type: _ } => {
3540 let count = datums[0].unwrap_int64();
3543 if count != 1 {
3544 Err(EvalError::MultipleRowsFromSubquery)
3545 } else {
3546 Ok(Box::new([].into_iter()))
3547 }
3548 }
3549 TableFunc::Repeat => Ok(Box::new(repeat(datums[0]).into_iter())),
3550 TableFunc::UnnestArray { .. } => Ok(Box::new(unnest_array(datums[0]))),
3551 TableFunc::UnnestList { .. } => Ok(Box::new(unnest_list(datums[0]))),
3552 TableFunc::UnnestMap { .. } => Ok(Box::new(unnest_map(datums[0]))),
3553 TableFunc::Wrap { width, .. } => Ok(Box::new(wrap(datums, *width))),
3554 TableFunc::TabletizedScalar { .. } => {
3555 let r = Row::pack_slice(datums);
3556 Ok(Box::new(std::iter::once((r, Diff::ONE))))
3557 }
3558 TableFunc::RegexpMatches => Ok(Box::new(regexp_matches(datums)?)),
3559 TableFunc::WithOrdinality(func_with_ordinality) => {
3560 func_with_ordinality.eval(datums, temp_storage)
3561 }
3562 }
3563 }
3564
3565 pub fn output_sql_type(&self) -> SqlRelationType {
3566 let (column_types, keys) = match self {
3567 TableFunc::AclExplode => {
3568 let column_types = vec![
3569 SqlScalarType::Oid.nullable(false),
3570 SqlScalarType::Oid.nullable(false),
3571 SqlScalarType::String.nullable(false),
3572 SqlScalarType::Bool.nullable(false),
3573 ];
3574 let keys = vec![];
3575 (column_types, keys)
3576 }
3577 TableFunc::MzAclExplode => {
3578 let column_types = vec![
3579 SqlScalarType::String.nullable(false),
3580 SqlScalarType::String.nullable(false),
3581 SqlScalarType::String.nullable(false),
3582 SqlScalarType::Bool.nullable(false),
3583 ];
3584 let keys = vec![];
3585 (column_types, keys)
3586 }
3587 TableFunc::JsonbEach => {
3588 let column_types = vec![
3589 SqlScalarType::String.nullable(false),
3590 SqlScalarType::Jsonb.nullable(false),
3591 ];
3592 let keys = vec![];
3593 (column_types, keys)
3594 }
3595 TableFunc::JsonbEachStringify => {
3596 let column_types = vec![
3597 SqlScalarType::String.nullable(false),
3598 SqlScalarType::String.nullable(true),
3599 ];
3600 let keys = vec![];
3601 (column_types, keys)
3602 }
3603 TableFunc::JsonbObjectKeys => {
3604 let column_types = vec![SqlScalarType::String.nullable(false)];
3605 let keys = vec![];
3606 (column_types, keys)
3607 }
3608 TableFunc::JsonbArrayElements => {
3609 let column_types = vec![SqlScalarType::Jsonb.nullable(false)];
3610 let keys = vec![];
3611 (column_types, keys)
3612 }
3613 TableFunc::JsonbArrayElementsStringify => {
3614 let column_types = vec![SqlScalarType::String.nullable(true)];
3615 let keys = vec![];
3616 (column_types, keys)
3617 }
3618 TableFunc::RegexpExtract(a) => {
3619 let column_types = a
3620 .capture_groups_iter()
3621 .map(|cg| SqlScalarType::String.nullable(cg.nullable))
3622 .collect();
3623 let keys = vec![];
3624 (column_types, keys)
3625 }
3626 TableFunc::CsvExtract(n_cols) => {
3627 let column_types = iter::repeat(SqlScalarType::String.nullable(false))
3628 .take(*n_cols)
3629 .collect();
3630 let keys = vec![];
3631 (column_types, keys)
3632 }
3633 TableFunc::GenerateSeriesInt32 => {
3634 let column_types = vec![SqlScalarType::Int32.nullable(false)];
3635 let keys = vec![vec![0]];
3636 (column_types, keys)
3637 }
3638 TableFunc::GenerateSeriesInt64 => {
3639 let column_types = vec![SqlScalarType::Int64.nullable(false)];
3640 let keys = vec![vec![0]];
3641 (column_types, keys)
3642 }
3643 TableFunc::GenerateSeriesTimestamp => {
3644 let column_types =
3645 vec![SqlScalarType::Timestamp { precision: None }.nullable(false)];
3646 let keys = vec![vec![0]];
3647 (column_types, keys)
3648 }
3649 TableFunc::GenerateSeriesTimestampTz => {
3650 let column_types =
3651 vec![SqlScalarType::TimestampTz { precision: None }.nullable(false)];
3652 let keys = vec![vec![0]];
3653 (column_types, keys)
3654 }
3655 TableFunc::GenerateSubscriptsArray => {
3656 let column_types = vec![SqlScalarType::Int32.nullable(false)];
3657 let keys = vec![vec![0]];
3658 (column_types, keys)
3659 }
3660 TableFunc::GuardSubquerySize { column_type } => {
3661 let column_types = vec![column_type.clone().nullable(false)];
3662 let keys = vec![];
3663 (column_types, keys)
3664 }
3665 TableFunc::Repeat => {
3666 let column_types = vec![];
3667 let keys = vec![];
3668 (column_types, keys)
3669 }
3670 TableFunc::UnnestArray { el_typ } => {
3671 let column_types = vec![el_typ.clone().nullable(true)];
3672 let keys = vec![];
3673 (column_types, keys)
3674 }
3675 TableFunc::UnnestList { el_typ } => {
3676 let column_types = vec![el_typ.clone().nullable(true)];
3677 let keys = vec![];
3678 (column_types, keys)
3679 }
3680 TableFunc::UnnestMap { value_type } => {
3681 let column_types = vec![
3682 SqlScalarType::String.nullable(false),
3683 value_type.clone().nullable(true),
3684 ];
3685 let keys = vec![vec![0]];
3686 (column_types, keys)
3687 }
3688 TableFunc::Wrap { types, .. } => {
3689 let column_types = types.clone();
3690 let keys = vec![];
3691 (column_types, keys)
3692 }
3693 TableFunc::TabletizedScalar { relation, .. } => {
3694 return relation.clone();
3695 }
3696 TableFunc::RegexpMatches => {
3697 let column_types =
3698 vec![SqlScalarType::Array(Box::new(SqlScalarType::String)).nullable(false)];
3699 let keys = vec![];
3700
3701 (column_types, keys)
3702 }
3703 TableFunc::WithOrdinality(WithOrdinality { inner }) => {
3704 let mut typ = inner.output_sql_type();
3705 typ.column_types.push(SqlScalarType::Int64.nullable(false));
3707 typ.keys.push(vec![typ.column_types.len() - 1]);
3709 (typ.column_types, typ.keys)
3710 }
3711 };
3712
3713 soft_assert_eq_no_log!(column_types.len(), self.output_arity());
3714
3715 if !keys.is_empty() {
3716 SqlRelationType::new(column_types).with_keys(keys)
3717 } else {
3718 SqlRelationType::new(column_types)
3719 }
3720 }
3721
3722 pub fn output_type(&self) -> ReprRelationType {
3726 ReprRelationType::from(&self.output_sql_type())
3727 }
3728
3729 pub fn output_arity(&self) -> usize {
3730 match self {
3731 TableFunc::AclExplode => 4,
3732 TableFunc::MzAclExplode => 4,
3733 TableFunc::JsonbEach => 2,
3734 TableFunc::JsonbEachStringify => 2,
3735 TableFunc::JsonbObjectKeys => 1,
3736 TableFunc::JsonbArrayElements => 1,
3737 TableFunc::JsonbArrayElementsStringify => 1,
3738 TableFunc::RegexpExtract(a) => a.capture_groups_len(),
3739 TableFunc::CsvExtract(n_cols) => *n_cols,
3740 TableFunc::GenerateSeriesInt32 => 1,
3741 TableFunc::GenerateSeriesInt64 => 1,
3742 TableFunc::GenerateSeriesTimestamp => 1,
3743 TableFunc::GenerateSeriesTimestampTz => 1,
3744 TableFunc::GenerateSubscriptsArray => 1,
3745 TableFunc::GuardSubquerySize { .. } => 1,
3746 TableFunc::Repeat => 0,
3747 TableFunc::UnnestArray { .. } => 1,
3748 TableFunc::UnnestList { .. } => 1,
3749 TableFunc::UnnestMap { .. } => 2,
3750 TableFunc::Wrap { width, .. } => *width,
3751 TableFunc::TabletizedScalar { relation, .. } => relation.column_types.len(),
3752 TableFunc::RegexpMatches => 1,
3753 TableFunc::WithOrdinality(WithOrdinality { inner }) => inner.output_arity() + 1,
3754 }
3755 }
3756
3757 pub fn empty_on_null_input(&self) -> bool {
3758 match self {
3759 TableFunc::AclExplode
3760 | TableFunc::MzAclExplode
3761 | TableFunc::JsonbEach
3762 | TableFunc::JsonbEachStringify
3763 | TableFunc::JsonbObjectKeys
3764 | TableFunc::JsonbArrayElements
3765 | TableFunc::JsonbArrayElementsStringify
3766 | TableFunc::GenerateSeriesInt32
3767 | TableFunc::GenerateSeriesInt64
3768 | TableFunc::GenerateSeriesTimestamp
3769 | TableFunc::GenerateSeriesTimestampTz
3770 | TableFunc::GenerateSubscriptsArray
3771 | TableFunc::RegexpExtract(_)
3772 | TableFunc::CsvExtract(_)
3773 | TableFunc::Repeat
3774 | TableFunc::UnnestArray { .. }
3775 | TableFunc::UnnestList { .. }
3776 | TableFunc::UnnestMap { .. }
3777 | TableFunc::RegexpMatches => true,
3778 TableFunc::GuardSubquerySize { .. } => false,
3779 TableFunc::Wrap { .. } => false,
3780 TableFunc::TabletizedScalar { .. } => false,
3781 TableFunc::WithOrdinality(WithOrdinality { inner }) => inner.empty_on_null_input(),
3782 }
3783 }
3784
3785 pub fn preserves_monotonicity(&self) -> bool {
3787 match self {
3790 TableFunc::AclExplode => false,
3791 TableFunc::MzAclExplode => false,
3792 TableFunc::JsonbEach => true,
3793 TableFunc::JsonbEachStringify => true,
3794 TableFunc::JsonbObjectKeys => true,
3795 TableFunc::JsonbArrayElements => true,
3796 TableFunc::JsonbArrayElementsStringify => true,
3797 TableFunc::RegexpExtract(_) => true,
3798 TableFunc::CsvExtract(_) => true,
3799 TableFunc::GenerateSeriesInt32 => true,
3800 TableFunc::GenerateSeriesInt64 => true,
3801 TableFunc::GenerateSeriesTimestamp => true,
3802 TableFunc::GenerateSeriesTimestampTz => true,
3803 TableFunc::GenerateSubscriptsArray => true,
3804 TableFunc::Repeat => false,
3805 TableFunc::UnnestArray { .. } => true,
3806 TableFunc::UnnestList { .. } => true,
3807 TableFunc::UnnestMap { .. } => true,
3808 TableFunc::Wrap { .. } => true,
3809 TableFunc::TabletizedScalar { .. } => true,
3810 TableFunc::RegexpMatches => true,
3811 TableFunc::GuardSubquerySize { .. } => false,
3812 TableFunc::WithOrdinality(WithOrdinality { inner }) => inner.preserves_monotonicity(),
3813 }
3814 }
3815}
3816
3817impl fmt::Display for TableFunc {
3818 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
3819 match self {
3820 TableFunc::AclExplode => f.write_str("aclexplode"),
3821 TableFunc::MzAclExplode => f.write_str("mz_aclexplode"),
3822 TableFunc::JsonbEach => f.write_str("jsonb_each"),
3823 TableFunc::JsonbEachStringify => f.write_str("jsonb_each_text"),
3824 TableFunc::JsonbObjectKeys => f.write_str("jsonb_object_keys"),
3825 TableFunc::JsonbArrayElements => f.write_str("jsonb_array_elements"),
3826 TableFunc::JsonbArrayElementsStringify => f.write_str("jsonb_array_elements_text"),
3827 TableFunc::RegexpExtract(a) => write!(f, "regexp_extract({:?}, _)", a.0),
3828 TableFunc::CsvExtract(n_cols) => write!(f, "csv_extract({}, _)", n_cols),
3829 TableFunc::GenerateSeriesInt32 => f.write_str("generate_series"),
3830 TableFunc::GenerateSeriesInt64 => f.write_str("generate_series"),
3831 TableFunc::GenerateSeriesTimestamp => f.write_str("generate_series"),
3832 TableFunc::GenerateSeriesTimestampTz => f.write_str("generate_series"),
3833 TableFunc::GenerateSubscriptsArray => f.write_str("generate_subscripts"),
3834 TableFunc::GuardSubquerySize { .. } => f.write_str("guard_subquery_size"),
3835 TableFunc::Repeat => f.write_str("repeat_row"),
3836 TableFunc::UnnestArray { .. } => f.write_str("unnest_array"),
3837 TableFunc::UnnestList { .. } => f.write_str("unnest_list"),
3838 TableFunc::UnnestMap { .. } => f.write_str("unnest_map"),
3839 TableFunc::Wrap { width, .. } => write!(f, "wrap{}", width),
3840 TableFunc::TabletizedScalar { name, .. } => f.write_str(name),
3841 TableFunc::RegexpMatches => write!(f, "regexp_matches(_, _, _)"),
3842 TableFunc::WithOrdinality(WithOrdinality { inner }) => {
3843 write!(f, "{}[with_ordinality]", inner)
3844 }
3845 }
3846 }
3847}
3848
3849impl WithOrdinality {
3850 fn eval<'a>(
3859 &'a self,
3860 datums: &'a [Datum<'a>],
3861 temp_storage: &'a RowArena,
3862 ) -> Result<Box<dyn Iterator<Item = (Row, Diff)> + 'a>, EvalError> {
3863 let mut next_ordinal: i64 = 1;
3864 let it = self
3865 .inner
3866 .eval(datums, temp_storage)?
3867 .flat_map(move |(mut row, diff)| {
3868 let diff = diff.into_inner();
3869 assert!(diff >= 0);
3876 let mut ordinals = next_ordinal..(next_ordinal + diff);
3878 next_ordinal += diff;
3879 let cap = row.data_len() + datum_size(&Datum::Int64(next_ordinal));
3881 iter::from_fn(move || {
3882 let ordinal = ordinals.next()?;
3883 let mut row = if ordinals.is_empty() {
3884 std::mem::take(&mut row)
3887 } else {
3888 let mut new_row = Row::with_capacity(cap);
3889 new_row.clone_from(&row);
3890 new_row
3891 };
3892 RowPacker::for_existing_row(&mut row).push(Datum::Int64(ordinal));
3893 Some((row, Diff::ONE))
3894 })
3895 });
3896 Ok(Box::new(it))
3897 }
3898}