1#![allow(missing_docs)]
11
12use std::cmp::{max, min};
13use std::iter::Sum;
14use std::ops::Deref;
15use std::str::FromStr;
16use std::{fmt, iter};
17
18use chrono::{DateTime, NaiveDateTime, NaiveTime, Utc};
19use dec::OrderedDecimal;
20use itertools::{Either, Itertools};
21use mz_lowertest::MzReflect;
22use mz_ore::cast::CastFrom;
23
24use mz_ore::str::separated;
25use mz_ore::{soft_assert_eq_no_log, soft_assert_or_log};
26use mz_repr::adt::array::ArrayDimension;
27use mz_repr::adt::date::Date;
28use mz_repr::adt::interval::Interval;
29use mz_repr::adt::numeric::{self, Numeric, NumericMaxScale};
30use mz_repr::adt::regex::{Regex as ReprRegex, RegexCompilationError};
31use mz_repr::adt::timestamp::{CheckedTimestamp, TimestampLike};
32use mz_repr::{
33 ColumnName, Datum, Diff, Row, RowArena, RowPacker, SharedRow, SqlColumnType, SqlRelationType,
34 SqlScalarType, datum_size,
35};
36use num::{CheckedAdd, Integer, Signed, ToPrimitive};
37use ordered_float::OrderedFloat;
38use regex::Regex;
39use serde::{Deserialize, Serialize};
40use smallvec::SmallVec;
41
42use crate::EvalError;
43use crate::WindowFrameBound::{
44 CurrentRow, OffsetFollowing, OffsetPreceding, UnboundedFollowing, UnboundedPreceding,
45};
46use crate::WindowFrameUnits::{Groups, Range, Rows};
47use crate::explain::{HumanizedExpr, HumanizerMode};
48use crate::relation::{
49 ColumnOrder, WindowFrame, WindowFrameBound, WindowFrameUnits, compare_columns,
50};
51use crate::scalar::func::{add_timestamp_months, jsonb_stringify};
52
53fn max_string<'a, I>(datums: I) -> Datum<'a>
57where
58 I: IntoIterator<Item = Datum<'a>>,
59{
60 match datums
61 .into_iter()
62 .filter(|d| !d.is_null())
63 .max_by(|a, b| a.unwrap_str().cmp(b.unwrap_str()))
64 {
65 Some(datum) => datum,
66 None => Datum::Null,
67 }
68}
69
70fn max_datum<'a, I, DatumType>(datums: I) -> Datum<'a>
71where
72 I: IntoIterator<Item = Datum<'a>>,
73 DatumType: TryFrom<Datum<'a>> + Ord,
74 <DatumType as TryFrom<Datum<'a>>>::Error: std::fmt::Debug,
75 Datum<'a>: From<Option<DatumType>>,
76{
77 let x: Option<DatumType> = datums
78 .into_iter()
79 .filter(|d| !d.is_null())
80 .map(|d| DatumType::try_from(d).expect("unexpected type"))
81 .max();
82
83 x.into()
84}
85
86fn min_datum<'a, I, DatumType>(datums: I) -> Datum<'a>
87where
88 I: IntoIterator<Item = Datum<'a>>,
89 DatumType: TryFrom<Datum<'a>> + Ord,
90 <DatumType as TryFrom<Datum<'a>>>::Error: std::fmt::Debug,
91 Datum<'a>: From<Option<DatumType>>,
92{
93 let x: Option<DatumType> = datums
94 .into_iter()
95 .filter(|d| !d.is_null())
96 .map(|d| DatumType::try_from(d).expect("unexpected type"))
97 .min();
98
99 x.into()
100}
101
102fn min_string<'a, I>(datums: I) -> Datum<'a>
103where
104 I: IntoIterator<Item = Datum<'a>>,
105{
106 match datums
107 .into_iter()
108 .filter(|d| !d.is_null())
109 .min_by(|a, b| a.unwrap_str().cmp(b.unwrap_str()))
110 {
111 Some(datum) => datum,
112 None => Datum::Null,
113 }
114}
115
116fn sum_datum<'a, I, DatumType, ResultType>(datums: I) -> Datum<'a>
117where
118 I: IntoIterator<Item = Datum<'a>>,
119 DatumType: TryFrom<Datum<'a>>,
120 <DatumType as TryFrom<Datum<'a>>>::Error: std::fmt::Debug,
121 ResultType: From<DatumType> + Sum + Into<Datum<'a>>,
122{
123 let mut datums = datums.into_iter().filter(|d| !d.is_null()).peekable();
124 if datums.peek().is_none() {
125 Datum::Null
126 } else {
127 let x = datums
128 .map(|d| ResultType::from(DatumType::try_from(d).expect("unexpected type")))
129 .sum::<ResultType>();
130 x.into()
131 }
132}
133
134fn sum_numeric<'a, I>(datums: I) -> Datum<'a>
135where
136 I: IntoIterator<Item = Datum<'a>>,
137{
138 let mut cx = numeric::cx_datum();
139 let mut sum = Numeric::zero();
140 let mut empty = true;
141 for d in datums {
142 if !d.is_null() {
143 empty = false;
144 cx.add(&mut sum, &d.unwrap_numeric().0);
145 }
146 }
147 match empty {
148 true => Datum::Null,
149 false => Datum::from(sum),
150 }
151}
152
153#[allow(clippy::as_conversions)]
155fn count<'a, I>(datums: I) -> Datum<'a>
156where
157 I: IntoIterator<Item = Datum<'a>>,
158{
159 let x: i64 = datums.into_iter().filter(|d| !d.is_null()).count() as i64;
161 Datum::from(x)
162}
163
164fn any<'a, I>(datums: I) -> Datum<'a>
165where
166 I: IntoIterator<Item = Datum<'a>>,
167{
168 datums
169 .into_iter()
170 .fold(Datum::False, |state, next| match (state, next) {
171 (Datum::True, _) | (_, Datum::True) => Datum::True,
172 (Datum::Null, _) | (_, Datum::Null) => Datum::Null,
173 _ => Datum::False,
174 })
175}
176
177fn all<'a, I>(datums: I) -> Datum<'a>
178where
179 I: IntoIterator<Item = Datum<'a>>,
180{
181 datums
182 .into_iter()
183 .fold(Datum::True, |state, next| match (state, next) {
184 (Datum::False, _) | (_, Datum::False) => Datum::False,
185 (Datum::Null, _) | (_, Datum::Null) => Datum::Null,
186 _ => Datum::True,
187 })
188}
189
190fn string_agg<'a, I>(datums: I, temp_storage: &'a RowArena, order_by: &[ColumnOrder]) -> Datum<'a>
191where
192 I: IntoIterator<Item = Datum<'a>>,
193{
194 const EMPTY_SEP: &str = "";
195
196 let datums = order_aggregate_datums(datums, order_by);
197 let mut sep_value_pairs = datums.into_iter().filter_map(|d| {
198 if d.is_null() {
199 return None;
200 }
201 let mut value_sep = d.unwrap_list().iter();
202 match (value_sep.next().unwrap(), value_sep.next().unwrap()) {
203 (Datum::Null, _) => None,
204 (Datum::String(val), Datum::Null) => Some((EMPTY_SEP, val)),
205 (Datum::String(val), Datum::String(sep)) => Some((sep, val)),
206 _ => unreachable!(),
207 }
208 });
209
210 let mut s = String::default();
211 match sep_value_pairs.next() {
212 Some((_, value)) => s.push_str(value),
214 None => return Datum::Null,
216 }
217
218 for (sep, value) in sep_value_pairs {
219 s.push_str(sep);
220 s.push_str(value);
221 }
222
223 Datum::String(temp_storage.push_string(s))
224}
225
226fn jsonb_agg<'a, I>(datums: I, temp_storage: &'a RowArena, order_by: &[ColumnOrder]) -> Datum<'a>
227where
228 I: IntoIterator<Item = Datum<'a>>,
229{
230 let datums = order_aggregate_datums(datums, order_by);
231 temp_storage.make_datum(|packer| {
232 packer.push_list(datums.into_iter().filter(|d| !d.is_null()));
233 })
234}
235
236fn dict_agg<'a, I>(datums: I, temp_storage: &'a RowArena, order_by: &[ColumnOrder]) -> Datum<'a>
237where
238 I: IntoIterator<Item = Datum<'a>>,
239{
240 let datums = order_aggregate_datums(datums, order_by);
241 temp_storage.make_datum(|packer| {
242 let mut datums: Vec<_> = datums
243 .into_iter()
244 .filter_map(|d| {
245 if d.is_null() {
246 return None;
247 }
248 let mut list = d.unwrap_list().iter();
249 let key = list.next().unwrap();
250 let val = list.next().unwrap();
251 if key.is_null() {
252 None
255 } else {
256 Some((key.unwrap_str(), val))
257 }
258 })
259 .collect();
260 datums.sort_by_key(|(k, _v)| *k);
266 datums.reverse();
267 datums.dedup_by_key(|(k, _v)| *k);
268 datums.reverse();
269 packer.push_dict(datums);
270 })
271}
272
273pub fn order_aggregate_datums<'a: 'b, 'b, I>(
283 datums: I,
284 order_by: &[ColumnOrder],
285) -> impl Iterator<Item = Datum<'b>>
286where
287 I: IntoIterator<Item = Datum<'a>>,
288{
289 order_aggregate_datums_with_rank_inner(datums, order_by)
290 .into_iter()
291 .map(|(payload, _order_datums)| payload)
293}
294
295fn order_aggregate_datums_with_rank<'a, I>(
298 datums: I,
299 order_by: &[ColumnOrder],
300) -> impl Iterator<Item = (Datum<'a>, Row)>
301where
302 I: IntoIterator<Item = Datum<'a>>,
303{
304 order_aggregate_datums_with_rank_inner(datums, order_by)
305 .into_iter()
306 .map(|(payload, order_by_datums)| (payload, Row::pack(order_by_datums)))
307}
308
309fn order_aggregate_datums_with_rank_inner<'a, I>(
310 datums: I,
311 order_by: &[ColumnOrder],
312) -> Vec<(Datum<'a>, Vec<Datum<'a>>)>
313where
314 I: IntoIterator<Item = Datum<'a>>,
315{
316 let mut decoded: Vec<(Datum, Vec<Datum>)> = datums
317 .into_iter()
318 .map(|d| {
319 let list = d.unwrap_list();
320 let mut list_it = list.iter();
321 let payload = list_it.next().unwrap();
322
323 let mut order_by_datums = Vec::with_capacity(order_by.len());
333 for _ in 0..order_by.len() {
334 order_by_datums.push(
335 list_it
336 .next()
337 .expect("must have exactly the same number of Datums as `order_by`"),
338 );
339 }
340
341 (payload, order_by_datums)
342 })
343 .collect();
344
345 let mut sort_by =
346 |(payload_left, left_order_by_datums): &(Datum, Vec<Datum>),
347 (payload_right, right_order_by_datums): &(Datum, Vec<Datum>)| {
348 compare_columns(
349 order_by,
350 left_order_by_datums,
351 right_order_by_datums,
352 || payload_left.cmp(payload_right),
353 )
354 };
355 decoded.sort_unstable_by(&mut sort_by);
360 decoded
361}
362
363fn array_concat<'a, I>(datums: I, temp_storage: &'a RowArena, order_by: &[ColumnOrder]) -> Datum<'a>
364where
365 I: IntoIterator<Item = Datum<'a>>,
366{
367 let datums = order_aggregate_datums(datums, order_by);
368 let datums: Vec<_> = datums
369 .into_iter()
370 .map(|d| d.unwrap_array().elements().iter())
371 .flatten()
372 .collect();
373 let dims = ArrayDimension {
374 lower_bound: 1,
375 length: datums.len(),
376 };
377 temp_storage.make_datum(|packer| {
378 packer.try_push_array(&[dims], datums).unwrap();
379 })
380}
381
382fn list_concat<'a, I>(datums: I, temp_storage: &'a RowArena, order_by: &[ColumnOrder]) -> Datum<'a>
383where
384 I: IntoIterator<Item = Datum<'a>>,
385{
386 let datums = order_aggregate_datums(datums, order_by);
387 temp_storage.make_datum(|packer| {
388 packer.push_list(datums.into_iter().map(|d| d.unwrap_list().iter()).flatten());
389 })
390}
391
392fn row_number<'a, I>(
396 datums: I,
397 callers_temp_storage: &'a RowArena,
398 order_by: &[ColumnOrder],
399) -> Datum<'a>
400where
401 I: IntoIterator<Item = Datum<'a>>,
402{
403 let temp_storage = RowArena::new();
407 let datums = row_number_no_list(datums, &temp_storage, order_by);
408
409 callers_temp_storage.make_datum(|packer| {
410 packer.push_list(datums);
411 })
412}
413
414fn row_number_no_list<'a: 'b, 'b, I>(
417 datums: I,
418 callers_temp_storage: &'b RowArena,
419 order_by: &[ColumnOrder],
420) -> impl Iterator<Item = Datum<'b>>
421where
422 I: IntoIterator<Item = Datum<'a>>,
423{
424 let datums = order_aggregate_datums(datums, order_by);
425
426 callers_temp_storage.reserve(datums.size_hint().0);
427 #[allow(clippy::disallowed_methods)]
428 datums
429 .into_iter()
430 .map(|d| d.unwrap_list().iter())
431 .flatten()
432 .zip(1i64..)
433 .map(|(d, i)| {
434 callers_temp_storage.make_datum(|packer| {
435 packer.push_list_with(|packer| {
436 packer.push(Datum::Int64(i));
437 packer.push(d);
438 });
439 })
440 })
441}
442
443fn rank<'a, I>(datums: I, callers_temp_storage: &'a RowArena, order_by: &[ColumnOrder]) -> Datum<'a>
447where
448 I: IntoIterator<Item = Datum<'a>>,
449{
450 let temp_storage = RowArena::new();
451 let datums = rank_no_list(datums, &temp_storage, order_by);
452
453 callers_temp_storage.make_datum(|packer| {
454 packer.push_list(datums);
455 })
456}
457
458fn rank_no_list<'a: 'b, 'b, I>(
461 datums: I,
462 callers_temp_storage: &'b RowArena,
463 order_by: &[ColumnOrder],
464) -> impl Iterator<Item = Datum<'b>>
465where
466 I: IntoIterator<Item = Datum<'a>>,
467{
468 let datums = order_aggregate_datums_with_rank(datums, order_by);
470
471 let mut datums = datums
472 .into_iter()
473 .map(|(d0, order_row)| {
474 d0.unwrap_list()
475 .iter()
476 .map(move |d1| (d1, order_row.clone()))
477 })
478 .flatten();
479
480 callers_temp_storage.reserve(datums.size_hint().0);
481 datums
482 .next()
483 .map_or(vec![], |(first_datum, first_order_row)| {
484 datums.fold(
487 (first_order_row, 1, 1, vec![(first_datum, 1)]),
488 |mut acc, (next_datum, next_order_row)| {
489 let (ref mut acc_row, ref mut acc_rank, ref mut acc_row_num, ref mut output) = acc;
490 *acc_row_num += 1;
491 if *acc_row != next_order_row {
493 *acc_rank = *acc_row_num;
494 *acc_row = next_order_row;
495 }
496
497 (*output).push((next_datum, *acc_rank));
498 acc
499 })
500 }.3).into_iter().map(|(d, i)| {
501 callers_temp_storage.make_datum(|packer| {
502 packer.push_list_with(|packer| {
503 packer.push(Datum::Int64(i));
504 packer.push(d);
505 });
506 })
507 })
508}
509
510fn dense_rank<'a, I>(
514 datums: I,
515 callers_temp_storage: &'a RowArena,
516 order_by: &[ColumnOrder],
517) -> Datum<'a>
518where
519 I: IntoIterator<Item = Datum<'a>>,
520{
521 let temp_storage = RowArena::new();
522 let datums = dense_rank_no_list(datums, &temp_storage, order_by);
523
524 callers_temp_storage.make_datum(|packer| {
525 packer.push_list(datums);
526 })
527}
528
529fn dense_rank_no_list<'a: 'b, 'b, I>(
532 datums: I,
533 callers_temp_storage: &'b RowArena,
534 order_by: &[ColumnOrder],
535) -> impl Iterator<Item = Datum<'b>>
536where
537 I: IntoIterator<Item = Datum<'a>>,
538{
539 let datums = order_aggregate_datums_with_rank(datums, order_by);
541
542 let mut datums = datums
543 .into_iter()
544 .map(|(d0, order_row)| {
545 d0.unwrap_list()
546 .iter()
547 .map(move |d1| (d1, order_row.clone()))
548 })
549 .flatten();
550
551 callers_temp_storage.reserve(datums.size_hint().0);
552 datums
553 .next()
554 .map_or(vec![], |(first_datum, first_order_row)| {
555 datums.fold(
558 (first_order_row, 1, vec![(first_datum, 1)]),
559 |mut acc, (next_datum, next_order_row)| {
560 let (ref mut acc_row, ref mut acc_rank, ref mut output) = acc;
561 if *acc_row != next_order_row {
563 *acc_rank += 1;
564 *acc_row = next_order_row;
565 }
566
567 (*output).push((next_datum, *acc_rank));
568 acc
569 })
570 }.2).into_iter().map(|(d, i)| {
571 callers_temp_storage.make_datum(|packer| {
572 packer.push_list_with(|packer| {
573 packer.push(Datum::Int64(i));
574 packer.push(d);
575 });
576 })
577 })
578}
579
580fn lag_lead<'a, I>(
602 datums: I,
603 callers_temp_storage: &'a RowArena,
604 order_by: &[ColumnOrder],
605 lag_lead_type: &LagLeadType,
606 ignore_nulls: &bool,
607) -> Datum<'a>
608where
609 I: IntoIterator<Item = Datum<'a>>,
610{
611 let temp_storage = RowArena::new();
612 let iter = lag_lead_no_list(datums, &temp_storage, order_by, lag_lead_type, ignore_nulls);
613 callers_temp_storage.make_datum(|packer| {
614 packer.push_list(iter);
615 })
616}
617
618fn lag_lead_no_list<'a: 'b, 'b, I>(
621 datums: I,
622 callers_temp_storage: &'b RowArena,
623 order_by: &[ColumnOrder],
624 lag_lead_type: &LagLeadType,
625 ignore_nulls: &bool,
626) -> impl Iterator<Item = Datum<'b>>
627where
628 I: IntoIterator<Item = Datum<'a>>,
629{
630 let datums = order_aggregate_datums(datums, order_by);
632
633 let (orig_rows, unwrapped_args): (Vec<_>, Vec<_>) = datums
637 .into_iter()
638 .map(|d| {
639 let mut iter = d.unwrap_list().iter();
640 let original_row = iter.next().unwrap();
641 let (input_value, offset, default_value) =
642 unwrap_lag_lead_encoded_args(iter.next().unwrap());
643 (original_row, (input_value, offset, default_value))
644 })
645 .unzip();
646
647 let result = lag_lead_inner(unwrapped_args, lag_lead_type, ignore_nulls);
648
649 callers_temp_storage.reserve(result.len());
650 result
651 .into_iter()
652 .zip_eq(orig_rows)
653 .map(|(result_value, original_row)| {
654 callers_temp_storage.make_datum(|packer| {
655 packer.push_list_with(|packer| {
656 packer.push(result_value);
657 packer.push(original_row);
658 });
659 })
660 })
661}
662
663fn unwrap_lag_lead_encoded_args(encoded_args: Datum) -> (Datum, Datum, Datum) {
665 let mut encoded_args_iter = encoded_args.unwrap_list().iter();
666 let (input_value, offset, default_value) = (
667 encoded_args_iter.next().unwrap(),
668 encoded_args_iter.next().unwrap(),
669 encoded_args_iter.next().unwrap(),
670 );
671 (input_value, offset, default_value)
672}
673
674fn lag_lead_inner<'a>(
677 args: Vec<(Datum<'a>, Datum<'a>, Datum<'a>)>,
678 lag_lead_type: &LagLeadType,
679 ignore_nulls: &bool,
680) -> Vec<Datum<'a>> {
681 if *ignore_nulls {
682 lag_lead_inner_ignore_nulls(args, lag_lead_type)
683 } else {
684 lag_lead_inner_respect_nulls(args, lag_lead_type)
685 }
686}
687
688fn lag_lead_inner_respect_nulls<'a>(
689 args: Vec<(Datum<'a>, Datum<'a>, Datum<'a>)>,
690 lag_lead_type: &LagLeadType,
691) -> Vec<Datum<'a>> {
692 let mut result: Vec<Datum> = Vec::with_capacity(args.len());
693 for (idx, (_, offset, default_value)) in args.iter().enumerate() {
694 if offset.is_null() {
696 result.push(Datum::Null);
697 continue;
698 }
699
700 let idx = i64::try_from(idx).expect("Array index does not fit in i64");
701 let offset = i64::from(offset.unwrap_int32());
702 let offset = match lag_lead_type {
703 LagLeadType::Lag => -offset,
704 LagLeadType::Lead => offset,
705 };
706
707 let datums_get = |i: i64| -> Option<Datum> {
709 match u64::try_from(i) {
710 Ok(i) => args
711 .get(usize::cast_from(i))
712 .map(|d| Some(d.0)) .unwrap_or(None), Err(_) => None, }
716 };
717
718 let lagged_value = datums_get(idx + offset).unwrap_or(*default_value);
719
720 result.push(lagged_value);
721 }
722
723 result
724}
725
726#[allow(clippy::as_conversions)]
730fn lag_lead_inner_ignore_nulls<'a>(
731 args: Vec<(Datum<'a>, Datum<'a>, Datum<'a>)>,
732 lag_lead_type: &LagLeadType,
733) -> Vec<Datum<'a>> {
734 if i64::try_from(args.len()).is_err() {
737 panic!("window partition way too big")
738 }
739 let mut skip_nulls_backward = vec![None; args.len()];
742 let mut last_non_null: i64 = -1;
743 let pairs = args
744 .iter()
745 .enumerate()
746 .zip_eq(skip_nulls_backward.iter_mut());
747 for ((i, (d, _, _)), slot) in pairs {
748 if d.is_null() {
749 *slot = Some(last_non_null);
750 } else {
751 last_non_null = i as i64;
752 }
753 }
754 let mut skip_nulls_forward = vec![None; args.len()];
755 let mut last_non_null: i64 = args.len() as i64;
756 let pairs = args
757 .iter()
758 .enumerate()
759 .rev()
760 .zip_eq(skip_nulls_forward.iter_mut().rev());
761 for ((i, (d, _, _)), slot) in pairs {
762 if d.is_null() {
763 *slot = Some(last_non_null);
764 } else {
765 last_non_null = i as i64;
766 }
767 }
768
769 let mut result: Vec<Datum> = Vec::with_capacity(args.len());
771 for (idx, (_, offset, default_value)) in args.iter().enumerate() {
772 if offset.is_null() {
774 result.push(Datum::Null);
775 continue;
776 }
777
778 let idx = idx as i64; let offset = i64::cast_from(offset.unwrap_int32());
780 let offset = match lag_lead_type {
781 LagLeadType::Lag => -offset,
782 LagLeadType::Lead => offset,
783 };
784 let increment = offset.signum();
785
786 let datums_get = |i: i64| -> Option<Datum> {
788 match u64::try_from(i) {
789 Ok(i) => args
790 .get(usize::cast_from(i))
791 .map(|d| Some(d.0)) .unwrap_or(None), Err(_) => None, }
795 };
796
797 let lagged_value = if increment != 0 {
798 let mut j = idx;
808 for _ in 0..num::abs(offset) {
809 j += increment;
810 if datums_get(j).is_some_and(|d| d.is_null()) {
812 let ju = j as usize; if increment > 0 {
814 j = skip_nulls_forward[ju].expect("checked above that it's null");
815 } else {
816 j = skip_nulls_backward[ju].expect("checked above that it's null");
817 }
818 }
819 if datums_get(j).is_none() {
820 break;
821 }
822 }
823 match datums_get(j) {
824 Some(datum) => datum,
825 None => *default_value,
826 }
827 } else {
828 assert_eq!(offset, 0);
829 let datum = datums_get(idx).expect("known to exist");
830 if !datum.is_null() {
831 datum
832 } else {
833 panic!("0 offset in lag/lead IGNORE NULLS");
838 }
839 };
840
841 result.push(lagged_value);
842 }
843
844 result
845}
846
847fn first_value<'a, I>(
849 datums: I,
850 callers_temp_storage: &'a RowArena,
851 order_by: &[ColumnOrder],
852 window_frame: &WindowFrame,
853) -> Datum<'a>
854where
855 I: IntoIterator<Item = Datum<'a>>,
856{
857 let temp_storage = RowArena::new();
858 let iter = first_value_no_list(datums, &temp_storage, order_by, window_frame);
859 callers_temp_storage.make_datum(|packer| {
860 packer.push_list(iter);
861 })
862}
863
864fn first_value_no_list<'a: 'b, 'b, I>(
867 datums: I,
868 callers_temp_storage: &'b RowArena,
869 order_by: &[ColumnOrder],
870 window_frame: &WindowFrame,
871) -> impl Iterator<Item = Datum<'b>>
872where
873 I: IntoIterator<Item = Datum<'a>>,
874{
875 let datums = order_aggregate_datums(datums, order_by);
877
878 let (orig_rows, args): (Vec<_>, Vec<_>) = datums
880 .into_iter()
881 .map(|d| {
882 let mut iter = d.unwrap_list().iter();
883 let original_row = iter.next().unwrap();
884 let arg = iter.next().unwrap();
885
886 (original_row, arg)
887 })
888 .unzip();
889
890 let results = first_value_inner(args, window_frame);
891
892 callers_temp_storage.reserve(results.len());
893 results
894 .into_iter()
895 .zip_eq(orig_rows)
896 .map(|(result_value, original_row)| {
897 callers_temp_storage.make_datum(|packer| {
898 packer.push_list_with(|packer| {
899 packer.push(result_value);
900 packer.push(original_row);
901 });
902 })
903 })
904}
905
906fn first_value_inner<'a>(datums: Vec<Datum<'a>>, window_frame: &WindowFrame) -> Vec<Datum<'a>> {
907 let length = datums.len();
908 let mut result: Vec<Datum> = Vec::with_capacity(length);
909 for (idx, current_datum) in datums.iter().enumerate() {
910 let first_value = match &window_frame.start_bound {
911 WindowFrameBound::CurrentRow => *current_datum,
913 WindowFrameBound::UnboundedPreceding => {
914 if let WindowFrameBound::OffsetPreceding(end_offset) = &window_frame.end_bound {
915 let end_offset = usize::cast_from(*end_offset);
916
917 if idx < end_offset {
919 Datum::Null
920 } else {
921 datums[0]
922 }
923 } else {
924 datums[0]
925 }
926 }
927 WindowFrameBound::OffsetPreceding(offset) => {
928 let start_offset = usize::cast_from(*offset);
929 let start_idx = idx.saturating_sub(start_offset);
930 if let WindowFrameBound::OffsetPreceding(end_offset) = &window_frame.end_bound {
931 let end_offset = usize::cast_from(*end_offset);
932
933 if start_offset < end_offset || idx < end_offset {
935 Datum::Null
936 } else {
937 datums[start_idx]
938 }
939 } else {
940 datums[start_idx]
941 }
942 }
943 WindowFrameBound::OffsetFollowing(offset) => {
944 let start_offset = usize::cast_from(*offset);
945 let start_idx = idx.saturating_add(start_offset);
946 if let WindowFrameBound::OffsetFollowing(end_offset) = &window_frame.end_bound {
947 if offset > end_offset || start_idx >= length {
949 Datum::Null
950 } else {
951 datums[start_idx]
952 }
953 } else {
954 datums
955 .get(start_idx)
956 .map(|d| d.clone())
957 .unwrap_or(Datum::Null)
958 }
959 }
960 WindowFrameBound::UnboundedFollowing => unreachable!(),
962 };
963 result.push(first_value);
964 }
965 result
966}
967
968fn last_value<'a, I>(
970 datums: I,
971 callers_temp_storage: &'a RowArena,
972 order_by: &[ColumnOrder],
973 window_frame: &WindowFrame,
974) -> Datum<'a>
975where
976 I: IntoIterator<Item = Datum<'a>>,
977{
978 let temp_storage = RowArena::new();
979 let iter = last_value_no_list(datums, &temp_storage, order_by, window_frame);
980 callers_temp_storage.make_datum(|packer| {
981 packer.push_list(iter);
982 })
983}
984
985fn last_value_no_list<'a: 'b, 'b, I>(
988 datums: I,
989 callers_temp_storage: &'b RowArena,
990 order_by: &[ColumnOrder],
991 window_frame: &WindowFrame,
992) -> impl Iterator<Item = Datum<'b>>
993where
994 I: IntoIterator<Item = Datum<'a>>,
995{
996 let datums = order_aggregate_datums_with_rank(datums, order_by);
999
1000 let size_hint = datums.size_hint().0;
1002 let mut args = Vec::with_capacity(size_hint);
1003 let mut original_rows = Vec::with_capacity(size_hint);
1004 let mut order_by_rows = Vec::with_capacity(size_hint);
1005 for (d, order_by_row) in datums.into_iter() {
1006 let mut iter = d.unwrap_list().iter();
1007 let original_row = iter.next().unwrap();
1008 let arg = iter.next().unwrap();
1009 order_by_rows.push(order_by_row);
1010 original_rows.push(original_row);
1011 args.push(arg);
1012 }
1013
1014 let results = last_value_inner(args, &order_by_rows, window_frame);
1015
1016 callers_temp_storage.reserve(results.len());
1017 results
1018 .into_iter()
1019 .zip_eq(original_rows)
1020 .map(|(result_value, original_row)| {
1021 callers_temp_storage.make_datum(|packer| {
1022 packer.push_list_with(|packer| {
1023 packer.push(result_value);
1024 packer.push(original_row);
1025 });
1026 })
1027 })
1028}
1029
1030fn last_value_inner<'a>(
1031 args: Vec<Datum<'a>>,
1032 order_by_rows: &Vec<Row>,
1033 window_frame: &WindowFrame,
1034) -> Vec<Datum<'a>> {
1035 let length = args.len();
1036 let mut results: Vec<Datum> = Vec::with_capacity(length);
1037 for (idx, (current_datum, order_by_row)) in args.iter().zip_eq(order_by_rows).enumerate() {
1038 let last_value = match &window_frame.end_bound {
1039 WindowFrameBound::CurrentRow => match &window_frame.units {
1040 WindowFrameUnits::Rows => *current_datum,
1042 WindowFrameUnits::Range => {
1043 let target_idx = order_by_rows[idx..]
1048 .iter()
1049 .enumerate()
1050 .take_while(|(_, row)| *row == order_by_row)
1051 .last()
1052 .unwrap()
1053 .0
1054 + idx;
1055 args[target_idx]
1056 }
1057 WindowFrameUnits::Groups => unreachable!(),
1059 },
1060 WindowFrameBound::UnboundedFollowing => {
1061 if let WindowFrameBound::OffsetFollowing(start_offset) = &window_frame.start_bound {
1062 let start_offset = usize::cast_from(*start_offset);
1063
1064 if idx + start_offset > length - 1 {
1066 Datum::Null
1067 } else {
1068 args[length - 1]
1069 }
1070 } else {
1071 args[length - 1]
1072 }
1073 }
1074 WindowFrameBound::OffsetFollowing(offset) => {
1075 let end_offset = usize::cast_from(*offset);
1076 let end_idx = idx.saturating_add(end_offset);
1077 if let WindowFrameBound::OffsetFollowing(start_offset) = &window_frame.start_bound {
1078 let start_offset = usize::cast_from(*start_offset);
1079 let start_idx = idx.saturating_add(start_offset);
1080
1081 if end_offset < start_offset || start_idx >= length {
1083 Datum::Null
1084 } else {
1085 args.get(end_idx).unwrap_or(&args[length - 1]).clone()
1087 }
1088 } else {
1089 args.get(end_idx).unwrap_or(&args[length - 1]).clone()
1090 }
1091 }
1092 WindowFrameBound::OffsetPreceding(offset) => {
1093 let end_offset = usize::cast_from(*offset);
1094 let end_idx = idx.saturating_sub(end_offset);
1095 if idx < end_offset {
1096 Datum::Null
1098 } else if let WindowFrameBound::OffsetPreceding(start_offset) =
1099 &window_frame.start_bound
1100 {
1101 if offset > start_offset {
1103 Datum::Null
1104 } else {
1105 args[end_idx]
1106 }
1107 } else {
1108 args[end_idx]
1109 }
1110 }
1111 WindowFrameBound::UnboundedPreceding => unreachable!(),
1113 };
1114 results.push(last_value);
1115 }
1116 results
1117}
1118
1119fn fused_value_window_func<'a, I>(
1125 input_datums: I,
1126 callers_temp_storage: &'a RowArena,
1127 funcs: &Vec<AggregateFunc>,
1128 order_by: &Vec<ColumnOrder>,
1129) -> Datum<'a>
1130where
1131 I: IntoIterator<Item = Datum<'a>>,
1132{
1133 let temp_storage = RowArena::new();
1134 let iter = fused_value_window_func_no_list(input_datums, &temp_storage, funcs, order_by);
1135 callers_temp_storage.make_datum(|packer| {
1136 packer.push_list(iter);
1137 })
1138}
1139
1140fn fused_value_window_func_no_list<'a: 'b, 'b, I>(
1143 input_datums: I,
1144 callers_temp_storage: &'b RowArena,
1145 funcs: &Vec<AggregateFunc>,
1146 order_by: &Vec<ColumnOrder>,
1147) -> impl Iterator<Item = Datum<'b>>
1148where
1149 I: IntoIterator<Item = Datum<'a>>,
1150{
1151 let has_last_value = funcs
1152 .iter()
1153 .any(|f| matches!(f, AggregateFunc::LastValue { .. }));
1154
1155 let input_datums_with_ranks = order_aggregate_datums_with_rank(input_datums, order_by);
1156
1157 let size_hint = input_datums_with_ranks.size_hint().0;
1158 let mut encoded_argsss = vec![Vec::with_capacity(size_hint); funcs.len()];
1159 let mut original_rows = Vec::with_capacity(size_hint);
1160 let mut order_by_rows = Vec::with_capacity(size_hint);
1161 for (d, order_by_row) in input_datums_with_ranks {
1162 let mut iter = d.unwrap_list().iter();
1163 let original_row = iter.next().unwrap();
1164 original_rows.push(original_row);
1165 let mut argss_iter = iter.next().unwrap().unwrap_list().iter();
1166 for i in 0..funcs.len() {
1167 let encoded_args = argss_iter.next().unwrap();
1168 encoded_argsss[i].push(encoded_args);
1169 }
1170 if has_last_value {
1171 order_by_rows.push(order_by_row);
1172 }
1173 }
1174
1175 let mut results_per_row = vec![Vec::with_capacity(funcs.len()); original_rows.len()];
1176 for (func, encoded_argss) in funcs.iter().zip_eq(encoded_argsss) {
1177 let results = match func {
1178 AggregateFunc::LagLead {
1179 order_by: inner_order_by,
1180 lag_lead,
1181 ignore_nulls,
1182 } => {
1183 assert_eq!(order_by, inner_order_by);
1184 let unwrapped_argss = encoded_argss
1185 .into_iter()
1186 .map(|encoded_args| unwrap_lag_lead_encoded_args(encoded_args))
1187 .collect();
1188 lag_lead_inner(unwrapped_argss, lag_lead, ignore_nulls)
1189 }
1190 AggregateFunc::FirstValue {
1191 order_by: inner_order_by,
1192 window_frame,
1193 } => {
1194 assert_eq!(order_by, inner_order_by);
1195 first_value_inner(encoded_argss, window_frame)
1198 }
1199 AggregateFunc::LastValue {
1200 order_by: inner_order_by,
1201 window_frame,
1202 } => {
1203 assert_eq!(order_by, inner_order_by);
1204 last_value_inner(encoded_argss, &order_by_rows, window_frame)
1207 }
1208 _ => panic!("unknown window function in FusedValueWindowFunc"),
1209 };
1210 for (results, result) in results_per_row.iter_mut().zip_eq(results) {
1211 results.push(result);
1212 }
1213 }
1214
1215 callers_temp_storage.reserve(2 * original_rows.len());
1216 results_per_row
1217 .into_iter()
1218 .enumerate()
1219 .map(move |(i, results)| {
1220 callers_temp_storage.make_datum(|packer| {
1221 packer.push_list_with(|packer| {
1222 packer
1223 .push(callers_temp_storage.make_datum(|packer| packer.push_list(results)));
1224 packer.push(original_rows[i]);
1225 });
1226 })
1227 })
1228}
1229
1230fn window_aggr<'a, I, A>(
1239 input_datums: I,
1240 callers_temp_storage: &'a RowArena,
1241 wrapped_aggregate: &AggregateFunc,
1242 order_by: &[ColumnOrder],
1243 window_frame: &WindowFrame,
1244) -> Datum<'a>
1245where
1246 I: IntoIterator<Item = Datum<'a>>,
1247 A: OneByOneAggr,
1248{
1249 let temp_storage = RowArena::new();
1250 let iter = window_aggr_no_list::<I, A>(
1251 input_datums,
1252 &temp_storage,
1253 wrapped_aggregate,
1254 order_by,
1255 window_frame,
1256 );
1257 callers_temp_storage.make_datum(|packer| {
1258 packer.push_list(iter);
1259 })
1260}
1261
1262fn window_aggr_no_list<'a: 'b, 'b, I, A>(
1265 input_datums: I,
1266 callers_temp_storage: &'b RowArena,
1267 wrapped_aggregate: &AggregateFunc,
1268 order_by: &[ColumnOrder],
1269 window_frame: &WindowFrame,
1270) -> impl Iterator<Item = Datum<'b>>
1271where
1272 I: IntoIterator<Item = Datum<'a>>,
1273 A: OneByOneAggr,
1274{
1275 let datums = order_aggregate_datums_with_rank(input_datums, order_by);
1278
1279 let size_hint = datums.size_hint().0;
1281 let mut args: Vec<Datum> = Vec::with_capacity(size_hint);
1282 let mut original_rows: Vec<Datum> = Vec::with_capacity(size_hint);
1283 let mut order_by_rows = Vec::with_capacity(size_hint);
1284 for (d, order_by_row) in datums.into_iter() {
1285 let mut iter = d.unwrap_list().iter();
1286 let original_row = iter.next().unwrap();
1287 let arg = iter.next().unwrap();
1288 order_by_rows.push(order_by_row);
1289 original_rows.push(original_row);
1290 args.push(arg);
1291 }
1292
1293 let results = window_aggr_inner::<A>(
1294 args,
1295 &order_by_rows,
1296 wrapped_aggregate,
1297 order_by,
1298 window_frame,
1299 callers_temp_storage,
1300 );
1301
1302 callers_temp_storage.reserve(results.len());
1303 results
1304 .into_iter()
1305 .zip_eq(original_rows)
1306 .map(|(result_value, original_row)| {
1307 callers_temp_storage.make_datum(|packer| {
1308 packer.push_list_with(|packer| {
1309 packer.push(result_value);
1310 packer.push(original_row);
1311 });
1312 })
1313 })
1314}
1315
1316fn window_aggr_inner<'a, A>(
1317 mut args: Vec<Datum<'a>>,
1318 order_by_rows: &Vec<Row>,
1319 wrapped_aggregate: &AggregateFunc,
1320 order_by: &[ColumnOrder],
1321 window_frame: &WindowFrame,
1322 temp_storage: &'a RowArena,
1323) -> Vec<Datum<'a>>
1324where
1325 A: OneByOneAggr,
1326{
1327 let length = args.len();
1328 let mut result: Vec<Datum> = Vec::with_capacity(length);
1329
1330 soft_assert_or_log!(
1336 !((matches!(window_frame.units, WindowFrameUnits::Groups)
1337 || matches!(window_frame.units, WindowFrameUnits::Range))
1338 && !window_frame.includes_current_row()),
1339 "window frame without current row"
1340 );
1341
1342 if (matches!(
1343 window_frame.start_bound,
1344 WindowFrameBound::UnboundedPreceding
1345 ) && matches!(window_frame.end_bound, WindowFrameBound::UnboundedFollowing))
1346 || (order_by.is_empty()
1347 && (matches!(window_frame.units, WindowFrameUnits::Groups)
1348 || matches!(window_frame.units, WindowFrameUnits::Range))
1349 && window_frame.includes_current_row())
1350 {
1351 let result_value = wrapped_aggregate.eval(args, temp_storage);
1358 for _ in 0..length {
1360 result.push(result_value);
1361 }
1362 } else {
1363 fn rows_between_unbounded_preceding_and_current_row<'a, A>(
1364 args: Vec<Datum<'a>>,
1365 result: &mut Vec<Datum<'a>>,
1366 mut one_by_one_aggr: A,
1367 temp_storage: &'a RowArena,
1368 ) where
1369 A: OneByOneAggr,
1370 {
1371 for current_arg in args.into_iter() {
1372 one_by_one_aggr.give(¤t_arg);
1373 let result_value = one_by_one_aggr.get_current_aggregate(temp_storage);
1374 result.push(result_value);
1375 }
1376 }
1377
1378 fn groups_between_unbounded_preceding_and_current_row<'a, A>(
1379 args: Vec<Datum<'a>>,
1380 order_by_rows: &Vec<Row>,
1381 result: &mut Vec<Datum<'a>>,
1382 mut one_by_one_aggr: A,
1383 temp_storage: &'a RowArena,
1384 ) where
1385 A: OneByOneAggr,
1386 {
1387 let mut peer_group_start = 0;
1388 while peer_group_start < args.len() {
1389 let mut peer_group_end = peer_group_start + 1;
1393 while peer_group_end < args.len()
1394 && order_by_rows[peer_group_start] == order_by_rows[peer_group_end]
1395 {
1396 peer_group_end += 1;
1398 }
1399 for current_arg in args[peer_group_start..peer_group_end].iter() {
1402 one_by_one_aggr.give(current_arg);
1403 }
1404 let agg_for_peer_group = one_by_one_aggr.get_current_aggregate(temp_storage);
1405 for _ in args[peer_group_start..peer_group_end].iter() {
1407 result.push(agg_for_peer_group);
1408 }
1409 peer_group_start = peer_group_end;
1411 }
1412 }
1413
1414 fn rows_between_offset_and_offset<'a>(
1415 args: Vec<Datum<'a>>,
1416 result: &mut Vec<Datum<'a>>,
1417 wrapped_aggregate: &AggregateFunc,
1418 temp_storage: &'a RowArena,
1419 offset_start: i64,
1420 offset_end: i64,
1421 ) {
1422 let len = args
1423 .len()
1424 .to_i64()
1425 .expect("window partition's len should fit into i64");
1426 for i in 0..len {
1427 let i = i.to_i64().expect("window partition shouldn't be super big");
1428 let frame_start = max(i + offset_start, 0)
1431 .to_usize()
1432 .expect("The max made sure it's not negative");
1433 let frame_end = min(i + offset_end, len - 1).to_usize();
1436 match frame_end {
1437 Some(frame_end) => {
1438 if frame_start <= frame_end {
1439 let frame_values = args[frame_start..=frame_end].iter().cloned();
1450 let result_value = wrapped_aggregate.eval(frame_values, temp_storage);
1451 result.push(result_value);
1452 } else {
1453 let result_value = wrapped_aggregate.default();
1455 result.push(result_value);
1456 }
1457 }
1458 None => {
1459 let result_value = wrapped_aggregate.default();
1461 result.push(result_value);
1462 }
1463 }
1464 }
1465 }
1466
1467 match (
1468 &window_frame.units,
1469 &window_frame.start_bound,
1470 &window_frame.end_bound,
1471 ) {
1472 (Rows, UnboundedPreceding, CurrentRow) => {
1477 rows_between_unbounded_preceding_and_current_row::<A>(
1478 args,
1479 &mut result,
1480 A::new(wrapped_aggregate, false),
1481 temp_storage,
1482 );
1483 }
1484 (Rows, CurrentRow, UnboundedFollowing) => {
1485 args.reverse();
1487 rows_between_unbounded_preceding_and_current_row::<A>(
1488 args,
1489 &mut result,
1490 A::new(wrapped_aggregate, true),
1491 temp_storage,
1492 );
1493 result.reverse();
1494 }
1495 (Range, UnboundedPreceding, CurrentRow) => {
1496 groups_between_unbounded_preceding_and_current_row::<A>(
1499 args,
1500 order_by_rows,
1501 &mut result,
1502 A::new(wrapped_aggregate, false),
1503 temp_storage,
1504 );
1505 }
1506 (Rows, OffsetPreceding(start_prec), OffsetPreceding(end_prec)) => {
1510 let start_prec = start_prec.to_i64().expect(
1511 "window frame start OFFSET shouldn't be super big (the planning ensured this)",
1512 );
1513 let end_prec = end_prec.to_i64().expect(
1514 "window frame end OFFSET shouldn't be super big (the planning ensured this)",
1515 );
1516 rows_between_offset_and_offset(
1517 args,
1518 &mut result,
1519 wrapped_aggregate,
1520 temp_storage,
1521 -start_prec,
1522 -end_prec,
1523 );
1524 }
1525 (Rows, OffsetPreceding(start_prec), OffsetFollowing(end_fol)) => {
1526 let start_prec = start_prec.to_i64().expect(
1527 "window frame start OFFSET shouldn't be super big (the planning ensured this)",
1528 );
1529 let end_fol = end_fol.to_i64().expect(
1530 "window frame end OFFSET shouldn't be super big (the planning ensured this)",
1531 );
1532 rows_between_offset_and_offset(
1533 args,
1534 &mut result,
1535 wrapped_aggregate,
1536 temp_storage,
1537 -start_prec,
1538 end_fol,
1539 );
1540 }
1541 (Rows, OffsetFollowing(start_fol), OffsetFollowing(end_fol)) => {
1542 let start_fol = start_fol.to_i64().expect(
1543 "window frame start OFFSET shouldn't be super big (the planning ensured this)",
1544 );
1545 let end_fol = end_fol.to_i64().expect(
1546 "window frame end OFFSET shouldn't be super big (the planning ensured this)",
1547 );
1548 rows_between_offset_and_offset(
1549 args,
1550 &mut result,
1551 wrapped_aggregate,
1552 temp_storage,
1553 start_fol,
1554 end_fol,
1555 );
1556 }
1557 (Rows, OffsetFollowing(_), OffsetPreceding(_)) => {
1558 unreachable!() }
1560 (Rows, OffsetPreceding(start_prec), CurrentRow) => {
1561 let start_prec = start_prec.to_i64().expect(
1562 "window frame start OFFSET shouldn't be super big (the planning ensured this)",
1563 );
1564 let end_fol = 0;
1565 rows_between_offset_and_offset(
1566 args,
1567 &mut result,
1568 wrapped_aggregate,
1569 temp_storage,
1570 -start_prec,
1571 end_fol,
1572 );
1573 }
1574 (Rows, CurrentRow, OffsetFollowing(end_fol)) => {
1575 let start_fol = 0;
1576 let end_fol = end_fol.to_i64().expect(
1577 "window frame end OFFSET shouldn't be super big (the planning ensured this)",
1578 );
1579 rows_between_offset_and_offset(
1580 args,
1581 &mut result,
1582 wrapped_aggregate,
1583 temp_storage,
1584 start_fol,
1585 end_fol,
1586 );
1587 }
1588 (Rows, CurrentRow, CurrentRow) => {
1589 let start_fol = 0;
1592 let end_fol = 0;
1593 rows_between_offset_and_offset(
1594 args,
1595 &mut result,
1596 wrapped_aggregate,
1597 temp_storage,
1598 start_fol,
1599 end_fol,
1600 );
1601 }
1602 (Rows, CurrentRow, OffsetPreceding(_))
1603 | (Rows, UnboundedFollowing, _)
1604 | (Rows, _, UnboundedPreceding)
1605 | (Rows, OffsetFollowing(..), CurrentRow) => {
1606 unreachable!() }
1608 (Rows, UnboundedPreceding, UnboundedFollowing) => {
1609 unreachable!()
1612 }
1613 (Rows, UnboundedPreceding, OffsetPreceding(_))
1614 | (Rows, UnboundedPreceding, OffsetFollowing(_))
1615 | (Rows, OffsetPreceding(..), UnboundedFollowing)
1616 | (Rows, OffsetFollowing(..), UnboundedFollowing) => {
1617 unreachable!()
1620 }
1621 (Range, _, _) => {
1622 unreachable!()
1629 }
1630 (Groups, _, _) => {
1631 unreachable!()
1635 }
1636 }
1637 }
1638
1639 result
1640}
1641
1642fn fused_window_aggr<'a, I, A>(
1646 input_datums: I,
1647 callers_temp_storage: &'a RowArena,
1648 wrapped_aggregates: &Vec<AggregateFunc>,
1649 order_by: &Vec<ColumnOrder>,
1650 window_frame: &WindowFrame,
1651) -> Datum<'a>
1652where
1653 I: IntoIterator<Item = Datum<'a>>,
1654 A: OneByOneAggr,
1655{
1656 let temp_storage = RowArena::new();
1657 let iter = fused_window_aggr_no_list::<_, A>(
1658 input_datums,
1659 &temp_storage,
1660 wrapped_aggregates,
1661 order_by,
1662 window_frame,
1663 );
1664 callers_temp_storage.make_datum(|packer| {
1665 packer.push_list(iter);
1666 })
1667}
1668
1669fn fused_window_aggr_no_list<'a: 'b, 'b, I, A>(
1672 input_datums: I,
1673 callers_temp_storage: &'b RowArena,
1674 wrapped_aggregates: &Vec<AggregateFunc>,
1675 order_by: &Vec<ColumnOrder>,
1676 window_frame: &WindowFrame,
1677) -> impl Iterator<Item = Datum<'b>>
1678where
1679 I: IntoIterator<Item = Datum<'a>>,
1680 A: OneByOneAggr,
1681{
1682 let datums = order_aggregate_datums_with_rank(input_datums, order_by);
1685
1686 let size_hint = datums.size_hint().0;
1687 let mut argss = vec![Vec::with_capacity(size_hint); wrapped_aggregates.len()];
1688 let mut original_rows = Vec::with_capacity(size_hint);
1689 let mut order_by_rows = Vec::with_capacity(size_hint);
1690 for (d, order_by_row) in datums {
1691 let mut iter = d.unwrap_list().iter();
1692 let original_row = iter.next().unwrap();
1693 original_rows.push(original_row);
1694 let args_iter = iter.next().unwrap().unwrap_list().iter();
1695 for (args, arg) in argss.iter_mut().zip_eq(args_iter) {
1697 args.push(arg);
1698 }
1699 order_by_rows.push(order_by_row);
1700 }
1701
1702 let mut results_per_row =
1703 vec![Vec::with_capacity(wrapped_aggregates.len()); original_rows.len()];
1704 for (wrapped_aggr, args) in wrapped_aggregates.iter().zip_eq(argss) {
1705 let results = window_aggr_inner::<A>(
1706 args,
1707 &order_by_rows,
1708 wrapped_aggr,
1709 order_by,
1710 window_frame,
1711 callers_temp_storage,
1712 );
1713 for (results, result) in results_per_row.iter_mut().zip_eq(results) {
1714 results.push(result);
1715 }
1716 }
1717
1718 callers_temp_storage.reserve(2 * original_rows.len());
1719 results_per_row
1720 .into_iter()
1721 .enumerate()
1722 .map(move |(i, results)| {
1723 callers_temp_storage.make_datum(|packer| {
1724 packer.push_list_with(|packer| {
1725 packer
1726 .push(callers_temp_storage.make_datum(|packer| packer.push_list(results)));
1727 packer.push(original_rows[i]);
1728 });
1729 })
1730 })
1731}
1732
1733pub trait OneByOneAggr {
1737 fn new(agg: &AggregateFunc, reverse: bool) -> Self;
1742 fn give(&mut self, d: &Datum);
1744 fn get_current_aggregate<'a>(&self, temp_storage: &'a RowArena) -> Datum<'a>;
1746}
1747
1748#[derive(Debug)]
1754pub struct NaiveOneByOneAggr {
1755 agg: AggregateFunc,
1756 input: Vec<Row>,
1757 reverse: bool,
1758}
1759
1760impl OneByOneAggr for NaiveOneByOneAggr {
1761 fn new(agg: &AggregateFunc, reverse: bool) -> Self {
1762 NaiveOneByOneAggr {
1763 agg: agg.clone(),
1764 input: Vec::new(),
1765 reverse,
1766 }
1767 }
1768
1769 fn give(&mut self, d: &Datum) {
1770 let mut row = Row::default();
1771 row.packer().push(d);
1772 self.input.push(row);
1773 }
1774
1775 fn get_current_aggregate<'a>(&self, temp_storage: &'a RowArena) -> Datum<'a> {
1776 temp_storage.make_datum(|packer| {
1777 packer.push(if !self.reverse {
1778 self.agg
1779 .eval(self.input.iter().map(|r| r.unpack_first()), temp_storage)
1780 } else {
1781 self.agg.eval(
1782 self.input.iter().rev().map(|r| r.unpack_first()),
1783 temp_storage,
1784 )
1785 });
1786 })
1787 }
1788}
1789
1790#[derive(
1793 Clone,
1794 Debug,
1795 Eq,
1796 PartialEq,
1797 Ord,
1798 PartialOrd,
1799 Serialize,
1800 Deserialize,
1801 Hash,
1802 MzReflect
1803)]
1804pub enum LagLeadType {
1805 Lag,
1806 Lead,
1807}
1808
1809#[derive(
1810 Clone,
1811 Debug,
1812 Eq,
1813 PartialEq,
1814 Ord,
1815 PartialOrd,
1816 Serialize,
1817 Deserialize,
1818 Hash,
1819 MzReflect
1820)]
1821pub enum AggregateFunc {
1822 MaxNumeric,
1823 MaxInt16,
1824 MaxInt32,
1825 MaxInt64,
1826 MaxUInt16,
1827 MaxUInt32,
1828 MaxUInt64,
1829 MaxMzTimestamp,
1830 MaxFloat32,
1831 MaxFloat64,
1832 MaxBool,
1833 MaxString,
1834 MaxDate,
1835 MaxTimestamp,
1836 MaxTimestampTz,
1837 MaxInterval,
1838 MaxTime,
1839 MinNumeric,
1840 MinInt16,
1841 MinInt32,
1842 MinInt64,
1843 MinUInt16,
1844 MinUInt32,
1845 MinUInt64,
1846 MinMzTimestamp,
1847 MinFloat32,
1848 MinFloat64,
1849 MinBool,
1850 MinString,
1851 MinDate,
1852 MinTimestamp,
1853 MinTimestampTz,
1854 MinInterval,
1855 MinTime,
1856 SumInt16,
1857 SumInt32,
1858 SumInt64,
1859 SumUInt16,
1860 SumUInt32,
1861 SumUInt64,
1862 SumFloat32,
1863 SumFloat64,
1864 SumNumeric,
1865 Count,
1866 Any,
1867 All,
1868 JsonbAgg {
1875 order_by: Vec<ColumnOrder>,
1876 },
1877 JsonbObjectAgg {
1884 order_by: Vec<ColumnOrder>,
1885 },
1886 MapAgg {
1890 order_by: Vec<ColumnOrder>,
1891 value_type: SqlScalarType,
1892 },
1893 ArrayConcat {
1896 order_by: Vec<ColumnOrder>,
1897 },
1898 ListConcat {
1901 order_by: Vec<ColumnOrder>,
1902 },
1903 StringAgg {
1904 order_by: Vec<ColumnOrder>,
1905 },
1906 RowNumber {
1907 order_by: Vec<ColumnOrder>,
1908 },
1909 Rank {
1910 order_by: Vec<ColumnOrder>,
1911 },
1912 DenseRank {
1913 order_by: Vec<ColumnOrder>,
1914 },
1915 LagLead {
1916 order_by: Vec<ColumnOrder>,
1917 lag_lead: LagLeadType,
1918 ignore_nulls: bool,
1919 },
1920 FirstValue {
1921 order_by: Vec<ColumnOrder>,
1922 window_frame: WindowFrame,
1923 },
1924 LastValue {
1925 order_by: Vec<ColumnOrder>,
1926 window_frame: WindowFrame,
1927 },
1928 FusedValueWindowFunc {
1930 funcs: Vec<AggregateFunc>,
1931 order_by: Vec<ColumnOrder>,
1934 },
1935 WindowAggregate {
1936 wrapped_aggregate: Box<AggregateFunc>,
1937 order_by: Vec<ColumnOrder>,
1938 window_frame: WindowFrame,
1939 },
1940 FusedWindowAggregate {
1941 wrapped_aggregates: Vec<AggregateFunc>,
1942 order_by: Vec<ColumnOrder>,
1943 window_frame: WindowFrame,
1944 },
1945 Dummy,
1950}
1951
1952impl AggregateFunc {
1953 pub fn eval<'a, I>(&self, datums: I, temp_storage: &'a RowArena) -> Datum<'a>
1954 where
1955 I: IntoIterator<Item = Datum<'a>>,
1956 {
1957 match self {
1958 AggregateFunc::MaxNumeric => {
1959 max_datum::<'a, I, OrderedDecimal<numeric::Numeric>>(datums)
1960 }
1961 AggregateFunc::MaxInt16 => max_datum::<'a, I, i16>(datums),
1962 AggregateFunc::MaxInt32 => max_datum::<'a, I, i32>(datums),
1963 AggregateFunc::MaxInt64 => max_datum::<'a, I, i64>(datums),
1964 AggregateFunc::MaxUInt16 => max_datum::<'a, I, u16>(datums),
1965 AggregateFunc::MaxUInt32 => max_datum::<'a, I, u32>(datums),
1966 AggregateFunc::MaxUInt64 => max_datum::<'a, I, u64>(datums),
1967 AggregateFunc::MaxMzTimestamp => max_datum::<'a, I, mz_repr::Timestamp>(datums),
1968 AggregateFunc::MaxFloat32 => max_datum::<'a, I, OrderedFloat<f32>>(datums),
1969 AggregateFunc::MaxFloat64 => max_datum::<'a, I, OrderedFloat<f64>>(datums),
1970 AggregateFunc::MaxBool => max_datum::<'a, I, bool>(datums),
1971 AggregateFunc::MaxString => max_string(datums),
1972 AggregateFunc::MaxDate => max_datum::<'a, I, Date>(datums),
1973 AggregateFunc::MaxTimestamp => {
1974 max_datum::<'a, I, CheckedTimestamp<NaiveDateTime>>(datums)
1975 }
1976 AggregateFunc::MaxTimestampTz => {
1977 max_datum::<'a, I, CheckedTimestamp<DateTime<Utc>>>(datums)
1978 }
1979 AggregateFunc::MaxInterval => max_datum::<'a, I, Interval>(datums),
1980 AggregateFunc::MaxTime => max_datum::<'a, I, NaiveTime>(datums),
1981 AggregateFunc::MinNumeric => {
1982 min_datum::<'a, I, OrderedDecimal<numeric::Numeric>>(datums)
1983 }
1984 AggregateFunc::MinInt16 => min_datum::<'a, I, i16>(datums),
1985 AggregateFunc::MinInt32 => min_datum::<'a, I, i32>(datums),
1986 AggregateFunc::MinInt64 => min_datum::<'a, I, i64>(datums),
1987 AggregateFunc::MinUInt16 => min_datum::<'a, I, u16>(datums),
1988 AggregateFunc::MinUInt32 => min_datum::<'a, I, u32>(datums),
1989 AggregateFunc::MinUInt64 => min_datum::<'a, I, u64>(datums),
1990 AggregateFunc::MinMzTimestamp => min_datum::<'a, I, mz_repr::Timestamp>(datums),
1991 AggregateFunc::MinFloat32 => min_datum::<'a, I, OrderedFloat<f32>>(datums),
1992 AggregateFunc::MinFloat64 => min_datum::<'a, I, OrderedFloat<f64>>(datums),
1993 AggregateFunc::MinBool => min_datum::<'a, I, bool>(datums),
1994 AggregateFunc::MinString => min_string(datums),
1995 AggregateFunc::MinDate => min_datum::<'a, I, Date>(datums),
1996 AggregateFunc::MinTimestamp => {
1997 min_datum::<'a, I, CheckedTimestamp<NaiveDateTime>>(datums)
1998 }
1999 AggregateFunc::MinTimestampTz => {
2000 min_datum::<'a, I, CheckedTimestamp<DateTime<Utc>>>(datums)
2001 }
2002 AggregateFunc::MinInterval => min_datum::<'a, I, Interval>(datums),
2003 AggregateFunc::MinTime => min_datum::<'a, I, NaiveTime>(datums),
2004 AggregateFunc::SumInt16 => sum_datum::<'a, I, i16, i64>(datums),
2005 AggregateFunc::SumInt32 => sum_datum::<'a, I, i32, i64>(datums),
2006 AggregateFunc::SumInt64 => sum_datum::<'a, I, i64, i128>(datums),
2007 AggregateFunc::SumUInt16 => sum_datum::<'a, I, u16, u64>(datums),
2008 AggregateFunc::SumUInt32 => sum_datum::<'a, I, u32, u64>(datums),
2009 AggregateFunc::SumUInt64 => sum_datum::<'a, I, u64, u128>(datums),
2010 AggregateFunc::SumFloat32 => sum_datum::<'a, I, f32, f32>(datums),
2011 AggregateFunc::SumFloat64 => sum_datum::<'a, I, f64, f64>(datums),
2012 AggregateFunc::SumNumeric => sum_numeric(datums),
2013 AggregateFunc::Count => count(datums),
2014 AggregateFunc::Any => any(datums),
2015 AggregateFunc::All => all(datums),
2016 AggregateFunc::JsonbAgg { order_by } => jsonb_agg(datums, temp_storage, order_by),
2017 AggregateFunc::MapAgg { order_by, .. } | AggregateFunc::JsonbObjectAgg { order_by } => {
2018 dict_agg(datums, temp_storage, order_by)
2019 }
2020 AggregateFunc::ArrayConcat { order_by } => array_concat(datums, temp_storage, order_by),
2021 AggregateFunc::ListConcat { order_by } => list_concat(datums, temp_storage, order_by),
2022 AggregateFunc::StringAgg { order_by } => string_agg(datums, temp_storage, order_by),
2023 AggregateFunc::RowNumber { order_by } => row_number(datums, temp_storage, order_by),
2024 AggregateFunc::Rank { order_by } => rank(datums, temp_storage, order_by),
2025 AggregateFunc::DenseRank { order_by } => dense_rank(datums, temp_storage, order_by),
2026 AggregateFunc::LagLead {
2027 order_by,
2028 lag_lead: lag_lead_type,
2029 ignore_nulls,
2030 } => lag_lead(datums, temp_storage, order_by, lag_lead_type, ignore_nulls),
2031 AggregateFunc::FirstValue {
2032 order_by,
2033 window_frame,
2034 } => first_value(datums, temp_storage, order_by, window_frame),
2035 AggregateFunc::LastValue {
2036 order_by,
2037 window_frame,
2038 } => last_value(datums, temp_storage, order_by, window_frame),
2039 AggregateFunc::WindowAggregate {
2040 wrapped_aggregate,
2041 order_by,
2042 window_frame,
2043 } => window_aggr::<_, NaiveOneByOneAggr>(
2044 datums,
2045 temp_storage,
2046 wrapped_aggregate,
2047 order_by,
2048 window_frame,
2049 ),
2050 AggregateFunc::FusedValueWindowFunc { funcs, order_by } => {
2051 fused_value_window_func(datums, temp_storage, funcs, order_by)
2052 }
2053 AggregateFunc::FusedWindowAggregate {
2054 wrapped_aggregates,
2055 order_by,
2056 window_frame,
2057 } => fused_window_aggr::<_, NaiveOneByOneAggr>(
2058 datums,
2059 temp_storage,
2060 wrapped_aggregates,
2061 order_by,
2062 window_frame,
2063 ),
2064 AggregateFunc::Dummy => Datum::Dummy,
2065 }
2066 }
2067
2068 pub fn eval_with_fast_window_agg<'a, I, W>(
2072 &self,
2073 datums: I,
2074 temp_storage: &'a RowArena,
2075 ) -> Datum<'a>
2076 where
2077 I: IntoIterator<Item = Datum<'a>>,
2078 W: OneByOneAggr,
2079 {
2080 match self {
2081 AggregateFunc::WindowAggregate {
2082 wrapped_aggregate,
2083 order_by,
2084 window_frame,
2085 } => window_aggr::<_, W>(
2086 datums,
2087 temp_storage,
2088 wrapped_aggregate,
2089 order_by,
2090 window_frame,
2091 ),
2092 AggregateFunc::FusedWindowAggregate {
2093 wrapped_aggregates,
2094 order_by,
2095 window_frame,
2096 } => fused_window_aggr::<_, W>(
2097 datums,
2098 temp_storage,
2099 wrapped_aggregates,
2100 order_by,
2101 window_frame,
2102 ),
2103 _ => self.eval(datums, temp_storage),
2104 }
2105 }
2106
2107 pub fn eval_with_unnest_list<'a, I, W>(
2108 &self,
2109 datums: I,
2110 temp_storage: &'a RowArena,
2111 ) -> impl Iterator<Item = Datum<'a>>
2112 where
2113 I: IntoIterator<Item = Datum<'a>>,
2114 W: OneByOneAggr,
2115 {
2116 assert!(self.can_fuse_with_unnest_list());
2118 match self {
2119 AggregateFunc::RowNumber { order_by } => {
2120 row_number_no_list(datums, temp_storage, order_by).collect_vec()
2121 }
2122 AggregateFunc::Rank { order_by } => {
2123 rank_no_list(datums, temp_storage, order_by).collect_vec()
2124 }
2125 AggregateFunc::DenseRank { order_by } => {
2126 dense_rank_no_list(datums, temp_storage, order_by).collect_vec()
2127 }
2128 AggregateFunc::LagLead {
2129 order_by,
2130 lag_lead: lag_lead_type,
2131 ignore_nulls,
2132 } => lag_lead_no_list(datums, temp_storage, order_by, lag_lead_type, ignore_nulls)
2133 .collect_vec(),
2134 AggregateFunc::FirstValue {
2135 order_by,
2136 window_frame,
2137 } => first_value_no_list(datums, temp_storage, order_by, window_frame).collect_vec(),
2138 AggregateFunc::LastValue {
2139 order_by,
2140 window_frame,
2141 } => last_value_no_list(datums, temp_storage, order_by, window_frame).collect_vec(),
2142 AggregateFunc::FusedValueWindowFunc { funcs, order_by } => {
2143 fused_value_window_func_no_list(datums, temp_storage, funcs, order_by).collect_vec()
2144 }
2145 AggregateFunc::WindowAggregate {
2146 wrapped_aggregate,
2147 order_by,
2148 window_frame,
2149 } => window_aggr_no_list::<_, W>(
2150 datums,
2151 temp_storage,
2152 wrapped_aggregate,
2153 order_by,
2154 window_frame,
2155 )
2156 .collect_vec(),
2157 AggregateFunc::FusedWindowAggregate {
2158 wrapped_aggregates,
2159 order_by,
2160 window_frame,
2161 } => fused_window_aggr_no_list::<_, W>(
2162 datums,
2163 temp_storage,
2164 wrapped_aggregates,
2165 order_by,
2166 window_frame,
2167 )
2168 .collect_vec(),
2169 _ => unreachable!("asserted above that `can_fuse_with_unnest_list`"),
2170 }
2171 .into_iter()
2172 }
2173
2174 pub fn default(&self) -> Datum<'static> {
2177 match self {
2178 AggregateFunc::Count => Datum::Int64(0),
2179 AggregateFunc::Any => Datum::False,
2180 AggregateFunc::All => Datum::True,
2181 AggregateFunc::Dummy => Datum::Dummy,
2182 _ => Datum::Null,
2183 }
2184 }
2185
2186 pub fn identity_datum(&self) -> Datum<'static> {
2189 match self {
2190 AggregateFunc::Any => Datum::False,
2191 AggregateFunc::All => Datum::True,
2192 AggregateFunc::Dummy => Datum::Dummy,
2193 AggregateFunc::ArrayConcat { .. } => Datum::empty_array(),
2194 AggregateFunc::ListConcat { .. } => Datum::empty_list(),
2195 AggregateFunc::RowNumber { .. }
2196 | AggregateFunc::Rank { .. }
2197 | AggregateFunc::DenseRank { .. }
2198 | AggregateFunc::LagLead { .. }
2199 | AggregateFunc::FirstValue { .. }
2200 | AggregateFunc::LastValue { .. }
2201 | AggregateFunc::WindowAggregate { .. }
2202 | AggregateFunc::FusedValueWindowFunc { .. }
2203 | AggregateFunc::FusedWindowAggregate { .. } => Datum::empty_list(),
2204 AggregateFunc::MaxNumeric
2205 | AggregateFunc::MaxInt16
2206 | AggregateFunc::MaxInt32
2207 | AggregateFunc::MaxInt64
2208 | AggregateFunc::MaxUInt16
2209 | AggregateFunc::MaxUInt32
2210 | AggregateFunc::MaxUInt64
2211 | AggregateFunc::MaxMzTimestamp
2212 | AggregateFunc::MaxFloat32
2213 | AggregateFunc::MaxFloat64
2214 | AggregateFunc::MaxBool
2215 | AggregateFunc::MaxString
2216 | AggregateFunc::MaxDate
2217 | AggregateFunc::MaxTimestamp
2218 | AggregateFunc::MaxTimestampTz
2219 | AggregateFunc::MaxInterval
2220 | AggregateFunc::MaxTime
2221 | AggregateFunc::MinNumeric
2222 | AggregateFunc::MinInt16
2223 | AggregateFunc::MinInt32
2224 | AggregateFunc::MinInt64
2225 | AggregateFunc::MinUInt16
2226 | AggregateFunc::MinUInt32
2227 | AggregateFunc::MinUInt64
2228 | AggregateFunc::MinMzTimestamp
2229 | AggregateFunc::MinFloat32
2230 | AggregateFunc::MinFloat64
2231 | AggregateFunc::MinBool
2232 | AggregateFunc::MinString
2233 | AggregateFunc::MinDate
2234 | AggregateFunc::MinTimestamp
2235 | AggregateFunc::MinTimestampTz
2236 | AggregateFunc::MinInterval
2237 | AggregateFunc::MinTime
2238 | AggregateFunc::SumInt16
2239 | AggregateFunc::SumInt32
2240 | AggregateFunc::SumInt64
2241 | AggregateFunc::SumUInt16
2242 | AggregateFunc::SumUInt32
2243 | AggregateFunc::SumUInt64
2244 | AggregateFunc::SumFloat32
2245 | AggregateFunc::SumFloat64
2246 | AggregateFunc::SumNumeric
2247 | AggregateFunc::Count
2248 | AggregateFunc::JsonbAgg { .. }
2249 | AggregateFunc::JsonbObjectAgg { .. }
2250 | AggregateFunc::MapAgg { .. }
2251 | AggregateFunc::StringAgg { .. } => Datum::Null,
2252 }
2253 }
2254
2255 pub fn can_fuse_with_unnest_list(&self) -> bool {
2256 match self {
2257 AggregateFunc::RowNumber { .. }
2258 | AggregateFunc::Rank { .. }
2259 | AggregateFunc::DenseRank { .. }
2260 | AggregateFunc::LagLead { .. }
2261 | AggregateFunc::FirstValue { .. }
2262 | AggregateFunc::LastValue { .. }
2263 | AggregateFunc::WindowAggregate { .. }
2264 | AggregateFunc::FusedValueWindowFunc { .. }
2265 | AggregateFunc::FusedWindowAggregate { .. } => true,
2266 AggregateFunc::ArrayConcat { .. }
2267 | AggregateFunc::ListConcat { .. }
2268 | AggregateFunc::Any
2269 | AggregateFunc::All
2270 | AggregateFunc::Dummy
2271 | AggregateFunc::MaxNumeric
2272 | AggregateFunc::MaxInt16
2273 | AggregateFunc::MaxInt32
2274 | AggregateFunc::MaxInt64
2275 | AggregateFunc::MaxUInt16
2276 | AggregateFunc::MaxUInt32
2277 | AggregateFunc::MaxUInt64
2278 | AggregateFunc::MaxMzTimestamp
2279 | AggregateFunc::MaxFloat32
2280 | AggregateFunc::MaxFloat64
2281 | AggregateFunc::MaxBool
2282 | AggregateFunc::MaxString
2283 | AggregateFunc::MaxDate
2284 | AggregateFunc::MaxTimestamp
2285 | AggregateFunc::MaxTimestampTz
2286 | AggregateFunc::MaxInterval
2287 | AggregateFunc::MaxTime
2288 | AggregateFunc::MinNumeric
2289 | AggregateFunc::MinInt16
2290 | AggregateFunc::MinInt32
2291 | AggregateFunc::MinInt64
2292 | AggregateFunc::MinUInt16
2293 | AggregateFunc::MinUInt32
2294 | AggregateFunc::MinUInt64
2295 | AggregateFunc::MinMzTimestamp
2296 | AggregateFunc::MinFloat32
2297 | AggregateFunc::MinFloat64
2298 | AggregateFunc::MinBool
2299 | AggregateFunc::MinString
2300 | AggregateFunc::MinDate
2301 | AggregateFunc::MinTimestamp
2302 | AggregateFunc::MinTimestampTz
2303 | AggregateFunc::MinInterval
2304 | AggregateFunc::MinTime
2305 | AggregateFunc::SumInt16
2306 | AggregateFunc::SumInt32
2307 | AggregateFunc::SumInt64
2308 | AggregateFunc::SumUInt16
2309 | AggregateFunc::SumUInt32
2310 | AggregateFunc::SumUInt64
2311 | AggregateFunc::SumFloat32
2312 | AggregateFunc::SumFloat64
2313 | AggregateFunc::SumNumeric
2314 | AggregateFunc::Count
2315 | AggregateFunc::JsonbAgg { .. }
2316 | AggregateFunc::JsonbObjectAgg { .. }
2317 | AggregateFunc::MapAgg { .. }
2318 | AggregateFunc::StringAgg { .. } => false,
2319 }
2320 }
2321
2322 pub fn output_type(&self, input_type: SqlColumnType) -> SqlColumnType {
2328 let scalar_type = match self {
2329 AggregateFunc::Count => SqlScalarType::Int64,
2330 AggregateFunc::Any => SqlScalarType::Bool,
2331 AggregateFunc::All => SqlScalarType::Bool,
2332 AggregateFunc::JsonbAgg { .. } => SqlScalarType::Jsonb,
2333 AggregateFunc::JsonbObjectAgg { .. } => SqlScalarType::Jsonb,
2334 AggregateFunc::SumInt16 => SqlScalarType::Int64,
2335 AggregateFunc::SumInt32 => SqlScalarType::Int64,
2336 AggregateFunc::SumInt64 => SqlScalarType::Numeric {
2337 max_scale: Some(NumericMaxScale::ZERO),
2338 },
2339 AggregateFunc::SumUInt16 => SqlScalarType::UInt64,
2340 AggregateFunc::SumUInt32 => SqlScalarType::UInt64,
2341 AggregateFunc::SumUInt64 => SqlScalarType::Numeric {
2342 max_scale: Some(NumericMaxScale::ZERO),
2343 },
2344 AggregateFunc::MapAgg { value_type, .. } => SqlScalarType::Map {
2345 value_type: Box::new(value_type.clone()),
2346 custom_id: None,
2347 },
2348 AggregateFunc::ArrayConcat { .. } | AggregateFunc::ListConcat { .. } => {
2349 match input_type.scalar_type {
2350 SqlScalarType::Record { ref fields, .. } => fields[0].1.scalar_type.clone(),
2352 _ => unreachable!(),
2353 }
2354 }
2355 AggregateFunc::StringAgg { .. } => SqlScalarType::String,
2356 AggregateFunc::RowNumber { .. } => {
2357 AggregateFunc::output_type_ranking_window_funcs(&input_type, "?row_number?")
2358 }
2359 AggregateFunc::Rank { .. } => {
2360 AggregateFunc::output_type_ranking_window_funcs(&input_type, "?rank?")
2361 }
2362 AggregateFunc::DenseRank { .. } => {
2363 AggregateFunc::output_type_ranking_window_funcs(&input_type, "?dense_rank?")
2364 }
2365 AggregateFunc::LagLead { lag_lead: lag_lead_type, .. } => {
2366 let fields = input_type.scalar_type.unwrap_record_element_type();
2368 let original_row_type = fields[0].unwrap_record_element_type()[0]
2369 .clone()
2370 .nullable(false);
2371 let encoded_args = fields[0].unwrap_record_element_type()[1];
2372 let output_type_inner =
2373 Self::lag_lead_output_type_inner_from_encoded_args(encoded_args);
2374 let column_name = Self::lag_lead_result_column_name(lag_lead_type);
2375
2376 SqlScalarType::List {
2377 element_type: Box::new(SqlScalarType::Record {
2378 fields: [
2379 (column_name, output_type_inner),
2380 (ColumnName::from("?orig_row?"), original_row_type),
2381 ].into(),
2382 custom_id: None,
2383 }),
2384 custom_id: None,
2385 }
2386 }
2387 AggregateFunc::FirstValue { .. } => {
2388 let fields = input_type.scalar_type.unwrap_record_element_type();
2390 let original_row_type = fields[0].unwrap_record_element_type()[0]
2391 .clone()
2392 .nullable(false);
2393 let value_type = fields[0].unwrap_record_element_type()[1]
2394 .clone()
2395 .nullable(true); SqlScalarType::List {
2398 element_type: Box::new(SqlScalarType::Record {
2399 fields: [
2400 (ColumnName::from("?first_value?"), value_type),
2401 (ColumnName::from("?orig_row?"), original_row_type),
2402 ].into(),
2403 custom_id: None,
2404 }),
2405 custom_id: None,
2406 }
2407 }
2408 AggregateFunc::LastValue { .. } => {
2409 let fields = input_type.scalar_type.unwrap_record_element_type();
2411 let original_row_type = fields[0].unwrap_record_element_type()[0]
2412 .clone()
2413 .nullable(false);
2414 let value_type = fields[0].unwrap_record_element_type()[1]
2415 .clone()
2416 .nullable(true); SqlScalarType::List {
2419 element_type: Box::new(SqlScalarType::Record {
2420 fields: [
2421 (ColumnName::from("?last_value?"), value_type),
2422 (ColumnName::from("?orig_row?"), original_row_type),
2423 ].into(),
2424 custom_id: None,
2425 }),
2426 custom_id: None,
2427 }
2428 }
2429 AggregateFunc::WindowAggregate {
2430 wrapped_aggregate, ..
2431 } => {
2432 let fields = input_type.scalar_type.unwrap_record_element_type();
2434 let original_row_type = fields[0].unwrap_record_element_type()[0]
2435 .clone()
2436 .nullable(false);
2437 let arg_type = fields[0].unwrap_record_element_type()[1]
2438 .clone()
2439 .nullable(true);
2440 let wrapped_aggr_out_type = wrapped_aggregate.output_type(arg_type);
2441
2442 SqlScalarType::List {
2443 element_type: Box::new(SqlScalarType::Record {
2444 fields: [
2445 (ColumnName::from("?window_agg?"), wrapped_aggr_out_type),
2446 (ColumnName::from("?orig_row?"), original_row_type),
2447 ].into(),
2448 custom_id: None,
2449 }),
2450 custom_id: None,
2451 }
2452 }
2453 AggregateFunc::FusedWindowAggregate {
2454 wrapped_aggregates, ..
2455 } => {
2456 let fields = input_type.scalar_type.unwrap_record_element_type();
2459 let original_row_type = fields[0].unwrap_record_element_type()[0]
2460 .clone()
2461 .nullable(false);
2462 let args_type = fields[0].unwrap_record_element_type()[1];
2463 let arg_types = args_type.unwrap_record_element_type();
2464 let out_fields = arg_types.iter().zip_eq(wrapped_aggregates).map(
2465 |(arg_type, wrapped_agg)| {
2466 (
2467 ColumnName::from(wrapped_agg.name()),
2468 wrapped_agg.output_type((**arg_type).clone().nullable(true)),
2469 )
2470 }).collect_vec();
2471
2472 SqlScalarType::List {
2473 element_type: Box::new(SqlScalarType::Record {
2474 fields: [
2475 (ColumnName::from("?fused_window_agg?"), SqlScalarType::Record {
2476 fields: out_fields.into(),
2477 custom_id: None,
2478 }.nullable(false)),
2479 (ColumnName::from("?orig_row?"), original_row_type),
2480 ].into(),
2481 custom_id: None,
2482 }),
2483 custom_id: None,
2484 }
2485 }
2486 AggregateFunc::FusedValueWindowFunc { funcs, order_by: _ } => {
2487 let fields = input_type.scalar_type.unwrap_record_element_type();
2492 let original_row_type = fields[0].unwrap_record_element_type()[0]
2493 .clone()
2494 .nullable(false);
2495 let encoded_args_type = fields[0]
2496 .unwrap_record_element_type()[1]
2497 .unwrap_record_element_type();
2498
2499 SqlScalarType::List {
2500 element_type: Box::new(SqlScalarType::Record {
2501 fields: [
2502 (
2503 ColumnName::from("?fused_value_window_func?"),
2504 SqlScalarType::Record {
2505 fields: encoded_args_type.into_iter().zip_eq(funcs).map(
2506 |(arg_type, func)| {
2507 match func {
2508 AggregateFunc::LagLead {
2509 lag_lead: lag_lead_type, ..
2510 } => {
2511 let name = Self::lag_lead_result_column_name(
2512 lag_lead_type,
2513 );
2514 let ty = Self
2515 ::lag_lead_output_type_inner_from_encoded_args(
2516 arg_type,
2517 );
2518 (name, ty)
2519 },
2520 AggregateFunc::FirstValue { .. } => {
2521 (
2522 ColumnName::from("?first_value?"),
2523 arg_type.clone().nullable(true),
2524 )
2525 }
2526 AggregateFunc::LastValue { .. } => {
2527 (
2528 ColumnName::from("?last_value?"),
2529 arg_type.clone().nullable(true),
2530 )
2531 }
2532 _ => panic!("FusedValueWindowFunc has an unknown function"),
2533 }
2534 }).collect(),
2535 custom_id: None,
2536 }.nullable(false)),
2537 (ColumnName::from("?orig_row?"), original_row_type),
2538 ].into(),
2539 custom_id: None,
2540 }),
2541 custom_id: None,
2542 }
2543 }
2544 AggregateFunc::Dummy
2545 | AggregateFunc::MaxNumeric
2546 | AggregateFunc::MaxInt16
2547 | AggregateFunc::MaxInt32
2548 | AggregateFunc::MaxInt64
2549 | AggregateFunc::MaxUInt16
2550 | AggregateFunc::MaxUInt32
2551 | AggregateFunc::MaxUInt64
2552 | AggregateFunc::MaxMzTimestamp
2553 | AggregateFunc::MaxFloat32
2554 | AggregateFunc::MaxFloat64
2555 | AggregateFunc::MaxBool
2556 | AggregateFunc::MaxString
2560 | AggregateFunc::MaxDate
2561 | AggregateFunc::MaxTimestamp
2562 | AggregateFunc::MaxTimestampTz
2563 | AggregateFunc::MaxInterval
2564 | AggregateFunc::MaxTime
2565 | AggregateFunc::MinNumeric
2566 | AggregateFunc::MinInt16
2567 | AggregateFunc::MinInt32
2568 | AggregateFunc::MinInt64
2569 | AggregateFunc::MinUInt16
2570 | AggregateFunc::MinUInt32
2571 | AggregateFunc::MinUInt64
2572 | AggregateFunc::MinMzTimestamp
2573 | AggregateFunc::MinFloat32
2574 | AggregateFunc::MinFloat64
2575 | AggregateFunc::MinBool
2576 | AggregateFunc::MinString
2577 | AggregateFunc::MinDate
2578 | AggregateFunc::MinTimestamp
2579 | AggregateFunc::MinTimestampTz
2580 | AggregateFunc::MinInterval
2581 | AggregateFunc::MinTime
2582 | AggregateFunc::SumFloat32
2583 | AggregateFunc::SumFloat64
2584 | AggregateFunc::SumNumeric => input_type.scalar_type.clone(),
2585 };
2586 let nullable = match self {
2589 AggregateFunc::Count => false,
2590 AggregateFunc::StringAgg { .. } => match input_type.scalar_type {
2592 SqlScalarType::Record { fields, .. } => match &fields[0].1.scalar_type {
2594 SqlScalarType::Record { fields, .. } => fields[0].1.nullable,
2596 _ => unreachable!(),
2597 },
2598 _ => unreachable!(),
2599 },
2600 _ => input_type.nullable,
2601 };
2602 scalar_type.nullable(nullable)
2603 }
2604
2605 fn output_type_ranking_window_funcs(
2607 input_type: &SqlColumnType,
2608 col_name: &str,
2609 ) -> SqlScalarType {
2610 match input_type.scalar_type {
2611 SqlScalarType::Record { ref fields, .. } => SqlScalarType::List {
2612 element_type: Box::new(SqlScalarType::Record {
2613 fields: [
2614 (
2615 ColumnName::from(col_name),
2616 SqlScalarType::Int64.nullable(false),
2617 ),
2618 (ColumnName::from("?orig_row?"), {
2619 let inner = match &fields[0].1.scalar_type {
2620 SqlScalarType::List { element_type, .. } => element_type.clone(),
2621 _ => unreachable!(),
2622 };
2623 inner.nullable(false)
2624 }),
2625 ]
2626 .into(),
2627 custom_id: None,
2628 }),
2629 custom_id: None,
2630 },
2631 _ => unreachable!(),
2632 }
2633 }
2634
2635 fn lag_lead_output_type_inner_from_encoded_args(
2639 encoded_args_type: &SqlScalarType,
2640 ) -> SqlColumnType {
2641 encoded_args_type.unwrap_record_element_type()[0]
2645 .clone()
2646 .nullable(true)
2647 }
2648
2649 fn lag_lead_result_column_name(lag_lead_type: &LagLeadType) -> ColumnName {
2650 ColumnName::from(match lag_lead_type {
2651 LagLeadType::Lag => "?lag?",
2652 LagLeadType::Lead => "?lead?",
2653 })
2654 }
2655
2656 pub fn propagates_nonnull_constraint(&self) -> bool {
2661 match self {
2662 AggregateFunc::MaxNumeric
2663 | AggregateFunc::MaxInt16
2664 | AggregateFunc::MaxInt32
2665 | AggregateFunc::MaxInt64
2666 | AggregateFunc::MaxUInt16
2667 | AggregateFunc::MaxUInt32
2668 | AggregateFunc::MaxUInt64
2669 | AggregateFunc::MaxMzTimestamp
2670 | AggregateFunc::MaxFloat32
2671 | AggregateFunc::MaxFloat64
2672 | AggregateFunc::MaxBool
2673 | AggregateFunc::MaxString
2674 | AggregateFunc::MaxDate
2675 | AggregateFunc::MaxTimestamp
2676 | AggregateFunc::MaxTimestampTz
2677 | AggregateFunc::MinNumeric
2678 | AggregateFunc::MinInt16
2679 | AggregateFunc::MinInt32
2680 | AggregateFunc::MinInt64
2681 | AggregateFunc::MinUInt16
2682 | AggregateFunc::MinUInt32
2683 | AggregateFunc::MinUInt64
2684 | AggregateFunc::MinMzTimestamp
2685 | AggregateFunc::MinFloat32
2686 | AggregateFunc::MinFloat64
2687 | AggregateFunc::MinBool
2688 | AggregateFunc::MinString
2689 | AggregateFunc::MinDate
2690 | AggregateFunc::MinTimestamp
2691 | AggregateFunc::MinTimestampTz
2692 | AggregateFunc::SumInt16
2693 | AggregateFunc::SumInt32
2694 | AggregateFunc::SumInt64
2695 | AggregateFunc::SumUInt16
2696 | AggregateFunc::SumUInt32
2697 | AggregateFunc::SumUInt64
2698 | AggregateFunc::SumFloat32
2699 | AggregateFunc::SumFloat64
2700 | AggregateFunc::SumNumeric
2701 | AggregateFunc::StringAgg { .. } => true,
2702 AggregateFunc::Count => false,
2704 _ => false,
2705 }
2706 }
2707}
2708
2709fn jsonb_each<'a>(a: Datum<'a>) -> impl Iterator<Item = (Row, Diff)> + 'a {
2710 let map = match a {
2712 Datum::Map(dict) => dict,
2713 _ => mz_repr::DatumMap::empty(),
2714 };
2715
2716 map.iter()
2717 .map(move |(k, v)| (Row::pack_slice(&[Datum::String(k), v]), Diff::ONE))
2718}
2719
2720fn jsonb_each_stringify<'a>(
2721 a: Datum<'a>,
2722 temp_storage: &'a RowArena,
2723) -> impl Iterator<Item = (Row, Diff)> + 'a {
2724 let map = match a {
2726 Datum::Map(dict) => dict,
2727 _ => mz_repr::DatumMap::empty(),
2728 };
2729
2730 map.iter().map(move |(k, mut v)| {
2731 v = jsonb_stringify(v, temp_storage)
2732 .map(Datum::String)
2733 .unwrap_or(Datum::Null);
2734 (Row::pack_slice(&[Datum::String(k), v]), Diff::ONE)
2735 })
2736}
2737
2738fn jsonb_object_keys<'a>(a: Datum<'a>) -> impl Iterator<Item = (Row, Diff)> + 'a {
2739 let map = match a {
2740 Datum::Map(dict) => dict,
2741 _ => mz_repr::DatumMap::empty(),
2742 };
2743
2744 map.iter()
2745 .map(move |(k, _)| (Row::pack_slice(&[Datum::String(k)]), Diff::ONE))
2746}
2747
2748fn jsonb_array_elements<'a>(a: Datum<'a>) -> impl Iterator<Item = (Row, Diff)> + 'a {
2749 let list = match a {
2750 Datum::List(list) => list,
2751 _ => mz_repr::DatumList::empty(),
2752 };
2753 list.iter().map(move |e| (Row::pack_slice(&[e]), Diff::ONE))
2754}
2755
2756fn jsonb_array_elements_stringify<'a>(
2757 a: Datum<'a>,
2758 temp_storage: &'a RowArena,
2759) -> impl Iterator<Item = (Row, Diff)> + 'a {
2760 let list = match a {
2761 Datum::List(list) => list,
2762 _ => mz_repr::DatumList::empty(),
2763 };
2764 list.iter().map(move |mut e| {
2765 e = jsonb_stringify(e, temp_storage)
2766 .map(Datum::String)
2767 .unwrap_or(Datum::Null);
2768 (Row::pack_slice(&[e]), Diff::ONE)
2769 })
2770}
2771
2772fn regexp_extract(a: Datum, r: &AnalyzedRegex) -> Option<(Row, Diff)> {
2773 let r = r.inner();
2774 let a = a.unwrap_str();
2775 let captures = r.captures(a)?;
2776 let datums = captures
2777 .iter()
2778 .skip(1)
2779 .map(|m| Datum::from(m.map(|m| m.as_str())));
2780 Some((Row::pack(datums), Diff::ONE))
2781}
2782
2783fn regexp_matches<'a, 'r: 'a>(
2784 exprs: &[Datum<'a>],
2785) -> Result<impl Iterator<Item = (Row, Diff)> + 'a, EvalError> {
2786 assert!(exprs.len() == 2 || exprs.len() == 3);
2790 let a = exprs[0].unwrap_str();
2791 let r = exprs[1].unwrap_str();
2792
2793 let (regex, opts) = if exprs.len() == 3 {
2794 let flag = exprs[2].unwrap_str();
2795 let opts = AnalyzedRegexOpts::from_str(flag)?;
2796 (AnalyzedRegex::new(r, opts)?, opts)
2797 } else {
2798 let opts = AnalyzedRegexOpts::default();
2799 (AnalyzedRegex::new(r, opts)?, opts)
2800 };
2801
2802 let regex = regex.inner().clone();
2803
2804 let iter = regex.captures_iter(a).map(move |captures| {
2805 let matches = captures
2806 .iter()
2807 .skip(1)
2809 .map(|m| Datum::from(m.map(|m| m.as_str())))
2810 .collect::<Vec<_>>();
2811
2812 let mut binding = SharedRow::get();
2813 let mut packer = binding.packer();
2814
2815 let dimension = ArrayDimension {
2816 lower_bound: 1,
2817 length: matches.len(),
2818 };
2819 packer
2820 .try_push_array(&[dimension], matches)
2821 .expect("generated dimensions above");
2822
2823 (binding.clone(), Diff::ONE)
2824 });
2825
2826 let out = iter.collect::<SmallVec<[_; 3]>>();
2831
2832 if opts.global {
2833 Ok(Either::Left(out.into_iter()))
2834 } else {
2835 Ok(Either::Right(out.into_iter().take(1)))
2836 }
2837}
2838
2839fn generate_series<N>(
2840 start: N,
2841 stop: N,
2842 step: N,
2843) -> Result<impl Iterator<Item = (Row, Diff)>, EvalError>
2844where
2845 N: Integer + Signed + CheckedAdd + Clone,
2846 Datum<'static>: From<N>,
2847{
2848 if step == N::zero() {
2849 return Err(EvalError::InvalidParameterValue(
2850 "step size cannot equal zero".into(),
2851 ));
2852 }
2853 Ok(num::range_step_inclusive(start, stop, step)
2854 .map(move |i| (Row::pack_slice(&[Datum::from(i)]), Diff::ONE)))
2855}
2856
2857#[derive(Clone)]
2861pub struct TimestampRangeStepInclusive<T> {
2862 state: CheckedTimestamp<T>,
2863 stop: CheckedTimestamp<T>,
2864 step: Interval,
2865 rev: bool,
2866 done: bool,
2867}
2868
2869impl<T: TimestampLike> Iterator for TimestampRangeStepInclusive<T> {
2870 type Item = CheckedTimestamp<T>;
2871
2872 #[inline]
2873 fn next(&mut self) -> Option<CheckedTimestamp<T>> {
2874 if !self.done
2875 && ((self.rev && self.state >= self.stop) || (!self.rev && self.state <= self.stop))
2876 {
2877 let result = self.state.clone();
2878 match add_timestamp_months(self.state.deref(), self.step.months) {
2879 Ok(state) => match state.checked_add_signed(self.step.duration_as_chrono()) {
2880 Some(v) => match CheckedTimestamp::from_timestamplike(v) {
2881 Ok(v) => self.state = v,
2882 Err(_) => self.done = true,
2883 },
2884 None => self.done = true,
2885 },
2886 Err(..) => {
2887 self.done = true;
2888 }
2889 }
2890
2891 Some(result)
2892 } else {
2893 None
2894 }
2895 }
2896}
2897
2898fn generate_series_ts<T: TimestampLike>(
2899 start: CheckedTimestamp<T>,
2900 stop: CheckedTimestamp<T>,
2901 step: Interval,
2902 conv: fn(CheckedTimestamp<T>) -> Datum<'static>,
2903) -> Result<impl Iterator<Item = (Row, Diff)>, EvalError> {
2904 let normalized_step = step.as_microseconds();
2905 if normalized_step == 0 {
2906 return Err(EvalError::InvalidParameterValue(
2907 "step size cannot equal zero".into(),
2908 ));
2909 }
2910 let rev = normalized_step < 0;
2911
2912 let trsi = TimestampRangeStepInclusive {
2913 state: start,
2914 stop,
2915 step,
2916 rev,
2917 done: false,
2918 };
2919
2920 Ok(trsi.map(move |i| (Row::pack_slice(&[conv(i)]), Diff::ONE)))
2921}
2922
2923fn generate_subscripts_array(
2924 a: Datum,
2925 dim: i32,
2926) -> Result<Box<dyn Iterator<Item = (Row, Diff)>>, EvalError> {
2927 if dim <= 0 {
2928 return Ok(Box::new(iter::empty()));
2929 }
2930
2931 match a.unwrap_array().dims().into_iter().nth(
2932 (dim - 1)
2933 .try_into()
2934 .map_err(|_| EvalError::Int32OutOfRange((dim - 1).to_string().into()))?,
2935 ) {
2936 Some(requested_dim) => Ok(Box::new(generate_series::<i32>(
2937 requested_dim.lower_bound.try_into().map_err(|_| {
2938 EvalError::Int32OutOfRange(requested_dim.lower_bound.to_string().into())
2939 })?,
2940 requested_dim
2941 .length
2942 .try_into()
2943 .map_err(|_| EvalError::Int32OutOfRange(requested_dim.length.to_string().into()))?,
2944 1,
2945 )?)),
2946 None => Ok(Box::new(iter::empty())),
2947 }
2948}
2949
2950fn unnest_array<'a>(a: Datum<'a>) -> impl Iterator<Item = (Row, Diff)> + 'a {
2951 a.unwrap_array()
2952 .elements()
2953 .iter()
2954 .map(move |e| (Row::pack_slice(&[e]), Diff::ONE))
2955}
2956
2957fn unnest_list<'a>(a: Datum<'a>) -> impl Iterator<Item = (Row, Diff)> + 'a {
2958 a.unwrap_list()
2959 .iter()
2960 .map(move |e| (Row::pack_slice(&[e]), Diff::ONE))
2961}
2962
2963fn unnest_map<'a>(a: Datum<'a>) -> impl Iterator<Item = (Row, Diff)> + 'a {
2964 a.unwrap_map()
2965 .iter()
2966 .map(move |(k, v)| (Row::pack_slice(&[Datum::from(k), v]), Diff::ONE))
2967}
2968
2969impl AggregateFunc {
2970 pub fn name(&self) -> &'static str {
2973 match self {
2974 Self::MaxNumeric => "max",
2975 Self::MaxInt16 => "max",
2976 Self::MaxInt32 => "max",
2977 Self::MaxInt64 => "max",
2978 Self::MaxUInt16 => "max",
2979 Self::MaxUInt32 => "max",
2980 Self::MaxUInt64 => "max",
2981 Self::MaxMzTimestamp => "max",
2982 Self::MaxFloat32 => "max",
2983 Self::MaxFloat64 => "max",
2984 Self::MaxBool => "max",
2985 Self::MaxString => "max",
2986 Self::MaxDate => "max",
2987 Self::MaxTimestamp => "max",
2988 Self::MaxTimestampTz => "max",
2989 Self::MaxInterval => "max",
2990 Self::MaxTime => "max",
2991 Self::MinNumeric => "min",
2992 Self::MinInt16 => "min",
2993 Self::MinInt32 => "min",
2994 Self::MinInt64 => "min",
2995 Self::MinUInt16 => "min",
2996 Self::MinUInt32 => "min",
2997 Self::MinUInt64 => "min",
2998 Self::MinMzTimestamp => "min",
2999 Self::MinFloat32 => "min",
3000 Self::MinFloat64 => "min",
3001 Self::MinBool => "min",
3002 Self::MinString => "min",
3003 Self::MinDate => "min",
3004 Self::MinTimestamp => "min",
3005 Self::MinTimestampTz => "min",
3006 Self::MinInterval => "min",
3007 Self::MinTime => "min",
3008 Self::SumInt16 => "sum",
3009 Self::SumInt32 => "sum",
3010 Self::SumInt64 => "sum",
3011 Self::SumUInt16 => "sum",
3012 Self::SumUInt32 => "sum",
3013 Self::SumUInt64 => "sum",
3014 Self::SumFloat32 => "sum",
3015 Self::SumFloat64 => "sum",
3016 Self::SumNumeric => "sum",
3017 Self::Count => "count",
3018 Self::Any => "any",
3019 Self::All => "all",
3020 Self::JsonbAgg { .. } => "jsonb_agg",
3021 Self::JsonbObjectAgg { .. } => "jsonb_object_agg",
3022 Self::MapAgg { .. } => "map_agg",
3023 Self::ArrayConcat { .. } => "array_agg",
3024 Self::ListConcat { .. } => "list_agg",
3025 Self::StringAgg { .. } => "string_agg",
3026 Self::RowNumber { .. } => "row_number",
3027 Self::Rank { .. } => "rank",
3028 Self::DenseRank { .. } => "dense_rank",
3029 Self::LagLead {
3030 lag_lead: LagLeadType::Lag,
3031 ..
3032 } => "lag",
3033 Self::LagLead {
3034 lag_lead: LagLeadType::Lead,
3035 ..
3036 } => "lead",
3037 Self::FirstValue { .. } => "first_value",
3038 Self::LastValue { .. } => "last_value",
3039 Self::WindowAggregate { .. } => "window_agg",
3040 Self::FusedValueWindowFunc { .. } => "fused_value_window_func",
3041 Self::FusedWindowAggregate { .. } => "fused_window_agg",
3042 Self::Dummy => "dummy",
3043 }
3044 }
3045}
3046
3047impl<'a, M> fmt::Display for HumanizedExpr<'a, AggregateFunc, M>
3048where
3049 M: HumanizerMode,
3050{
3051 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
3052 use AggregateFunc::*;
3053 let name = self.expr.name();
3054 match self.expr {
3055 JsonbAgg { order_by }
3056 | JsonbObjectAgg { order_by }
3057 | MapAgg { order_by, .. }
3058 | ArrayConcat { order_by }
3059 | ListConcat { order_by }
3060 | StringAgg { order_by }
3061 | RowNumber { order_by }
3062 | Rank { order_by }
3063 | DenseRank { order_by } => {
3064 let order_by = order_by.iter().map(|col| self.child(col));
3065 write!(f, "{}[order_by=[{}]]", name, separated(", ", order_by))
3066 }
3067 LagLead {
3068 lag_lead: _,
3069 ignore_nulls,
3070 order_by,
3071 } => {
3072 let order_by = order_by.iter().map(|col| self.child(col));
3073 f.write_str(name)?;
3074 f.write_str("[")?;
3075 if *ignore_nulls {
3076 f.write_str("ignore_nulls=true, ")?;
3077 }
3078 write!(f, "order_by=[{}]", separated(", ", order_by))?;
3079 f.write_str("]")
3080 }
3081 FirstValue {
3082 order_by,
3083 window_frame,
3084 } => {
3085 let order_by = order_by.iter().map(|col| self.child(col));
3086 f.write_str(name)?;
3087 f.write_str("[")?;
3088 write!(f, "order_by=[{}]", separated(", ", order_by))?;
3089 if *window_frame != WindowFrame::default() {
3090 write!(f, " {}", window_frame)?;
3091 }
3092 f.write_str("]")
3093 }
3094 LastValue {
3095 order_by,
3096 window_frame,
3097 } => {
3098 let order_by = order_by.iter().map(|col| self.child(col));
3099 f.write_str(name)?;
3100 f.write_str("[")?;
3101 write!(f, "order_by=[{}]", separated(", ", order_by))?;
3102 if *window_frame != WindowFrame::default() {
3103 write!(f, " {}", window_frame)?;
3104 }
3105 f.write_str("]")
3106 }
3107 WindowAggregate {
3108 wrapped_aggregate,
3109 order_by,
3110 window_frame,
3111 } => {
3112 let order_by = order_by.iter().map(|col| self.child(col));
3113 let wrapped_aggregate = self.child(wrapped_aggregate.deref());
3114 f.write_str(name)?;
3115 f.write_str("[")?;
3116 write!(f, "{} ", wrapped_aggregate)?;
3117 write!(f, "order_by=[{}]", separated(", ", order_by))?;
3118 if *window_frame != WindowFrame::default() {
3119 write!(f, " {}", window_frame)?;
3120 }
3121 f.write_str("]")
3122 }
3123 FusedValueWindowFunc { funcs, order_by } => {
3124 let order_by = order_by.iter().map(|col| self.child(col));
3125 let funcs = separated(", ", funcs.iter().map(|func| self.child(func)));
3126 f.write_str(name)?;
3127 f.write_str("[")?;
3128 write!(f, "{} ", funcs)?;
3129 write!(f, "order_by=[{}]", separated(", ", order_by))?;
3130 f.write_str("]")
3131 }
3132 _ => f.write_str(name),
3133 }
3134 }
3135}
3136
3137#[derive(
3138 Clone,
3139 Debug,
3140 Eq,
3141 PartialEq,
3142 Ord,
3143 PartialOrd,
3144 Serialize,
3145 Deserialize,
3146 Hash,
3147 MzReflect
3148)]
3149pub struct CaptureGroupDesc {
3150 pub index: u32,
3151 pub name: Option<String>,
3152 pub nullable: bool,
3153}
3154
3155#[derive(
3156 Clone,
3157 Copy,
3158 Debug,
3159 Eq,
3160 PartialEq,
3161 Ord,
3162 PartialOrd,
3163 Serialize,
3164 Deserialize,
3165 Hash,
3166 MzReflect,
3167 Default
3168)]
3169pub struct AnalyzedRegexOpts {
3170 pub case_insensitive: bool,
3171 pub global: bool,
3172}
3173
3174impl FromStr for AnalyzedRegexOpts {
3175 type Err = EvalError;
3176
3177 fn from_str(s: &str) -> Result<Self, Self::Err> {
3178 let mut opts = AnalyzedRegexOpts::default();
3179 for c in s.chars() {
3180 match c {
3181 'i' => opts.case_insensitive = true,
3182 'g' => opts.global = true,
3183 _ => return Err(EvalError::InvalidRegexFlag(c)),
3184 }
3185 }
3186 Ok(opts)
3187 }
3188}
3189
3190#[derive(
3191 Clone,
3192 Debug,
3193 Eq,
3194 PartialEq,
3195 Ord,
3196 PartialOrd,
3197 Serialize,
3198 Deserialize,
3199 Hash,
3200 MzReflect
3201)]
3202pub struct AnalyzedRegex(ReprRegex, Vec<CaptureGroupDesc>, AnalyzedRegexOpts);
3203
3204impl AnalyzedRegex {
3205 pub fn new(s: &str, opts: AnalyzedRegexOpts) -> Result<Self, RegexCompilationError> {
3206 let r = ReprRegex::new(s, opts.case_insensitive)?;
3207 #[allow(clippy::as_conversions)]
3209 let descs: Vec<_> = r
3210 .capture_names()
3211 .enumerate()
3212 .skip(1)
3217 .map(|(i, name)| CaptureGroupDesc {
3218 index: i as u32,
3219 name: name.map(String::from),
3220 nullable: true,
3223 })
3224 .collect();
3225 Ok(Self(r, descs, opts))
3226 }
3227 pub fn capture_groups_len(&self) -> usize {
3228 self.1.len()
3229 }
3230 pub fn capture_groups_iter(&self) -> impl Iterator<Item = &CaptureGroupDesc> {
3231 self.1.iter()
3232 }
3233 pub fn inner(&self) -> &Regex {
3234 &(self.0).regex
3235 }
3236 pub fn opts(&self) -> &AnalyzedRegexOpts {
3237 &self.2
3238 }
3239}
3240
3241pub fn csv_extract(a: Datum<'_>, n_cols: usize) -> impl Iterator<Item = (Row, Diff)> + '_ {
3242 let bytes = a.unwrap_str().as_bytes();
3243 let mut row = Row::default();
3244 let csv_reader = csv::ReaderBuilder::new()
3245 .has_headers(false)
3246 .from_reader(bytes);
3247 csv_reader.into_records().filter_map(move |res| match res {
3248 Ok(sr) if sr.len() == n_cols => {
3249 row.packer().extend(sr.iter().map(Datum::String));
3250 Some((row.clone(), Diff::ONE))
3251 }
3252 _ => None,
3253 })
3254}
3255
3256pub fn repeat(a: Datum) -> Option<(Row, Diff)> {
3257 let n = a.unwrap_int64();
3258 if n != 0 {
3259 Some((Row::default(), n.into()))
3260 } else {
3261 None
3262 }
3263}
3264
3265fn wrap<'a>(datums: &'a [Datum<'a>], width: usize) -> impl Iterator<Item = (Row, Diff)> + 'a {
3266 datums
3267 .chunks(width)
3268 .map(|chunk| (Row::pack(chunk), Diff::ONE))
3269}
3270
3271fn acl_explode<'a>(
3272 acl_items: Datum<'a>,
3273 temp_storage: &'a RowArena,
3274) -> Result<impl Iterator<Item = (Row, Diff)> + 'a, EvalError> {
3275 let acl_items = acl_items.unwrap_array();
3276 let mut res = Vec::new();
3277 for acl_item in acl_items.elements().iter() {
3278 if acl_item.is_null() {
3279 return Err(EvalError::AclArrayNullElement);
3280 }
3281 let acl_item = acl_item.unwrap_acl_item();
3282 for privilege in acl_item.acl_mode.explode() {
3283 let row = [
3284 Datum::UInt32(acl_item.grantor.0),
3285 Datum::UInt32(acl_item.grantee.0),
3286 Datum::String(temp_storage.push_string(privilege.to_string())),
3287 Datum::False,
3289 ];
3290 res.push((Row::pack_slice(&row), Diff::ONE));
3291 }
3292 }
3293 Ok(res.into_iter())
3294}
3295
3296fn mz_acl_explode<'a>(
3297 mz_acl_items: Datum<'a>,
3298 temp_storage: &'a RowArena,
3299) -> Result<impl Iterator<Item = (Row, Diff)> + 'a, EvalError> {
3300 let mz_acl_items = mz_acl_items.unwrap_array();
3301 let mut res = Vec::new();
3302 for mz_acl_item in mz_acl_items.elements().iter() {
3303 if mz_acl_item.is_null() {
3304 return Err(EvalError::MzAclArrayNullElement);
3305 }
3306 let mz_acl_item = mz_acl_item.unwrap_mz_acl_item();
3307 for privilege in mz_acl_item.acl_mode.explode() {
3308 let row = [
3309 Datum::String(temp_storage.push_string(mz_acl_item.grantor.to_string())),
3310 Datum::String(temp_storage.push_string(mz_acl_item.grantee.to_string())),
3311 Datum::String(temp_storage.push_string(privilege.to_string())),
3312 Datum::False,
3314 ];
3315 res.push((Row::pack_slice(&row), Diff::ONE));
3316 }
3317 }
3318 Ok(res.into_iter())
3319}
3320
3321#[derive(
3324 Clone,
3325 Debug,
3326 Eq,
3327 PartialEq,
3328 Ord,
3329 PartialOrd,
3330 Serialize,
3331 Deserialize,
3332 Hash,
3333 MzReflect
3334)]
3335pub enum TableFunc {
3336 AclExplode,
3337 MzAclExplode,
3338 JsonbEach,
3339 JsonbEachStringify,
3340 JsonbObjectKeys,
3341 JsonbArrayElements,
3342 JsonbArrayElementsStringify,
3343 RegexpExtract(AnalyzedRegex),
3344 CsvExtract(usize),
3345 GenerateSeriesInt32,
3346 GenerateSeriesInt64,
3347 GenerateSeriesTimestamp,
3348 GenerateSeriesTimestampTz,
3349 GuardSubquerySize {
3370 column_type: SqlScalarType,
3371 },
3372 Repeat,
3373 UnnestArray {
3374 el_typ: SqlScalarType,
3375 },
3376 UnnestList {
3377 el_typ: SqlScalarType,
3378 },
3379 UnnestMap {
3380 value_type: SqlScalarType,
3381 },
3382 Wrap {
3388 types: Vec<SqlColumnType>,
3389 width: usize,
3390 },
3391 GenerateSubscriptsArray,
3392 TabletizedScalar {
3394 name: String,
3395 relation: SqlRelationType,
3396 },
3397 RegexpMatches,
3398 #[allow(private_interfaces)]
3403 WithOrdinality(WithOrdinality),
3404}
3405
3406#[derive(
3414 Clone,
3415 Debug,
3416 Eq,
3417 PartialEq,
3418 Ord,
3419 PartialOrd,
3420 Serialize,
3421 Deserialize,
3422 Hash,
3423 MzReflect
3424)]
3425struct WithOrdinality {
3426 inner: Box<TableFunc>,
3427}
3428
3429impl TableFunc {
3430 pub fn with_ordinality(inner: TableFunc) -> Option<TableFunc> {
3432 match inner {
3433 TableFunc::AclExplode
3434 | TableFunc::MzAclExplode
3435 | TableFunc::JsonbEach
3436 | TableFunc::JsonbEachStringify
3437 | TableFunc::JsonbObjectKeys
3438 | TableFunc::JsonbArrayElements
3439 | TableFunc::JsonbArrayElementsStringify
3440 | TableFunc::RegexpExtract(_)
3441 | TableFunc::CsvExtract(_)
3442 | TableFunc::GenerateSeriesInt32
3443 | TableFunc::GenerateSeriesInt64
3444 | TableFunc::GenerateSeriesTimestamp
3445 | TableFunc::GenerateSeriesTimestampTz
3446 | TableFunc::GuardSubquerySize { .. }
3447 | TableFunc::Repeat
3448 | TableFunc::UnnestArray { .. }
3449 | TableFunc::UnnestList { .. }
3450 | TableFunc::UnnestMap { .. }
3451 | TableFunc::Wrap { .. }
3452 | TableFunc::GenerateSubscriptsArray
3453 | TableFunc::TabletizedScalar { .. }
3454 | TableFunc::RegexpMatches => Some(TableFunc::WithOrdinality(WithOrdinality {
3455 inner: Box::new(inner),
3456 })),
3457 TableFunc::WithOrdinality(_) => None,
3460 }
3461 }
3462}
3463
3464impl TableFunc {
3465 pub fn eval<'a>(
3467 &'a self,
3468 datums: &'a [Datum<'a>],
3469 temp_storage: &'a RowArena,
3470 ) -> Result<Box<dyn Iterator<Item = (Row, Diff)> + 'a>, EvalError> {
3471 if self.empty_on_null_input() && datums.iter().any(|d| d.is_null()) {
3472 return Ok(Box::new(vec![].into_iter()));
3473 }
3474 match self {
3475 TableFunc::AclExplode => Ok(Box::new(acl_explode(datums[0], temp_storage)?)),
3476 TableFunc::MzAclExplode => Ok(Box::new(mz_acl_explode(datums[0], temp_storage)?)),
3477 TableFunc::JsonbEach => Ok(Box::new(jsonb_each(datums[0]))),
3478 TableFunc::JsonbEachStringify => {
3479 Ok(Box::new(jsonb_each_stringify(datums[0], temp_storage)))
3480 }
3481 TableFunc::JsonbObjectKeys => Ok(Box::new(jsonb_object_keys(datums[0]))),
3482 TableFunc::JsonbArrayElements => Ok(Box::new(jsonb_array_elements(datums[0]))),
3483 TableFunc::JsonbArrayElementsStringify => Ok(Box::new(jsonb_array_elements_stringify(
3484 datums[0],
3485 temp_storage,
3486 ))),
3487 TableFunc::RegexpExtract(a) => Ok(Box::new(regexp_extract(datums[0], a).into_iter())),
3488 TableFunc::CsvExtract(n_cols) => Ok(Box::new(csv_extract(datums[0], *n_cols))),
3489 TableFunc::GenerateSeriesInt32 => {
3490 let res = generate_series(
3491 datums[0].unwrap_int32(),
3492 datums[1].unwrap_int32(),
3493 datums[2].unwrap_int32(),
3494 )?;
3495 Ok(Box::new(res))
3496 }
3497 TableFunc::GenerateSeriesInt64 => {
3498 let res = generate_series(
3499 datums[0].unwrap_int64(),
3500 datums[1].unwrap_int64(),
3501 datums[2].unwrap_int64(),
3502 )?;
3503 Ok(Box::new(res))
3504 }
3505 TableFunc::GenerateSeriesTimestamp => {
3506 fn pass_through<'a>(d: CheckedTimestamp<NaiveDateTime>) -> Datum<'a> {
3507 Datum::from(d)
3508 }
3509 let res = generate_series_ts(
3510 datums[0].unwrap_timestamp(),
3511 datums[1].unwrap_timestamp(),
3512 datums[2].unwrap_interval(),
3513 pass_through,
3514 )?;
3515 Ok(Box::new(res))
3516 }
3517 TableFunc::GenerateSeriesTimestampTz => {
3518 fn gen_ts_tz<'a>(d: CheckedTimestamp<DateTime<Utc>>) -> Datum<'a> {
3519 Datum::from(d)
3520 }
3521 let res = generate_series_ts(
3522 datums[0].unwrap_timestamptz(),
3523 datums[1].unwrap_timestamptz(),
3524 datums[2].unwrap_interval(),
3525 gen_ts_tz,
3526 )?;
3527 Ok(Box::new(res))
3528 }
3529 TableFunc::GenerateSubscriptsArray => {
3530 generate_subscripts_array(datums[0], datums[1].unwrap_int32())
3531 }
3532 TableFunc::GuardSubquerySize { column_type: _ } => {
3533 let count = datums[0].unwrap_int64();
3536 if count != 1 {
3537 Err(EvalError::MultipleRowsFromSubquery)
3538 } else {
3539 Ok(Box::new([].into_iter()))
3540 }
3541 }
3542 TableFunc::Repeat => Ok(Box::new(repeat(datums[0]).into_iter())),
3543 TableFunc::UnnestArray { .. } => Ok(Box::new(unnest_array(datums[0]))),
3544 TableFunc::UnnestList { .. } => Ok(Box::new(unnest_list(datums[0]))),
3545 TableFunc::UnnestMap { .. } => Ok(Box::new(unnest_map(datums[0]))),
3546 TableFunc::Wrap { width, .. } => Ok(Box::new(wrap(datums, *width))),
3547 TableFunc::TabletizedScalar { .. } => {
3548 let r = Row::pack_slice(datums);
3549 Ok(Box::new(std::iter::once((r, Diff::ONE))))
3550 }
3551 TableFunc::RegexpMatches => Ok(Box::new(regexp_matches(datums)?)),
3552 TableFunc::WithOrdinality(func_with_ordinality) => {
3553 func_with_ordinality.eval(datums, temp_storage)
3554 }
3555 }
3556 }
3557
3558 pub fn output_type(&self) -> SqlRelationType {
3559 let (column_types, keys) = match self {
3560 TableFunc::AclExplode => {
3561 let column_types = vec![
3562 SqlScalarType::Oid.nullable(false),
3563 SqlScalarType::Oid.nullable(false),
3564 SqlScalarType::String.nullable(false),
3565 SqlScalarType::Bool.nullable(false),
3566 ];
3567 let keys = vec![];
3568 (column_types, keys)
3569 }
3570 TableFunc::MzAclExplode => {
3571 let column_types = vec![
3572 SqlScalarType::String.nullable(false),
3573 SqlScalarType::String.nullable(false),
3574 SqlScalarType::String.nullable(false),
3575 SqlScalarType::Bool.nullable(false),
3576 ];
3577 let keys = vec![];
3578 (column_types, keys)
3579 }
3580 TableFunc::JsonbEach => {
3581 let column_types = vec![
3582 SqlScalarType::String.nullable(false),
3583 SqlScalarType::Jsonb.nullable(false),
3584 ];
3585 let keys = vec![];
3586 (column_types, keys)
3587 }
3588 TableFunc::JsonbEachStringify => {
3589 let column_types = vec![
3590 SqlScalarType::String.nullable(false),
3591 SqlScalarType::String.nullable(true),
3592 ];
3593 let keys = vec![];
3594 (column_types, keys)
3595 }
3596 TableFunc::JsonbObjectKeys => {
3597 let column_types = vec![SqlScalarType::String.nullable(false)];
3598 let keys = vec![];
3599 (column_types, keys)
3600 }
3601 TableFunc::JsonbArrayElements => {
3602 let column_types = vec![SqlScalarType::Jsonb.nullable(false)];
3603 let keys = vec![];
3604 (column_types, keys)
3605 }
3606 TableFunc::JsonbArrayElementsStringify => {
3607 let column_types = vec![SqlScalarType::String.nullable(true)];
3608 let keys = vec![];
3609 (column_types, keys)
3610 }
3611 TableFunc::RegexpExtract(a) => {
3612 let column_types = a
3613 .capture_groups_iter()
3614 .map(|cg| SqlScalarType::String.nullable(cg.nullable))
3615 .collect();
3616 let keys = vec![];
3617 (column_types, keys)
3618 }
3619 TableFunc::CsvExtract(n_cols) => {
3620 let column_types = iter::repeat(SqlScalarType::String.nullable(false))
3621 .take(*n_cols)
3622 .collect();
3623 let keys = vec![];
3624 (column_types, keys)
3625 }
3626 TableFunc::GenerateSeriesInt32 => {
3627 let column_types = vec![SqlScalarType::Int32.nullable(false)];
3628 let keys = vec![vec![0]];
3629 (column_types, keys)
3630 }
3631 TableFunc::GenerateSeriesInt64 => {
3632 let column_types = vec![SqlScalarType::Int64.nullable(false)];
3633 let keys = vec![vec![0]];
3634 (column_types, keys)
3635 }
3636 TableFunc::GenerateSeriesTimestamp => {
3637 let column_types =
3638 vec![SqlScalarType::Timestamp { precision: None }.nullable(false)];
3639 let keys = vec![vec![0]];
3640 (column_types, keys)
3641 }
3642 TableFunc::GenerateSeriesTimestampTz => {
3643 let column_types =
3644 vec![SqlScalarType::TimestampTz { precision: None }.nullable(false)];
3645 let keys = vec![vec![0]];
3646 (column_types, keys)
3647 }
3648 TableFunc::GenerateSubscriptsArray => {
3649 let column_types = vec![SqlScalarType::Int32.nullable(false)];
3650 let keys = vec![vec![0]];
3651 (column_types, keys)
3652 }
3653 TableFunc::GuardSubquerySize { column_type } => {
3654 let column_types = vec![column_type.clone().nullable(false)];
3655 let keys = vec![];
3656 (column_types, keys)
3657 }
3658 TableFunc::Repeat => {
3659 let column_types = vec![];
3660 let keys = vec![];
3661 (column_types, keys)
3662 }
3663 TableFunc::UnnestArray { el_typ } => {
3664 let column_types = vec![el_typ.clone().nullable(true)];
3665 let keys = vec![];
3666 (column_types, keys)
3667 }
3668 TableFunc::UnnestList { el_typ } => {
3669 let column_types = vec![el_typ.clone().nullable(true)];
3670 let keys = vec![];
3671 (column_types, keys)
3672 }
3673 TableFunc::UnnestMap { value_type } => {
3674 let column_types = vec![
3675 SqlScalarType::String.nullable(false),
3676 value_type.clone().nullable(true),
3677 ];
3678 let keys = vec![vec![0]];
3679 (column_types, keys)
3680 }
3681 TableFunc::Wrap { types, .. } => {
3682 let column_types = types.clone();
3683 let keys = vec![];
3684 (column_types, keys)
3685 }
3686 TableFunc::TabletizedScalar { relation, .. } => {
3687 return relation.clone();
3688 }
3689 TableFunc::RegexpMatches => {
3690 let column_types =
3691 vec![SqlScalarType::Array(Box::new(SqlScalarType::String)).nullable(false)];
3692 let keys = vec![];
3693
3694 (column_types, keys)
3695 }
3696 TableFunc::WithOrdinality(WithOrdinality { inner }) => {
3697 let mut typ = inner.output_type();
3698 typ.column_types.push(SqlScalarType::Int64.nullable(false));
3700 typ.keys.push(vec![typ.column_types.len() - 1]);
3702 (typ.column_types, typ.keys)
3703 }
3704 };
3705
3706 soft_assert_eq_no_log!(column_types.len(), self.output_arity());
3707
3708 if !keys.is_empty() {
3709 SqlRelationType::new(column_types).with_keys(keys)
3710 } else {
3711 SqlRelationType::new(column_types)
3712 }
3713 }
3714
3715 pub fn output_arity(&self) -> usize {
3716 match self {
3717 TableFunc::AclExplode => 4,
3718 TableFunc::MzAclExplode => 4,
3719 TableFunc::JsonbEach => 2,
3720 TableFunc::JsonbEachStringify => 2,
3721 TableFunc::JsonbObjectKeys => 1,
3722 TableFunc::JsonbArrayElements => 1,
3723 TableFunc::JsonbArrayElementsStringify => 1,
3724 TableFunc::RegexpExtract(a) => a.capture_groups_len(),
3725 TableFunc::CsvExtract(n_cols) => *n_cols,
3726 TableFunc::GenerateSeriesInt32 => 1,
3727 TableFunc::GenerateSeriesInt64 => 1,
3728 TableFunc::GenerateSeriesTimestamp => 1,
3729 TableFunc::GenerateSeriesTimestampTz => 1,
3730 TableFunc::GenerateSubscriptsArray => 1,
3731 TableFunc::GuardSubquerySize { .. } => 1,
3732 TableFunc::Repeat => 0,
3733 TableFunc::UnnestArray { .. } => 1,
3734 TableFunc::UnnestList { .. } => 1,
3735 TableFunc::UnnestMap { .. } => 2,
3736 TableFunc::Wrap { width, .. } => *width,
3737 TableFunc::TabletizedScalar { relation, .. } => relation.column_types.len(),
3738 TableFunc::RegexpMatches => 1,
3739 TableFunc::WithOrdinality(WithOrdinality { inner }) => inner.output_arity() + 1,
3740 }
3741 }
3742
3743 pub fn empty_on_null_input(&self) -> bool {
3744 match self {
3745 TableFunc::AclExplode
3746 | TableFunc::MzAclExplode
3747 | TableFunc::JsonbEach
3748 | TableFunc::JsonbEachStringify
3749 | TableFunc::JsonbObjectKeys
3750 | TableFunc::JsonbArrayElements
3751 | TableFunc::JsonbArrayElementsStringify
3752 | TableFunc::GenerateSeriesInt32
3753 | TableFunc::GenerateSeriesInt64
3754 | TableFunc::GenerateSeriesTimestamp
3755 | TableFunc::GenerateSeriesTimestampTz
3756 | TableFunc::GenerateSubscriptsArray
3757 | TableFunc::RegexpExtract(_)
3758 | TableFunc::CsvExtract(_)
3759 | TableFunc::Repeat
3760 | TableFunc::UnnestArray { .. }
3761 | TableFunc::UnnestList { .. }
3762 | TableFunc::UnnestMap { .. }
3763 | TableFunc::RegexpMatches => true,
3764 TableFunc::GuardSubquerySize { .. } => false,
3765 TableFunc::Wrap { .. } => false,
3766 TableFunc::TabletizedScalar { .. } => false,
3767 TableFunc::WithOrdinality(WithOrdinality { inner }) => inner.empty_on_null_input(),
3768 }
3769 }
3770
3771 pub fn preserves_monotonicity(&self) -> bool {
3773 match self {
3776 TableFunc::AclExplode => false,
3777 TableFunc::MzAclExplode => false,
3778 TableFunc::JsonbEach => true,
3779 TableFunc::JsonbEachStringify => true,
3780 TableFunc::JsonbObjectKeys => true,
3781 TableFunc::JsonbArrayElements => true,
3782 TableFunc::JsonbArrayElementsStringify => true,
3783 TableFunc::RegexpExtract(_) => true,
3784 TableFunc::CsvExtract(_) => true,
3785 TableFunc::GenerateSeriesInt32 => true,
3786 TableFunc::GenerateSeriesInt64 => true,
3787 TableFunc::GenerateSeriesTimestamp => true,
3788 TableFunc::GenerateSeriesTimestampTz => true,
3789 TableFunc::GenerateSubscriptsArray => true,
3790 TableFunc::Repeat => false,
3791 TableFunc::UnnestArray { .. } => true,
3792 TableFunc::UnnestList { .. } => true,
3793 TableFunc::UnnestMap { .. } => true,
3794 TableFunc::Wrap { .. } => true,
3795 TableFunc::TabletizedScalar { .. } => true,
3796 TableFunc::RegexpMatches => true,
3797 TableFunc::GuardSubquerySize { .. } => false,
3798 TableFunc::WithOrdinality(WithOrdinality { inner }) => inner.preserves_monotonicity(),
3799 }
3800 }
3801}
3802
3803impl fmt::Display for TableFunc {
3804 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
3805 match self {
3806 TableFunc::AclExplode => f.write_str("aclexplode"),
3807 TableFunc::MzAclExplode => f.write_str("mz_aclexplode"),
3808 TableFunc::JsonbEach => f.write_str("jsonb_each"),
3809 TableFunc::JsonbEachStringify => f.write_str("jsonb_each_text"),
3810 TableFunc::JsonbObjectKeys => f.write_str("jsonb_object_keys"),
3811 TableFunc::JsonbArrayElements => f.write_str("jsonb_array_elements"),
3812 TableFunc::JsonbArrayElementsStringify => f.write_str("jsonb_array_elements_text"),
3813 TableFunc::RegexpExtract(a) => write!(f, "regexp_extract({:?}, _)", a.0),
3814 TableFunc::CsvExtract(n_cols) => write!(f, "csv_extract({}, _)", n_cols),
3815 TableFunc::GenerateSeriesInt32 => f.write_str("generate_series"),
3816 TableFunc::GenerateSeriesInt64 => f.write_str("generate_series"),
3817 TableFunc::GenerateSeriesTimestamp => f.write_str("generate_series"),
3818 TableFunc::GenerateSeriesTimestampTz => f.write_str("generate_series"),
3819 TableFunc::GenerateSubscriptsArray => f.write_str("generate_subscripts"),
3820 TableFunc::GuardSubquerySize { .. } => f.write_str("guard_subquery_size"),
3821 TableFunc::Repeat => f.write_str("repeat_row"),
3822 TableFunc::UnnestArray { .. } => f.write_str("unnest_array"),
3823 TableFunc::UnnestList { .. } => f.write_str("unnest_list"),
3824 TableFunc::UnnestMap { .. } => f.write_str("unnest_map"),
3825 TableFunc::Wrap { width, .. } => write!(f, "wrap{}", width),
3826 TableFunc::TabletizedScalar { name, .. } => f.write_str(name),
3827 TableFunc::RegexpMatches => write!(f, "regexp_matches(_, _, _)"),
3828 TableFunc::WithOrdinality(WithOrdinality { inner }) => {
3829 write!(f, "{}[with_ordinality]", inner)
3830 }
3831 }
3832 }
3833}
3834
3835impl WithOrdinality {
3836 fn eval<'a>(
3845 &'a self,
3846 datums: &'a [Datum<'a>],
3847 temp_storage: &'a RowArena,
3848 ) -> Result<Box<dyn Iterator<Item = (Row, Diff)> + 'a>, EvalError> {
3849 let mut next_ordinal: i64 = 1;
3850 let it = self
3851 .inner
3852 .eval(datums, temp_storage)?
3853 .flat_map(move |(mut row, diff)| {
3854 let diff = diff.into_inner();
3855 assert!(diff >= 0);
3862 let mut ordinals = next_ordinal..(next_ordinal + diff);
3864 next_ordinal += diff;
3865 let cap = row.data_len() + datum_size(&Datum::Int64(next_ordinal));
3867 iter::from_fn(move || {
3868 let ordinal = ordinals.next()?;
3869 let mut row = if ordinals.is_empty() {
3870 std::mem::take(&mut row)
3873 } else {
3874 let mut new_row = Row::with_capacity(cap);
3875 new_row.clone_from(&row);
3876 new_row
3877 };
3878 RowPacker::for_existing_row(&mut row).push(Datum::Int64(ordinal));
3879 Some((row, Diff::ONE))
3880 })
3881 });
3882 Ok(Box::new(it))
3883 }
3884}