1#![allow(missing_docs)]
11
12use std::cmp::{max, min};
13use std::iter::Sum;
14use std::ops::Deref;
15use std::str::FromStr;
16use std::{fmt, iter};
17
18use chrono::{DateTime, NaiveDateTime, NaiveTime, Utc};
19use dec::OrderedDecimal;
20use itertools::{Either, Itertools};
21use mz_lowertest::MzReflect;
22use mz_ore::cast::CastFrom;
23
24use mz_ore::str::separated;
25use mz_ore::{soft_assert_eq_no_log, soft_assert_or_log, soft_panic_or_log};
26use mz_repr::adt::array::ArrayDimension;
27use mz_repr::adt::date::Date;
28use mz_repr::adt::interval::Interval;
29use mz_repr::adt::numeric::{self, Numeric, NumericMaxScale};
30use mz_repr::adt::regex::{Regex as ReprRegex, RegexCompilationError};
31use mz_repr::adt::timestamp::{CheckedTimestamp, TimestampLike};
32use mz_repr::{
33 ColumnName, Datum, Diff, ReprColumnType, ReprRelationType, Row, RowArena, RowPacker, SharedRow,
34 SqlColumnType, SqlRelationType, SqlScalarType, datum_size,
35};
36use num::{CheckedAdd, Integer, Signed, ToPrimitive};
37use ordered_float::OrderedFloat;
38use regex::Regex;
39use serde::{Deserialize, Serialize};
40use smallvec::SmallVec;
41
42use crate::EvalError;
43use crate::WindowFrameBound::{
44 CurrentRow, OffsetFollowing, OffsetPreceding, UnboundedFollowing, UnboundedPreceding,
45};
46use crate::WindowFrameUnits::{Groups, Range, Rows};
47use crate::explain::{HumanizedExpr, HumanizerMode};
48use crate::relation::{
49 ColumnOrder, WindowFrame, WindowFrameBound, WindowFrameUnits, compare_columns,
50};
51use crate::scalar::func::{add_timestamp_months, jsonb_stringify};
52
53fn max_string<'a, I>(datums: I) -> Datum<'a>
57where
58 I: IntoIterator<Item = Datum<'a>>,
59{
60 match datums
61 .into_iter()
62 .filter(|d| !d.is_null())
63 .max_by(|a, b| a.unwrap_str().cmp(b.unwrap_str()))
64 {
65 Some(datum) => datum,
66 None => Datum::Null,
67 }
68}
69
70fn max_datum<'a, I, DatumType>(datums: I) -> Datum<'a>
71where
72 I: IntoIterator<Item = Datum<'a>>,
73 DatumType: TryFrom<Datum<'a>> + Ord,
74 <DatumType as TryFrom<Datum<'a>>>::Error: std::fmt::Debug,
75 Datum<'a>: From<Option<DatumType>>,
76{
77 let x: Option<DatumType> = datums
78 .into_iter()
79 .filter(|d| !d.is_null())
80 .map(|d| DatumType::try_from(d).expect("unexpected type"))
81 .max();
82
83 x.into()
84}
85
86fn min_datum<'a, I, DatumType>(datums: I) -> Datum<'a>
87where
88 I: IntoIterator<Item = Datum<'a>>,
89 DatumType: TryFrom<Datum<'a>> + Ord,
90 <DatumType as TryFrom<Datum<'a>>>::Error: std::fmt::Debug,
91 Datum<'a>: From<Option<DatumType>>,
92{
93 let x: Option<DatumType> = datums
94 .into_iter()
95 .filter(|d| !d.is_null())
96 .map(|d| DatumType::try_from(d).expect("unexpected type"))
97 .min();
98
99 x.into()
100}
101
102fn min_string<'a, I>(datums: I) -> Datum<'a>
103where
104 I: IntoIterator<Item = Datum<'a>>,
105{
106 match datums
107 .into_iter()
108 .filter(|d| !d.is_null())
109 .min_by(|a, b| a.unwrap_str().cmp(b.unwrap_str()))
110 {
111 Some(datum) => datum,
112 None => Datum::Null,
113 }
114}
115
116fn sum_datum<'a, I, DatumType, ResultType>(datums: I) -> Datum<'a>
117where
118 I: IntoIterator<Item = Datum<'a>>,
119 DatumType: TryFrom<Datum<'a>>,
120 <DatumType as TryFrom<Datum<'a>>>::Error: std::fmt::Debug,
121 ResultType: From<DatumType> + Sum + Into<Datum<'a>>,
122{
123 let mut datums = datums.into_iter().filter(|d| !d.is_null()).peekable();
124 if datums.peek().is_none() {
125 Datum::Null
126 } else {
127 let x = datums
128 .map(|d| ResultType::from(DatumType::try_from(d).expect("unexpected type")))
129 .sum::<ResultType>();
130 x.into()
131 }
132}
133
134fn sum_numeric<'a, I>(datums: I) -> Datum<'a>
135where
136 I: IntoIterator<Item = Datum<'a>>,
137{
138 let mut cx = numeric::cx_datum();
139 let mut sum = Numeric::zero();
140 let mut empty = true;
141 for d in datums {
142 if !d.is_null() {
143 empty = false;
144 cx.add(&mut sum, &d.unwrap_numeric().0);
145 }
146 }
147 match empty {
148 true => Datum::Null,
149 false => Datum::from(sum),
150 }
151}
152
153#[allow(clippy::as_conversions)]
155fn count<'a, I>(datums: I) -> Datum<'a>
156where
157 I: IntoIterator<Item = Datum<'a>>,
158{
159 let x: i64 = datums.into_iter().filter(|d| !d.is_null()).count() as i64;
161 Datum::from(x)
162}
163
164fn any<'a, I>(datums: I) -> Datum<'a>
165where
166 I: IntoIterator<Item = Datum<'a>>,
167{
168 datums
169 .into_iter()
170 .fold(Datum::False, |state, next| match (state, next) {
171 (Datum::True, _) | (_, Datum::True) => Datum::True,
172 (Datum::Null, _) | (_, Datum::Null) => Datum::Null,
173 _ => Datum::False,
174 })
175}
176
177fn all<'a, I>(datums: I) -> Datum<'a>
178where
179 I: IntoIterator<Item = Datum<'a>>,
180{
181 datums
182 .into_iter()
183 .fold(Datum::True, |state, next| match (state, next) {
184 (Datum::False, _) | (_, Datum::False) => Datum::False,
185 (Datum::Null, _) | (_, Datum::Null) => Datum::Null,
186 _ => Datum::True,
187 })
188}
189
190fn string_agg<'a, I>(datums: I, temp_storage: &'a RowArena, order_by: &[ColumnOrder]) -> Datum<'a>
191where
192 I: IntoIterator<Item = Datum<'a>>,
193{
194 const EMPTY_SEP: &str = "";
195
196 let datums = order_aggregate_datums(datums, order_by);
197 let mut sep_value_pairs = datums.into_iter().filter_map(|d| {
198 if d.is_null() {
199 return None;
200 }
201 let mut value_sep = d.unwrap_list().iter();
202 match (value_sep.next().unwrap(), value_sep.next().unwrap()) {
203 (Datum::Null, _) => None,
204 (Datum::String(val), Datum::Null) => Some((EMPTY_SEP, val)),
205 (Datum::String(val), Datum::String(sep)) => Some((sep, val)),
206 _ => unreachable!(),
207 }
208 });
209
210 let mut s = String::default();
211 match sep_value_pairs.next() {
212 Some((_, value)) => s.push_str(value),
214 None => return Datum::Null,
216 }
217
218 for (sep, value) in sep_value_pairs {
219 s.push_str(sep);
220 s.push_str(value);
221 }
222
223 Datum::String(temp_storage.push_string(s))
224}
225
226fn jsonb_agg<'a, I>(datums: I, temp_storage: &'a RowArena, order_by: &[ColumnOrder]) -> Datum<'a>
227where
228 I: IntoIterator<Item = Datum<'a>>,
229{
230 let datums = order_aggregate_datums(datums, order_by);
231 temp_storage.make_datum(|packer| {
232 packer.push_list(datums.into_iter().filter(|d| !d.is_null()));
233 })
234}
235
236fn dict_agg<'a, I>(datums: I, temp_storage: &'a RowArena, order_by: &[ColumnOrder]) -> Datum<'a>
237where
238 I: IntoIterator<Item = Datum<'a>>,
239{
240 let datums = order_aggregate_datums(datums, order_by);
241 temp_storage.make_datum(|packer| {
242 let mut datums: Vec<_> = datums
243 .into_iter()
244 .filter_map(|d| {
245 if d.is_null() {
246 return None;
247 }
248 let mut list = d.unwrap_list().iter();
249 let key = list.next().unwrap();
250 let val = list.next().unwrap();
251 if key.is_null() {
252 None
255 } else {
256 Some((key.unwrap_str(), val))
257 }
258 })
259 .collect();
260 datums.sort_by_key(|(k, _v)| *k);
266 datums.reverse();
267 datums.dedup_by_key(|(k, _v)| *k);
268 datums.reverse();
269 packer.push_dict(datums);
270 })
271}
272
273pub fn order_aggregate_datums<'a: 'b, 'b, I>(
283 datums: I,
284 order_by: &[ColumnOrder],
285) -> impl Iterator<Item = Datum<'b>>
286where
287 I: IntoIterator<Item = Datum<'a>>,
288{
289 order_aggregate_datums_with_rank_inner(datums, order_by)
290 .into_iter()
291 .map(|(payload, _order_datums)| payload)
293}
294
295fn order_aggregate_datums_with_rank<'a, I>(
298 datums: I,
299 order_by: &[ColumnOrder],
300) -> impl Iterator<Item = (Datum<'a>, Row)>
301where
302 I: IntoIterator<Item = Datum<'a>>,
303{
304 order_aggregate_datums_with_rank_inner(datums, order_by)
305 .into_iter()
306 .map(|(payload, order_by_datums)| (payload, Row::pack(order_by_datums)))
307}
308
309fn order_aggregate_datums_with_rank_inner<'a, I>(
310 datums: I,
311 order_by: &[ColumnOrder],
312) -> Vec<(Datum<'a>, Vec<Datum<'a>>)>
313where
314 I: IntoIterator<Item = Datum<'a>>,
315{
316 let mut decoded: Vec<(Datum, Vec<Datum>)> = datums
317 .into_iter()
318 .map(|d| {
319 let list = d.unwrap_list();
320 let mut list_it = list.iter();
321 let payload = list_it.next().unwrap();
322
323 let mut order_by_datums = Vec::with_capacity(order_by.len());
333 for _ in 0..order_by.len() {
334 order_by_datums.push(
335 list_it
336 .next()
337 .expect("must have exactly the same number of Datums as `order_by`"),
338 );
339 }
340
341 (payload, order_by_datums)
342 })
343 .collect();
344
345 let mut sort_by =
346 |(payload_left, left_order_by_datums): &(Datum, Vec<Datum>),
347 (payload_right, right_order_by_datums): &(Datum, Vec<Datum>)| {
348 compare_columns(
349 order_by,
350 left_order_by_datums,
351 right_order_by_datums,
352 || payload_left.cmp(payload_right),
353 )
354 };
355 decoded.sort_unstable_by(&mut sort_by);
360 decoded
361}
362
363fn array_concat<'a, I>(datums: I, temp_storage: &'a RowArena, order_by: &[ColumnOrder]) -> Datum<'a>
364where
365 I: IntoIterator<Item = Datum<'a>>,
366{
367 let datums = order_aggregate_datums(datums, order_by);
368 let datums: Vec<_> = datums
369 .into_iter()
370 .map(|d| d.unwrap_array().elements().iter())
371 .flatten()
372 .collect();
373 let dims = ArrayDimension {
374 lower_bound: 1,
375 length: datums.len(),
376 };
377 temp_storage.make_datum(|packer| {
378 packer.try_push_array(&[dims], datums).unwrap();
379 })
380}
381
382fn list_concat<'a, I>(datums: I, temp_storage: &'a RowArena, order_by: &[ColumnOrder]) -> Datum<'a>
383where
384 I: IntoIterator<Item = Datum<'a>>,
385{
386 let datums = order_aggregate_datums(datums, order_by);
387 temp_storage.make_datum(|packer| {
388 packer.push_list(datums.into_iter().map(|d| d.unwrap_list().iter()).flatten());
389 })
390}
391
392fn row_number<'a, I>(
396 datums: I,
397 callers_temp_storage: &'a RowArena,
398 order_by: &[ColumnOrder],
399) -> Datum<'a>
400where
401 I: IntoIterator<Item = Datum<'a>>,
402{
403 let temp_storage = RowArena::new();
407 let datums = row_number_no_list(datums, &temp_storage, order_by);
408
409 callers_temp_storage.make_datum(|packer| {
410 packer.push_list(datums);
411 })
412}
413
414fn row_number_no_list<'a: 'b, 'b, I>(
417 datums: I,
418 callers_temp_storage: &'b RowArena,
419 order_by: &[ColumnOrder],
420) -> impl Iterator<Item = Datum<'b>>
421where
422 I: IntoIterator<Item = Datum<'a>>,
423{
424 let datums = order_aggregate_datums(datums, order_by);
425
426 callers_temp_storage.reserve(datums.size_hint().0);
427 #[allow(clippy::disallowed_methods)]
428 datums
429 .into_iter()
430 .map(|d| d.unwrap_list().iter())
431 .flatten()
432 .zip(1i64..)
433 .map(|(d, i)| {
434 callers_temp_storage.make_datum(|packer| {
435 packer.push_list_with(|packer| {
436 packer.push(Datum::Int64(i));
437 packer.push(d);
438 });
439 })
440 })
441}
442
443fn rank<'a, I>(datums: I, callers_temp_storage: &'a RowArena, order_by: &[ColumnOrder]) -> Datum<'a>
447where
448 I: IntoIterator<Item = Datum<'a>>,
449{
450 let temp_storage = RowArena::new();
451 let datums = rank_no_list(datums, &temp_storage, order_by);
452
453 callers_temp_storage.make_datum(|packer| {
454 packer.push_list(datums);
455 })
456}
457
458fn rank_no_list<'a: 'b, 'b, I>(
461 datums: I,
462 callers_temp_storage: &'b RowArena,
463 order_by: &[ColumnOrder],
464) -> impl Iterator<Item = Datum<'b>>
465where
466 I: IntoIterator<Item = Datum<'a>>,
467{
468 let datums = order_aggregate_datums_with_rank(datums, order_by);
470
471 let mut datums = datums
472 .into_iter()
473 .map(|(d0, order_row)| {
474 d0.unwrap_list()
475 .iter()
476 .map(move |d1| (d1, order_row.clone()))
477 })
478 .flatten();
479
480 callers_temp_storage.reserve(datums.size_hint().0);
481 datums
482 .next()
483 .map_or(vec![], |(first_datum, first_order_row)| {
484 datums.fold(
487 (first_order_row, 1, 1, vec![(first_datum, 1)]),
488 |mut acc, (next_datum, next_order_row)| {
489 let (ref mut acc_row, ref mut acc_rank, ref mut acc_row_num, ref mut output) = acc;
490 *acc_row_num += 1;
491 if *acc_row != next_order_row {
493 *acc_rank = *acc_row_num;
494 *acc_row = next_order_row;
495 }
496
497 (*output).push((next_datum, *acc_rank));
498 acc
499 })
500 }.3).into_iter().map(|(d, i)| {
501 callers_temp_storage.make_datum(|packer| {
502 packer.push_list_with(|packer| {
503 packer.push(Datum::Int64(i));
504 packer.push(d);
505 });
506 })
507 })
508}
509
510fn dense_rank<'a, I>(
514 datums: I,
515 callers_temp_storage: &'a RowArena,
516 order_by: &[ColumnOrder],
517) -> Datum<'a>
518where
519 I: IntoIterator<Item = Datum<'a>>,
520{
521 let temp_storage = RowArena::new();
522 let datums = dense_rank_no_list(datums, &temp_storage, order_by);
523
524 callers_temp_storage.make_datum(|packer| {
525 packer.push_list(datums);
526 })
527}
528
529fn dense_rank_no_list<'a: 'b, 'b, I>(
532 datums: I,
533 callers_temp_storage: &'b RowArena,
534 order_by: &[ColumnOrder],
535) -> impl Iterator<Item = Datum<'b>>
536where
537 I: IntoIterator<Item = Datum<'a>>,
538{
539 let datums = order_aggregate_datums_with_rank(datums, order_by);
541
542 let mut datums = datums
543 .into_iter()
544 .map(|(d0, order_row)| {
545 d0.unwrap_list()
546 .iter()
547 .map(move |d1| (d1, order_row.clone()))
548 })
549 .flatten();
550
551 callers_temp_storage.reserve(datums.size_hint().0);
552 datums
553 .next()
554 .map_or(vec![], |(first_datum, first_order_row)| {
555 datums.fold(
558 (first_order_row, 1, vec![(first_datum, 1)]),
559 |mut acc, (next_datum, next_order_row)| {
560 let (ref mut acc_row, ref mut acc_rank, ref mut output) = acc;
561 if *acc_row != next_order_row {
563 *acc_rank += 1;
564 *acc_row = next_order_row;
565 }
566
567 (*output).push((next_datum, *acc_rank));
568 acc
569 })
570 }.2).into_iter().map(|(d, i)| {
571 callers_temp_storage.make_datum(|packer| {
572 packer.push_list_with(|packer| {
573 packer.push(Datum::Int64(i));
574 packer.push(d);
575 });
576 })
577 })
578}
579
580fn lag_lead<'a, I>(
602 datums: I,
603 callers_temp_storage: &'a RowArena,
604 order_by: &[ColumnOrder],
605 lag_lead_type: &LagLeadType,
606 ignore_nulls: &bool,
607) -> Datum<'a>
608where
609 I: IntoIterator<Item = Datum<'a>>,
610{
611 let temp_storage = RowArena::new();
612 let iter = lag_lead_no_list(datums, &temp_storage, order_by, lag_lead_type, ignore_nulls);
613 callers_temp_storage.make_datum(|packer| {
614 packer.push_list(iter);
615 })
616}
617
618fn lag_lead_no_list<'a: 'b, 'b, I>(
621 datums: I,
622 callers_temp_storage: &'b RowArena,
623 order_by: &[ColumnOrder],
624 lag_lead_type: &LagLeadType,
625 ignore_nulls: &bool,
626) -> impl Iterator<Item = Datum<'b>>
627where
628 I: IntoIterator<Item = Datum<'a>>,
629{
630 let datums = order_aggregate_datums(datums, order_by);
632
633 let (orig_rows, unwrapped_args): (Vec<_>, Vec<_>) = datums
637 .into_iter()
638 .map(|d| {
639 let mut iter = d.unwrap_list().iter();
640 let original_row = iter.next().unwrap();
641 let (input_value, offset, default_value) =
642 unwrap_lag_lead_encoded_args(iter.next().unwrap());
643 (original_row, (input_value, offset, default_value))
644 })
645 .unzip();
646
647 let result = lag_lead_inner(unwrapped_args, lag_lead_type, ignore_nulls);
648
649 callers_temp_storage.reserve(result.len());
650 result
651 .into_iter()
652 .zip_eq(orig_rows)
653 .map(|(result_value, original_row)| {
654 callers_temp_storage.make_datum(|packer| {
655 packer.push_list_with(|packer| {
656 packer.push(result_value);
657 packer.push(original_row);
658 });
659 })
660 })
661}
662
663fn unwrap_lag_lead_encoded_args(encoded_args: Datum) -> (Datum, Datum, Datum) {
665 let mut encoded_args_iter = encoded_args.unwrap_list().iter();
666 let (input_value, offset, default_value) = (
667 encoded_args_iter.next().unwrap(),
668 encoded_args_iter.next().unwrap(),
669 encoded_args_iter.next().unwrap(),
670 );
671 (input_value, offset, default_value)
672}
673
674fn lag_lead_inner<'a>(
677 args: Vec<(Datum<'a>, Datum<'a>, Datum<'a>)>,
678 lag_lead_type: &LagLeadType,
679 ignore_nulls: &bool,
680) -> Vec<Datum<'a>> {
681 if *ignore_nulls {
682 lag_lead_inner_ignore_nulls(args, lag_lead_type)
683 } else {
684 lag_lead_inner_respect_nulls(args, lag_lead_type)
685 }
686}
687
688fn lag_lead_inner_respect_nulls<'a>(
689 args: Vec<(Datum<'a>, Datum<'a>, Datum<'a>)>,
690 lag_lead_type: &LagLeadType,
691) -> Vec<Datum<'a>> {
692 let mut result: Vec<Datum> = Vec::with_capacity(args.len());
693 for (idx, (_, offset, default_value)) in args.iter().enumerate() {
694 if offset.is_null() {
696 result.push(Datum::Null);
697 continue;
698 }
699
700 let idx = i64::try_from(idx).expect("Array index does not fit in i64");
701 let offset = i64::from(offset.unwrap_int32());
702 let offset = match lag_lead_type {
703 LagLeadType::Lag => -offset,
704 LagLeadType::Lead => offset,
705 };
706
707 let datums_get = |i: i64| -> Option<Datum> {
709 match u64::try_from(i) {
710 Ok(i) => args
711 .get(usize::cast_from(i))
712 .map(|d| Some(d.0)) .unwrap_or(None), Err(_) => None, }
716 };
717
718 let lagged_value = datums_get(idx + offset).unwrap_or(*default_value);
719
720 result.push(lagged_value);
721 }
722
723 result
724}
725
726#[allow(clippy::as_conversions)]
730fn lag_lead_inner_ignore_nulls<'a>(
731 args: Vec<(Datum<'a>, Datum<'a>, Datum<'a>)>,
732 lag_lead_type: &LagLeadType,
733) -> Vec<Datum<'a>> {
734 if i64::try_from(args.len()).is_err() {
737 panic!("window partition way too big")
738 }
739 let mut skip_nulls_backward = vec![None; args.len()];
742 let mut last_non_null: i64 = -1;
743 let pairs = args
744 .iter()
745 .enumerate()
746 .zip_eq(skip_nulls_backward.iter_mut());
747 for ((i, (d, _, _)), slot) in pairs {
748 if d.is_null() {
749 *slot = Some(last_non_null);
750 } else {
751 last_non_null = i as i64;
752 }
753 }
754 let mut skip_nulls_forward = vec![None; args.len()];
755 let mut last_non_null: i64 = args.len() as i64;
756 let pairs = args
757 .iter()
758 .enumerate()
759 .rev()
760 .zip_eq(skip_nulls_forward.iter_mut().rev());
761 for ((i, (d, _, _)), slot) in pairs {
762 if d.is_null() {
763 *slot = Some(last_non_null);
764 } else {
765 last_non_null = i as i64;
766 }
767 }
768
769 let mut result: Vec<Datum> = Vec::with_capacity(args.len());
771 for (idx, (_, offset, default_value)) in args.iter().enumerate() {
772 if offset.is_null() {
774 result.push(Datum::Null);
775 continue;
776 }
777
778 let idx = idx as i64; let offset = i64::cast_from(offset.unwrap_int32());
780 let offset = match lag_lead_type {
781 LagLeadType::Lag => -offset,
782 LagLeadType::Lead => offset,
783 };
784 let increment = offset.signum();
785
786 let datums_get = |i: i64| -> Option<Datum> {
788 match u64::try_from(i) {
789 Ok(i) => args
790 .get(usize::cast_from(i))
791 .map(|d| Some(d.0)) .unwrap_or(None), Err(_) => None, }
795 };
796
797 let lagged_value = if increment != 0 {
798 let mut j = idx;
808 for _ in 0..num::abs(offset) {
809 j += increment;
810 if datums_get(j).is_some_and(|d| d.is_null()) {
812 let ju = j as usize; if increment > 0 {
814 j = skip_nulls_forward[ju].expect("checked above that it's null");
815 } else {
816 j = skip_nulls_backward[ju].expect("checked above that it's null");
817 }
818 }
819 if datums_get(j).is_none() {
820 break;
821 }
822 }
823 match datums_get(j) {
824 Some(datum) => datum,
825 None => *default_value,
826 }
827 } else {
828 assert_eq!(offset, 0);
829 let datum = datums_get(idx).expect("known to exist");
830 if !datum.is_null() {
831 datum
832 } else {
833 panic!("0 offset in lag/lead IGNORE NULLS");
838 }
839 };
840
841 result.push(lagged_value);
842 }
843
844 result
845}
846
847fn first_value<'a, I>(
849 datums: I,
850 callers_temp_storage: &'a RowArena,
851 order_by: &[ColumnOrder],
852 window_frame: &WindowFrame,
853) -> Datum<'a>
854where
855 I: IntoIterator<Item = Datum<'a>>,
856{
857 let temp_storage = RowArena::new();
858 let iter = first_value_no_list(datums, &temp_storage, order_by, window_frame);
859 callers_temp_storage.make_datum(|packer| {
860 packer.push_list(iter);
861 })
862}
863
864fn first_value_no_list<'a: 'b, 'b, I>(
867 datums: I,
868 callers_temp_storage: &'b RowArena,
869 order_by: &[ColumnOrder],
870 window_frame: &WindowFrame,
871) -> impl Iterator<Item = Datum<'b>>
872where
873 I: IntoIterator<Item = Datum<'a>>,
874{
875 let datums = order_aggregate_datums(datums, order_by);
877
878 let (orig_rows, args): (Vec<_>, Vec<_>) = datums
880 .into_iter()
881 .map(|d| {
882 let mut iter = d.unwrap_list().iter();
883 let original_row = iter.next().unwrap();
884 let arg = iter.next().unwrap();
885
886 (original_row, arg)
887 })
888 .unzip();
889
890 let results = first_value_inner(args, window_frame);
891
892 callers_temp_storage.reserve(results.len());
893 results
894 .into_iter()
895 .zip_eq(orig_rows)
896 .map(|(result_value, original_row)| {
897 callers_temp_storage.make_datum(|packer| {
898 packer.push_list_with(|packer| {
899 packer.push(result_value);
900 packer.push(original_row);
901 });
902 })
903 })
904}
905
906fn first_value_inner<'a>(datums: Vec<Datum<'a>>, window_frame: &WindowFrame) -> Vec<Datum<'a>> {
907 let length = datums.len();
908 let mut result: Vec<Datum> = Vec::with_capacity(length);
909 for (idx, current_datum) in datums.iter().enumerate() {
910 let first_value = match &window_frame.start_bound {
911 WindowFrameBound::CurrentRow => *current_datum,
913 WindowFrameBound::UnboundedPreceding => {
914 if let WindowFrameBound::OffsetPreceding(end_offset) = &window_frame.end_bound {
915 let end_offset = usize::cast_from(*end_offset);
916
917 if idx < end_offset {
919 Datum::Null
920 } else {
921 datums[0]
922 }
923 } else {
924 datums[0]
925 }
926 }
927 WindowFrameBound::OffsetPreceding(offset) => {
928 let start_offset = usize::cast_from(*offset);
929 let start_idx = idx.saturating_sub(start_offset);
930 if let WindowFrameBound::OffsetPreceding(end_offset) = &window_frame.end_bound {
931 let end_offset = usize::cast_from(*end_offset);
932
933 if start_offset < end_offset || idx < end_offset {
935 Datum::Null
936 } else {
937 datums[start_idx]
938 }
939 } else {
940 datums[start_idx]
941 }
942 }
943 WindowFrameBound::OffsetFollowing(offset) => {
944 let start_offset = usize::cast_from(*offset);
945 let start_idx = idx.saturating_add(start_offset);
946 if let WindowFrameBound::OffsetFollowing(end_offset) = &window_frame.end_bound {
947 if offset > end_offset || start_idx >= length {
949 Datum::Null
950 } else {
951 datums[start_idx]
952 }
953 } else {
954 datums
955 .get(start_idx)
956 .map(|d| d.clone())
957 .unwrap_or(Datum::Null)
958 }
959 }
960 WindowFrameBound::UnboundedFollowing => unreachable!(),
962 };
963 result.push(first_value);
964 }
965 result
966}
967
968fn last_value<'a, I>(
970 datums: I,
971 callers_temp_storage: &'a RowArena,
972 order_by: &[ColumnOrder],
973 window_frame: &WindowFrame,
974) -> Datum<'a>
975where
976 I: IntoIterator<Item = Datum<'a>>,
977{
978 let temp_storage = RowArena::new();
979 let iter = last_value_no_list(datums, &temp_storage, order_by, window_frame);
980 callers_temp_storage.make_datum(|packer| {
981 packer.push_list(iter);
982 })
983}
984
985fn last_value_no_list<'a: 'b, 'b, I>(
988 datums: I,
989 callers_temp_storage: &'b RowArena,
990 order_by: &[ColumnOrder],
991 window_frame: &WindowFrame,
992) -> impl Iterator<Item = Datum<'b>>
993where
994 I: IntoIterator<Item = Datum<'a>>,
995{
996 let datums = order_aggregate_datums_with_rank(datums, order_by);
999
1000 let size_hint = datums.size_hint().0;
1002 let mut args = Vec::with_capacity(size_hint);
1003 let mut original_rows = Vec::with_capacity(size_hint);
1004 let mut order_by_rows = Vec::with_capacity(size_hint);
1005 for (d, order_by_row) in datums.into_iter() {
1006 let mut iter = d.unwrap_list().iter();
1007 let original_row = iter.next().unwrap();
1008 let arg = iter.next().unwrap();
1009 order_by_rows.push(order_by_row);
1010 original_rows.push(original_row);
1011 args.push(arg);
1012 }
1013
1014 let results = last_value_inner(args, &order_by_rows, window_frame);
1015
1016 callers_temp_storage.reserve(results.len());
1017 results
1018 .into_iter()
1019 .zip_eq(original_rows)
1020 .map(|(result_value, original_row)| {
1021 callers_temp_storage.make_datum(|packer| {
1022 packer.push_list_with(|packer| {
1023 packer.push(result_value);
1024 packer.push(original_row);
1025 });
1026 })
1027 })
1028}
1029
1030fn last_value_inner<'a>(
1031 args: Vec<Datum<'a>>,
1032 order_by_rows: &Vec<Row>,
1033 window_frame: &WindowFrame,
1034) -> Vec<Datum<'a>> {
1035 let length = args.len();
1036 let mut results: Vec<Datum> = Vec::with_capacity(length);
1037 for (idx, (current_datum, order_by_row)) in args.iter().zip_eq(order_by_rows).enumerate() {
1038 let last_value = match &window_frame.end_bound {
1039 WindowFrameBound::CurrentRow => match &window_frame.units {
1040 WindowFrameUnits::Rows => *current_datum,
1042 WindowFrameUnits::Range => {
1043 let target_idx = order_by_rows[idx..]
1048 .iter()
1049 .enumerate()
1050 .take_while(|(_, row)| *row == order_by_row)
1051 .last()
1052 .unwrap()
1053 .0
1054 + idx;
1055 args[target_idx]
1056 }
1057 WindowFrameUnits::Groups => unreachable!(),
1059 },
1060 WindowFrameBound::UnboundedFollowing => {
1061 if let WindowFrameBound::OffsetFollowing(start_offset) = &window_frame.start_bound {
1062 let start_offset = usize::cast_from(*start_offset);
1063
1064 if idx + start_offset > length - 1 {
1066 Datum::Null
1067 } else {
1068 args[length - 1]
1069 }
1070 } else {
1071 args[length - 1]
1072 }
1073 }
1074 WindowFrameBound::OffsetFollowing(offset) => {
1075 let end_offset = usize::cast_from(*offset);
1076 let end_idx = idx.saturating_add(end_offset);
1077 if let WindowFrameBound::OffsetFollowing(start_offset) = &window_frame.start_bound {
1078 let start_offset = usize::cast_from(*start_offset);
1079 let start_idx = idx.saturating_add(start_offset);
1080
1081 if end_offset < start_offset || start_idx >= length {
1083 Datum::Null
1084 } else {
1085 args.get(end_idx).unwrap_or(&args[length - 1]).clone()
1087 }
1088 } else {
1089 args.get(end_idx).unwrap_or(&args[length - 1]).clone()
1090 }
1091 }
1092 WindowFrameBound::OffsetPreceding(offset) => {
1093 let end_offset = usize::cast_from(*offset);
1094 let end_idx = idx.saturating_sub(end_offset);
1095 if idx < end_offset {
1096 Datum::Null
1098 } else if let WindowFrameBound::OffsetPreceding(start_offset) =
1099 &window_frame.start_bound
1100 {
1101 if offset > start_offset {
1103 Datum::Null
1104 } else {
1105 args[end_idx]
1106 }
1107 } else {
1108 args[end_idx]
1109 }
1110 }
1111 WindowFrameBound::UnboundedPreceding => unreachable!(),
1113 };
1114 results.push(last_value);
1115 }
1116 results
1117}
1118
1119fn fused_value_window_func<'a, I>(
1125 input_datums: I,
1126 callers_temp_storage: &'a RowArena,
1127 funcs: &Vec<AggregateFunc>,
1128 order_by: &Vec<ColumnOrder>,
1129) -> Datum<'a>
1130where
1131 I: IntoIterator<Item = Datum<'a>>,
1132{
1133 let temp_storage = RowArena::new();
1134 let iter = fused_value_window_func_no_list(input_datums, &temp_storage, funcs, order_by);
1135 callers_temp_storage.make_datum(|packer| {
1136 packer.push_list(iter);
1137 })
1138}
1139
1140fn fused_value_window_func_no_list<'a: 'b, 'b, I>(
1143 input_datums: I,
1144 callers_temp_storage: &'b RowArena,
1145 funcs: &Vec<AggregateFunc>,
1146 order_by: &Vec<ColumnOrder>,
1147) -> impl Iterator<Item = Datum<'b>>
1148where
1149 I: IntoIterator<Item = Datum<'a>>,
1150{
1151 let has_last_value = funcs
1152 .iter()
1153 .any(|f| matches!(f, AggregateFunc::LastValue { .. }));
1154
1155 let input_datums_with_ranks = order_aggregate_datums_with_rank(input_datums, order_by);
1156
1157 let size_hint = input_datums_with_ranks.size_hint().0;
1158 let mut encoded_argsss = vec![Vec::with_capacity(size_hint); funcs.len()];
1159 let mut original_rows = Vec::with_capacity(size_hint);
1160 let mut order_by_rows = Vec::with_capacity(size_hint);
1161 for (d, order_by_row) in input_datums_with_ranks {
1162 let mut iter = d.unwrap_list().iter();
1163 let original_row = iter.next().unwrap();
1164 original_rows.push(original_row);
1165 let mut argss_iter = iter.next().unwrap().unwrap_list().iter();
1166 for i in 0..funcs.len() {
1167 let encoded_args = argss_iter.next().unwrap();
1168 encoded_argsss[i].push(encoded_args);
1169 }
1170 if has_last_value {
1171 order_by_rows.push(order_by_row);
1172 }
1173 }
1174
1175 let mut results_per_row = vec![Vec::with_capacity(funcs.len()); original_rows.len()];
1176 for (func, encoded_argss) in funcs.iter().zip_eq(encoded_argsss) {
1177 let results = match func {
1178 AggregateFunc::LagLead {
1179 order_by: inner_order_by,
1180 lag_lead,
1181 ignore_nulls,
1182 } => {
1183 assert_eq!(order_by, inner_order_by);
1184 let unwrapped_argss = encoded_argss
1185 .into_iter()
1186 .map(|encoded_args| unwrap_lag_lead_encoded_args(encoded_args))
1187 .collect();
1188 lag_lead_inner(unwrapped_argss, lag_lead, ignore_nulls)
1189 }
1190 AggregateFunc::FirstValue {
1191 order_by: inner_order_by,
1192 window_frame,
1193 } => {
1194 assert_eq!(order_by, inner_order_by);
1195 first_value_inner(encoded_argss, window_frame)
1198 }
1199 AggregateFunc::LastValue {
1200 order_by: inner_order_by,
1201 window_frame,
1202 } => {
1203 assert_eq!(order_by, inner_order_by);
1204 last_value_inner(encoded_argss, &order_by_rows, window_frame)
1207 }
1208 _ => panic!("unknown window function in FusedValueWindowFunc"),
1209 };
1210 for (results, result) in results_per_row.iter_mut().zip_eq(results) {
1211 results.push(result);
1212 }
1213 }
1214
1215 callers_temp_storage.reserve(2 * original_rows.len());
1216 results_per_row
1217 .into_iter()
1218 .enumerate()
1219 .map(move |(i, results)| {
1220 callers_temp_storage.make_datum(|packer| {
1221 packer.push_list_with(|packer| {
1222 packer
1223 .push(callers_temp_storage.make_datum(|packer| packer.push_list(results)));
1224 packer.push(original_rows[i]);
1225 });
1226 })
1227 })
1228}
1229
1230fn window_aggr<'a, I, A>(
1239 input_datums: I,
1240 callers_temp_storage: &'a RowArena,
1241 wrapped_aggregate: &AggregateFunc,
1242 order_by: &[ColumnOrder],
1243 window_frame: &WindowFrame,
1244) -> Datum<'a>
1245where
1246 I: IntoIterator<Item = Datum<'a>>,
1247 A: OneByOneAggr,
1248{
1249 let temp_storage = RowArena::new();
1250 let iter = window_aggr_no_list::<I, A>(
1251 input_datums,
1252 &temp_storage,
1253 wrapped_aggregate,
1254 order_by,
1255 window_frame,
1256 );
1257 callers_temp_storage.make_datum(|packer| {
1258 packer.push_list(iter);
1259 })
1260}
1261
1262fn window_aggr_no_list<'a: 'b, 'b, I, A>(
1265 input_datums: I,
1266 callers_temp_storage: &'b RowArena,
1267 wrapped_aggregate: &AggregateFunc,
1268 order_by: &[ColumnOrder],
1269 window_frame: &WindowFrame,
1270) -> impl Iterator<Item = Datum<'b>>
1271where
1272 I: IntoIterator<Item = Datum<'a>>,
1273 A: OneByOneAggr,
1274{
1275 let datums = order_aggregate_datums_with_rank(input_datums, order_by);
1278
1279 let size_hint = datums.size_hint().0;
1281 let mut args: Vec<Datum> = Vec::with_capacity(size_hint);
1282 let mut original_rows: Vec<Datum> = Vec::with_capacity(size_hint);
1283 let mut order_by_rows = Vec::with_capacity(size_hint);
1284 for (d, order_by_row) in datums.into_iter() {
1285 let mut iter = d.unwrap_list().iter();
1286 let original_row = iter.next().unwrap();
1287 let arg = iter.next().unwrap();
1288 order_by_rows.push(order_by_row);
1289 original_rows.push(original_row);
1290 args.push(arg);
1291 }
1292
1293 let results = window_aggr_inner::<A>(
1294 args,
1295 &order_by_rows,
1296 wrapped_aggregate,
1297 order_by,
1298 window_frame,
1299 callers_temp_storage,
1300 );
1301
1302 callers_temp_storage.reserve(results.len());
1303 results
1304 .into_iter()
1305 .zip_eq(original_rows)
1306 .map(|(result_value, original_row)| {
1307 callers_temp_storage.make_datum(|packer| {
1308 packer.push_list_with(|packer| {
1309 packer.push(result_value);
1310 packer.push(original_row);
1311 });
1312 })
1313 })
1314}
1315
1316fn window_aggr_inner<'a, A>(
1317 mut args: Vec<Datum<'a>>,
1318 order_by_rows: &Vec<Row>,
1319 wrapped_aggregate: &AggregateFunc,
1320 order_by: &[ColumnOrder],
1321 window_frame: &WindowFrame,
1322 temp_storage: &'a RowArena,
1323) -> Vec<Datum<'a>>
1324where
1325 A: OneByOneAggr,
1326{
1327 let length = args.len();
1328 let mut result: Vec<Datum> = Vec::with_capacity(length);
1329
1330 soft_assert_or_log!(
1336 !((matches!(window_frame.units, WindowFrameUnits::Groups)
1337 || matches!(window_frame.units, WindowFrameUnits::Range))
1338 && !window_frame.includes_current_row()),
1339 "window frame without current row"
1340 );
1341
1342 if (matches!(
1343 window_frame.start_bound,
1344 WindowFrameBound::UnboundedPreceding
1345 ) && matches!(window_frame.end_bound, WindowFrameBound::UnboundedFollowing))
1346 || (order_by.is_empty()
1347 && (matches!(window_frame.units, WindowFrameUnits::Groups)
1348 || matches!(window_frame.units, WindowFrameUnits::Range))
1349 && window_frame.includes_current_row())
1350 {
1351 let result_value = wrapped_aggregate.eval(args, temp_storage);
1358 for _ in 0..length {
1360 result.push(result_value);
1361 }
1362 } else {
1363 fn rows_between_unbounded_preceding_and_current_row<'a, A>(
1364 args: Vec<Datum<'a>>,
1365 result: &mut Vec<Datum<'a>>,
1366 mut one_by_one_aggr: A,
1367 temp_storage: &'a RowArena,
1368 ) where
1369 A: OneByOneAggr,
1370 {
1371 for current_arg in args.into_iter() {
1372 one_by_one_aggr.give(¤t_arg);
1373 let result_value = one_by_one_aggr.get_current_aggregate(temp_storage);
1374 result.push(result_value);
1375 }
1376 }
1377
1378 fn groups_between_unbounded_preceding_and_current_row<'a, A>(
1379 args: Vec<Datum<'a>>,
1380 order_by_rows: &Vec<Row>,
1381 result: &mut Vec<Datum<'a>>,
1382 mut one_by_one_aggr: A,
1383 temp_storage: &'a RowArena,
1384 ) where
1385 A: OneByOneAggr,
1386 {
1387 let mut peer_group_start = 0;
1388 while peer_group_start < args.len() {
1389 let mut peer_group_end = peer_group_start + 1;
1393 while peer_group_end < args.len()
1394 && order_by_rows[peer_group_start] == order_by_rows[peer_group_end]
1395 {
1396 peer_group_end += 1;
1398 }
1399 for current_arg in args[peer_group_start..peer_group_end].iter() {
1402 one_by_one_aggr.give(current_arg);
1403 }
1404 let agg_for_peer_group = one_by_one_aggr.get_current_aggregate(temp_storage);
1405 for _ in args[peer_group_start..peer_group_end].iter() {
1407 result.push(agg_for_peer_group);
1408 }
1409 peer_group_start = peer_group_end;
1411 }
1412 }
1413
1414 fn rows_between_offset_and_offset<'a>(
1415 args: Vec<Datum<'a>>,
1416 result: &mut Vec<Datum<'a>>,
1417 wrapped_aggregate: &AggregateFunc,
1418 temp_storage: &'a RowArena,
1419 offset_start: i64,
1420 offset_end: i64,
1421 ) {
1422 let len = args
1423 .len()
1424 .to_i64()
1425 .expect("window partition's len should fit into i64");
1426 for i in 0..len {
1427 let i = i.to_i64().expect("window partition shouldn't be super big");
1428 let frame_start = max(i + offset_start, 0)
1431 .to_usize()
1432 .expect("The max made sure it's not negative");
1433 let frame_end = min(i + offset_end, len - 1).to_usize();
1436 match frame_end {
1437 Some(frame_end) => {
1438 if frame_start <= frame_end {
1439 let frame_values = args[frame_start..=frame_end].iter().cloned();
1450 let result_value = wrapped_aggregate.eval(frame_values, temp_storage);
1451 result.push(result_value);
1452 } else {
1453 let result_value = wrapped_aggregate.default();
1455 result.push(result_value);
1456 }
1457 }
1458 None => {
1459 let result_value = wrapped_aggregate.default();
1461 result.push(result_value);
1462 }
1463 }
1464 }
1465 }
1466
1467 match (
1468 &window_frame.units,
1469 &window_frame.start_bound,
1470 &window_frame.end_bound,
1471 ) {
1472 (Rows, UnboundedPreceding, CurrentRow) => {
1477 rows_between_unbounded_preceding_and_current_row::<A>(
1478 args,
1479 &mut result,
1480 A::new(wrapped_aggregate, false),
1481 temp_storage,
1482 );
1483 }
1484 (Rows, CurrentRow, UnboundedFollowing) => {
1485 args.reverse();
1487 rows_between_unbounded_preceding_and_current_row::<A>(
1488 args,
1489 &mut result,
1490 A::new(wrapped_aggregate, true),
1491 temp_storage,
1492 );
1493 result.reverse();
1494 }
1495 (Range, UnboundedPreceding, CurrentRow) => {
1496 groups_between_unbounded_preceding_and_current_row::<A>(
1499 args,
1500 order_by_rows,
1501 &mut result,
1502 A::new(wrapped_aggregate, false),
1503 temp_storage,
1504 );
1505 }
1506 (Rows, OffsetPreceding(start_prec), OffsetPreceding(end_prec)) => {
1510 let start_prec = start_prec.to_i64().expect(
1511 "window frame start OFFSET shouldn't be super big (the planning ensured this)",
1512 );
1513 let end_prec = end_prec.to_i64().expect(
1514 "window frame end OFFSET shouldn't be super big (the planning ensured this)",
1515 );
1516 rows_between_offset_and_offset(
1517 args,
1518 &mut result,
1519 wrapped_aggregate,
1520 temp_storage,
1521 -start_prec,
1522 -end_prec,
1523 );
1524 }
1525 (Rows, OffsetPreceding(start_prec), OffsetFollowing(end_fol)) => {
1526 let start_prec = start_prec.to_i64().expect(
1527 "window frame start OFFSET shouldn't be super big (the planning ensured this)",
1528 );
1529 let end_fol = end_fol.to_i64().expect(
1530 "window frame end OFFSET shouldn't be super big (the planning ensured this)",
1531 );
1532 rows_between_offset_and_offset(
1533 args,
1534 &mut result,
1535 wrapped_aggregate,
1536 temp_storage,
1537 -start_prec,
1538 end_fol,
1539 );
1540 }
1541 (Rows, OffsetFollowing(start_fol), OffsetFollowing(end_fol)) => {
1542 let start_fol = start_fol.to_i64().expect(
1543 "window frame start OFFSET shouldn't be super big (the planning ensured this)",
1544 );
1545 let end_fol = end_fol.to_i64().expect(
1546 "window frame end OFFSET shouldn't be super big (the planning ensured this)",
1547 );
1548 rows_between_offset_and_offset(
1549 args,
1550 &mut result,
1551 wrapped_aggregate,
1552 temp_storage,
1553 start_fol,
1554 end_fol,
1555 );
1556 }
1557 (Rows, OffsetFollowing(_), OffsetPreceding(_)) => {
1558 unreachable!() }
1560 (Rows, OffsetPreceding(start_prec), CurrentRow) => {
1561 let start_prec = start_prec.to_i64().expect(
1562 "window frame start OFFSET shouldn't be super big (the planning ensured this)",
1563 );
1564 let end_fol = 0;
1565 rows_between_offset_and_offset(
1566 args,
1567 &mut result,
1568 wrapped_aggregate,
1569 temp_storage,
1570 -start_prec,
1571 end_fol,
1572 );
1573 }
1574 (Rows, CurrentRow, OffsetFollowing(end_fol)) => {
1575 let start_fol = 0;
1576 let end_fol = end_fol.to_i64().expect(
1577 "window frame end OFFSET shouldn't be super big (the planning ensured this)",
1578 );
1579 rows_between_offset_and_offset(
1580 args,
1581 &mut result,
1582 wrapped_aggregate,
1583 temp_storage,
1584 start_fol,
1585 end_fol,
1586 );
1587 }
1588 (Rows, CurrentRow, CurrentRow) => {
1589 let start_fol = 0;
1592 let end_fol = 0;
1593 rows_between_offset_and_offset(
1594 args,
1595 &mut result,
1596 wrapped_aggregate,
1597 temp_storage,
1598 start_fol,
1599 end_fol,
1600 );
1601 }
1602 (Rows, CurrentRow, OffsetPreceding(_))
1603 | (Rows, UnboundedFollowing, _)
1604 | (Rows, _, UnboundedPreceding)
1605 | (Rows, OffsetFollowing(..), CurrentRow) => {
1606 unreachable!() }
1608 (Rows, UnboundedPreceding, UnboundedFollowing) => {
1609 unreachable!()
1612 }
1613 (Rows, UnboundedPreceding, OffsetPreceding(_))
1614 | (Rows, UnboundedPreceding, OffsetFollowing(_))
1615 | (Rows, OffsetPreceding(..), UnboundedFollowing)
1616 | (Rows, OffsetFollowing(..), UnboundedFollowing) => {
1617 unreachable!()
1620 }
1621 (Range, _, _) => {
1622 unreachable!()
1629 }
1630 (Groups, _, _) => {
1631 unreachable!()
1635 }
1636 }
1637 }
1638
1639 result
1640}
1641
1642fn fused_window_aggr<'a, I, A>(
1646 input_datums: I,
1647 callers_temp_storage: &'a RowArena,
1648 wrapped_aggregates: &Vec<AggregateFunc>,
1649 order_by: &Vec<ColumnOrder>,
1650 window_frame: &WindowFrame,
1651) -> Datum<'a>
1652where
1653 I: IntoIterator<Item = Datum<'a>>,
1654 A: OneByOneAggr,
1655{
1656 let temp_storage = RowArena::new();
1657 let iter = fused_window_aggr_no_list::<_, A>(
1658 input_datums,
1659 &temp_storage,
1660 wrapped_aggregates,
1661 order_by,
1662 window_frame,
1663 );
1664 callers_temp_storage.make_datum(|packer| {
1665 packer.push_list(iter);
1666 })
1667}
1668
1669fn fused_window_aggr_no_list<'a: 'b, 'b, I, A>(
1672 input_datums: I,
1673 callers_temp_storage: &'b RowArena,
1674 wrapped_aggregates: &Vec<AggregateFunc>,
1675 order_by: &Vec<ColumnOrder>,
1676 window_frame: &WindowFrame,
1677) -> impl Iterator<Item = Datum<'b>>
1678where
1679 I: IntoIterator<Item = Datum<'a>>,
1680 A: OneByOneAggr,
1681{
1682 let datums = order_aggregate_datums_with_rank(input_datums, order_by);
1685
1686 let size_hint = datums.size_hint().0;
1687 let mut argss = vec![Vec::with_capacity(size_hint); wrapped_aggregates.len()];
1688 let mut original_rows = Vec::with_capacity(size_hint);
1689 let mut order_by_rows = Vec::with_capacity(size_hint);
1690 for (d, order_by_row) in datums {
1691 let mut iter = d.unwrap_list().iter();
1692 let original_row = iter.next().unwrap();
1693 original_rows.push(original_row);
1694 let args_iter = iter.next().unwrap().unwrap_list().iter();
1695 for (args, arg) in argss.iter_mut().zip_eq(args_iter) {
1697 args.push(arg);
1698 }
1699 order_by_rows.push(order_by_row);
1700 }
1701
1702 let mut results_per_row =
1703 vec![Vec::with_capacity(wrapped_aggregates.len()); original_rows.len()];
1704 for (wrapped_aggr, args) in wrapped_aggregates.iter().zip_eq(argss) {
1705 let results = window_aggr_inner::<A>(
1706 args,
1707 &order_by_rows,
1708 wrapped_aggr,
1709 order_by,
1710 window_frame,
1711 callers_temp_storage,
1712 );
1713 for (results, result) in results_per_row.iter_mut().zip_eq(results) {
1714 results.push(result);
1715 }
1716 }
1717
1718 callers_temp_storage.reserve(2 * original_rows.len());
1719 results_per_row
1720 .into_iter()
1721 .enumerate()
1722 .map(move |(i, results)| {
1723 callers_temp_storage.make_datum(|packer| {
1724 packer.push_list_with(|packer| {
1725 packer
1726 .push(callers_temp_storage.make_datum(|packer| packer.push_list(results)));
1727 packer.push(original_rows[i]);
1728 });
1729 })
1730 })
1731}
1732
1733pub trait OneByOneAggr {
1737 fn new(agg: &AggregateFunc, reverse: bool) -> Self;
1742 fn give(&mut self, d: &Datum);
1744 fn get_current_aggregate<'a>(&self, temp_storage: &'a RowArena) -> Datum<'a>;
1746}
1747
1748#[derive(Debug)]
1754pub struct NaiveOneByOneAggr {
1755 agg: AggregateFunc,
1756 input: Vec<Row>,
1757 reverse: bool,
1758}
1759
1760impl OneByOneAggr for NaiveOneByOneAggr {
1761 fn new(agg: &AggregateFunc, reverse: bool) -> Self {
1762 NaiveOneByOneAggr {
1763 agg: agg.clone(),
1764 input: Vec::new(),
1765 reverse,
1766 }
1767 }
1768
1769 fn give(&mut self, d: &Datum) {
1770 let mut row = Row::default();
1771 row.packer().push(d);
1772 self.input.push(row);
1773 }
1774
1775 fn get_current_aggregate<'a>(&self, temp_storage: &'a RowArena) -> Datum<'a> {
1776 temp_storage.make_datum(|packer| {
1777 packer.push(if !self.reverse {
1778 self.agg
1779 .eval(self.input.iter().map(|r| r.unpack_first()), temp_storage)
1780 } else {
1781 self.agg.eval(
1782 self.input.iter().rev().map(|r| r.unpack_first()),
1783 temp_storage,
1784 )
1785 });
1786 })
1787 }
1788}
1789
1790#[derive(
1793 Clone,
1794 Debug,
1795 Eq,
1796 PartialEq,
1797 Ord,
1798 PartialOrd,
1799 Serialize,
1800 Deserialize,
1801 Hash,
1802 MzReflect
1803)]
1804pub enum LagLeadType {
1805 Lag,
1806 Lead,
1807}
1808
1809#[derive(
1810 Clone,
1811 Debug,
1812 Eq,
1813 PartialEq,
1814 Ord,
1815 PartialOrd,
1816 Serialize,
1817 Deserialize,
1818 Hash,
1819 MzReflect
1820)]
1821pub enum AggregateFunc {
1822 MaxNumeric,
1823 MaxInt16,
1824 MaxInt32,
1825 MaxInt64,
1826 MaxUInt16,
1827 MaxUInt32,
1828 MaxUInt64,
1829 MaxMzTimestamp,
1830 MaxFloat32,
1831 MaxFloat64,
1832 MaxBool,
1833 MaxString,
1834 MaxDate,
1835 MaxTimestamp,
1836 MaxTimestampTz,
1837 MaxInterval,
1838 MaxTime,
1839 MinNumeric,
1840 MinInt16,
1841 MinInt32,
1842 MinInt64,
1843 MinUInt16,
1844 MinUInt32,
1845 MinUInt64,
1846 MinMzTimestamp,
1847 MinFloat32,
1848 MinFloat64,
1849 MinBool,
1850 MinString,
1851 MinDate,
1852 MinTimestamp,
1853 MinTimestampTz,
1854 MinInterval,
1855 MinTime,
1856 SumInt16,
1857 SumInt32,
1858 SumInt64,
1859 SumUInt16,
1860 SumUInt32,
1861 SumUInt64,
1862 SumFloat32,
1863 SumFloat64,
1864 SumNumeric,
1865 Count,
1866 Any,
1867 All,
1868 JsonbAgg {
1875 order_by: Vec<ColumnOrder>,
1876 },
1877 JsonbObjectAgg {
1884 order_by: Vec<ColumnOrder>,
1885 },
1886 MapAgg {
1890 order_by: Vec<ColumnOrder>,
1891 value_type: SqlScalarType,
1892 },
1893 ArrayConcat {
1896 order_by: Vec<ColumnOrder>,
1897 },
1898 ListConcat {
1901 order_by: Vec<ColumnOrder>,
1902 },
1903 StringAgg {
1904 order_by: Vec<ColumnOrder>,
1905 },
1906 RowNumber {
1907 order_by: Vec<ColumnOrder>,
1908 },
1909 Rank {
1910 order_by: Vec<ColumnOrder>,
1911 },
1912 DenseRank {
1913 order_by: Vec<ColumnOrder>,
1914 },
1915 LagLead {
1916 order_by: Vec<ColumnOrder>,
1917 lag_lead: LagLeadType,
1918 ignore_nulls: bool,
1919 },
1920 FirstValue {
1921 order_by: Vec<ColumnOrder>,
1922 window_frame: WindowFrame,
1923 },
1924 LastValue {
1925 order_by: Vec<ColumnOrder>,
1926 window_frame: WindowFrame,
1927 },
1928 FusedValueWindowFunc {
1930 funcs: Vec<AggregateFunc>,
1931 order_by: Vec<ColumnOrder>,
1934 },
1935 WindowAggregate {
1936 wrapped_aggregate: Box<AggregateFunc>,
1937 order_by: Vec<ColumnOrder>,
1938 window_frame: WindowFrame,
1939 },
1940 FusedWindowAggregate {
1941 wrapped_aggregates: Vec<AggregateFunc>,
1942 order_by: Vec<ColumnOrder>,
1943 window_frame: WindowFrame,
1944 },
1945 Dummy,
1950}
1951
1952impl AggregateFunc {
1953 pub fn eval<'a, I>(&self, datums: I, temp_storage: &'a RowArena) -> Datum<'a>
1954 where
1955 I: IntoIterator<Item = Datum<'a>>,
1956 {
1957 match self {
1958 AggregateFunc::MaxNumeric => {
1959 max_datum::<'a, I, OrderedDecimal<numeric::Numeric>>(datums)
1960 }
1961 AggregateFunc::MaxInt16 => max_datum::<'a, I, i16>(datums),
1962 AggregateFunc::MaxInt32 => max_datum::<'a, I, i32>(datums),
1963 AggregateFunc::MaxInt64 => max_datum::<'a, I, i64>(datums),
1964 AggregateFunc::MaxUInt16 => max_datum::<'a, I, u16>(datums),
1965 AggregateFunc::MaxUInt32 => max_datum::<'a, I, u32>(datums),
1966 AggregateFunc::MaxUInt64 => max_datum::<'a, I, u64>(datums),
1967 AggregateFunc::MaxMzTimestamp => max_datum::<'a, I, mz_repr::Timestamp>(datums),
1968 AggregateFunc::MaxFloat32 => max_datum::<'a, I, OrderedFloat<f32>>(datums),
1969 AggregateFunc::MaxFloat64 => max_datum::<'a, I, OrderedFloat<f64>>(datums),
1970 AggregateFunc::MaxBool => max_datum::<'a, I, bool>(datums),
1971 AggregateFunc::MaxString => max_string(datums),
1972 AggregateFunc::MaxDate => max_datum::<'a, I, Date>(datums),
1973 AggregateFunc::MaxTimestamp => {
1974 max_datum::<'a, I, CheckedTimestamp<NaiveDateTime>>(datums)
1975 }
1976 AggregateFunc::MaxTimestampTz => {
1977 max_datum::<'a, I, CheckedTimestamp<DateTime<Utc>>>(datums)
1978 }
1979 AggregateFunc::MaxInterval => max_datum::<'a, I, Interval>(datums),
1980 AggregateFunc::MaxTime => max_datum::<'a, I, NaiveTime>(datums),
1981 AggregateFunc::MinNumeric => {
1982 min_datum::<'a, I, OrderedDecimal<numeric::Numeric>>(datums)
1983 }
1984 AggregateFunc::MinInt16 => min_datum::<'a, I, i16>(datums),
1985 AggregateFunc::MinInt32 => min_datum::<'a, I, i32>(datums),
1986 AggregateFunc::MinInt64 => min_datum::<'a, I, i64>(datums),
1987 AggregateFunc::MinUInt16 => min_datum::<'a, I, u16>(datums),
1988 AggregateFunc::MinUInt32 => min_datum::<'a, I, u32>(datums),
1989 AggregateFunc::MinUInt64 => min_datum::<'a, I, u64>(datums),
1990 AggregateFunc::MinMzTimestamp => min_datum::<'a, I, mz_repr::Timestamp>(datums),
1991 AggregateFunc::MinFloat32 => min_datum::<'a, I, OrderedFloat<f32>>(datums),
1992 AggregateFunc::MinFloat64 => min_datum::<'a, I, OrderedFloat<f64>>(datums),
1993 AggregateFunc::MinBool => min_datum::<'a, I, bool>(datums),
1994 AggregateFunc::MinString => min_string(datums),
1995 AggregateFunc::MinDate => min_datum::<'a, I, Date>(datums),
1996 AggregateFunc::MinTimestamp => {
1997 min_datum::<'a, I, CheckedTimestamp<NaiveDateTime>>(datums)
1998 }
1999 AggregateFunc::MinTimestampTz => {
2000 min_datum::<'a, I, CheckedTimestamp<DateTime<Utc>>>(datums)
2001 }
2002 AggregateFunc::MinInterval => min_datum::<'a, I, Interval>(datums),
2003 AggregateFunc::MinTime => min_datum::<'a, I, NaiveTime>(datums),
2004 AggregateFunc::SumInt16 => sum_datum::<'a, I, i16, i64>(datums),
2005 AggregateFunc::SumInt32 => sum_datum::<'a, I, i32, i64>(datums),
2006 AggregateFunc::SumInt64 => sum_datum::<'a, I, i64, i128>(datums),
2007 AggregateFunc::SumUInt16 => sum_datum::<'a, I, u16, u64>(datums),
2008 AggregateFunc::SumUInt32 => sum_datum::<'a, I, u32, u64>(datums),
2009 AggregateFunc::SumUInt64 => sum_datum::<'a, I, u64, u128>(datums),
2010 AggregateFunc::SumFloat32 => sum_datum::<'a, I, f32, f32>(datums),
2011 AggregateFunc::SumFloat64 => sum_datum::<'a, I, f64, f64>(datums),
2012 AggregateFunc::SumNumeric => sum_numeric(datums),
2013 AggregateFunc::Count => count(datums),
2014 AggregateFunc::Any => any(datums),
2015 AggregateFunc::All => all(datums),
2016 AggregateFunc::JsonbAgg { order_by } => jsonb_agg(datums, temp_storage, order_by),
2017 AggregateFunc::MapAgg { order_by, .. } | AggregateFunc::JsonbObjectAgg { order_by } => {
2018 dict_agg(datums, temp_storage, order_by)
2019 }
2020 AggregateFunc::ArrayConcat { order_by } => array_concat(datums, temp_storage, order_by),
2021 AggregateFunc::ListConcat { order_by } => list_concat(datums, temp_storage, order_by),
2022 AggregateFunc::StringAgg { order_by } => string_agg(datums, temp_storage, order_by),
2023 AggregateFunc::RowNumber { order_by } => row_number(datums, temp_storage, order_by),
2024 AggregateFunc::Rank { order_by } => rank(datums, temp_storage, order_by),
2025 AggregateFunc::DenseRank { order_by } => dense_rank(datums, temp_storage, order_by),
2026 AggregateFunc::LagLead {
2027 order_by,
2028 lag_lead: lag_lead_type,
2029 ignore_nulls,
2030 } => lag_lead(datums, temp_storage, order_by, lag_lead_type, ignore_nulls),
2031 AggregateFunc::FirstValue {
2032 order_by,
2033 window_frame,
2034 } => first_value(datums, temp_storage, order_by, window_frame),
2035 AggregateFunc::LastValue {
2036 order_by,
2037 window_frame,
2038 } => last_value(datums, temp_storage, order_by, window_frame),
2039 AggregateFunc::WindowAggregate {
2040 wrapped_aggregate,
2041 order_by,
2042 window_frame,
2043 } => window_aggr::<_, NaiveOneByOneAggr>(
2044 datums,
2045 temp_storage,
2046 wrapped_aggregate,
2047 order_by,
2048 window_frame,
2049 ),
2050 AggregateFunc::FusedValueWindowFunc { funcs, order_by } => {
2051 fused_value_window_func(datums, temp_storage, funcs, order_by)
2052 }
2053 AggregateFunc::FusedWindowAggregate {
2054 wrapped_aggregates,
2055 order_by,
2056 window_frame,
2057 } => fused_window_aggr::<_, NaiveOneByOneAggr>(
2058 datums,
2059 temp_storage,
2060 wrapped_aggregates,
2061 order_by,
2062 window_frame,
2063 ),
2064 AggregateFunc::Dummy => Datum::Dummy,
2065 }
2066 }
2067
2068 pub fn eval_with_fast_window_agg<'a, I, W>(
2072 &self,
2073 datums: I,
2074 temp_storage: &'a RowArena,
2075 ) -> Datum<'a>
2076 where
2077 I: IntoIterator<Item = Datum<'a>>,
2078 W: OneByOneAggr,
2079 {
2080 match self {
2081 AggregateFunc::WindowAggregate {
2082 wrapped_aggregate,
2083 order_by,
2084 window_frame,
2085 } => window_aggr::<_, W>(
2086 datums,
2087 temp_storage,
2088 wrapped_aggregate,
2089 order_by,
2090 window_frame,
2091 ),
2092 AggregateFunc::FusedWindowAggregate {
2093 wrapped_aggregates,
2094 order_by,
2095 window_frame,
2096 } => fused_window_aggr::<_, W>(
2097 datums,
2098 temp_storage,
2099 wrapped_aggregates,
2100 order_by,
2101 window_frame,
2102 ),
2103 _ => self.eval(datums, temp_storage),
2104 }
2105 }
2106
2107 pub fn eval_with_unnest_list<'a, I, W>(
2108 &self,
2109 datums: I,
2110 temp_storage: &'a RowArena,
2111 ) -> impl Iterator<Item = Datum<'a>>
2112 where
2113 I: IntoIterator<Item = Datum<'a>>,
2114 W: OneByOneAggr,
2115 {
2116 assert!(self.can_fuse_with_unnest_list());
2118 match self {
2119 AggregateFunc::RowNumber { order_by } => {
2120 row_number_no_list(datums, temp_storage, order_by).collect_vec()
2121 }
2122 AggregateFunc::Rank { order_by } => {
2123 rank_no_list(datums, temp_storage, order_by).collect_vec()
2124 }
2125 AggregateFunc::DenseRank { order_by } => {
2126 dense_rank_no_list(datums, temp_storage, order_by).collect_vec()
2127 }
2128 AggregateFunc::LagLead {
2129 order_by,
2130 lag_lead: lag_lead_type,
2131 ignore_nulls,
2132 } => lag_lead_no_list(datums, temp_storage, order_by, lag_lead_type, ignore_nulls)
2133 .collect_vec(),
2134 AggregateFunc::FirstValue {
2135 order_by,
2136 window_frame,
2137 } => first_value_no_list(datums, temp_storage, order_by, window_frame).collect_vec(),
2138 AggregateFunc::LastValue {
2139 order_by,
2140 window_frame,
2141 } => last_value_no_list(datums, temp_storage, order_by, window_frame).collect_vec(),
2142 AggregateFunc::FusedValueWindowFunc { funcs, order_by } => {
2143 fused_value_window_func_no_list(datums, temp_storage, funcs, order_by).collect_vec()
2144 }
2145 AggregateFunc::WindowAggregate {
2146 wrapped_aggregate,
2147 order_by,
2148 window_frame,
2149 } => window_aggr_no_list::<_, W>(
2150 datums,
2151 temp_storage,
2152 wrapped_aggregate,
2153 order_by,
2154 window_frame,
2155 )
2156 .collect_vec(),
2157 AggregateFunc::FusedWindowAggregate {
2158 wrapped_aggregates,
2159 order_by,
2160 window_frame,
2161 } => fused_window_aggr_no_list::<_, W>(
2162 datums,
2163 temp_storage,
2164 wrapped_aggregates,
2165 order_by,
2166 window_frame,
2167 )
2168 .collect_vec(),
2169 _ => unreachable!("asserted above that `can_fuse_with_unnest_list`"),
2170 }
2171 .into_iter()
2172 }
2173
2174 pub fn default(&self) -> Datum<'static> {
2177 match self {
2178 AggregateFunc::Count => Datum::Int64(0),
2179 AggregateFunc::Any => Datum::False,
2180 AggregateFunc::All => Datum::True,
2181 AggregateFunc::Dummy => Datum::Dummy,
2182 _ => Datum::Null,
2183 }
2184 }
2185
2186 pub fn identity_datum(&self) -> Datum<'static> {
2189 match self {
2190 AggregateFunc::Any => Datum::False,
2191 AggregateFunc::All => Datum::True,
2192 AggregateFunc::Dummy => Datum::Dummy,
2193 AggregateFunc::ArrayConcat { .. } => Datum::empty_array(),
2194 AggregateFunc::ListConcat { .. } => Datum::empty_list(),
2195 AggregateFunc::RowNumber { .. }
2196 | AggregateFunc::Rank { .. }
2197 | AggregateFunc::DenseRank { .. }
2198 | AggregateFunc::LagLead { .. }
2199 | AggregateFunc::FirstValue { .. }
2200 | AggregateFunc::LastValue { .. }
2201 | AggregateFunc::WindowAggregate { .. }
2202 | AggregateFunc::FusedValueWindowFunc { .. }
2203 | AggregateFunc::FusedWindowAggregate { .. } => Datum::empty_list(),
2204 AggregateFunc::MaxNumeric
2205 | AggregateFunc::MaxInt16
2206 | AggregateFunc::MaxInt32
2207 | AggregateFunc::MaxInt64
2208 | AggregateFunc::MaxUInt16
2209 | AggregateFunc::MaxUInt32
2210 | AggregateFunc::MaxUInt64
2211 | AggregateFunc::MaxMzTimestamp
2212 | AggregateFunc::MaxFloat32
2213 | AggregateFunc::MaxFloat64
2214 | AggregateFunc::MaxBool
2215 | AggregateFunc::MaxString
2216 | AggregateFunc::MaxDate
2217 | AggregateFunc::MaxTimestamp
2218 | AggregateFunc::MaxTimestampTz
2219 | AggregateFunc::MaxInterval
2220 | AggregateFunc::MaxTime
2221 | AggregateFunc::MinNumeric
2222 | AggregateFunc::MinInt16
2223 | AggregateFunc::MinInt32
2224 | AggregateFunc::MinInt64
2225 | AggregateFunc::MinUInt16
2226 | AggregateFunc::MinUInt32
2227 | AggregateFunc::MinUInt64
2228 | AggregateFunc::MinMzTimestamp
2229 | AggregateFunc::MinFloat32
2230 | AggregateFunc::MinFloat64
2231 | AggregateFunc::MinBool
2232 | AggregateFunc::MinString
2233 | AggregateFunc::MinDate
2234 | AggregateFunc::MinTimestamp
2235 | AggregateFunc::MinTimestampTz
2236 | AggregateFunc::MinInterval
2237 | AggregateFunc::MinTime
2238 | AggregateFunc::SumInt16
2239 | AggregateFunc::SumInt32
2240 | AggregateFunc::SumInt64
2241 | AggregateFunc::SumUInt16
2242 | AggregateFunc::SumUInt32
2243 | AggregateFunc::SumUInt64
2244 | AggregateFunc::SumFloat32
2245 | AggregateFunc::SumFloat64
2246 | AggregateFunc::SumNumeric
2247 | AggregateFunc::Count
2248 | AggregateFunc::JsonbAgg { .. }
2249 | AggregateFunc::JsonbObjectAgg { .. }
2250 | AggregateFunc::MapAgg { .. }
2251 | AggregateFunc::StringAgg { .. } => Datum::Null,
2252 }
2253 }
2254
2255 pub fn can_fuse_with_unnest_list(&self) -> bool {
2256 match self {
2257 AggregateFunc::RowNumber { .. }
2258 | AggregateFunc::Rank { .. }
2259 | AggregateFunc::DenseRank { .. }
2260 | AggregateFunc::LagLead { .. }
2261 | AggregateFunc::FirstValue { .. }
2262 | AggregateFunc::LastValue { .. }
2263 | AggregateFunc::WindowAggregate { .. }
2264 | AggregateFunc::FusedValueWindowFunc { .. }
2265 | AggregateFunc::FusedWindowAggregate { .. } => true,
2266 AggregateFunc::ArrayConcat { .. }
2267 | AggregateFunc::ListConcat { .. }
2268 | AggregateFunc::Any
2269 | AggregateFunc::All
2270 | AggregateFunc::Dummy
2271 | AggregateFunc::MaxNumeric
2272 | AggregateFunc::MaxInt16
2273 | AggregateFunc::MaxInt32
2274 | AggregateFunc::MaxInt64
2275 | AggregateFunc::MaxUInt16
2276 | AggregateFunc::MaxUInt32
2277 | AggregateFunc::MaxUInt64
2278 | AggregateFunc::MaxMzTimestamp
2279 | AggregateFunc::MaxFloat32
2280 | AggregateFunc::MaxFloat64
2281 | AggregateFunc::MaxBool
2282 | AggregateFunc::MaxString
2283 | AggregateFunc::MaxDate
2284 | AggregateFunc::MaxTimestamp
2285 | AggregateFunc::MaxTimestampTz
2286 | AggregateFunc::MaxInterval
2287 | AggregateFunc::MaxTime
2288 | AggregateFunc::MinNumeric
2289 | AggregateFunc::MinInt16
2290 | AggregateFunc::MinInt32
2291 | AggregateFunc::MinInt64
2292 | AggregateFunc::MinUInt16
2293 | AggregateFunc::MinUInt32
2294 | AggregateFunc::MinUInt64
2295 | AggregateFunc::MinMzTimestamp
2296 | AggregateFunc::MinFloat32
2297 | AggregateFunc::MinFloat64
2298 | AggregateFunc::MinBool
2299 | AggregateFunc::MinString
2300 | AggregateFunc::MinDate
2301 | AggregateFunc::MinTimestamp
2302 | AggregateFunc::MinTimestampTz
2303 | AggregateFunc::MinInterval
2304 | AggregateFunc::MinTime
2305 | AggregateFunc::SumInt16
2306 | AggregateFunc::SumInt32
2307 | AggregateFunc::SumInt64
2308 | AggregateFunc::SumUInt16
2309 | AggregateFunc::SumUInt32
2310 | AggregateFunc::SumUInt64
2311 | AggregateFunc::SumFloat32
2312 | AggregateFunc::SumFloat64
2313 | AggregateFunc::SumNumeric
2314 | AggregateFunc::Count
2315 | AggregateFunc::JsonbAgg { .. }
2316 | AggregateFunc::JsonbObjectAgg { .. }
2317 | AggregateFunc::MapAgg { .. }
2318 | AggregateFunc::StringAgg { .. } => false,
2319 }
2320 }
2321
2322 pub fn output_sql_type(&self, input_type: SqlColumnType) -> SqlColumnType {
2328 let scalar_type = match self {
2329 AggregateFunc::Count => SqlScalarType::Int64,
2330 AggregateFunc::Any => SqlScalarType::Bool,
2331 AggregateFunc::All => SqlScalarType::Bool,
2332 AggregateFunc::JsonbAgg { .. } => SqlScalarType::Jsonb,
2333 AggregateFunc::JsonbObjectAgg { .. } => SqlScalarType::Jsonb,
2334 AggregateFunc::SumInt16 => SqlScalarType::Int64,
2335 AggregateFunc::SumInt32 => SqlScalarType::Int64,
2336 AggregateFunc::SumInt64 => SqlScalarType::Numeric {
2337 max_scale: Some(NumericMaxScale::ZERO),
2338 },
2339 AggregateFunc::SumUInt16 => SqlScalarType::UInt64,
2340 AggregateFunc::SumUInt32 => SqlScalarType::UInt64,
2341 AggregateFunc::SumUInt64 => SqlScalarType::Numeric {
2342 max_scale: Some(NumericMaxScale::ZERO),
2343 },
2344 AggregateFunc::MapAgg { value_type, .. } => SqlScalarType::Map {
2345 value_type: Box::new(value_type.clone()),
2346 custom_id: None,
2347 },
2348 AggregateFunc::ArrayConcat { .. } | AggregateFunc::ListConcat { .. } => {
2349 match input_type.scalar_type {
2350 SqlScalarType::Record { ref fields, .. } => fields[0].1.scalar_type.clone(),
2352 _ => unreachable!(),
2353 }
2354 }
2355 AggregateFunc::StringAgg { .. } => SqlScalarType::String,
2356 AggregateFunc::RowNumber { .. } => {
2357 AggregateFunc::output_type_ranking_window_funcs(&input_type, "?row_number?")
2358 }
2359 AggregateFunc::Rank { .. } => {
2360 AggregateFunc::output_type_ranking_window_funcs(&input_type, "?rank?")
2361 }
2362 AggregateFunc::DenseRank { .. } => {
2363 AggregateFunc::output_type_ranking_window_funcs(&input_type, "?dense_rank?")
2364 }
2365 AggregateFunc::LagLead { lag_lead: lag_lead_type, .. } => {
2366 let fields = input_type.scalar_type.unwrap_record_element_type();
2368 let original_row_type = fields[0].unwrap_record_element_type()[0]
2369 .clone()
2370 .nullable(false);
2371 let encoded_args = fields[0].unwrap_record_element_type()[1];
2372 let output_type_inner =
2373 Self::lag_lead_output_type_inner_from_encoded_args(encoded_args);
2374 let column_name = Self::lag_lead_result_column_name(lag_lead_type);
2375
2376 SqlScalarType::List {
2377 element_type: Box::new(SqlScalarType::Record {
2378 fields: [
2379 (column_name, output_type_inner),
2380 (ColumnName::from("?orig_row?"), original_row_type),
2381 ].into(),
2382 custom_id: None,
2383 }),
2384 custom_id: None,
2385 }
2386 }
2387 AggregateFunc::FirstValue { .. } => {
2388 let fields = input_type.scalar_type.unwrap_record_element_type();
2390 let original_row_type = fields[0].unwrap_record_element_type()[0]
2391 .clone()
2392 .nullable(false);
2393 let value_type = fields[0].unwrap_record_element_type()[1]
2394 .clone()
2395 .nullable(true); SqlScalarType::List {
2398 element_type: Box::new(SqlScalarType::Record {
2399 fields: [
2400 (ColumnName::from("?first_value?"), value_type),
2401 (ColumnName::from("?orig_row?"), original_row_type),
2402 ].into(),
2403 custom_id: None,
2404 }),
2405 custom_id: None,
2406 }
2407 }
2408 AggregateFunc::LastValue { .. } => {
2409 let fields = input_type.scalar_type.unwrap_record_element_type();
2411 let original_row_type = fields[0].unwrap_record_element_type()[0]
2412 .clone()
2413 .nullable(false);
2414 let value_type = fields[0].unwrap_record_element_type()[1]
2415 .clone()
2416 .nullable(true); SqlScalarType::List {
2419 element_type: Box::new(SqlScalarType::Record {
2420 fields: [
2421 (ColumnName::from("?last_value?"), value_type),
2422 (ColumnName::from("?orig_row?"), original_row_type),
2423 ].into(),
2424 custom_id: None,
2425 }),
2426 custom_id: None,
2427 }
2428 }
2429 AggregateFunc::WindowAggregate {
2430 wrapped_aggregate, ..
2431 } => {
2432 let fields = input_type.scalar_type.unwrap_record_element_type();
2434 let original_row_type = fields[0].unwrap_record_element_type()[0]
2435 .clone()
2436 .nullable(false);
2437 let arg_type = fields[0].unwrap_record_element_type()[1]
2438 .clone()
2439 .nullable(true);
2440 let wrapped_aggr_out_type = wrapped_aggregate.output_sql_type(arg_type);
2441
2442 SqlScalarType::List {
2443 element_type: Box::new(SqlScalarType::Record {
2444 fields: [
2445 (ColumnName::from("?window_agg?"), wrapped_aggr_out_type),
2446 (ColumnName::from("?orig_row?"), original_row_type),
2447 ].into(),
2448 custom_id: None,
2449 }),
2450 custom_id: None,
2451 }
2452 }
2453 AggregateFunc::FusedWindowAggregate {
2454 wrapped_aggregates, ..
2455 } => {
2456 let fields = input_type.scalar_type.unwrap_record_element_type();
2459 let original_row_type = fields[0].unwrap_record_element_type()[0]
2460 .clone()
2461 .nullable(false);
2462 let args_type = fields[0].unwrap_record_element_type()[1];
2463 let arg_types = args_type.unwrap_record_element_type();
2464 let out_fields = arg_types.iter().zip_eq(wrapped_aggregates).map(
2465 |(arg_type, wrapped_agg)| {
2466 (
2467 ColumnName::from(wrapped_agg.name()),
2468 wrapped_agg.output_sql_type((**arg_type).clone().nullable(true)),
2469 )
2470 }).collect_vec();
2471
2472 SqlScalarType::List {
2473 element_type: Box::new(SqlScalarType::Record {
2474 fields: [
2475 (ColumnName::from("?fused_window_agg?"), SqlScalarType::Record {
2476 fields: out_fields.into(),
2477 custom_id: None,
2478 }.nullable(false)),
2479 (ColumnName::from("?orig_row?"), original_row_type),
2480 ].into(),
2481 custom_id: None,
2482 }),
2483 custom_id: None,
2484 }
2485 }
2486 AggregateFunc::FusedValueWindowFunc { funcs, order_by: _ } => {
2487 let fields = input_type.scalar_type.unwrap_record_element_type();
2492 let original_row_type = fields[0].unwrap_record_element_type()[0]
2493 .clone()
2494 .nullable(false);
2495 let encoded_args_type = fields[0]
2496 .unwrap_record_element_type()[1]
2497 .unwrap_record_element_type();
2498
2499 SqlScalarType::List {
2500 element_type: Box::new(SqlScalarType::Record {
2501 fields: [
2502 (
2503 ColumnName::from("?fused_value_window_func?"),
2504 SqlScalarType::Record {
2505 fields: encoded_args_type.into_iter().zip_eq(funcs).map(
2506 |(arg_type, func)| {
2507 match func {
2508 AggregateFunc::LagLead {
2509 lag_lead: lag_lead_type, ..
2510 } => {
2511 let name = Self::lag_lead_result_column_name(
2512 lag_lead_type,
2513 );
2514 let ty = Self
2515 ::lag_lead_output_type_inner_from_encoded_args(
2516 arg_type,
2517 );
2518 (name, ty)
2519 },
2520 AggregateFunc::FirstValue { .. } => {
2521 (
2522 ColumnName::from("?first_value?"),
2523 arg_type.clone().nullable(true),
2524 )
2525 }
2526 AggregateFunc::LastValue { .. } => {
2527 (
2528 ColumnName::from("?last_value?"),
2529 arg_type.clone().nullable(true),
2530 )
2531 }
2532 _ => panic!("FusedValueWindowFunc has an unknown function"),
2533 }
2534 }).collect(),
2535 custom_id: None,
2536 }.nullable(false)),
2537 (ColumnName::from("?orig_row?"), original_row_type),
2538 ].into(),
2539 custom_id: None,
2540 }),
2541 custom_id: None,
2542 }
2543 }
2544 AggregateFunc::Dummy
2545 | AggregateFunc::MaxNumeric
2546 | AggregateFunc::MaxInt16
2547 | AggregateFunc::MaxInt32
2548 | AggregateFunc::MaxInt64
2549 | AggregateFunc::MaxUInt16
2550 | AggregateFunc::MaxUInt32
2551 | AggregateFunc::MaxUInt64
2552 | AggregateFunc::MaxMzTimestamp
2553 | AggregateFunc::MaxFloat32
2554 | AggregateFunc::MaxFloat64
2555 | AggregateFunc::MaxBool
2556 | AggregateFunc::MaxString
2560 | AggregateFunc::MaxDate
2561 | AggregateFunc::MaxTimestamp
2562 | AggregateFunc::MaxTimestampTz
2563 | AggregateFunc::MaxInterval
2564 | AggregateFunc::MaxTime
2565 | AggregateFunc::MinNumeric
2566 | AggregateFunc::MinInt16
2567 | AggregateFunc::MinInt32
2568 | AggregateFunc::MinInt64
2569 | AggregateFunc::MinUInt16
2570 | AggregateFunc::MinUInt32
2571 | AggregateFunc::MinUInt64
2572 | AggregateFunc::MinMzTimestamp
2573 | AggregateFunc::MinFloat32
2574 | AggregateFunc::MinFloat64
2575 | AggregateFunc::MinBool
2576 | AggregateFunc::MinString
2577 | AggregateFunc::MinDate
2578 | AggregateFunc::MinTimestamp
2579 | AggregateFunc::MinTimestampTz
2580 | AggregateFunc::MinInterval
2581 | AggregateFunc::MinTime
2582 | AggregateFunc::SumFloat32
2583 | AggregateFunc::SumFloat64
2584 | AggregateFunc::SumNumeric => input_type.scalar_type.clone(),
2585 };
2586 let nullable = match self {
2589 AggregateFunc::Count => false,
2590 AggregateFunc::StringAgg { .. } => match input_type.scalar_type {
2592 SqlScalarType::Record { fields, .. } => match &fields[0].1.scalar_type {
2594 SqlScalarType::Record { fields, .. } => fields[0].1.nullable,
2596 _ => unreachable!(),
2597 },
2598 _ => unreachable!(),
2599 },
2600 _ => input_type.nullable,
2601 };
2602 scalar_type.nullable(nullable)
2603 }
2604
2605 pub fn output_type(&self, input_type: ReprColumnType) -> ReprColumnType {
2609 ReprColumnType::from(&self.output_sql_type(SqlColumnType::from_repr(&input_type)))
2610 }
2611
2612 fn output_type_ranking_window_funcs(
2614 input_type: &SqlColumnType,
2615 col_name: &str,
2616 ) -> SqlScalarType {
2617 match input_type.scalar_type {
2618 SqlScalarType::Record { ref fields, .. } => SqlScalarType::List {
2619 element_type: Box::new(SqlScalarType::Record {
2620 fields: [
2621 (
2622 ColumnName::from(col_name),
2623 SqlScalarType::Int64.nullable(false),
2624 ),
2625 (ColumnName::from("?orig_row?"), {
2626 let inner = match &fields[0].1.scalar_type {
2627 SqlScalarType::List { element_type, .. } => element_type.clone(),
2628 _ => unreachable!(),
2629 };
2630 inner.nullable(false)
2631 }),
2632 ]
2633 .into(),
2634 custom_id: None,
2635 }),
2636 custom_id: None,
2637 },
2638 _ => unreachable!(),
2639 }
2640 }
2641
2642 fn lag_lead_output_type_inner_from_encoded_args(
2646 encoded_args_type: &SqlScalarType,
2647 ) -> SqlColumnType {
2648 encoded_args_type.unwrap_record_element_type()[0]
2652 .clone()
2653 .nullable(true)
2654 }
2655
2656 fn lag_lead_result_column_name(lag_lead_type: &LagLeadType) -> ColumnName {
2657 ColumnName::from(match lag_lead_type {
2658 LagLeadType::Lag => "?lag?",
2659 LagLeadType::Lead => "?lead?",
2660 })
2661 }
2662
2663 pub fn propagates_nonnull_constraint(&self) -> bool {
2668 match self {
2669 AggregateFunc::MaxNumeric
2670 | AggregateFunc::MaxInt16
2671 | AggregateFunc::MaxInt32
2672 | AggregateFunc::MaxInt64
2673 | AggregateFunc::MaxUInt16
2674 | AggregateFunc::MaxUInt32
2675 | AggregateFunc::MaxUInt64
2676 | AggregateFunc::MaxMzTimestamp
2677 | AggregateFunc::MaxFloat32
2678 | AggregateFunc::MaxFloat64
2679 | AggregateFunc::MaxBool
2680 | AggregateFunc::MaxString
2681 | AggregateFunc::MaxDate
2682 | AggregateFunc::MaxTimestamp
2683 | AggregateFunc::MaxTimestampTz
2684 | AggregateFunc::MaxInterval
2685 | AggregateFunc::MaxTime
2686 | AggregateFunc::MinNumeric
2687 | AggregateFunc::MinInt16
2688 | AggregateFunc::MinInt32
2689 | AggregateFunc::MinInt64
2690 | AggregateFunc::MinUInt16
2691 | AggregateFunc::MinUInt32
2692 | AggregateFunc::MinUInt64
2693 | AggregateFunc::MinMzTimestamp
2694 | AggregateFunc::MinFloat32
2695 | AggregateFunc::MinFloat64
2696 | AggregateFunc::MinBool
2697 | AggregateFunc::MinString
2698 | AggregateFunc::MinDate
2699 | AggregateFunc::MinTimestamp
2700 | AggregateFunc::MinTimestampTz
2701 | AggregateFunc::MinInterval
2702 | AggregateFunc::MinTime
2703 | AggregateFunc::SumInt16
2704 | AggregateFunc::SumInt32
2705 | AggregateFunc::SumInt64
2706 | AggregateFunc::SumUInt16
2707 | AggregateFunc::SumUInt32
2708 | AggregateFunc::SumUInt64
2709 | AggregateFunc::SumFloat32
2710 | AggregateFunc::SumFloat64
2711 | AggregateFunc::SumNumeric
2712 | AggregateFunc::StringAgg { .. } => true,
2713 AggregateFunc::Count
2715 | AggregateFunc::Any
2716 | AggregateFunc::All
2717 | AggregateFunc::JsonbAgg { .. }
2718 | AggregateFunc::JsonbObjectAgg { .. }
2719 | AggregateFunc::MapAgg { .. }
2720 | AggregateFunc::ArrayConcat { .. }
2721 | AggregateFunc::ListConcat { .. }
2722 | AggregateFunc::RowNumber { .. }
2723 | AggregateFunc::Rank { .. }
2724 | AggregateFunc::DenseRank { .. }
2725 | AggregateFunc::LagLead { .. }
2726 | AggregateFunc::FirstValue { .. }
2727 | AggregateFunc::LastValue { .. }
2728 | AggregateFunc::FusedValueWindowFunc { .. }
2729 | AggregateFunc::WindowAggregate { .. }
2730 | AggregateFunc::FusedWindowAggregate { .. }
2731 | AggregateFunc::Dummy => false,
2732 }
2733 }
2734}
2735
2736fn jsonb_each<'a>(a: Datum<'a>) -> impl Iterator<Item = (Row, Diff)> + 'a {
2737 let map = match a {
2739 Datum::Map(dict) => dict,
2740 _ => mz_repr::DatumMap::empty(),
2741 };
2742
2743 map.iter()
2744 .map(move |(k, v)| (Row::pack_slice(&[Datum::String(k), v]), Diff::ONE))
2745}
2746
2747fn jsonb_each_stringify<'a>(
2748 a: Datum<'a>,
2749 temp_storage: &'a RowArena,
2750) -> impl Iterator<Item = (Row, Diff)> + 'a {
2751 let map = match a {
2753 Datum::Map(dict) => dict,
2754 _ => mz_repr::DatumMap::empty(),
2755 };
2756
2757 map.iter().map(move |(k, mut v)| {
2758 v = jsonb_stringify(v, temp_storage)
2759 .map(Datum::String)
2760 .unwrap_or(Datum::Null);
2761 (Row::pack_slice(&[Datum::String(k), v]), Diff::ONE)
2762 })
2763}
2764
2765fn jsonb_object_keys<'a>(a: Datum<'a>) -> impl Iterator<Item = (Row, Diff)> + 'a {
2766 let map = match a {
2767 Datum::Map(dict) => dict,
2768 _ => mz_repr::DatumMap::empty(),
2769 };
2770
2771 map.iter()
2772 .map(move |(k, _)| (Row::pack_slice(&[Datum::String(k)]), Diff::ONE))
2773}
2774
2775fn jsonb_array_elements<'a>(a: Datum<'a>) -> impl Iterator<Item = (Row, Diff)> + 'a {
2776 let list = match a {
2777 Datum::List(list) => list,
2778 _ => mz_repr::DatumList::empty(),
2779 };
2780 list.iter().map(move |e| (Row::pack_slice(&[e]), Diff::ONE))
2781}
2782
2783fn jsonb_array_elements_stringify<'a>(
2784 a: Datum<'a>,
2785 temp_storage: &'a RowArena,
2786) -> impl Iterator<Item = (Row, Diff)> + 'a {
2787 let list = match a {
2788 Datum::List(list) => list,
2789 _ => mz_repr::DatumList::empty(),
2790 };
2791 list.iter().map(move |mut e| {
2792 e = jsonb_stringify(e, temp_storage)
2793 .map(Datum::String)
2794 .unwrap_or(Datum::Null);
2795 (Row::pack_slice(&[e]), Diff::ONE)
2796 })
2797}
2798
2799fn regexp_extract(a: Datum, r: &AnalyzedRegex) -> Option<(Row, Diff)> {
2800 let r = r.inner();
2801 let a = a.unwrap_str();
2802 let captures = r.captures(a)?;
2803 let datums = captures
2804 .iter()
2805 .skip(1)
2806 .map(|m| Datum::from(m.map(|m| m.as_str())));
2807 Some((Row::pack(datums), Diff::ONE))
2808}
2809
2810fn regexp_matches<'a>(
2811 exprs: &[Datum<'a>],
2812) -> Result<impl Iterator<Item = (Row, Diff)> + 'a, EvalError> {
2813 assert!(exprs.len() == 2 || exprs.len() == 3);
2817 let a = exprs[0].unwrap_str();
2818 let r = exprs[1].unwrap_str();
2819
2820 let (regex, opts) = if exprs.len() == 3 {
2821 let flag = exprs[2].unwrap_str();
2822 let opts = AnalyzedRegexOpts::from_str(flag)?;
2823 (AnalyzedRegex::new(r, opts)?, opts)
2824 } else {
2825 let opts = AnalyzedRegexOpts::default();
2826 (AnalyzedRegex::new(r, opts)?, opts)
2827 };
2828
2829 let regex = regex.inner().clone();
2830
2831 let iter = regex.captures_iter(a).map(move |captures| {
2832 let matches = captures
2833 .iter()
2834 .skip(1)
2836 .map(|m| Datum::from(m.map(|m| m.as_str())))
2837 .collect::<Vec<_>>();
2838
2839 let mut binding = SharedRow::get();
2840 let mut packer = binding.packer();
2841
2842 let dimension = ArrayDimension {
2843 lower_bound: 1,
2844 length: matches.len(),
2845 };
2846 packer
2847 .try_push_array(&[dimension], matches)
2848 .expect("generated dimensions above");
2849
2850 (binding.clone(), Diff::ONE)
2851 });
2852
2853 let out = iter.collect::<SmallVec<[_; 3]>>();
2858
2859 if opts.global {
2860 Ok(Either::Left(out.into_iter()))
2861 } else {
2862 Ok(Either::Right(out.into_iter().take(1)))
2863 }
2864}
2865
2866fn generate_series<N>(
2867 start: N,
2868 stop: N,
2869 step: N,
2870) -> Result<impl Iterator<Item = (Row, Diff)>, EvalError>
2871where
2872 N: Integer + Signed + CheckedAdd + Clone,
2873 Datum<'static>: From<N>,
2874{
2875 if step == N::zero() {
2876 return Err(EvalError::InvalidParameterValue(
2877 "step size cannot equal zero".into(),
2878 ));
2879 }
2880 Ok(num::range_step_inclusive(start, stop, step)
2881 .map(move |i| (Row::pack_slice(&[Datum::from(i)]), Diff::ONE)))
2882}
2883
2884#[derive(Clone)]
2888pub struct TimestampRangeStepInclusive<T> {
2889 state: CheckedTimestamp<T>,
2890 stop: CheckedTimestamp<T>,
2891 step: Interval,
2892 rev: bool,
2893 done: bool,
2894}
2895
2896impl<T: TimestampLike> Iterator for TimestampRangeStepInclusive<T> {
2897 type Item = CheckedTimestamp<T>;
2898
2899 #[inline]
2900 fn next(&mut self) -> Option<CheckedTimestamp<T>> {
2901 if !self.done
2902 && ((self.rev && self.state >= self.stop) || (!self.rev && self.state <= self.stop))
2903 {
2904 let result = self.state.clone();
2905 match add_timestamp_months(self.state.deref(), self.step.months) {
2906 Ok(state) => match state.checked_add_signed(self.step.duration_as_chrono()) {
2907 Some(v) => match CheckedTimestamp::from_timestamplike(v) {
2908 Ok(v) => self.state = v,
2909 Err(_) => self.done = true,
2910 },
2911 None => self.done = true,
2912 },
2913 Err(..) => {
2914 self.done = true;
2915 }
2916 }
2917
2918 Some(result)
2919 } else {
2920 None
2921 }
2922 }
2923}
2924
2925fn generate_series_ts<T: TimestampLike>(
2926 start: CheckedTimestamp<T>,
2927 stop: CheckedTimestamp<T>,
2928 step: Interval,
2929 conv: fn(CheckedTimestamp<T>) -> Datum<'static>,
2930) -> Result<impl Iterator<Item = (Row, Diff)>, EvalError> {
2931 let normalized_step = step.as_microseconds();
2932 if normalized_step == 0 {
2933 return Err(EvalError::InvalidParameterValue(
2934 "step size cannot equal zero".into(),
2935 ));
2936 }
2937 let rev = normalized_step < 0;
2938
2939 let trsi = TimestampRangeStepInclusive {
2940 state: start,
2941 stop,
2942 step,
2943 rev,
2944 done: false,
2945 };
2946
2947 Ok(trsi.map(move |i| (Row::pack_slice(&[conv(i)]), Diff::ONE)))
2948}
2949
2950fn generate_subscripts_array(
2951 a: Datum,
2952 dim: i32,
2953) -> Result<Box<dyn Iterator<Item = (Row, Diff)>>, EvalError> {
2954 if dim <= 0 {
2955 return Ok(Box::new(iter::empty()));
2956 }
2957
2958 match a.unwrap_array().dims().into_iter().nth(
2959 (dim - 1)
2960 .try_into()
2961 .map_err(|_| EvalError::Int32OutOfRange((dim - 1).to_string().into()))?,
2962 ) {
2963 Some(requested_dim) => {
2964 let lower_bound: i32 = requested_dim.lower_bound.try_into().map_err(|_| {
2965 EvalError::Int32OutOfRange(requested_dim.lower_bound.to_string().into())
2966 })?;
2967 let length: i32 = requested_dim
2970 .length
2971 .try_into()
2972 .map_err(|_| EvalError::Int32OutOfRange(requested_dim.length.to_string().into()))?;
2973 let upper_bound = lower_bound.checked_add(length - 1).ok_or_else(|| {
2974 EvalError::Int32OutOfRange(requested_dim.length.to_string().into())
2975 })?;
2976 Ok(Box::new(generate_series::<i32>(
2977 lower_bound,
2978 upper_bound,
2979 1,
2980 )?))
2981 }
2982 None => Ok(Box::new(iter::empty())),
2983 }
2984}
2985
2986fn unnest_array<'a>(a: Datum<'a>) -> impl Iterator<Item = (Row, Diff)> + 'a {
2987 a.unwrap_array()
2988 .elements()
2989 .iter()
2990 .map(move |e| (Row::pack_slice(&[e]), Diff::ONE))
2991}
2992
2993fn unnest_list<'a>(a: Datum<'a>) -> impl Iterator<Item = (Row, Diff)> + 'a {
2994 a.unwrap_list()
2995 .iter()
2996 .map(move |e| (Row::pack_slice(&[e]), Diff::ONE))
2997}
2998
2999fn unnest_map<'a>(a: Datum<'a>) -> impl Iterator<Item = (Row, Diff)> + 'a {
3000 a.unwrap_map()
3001 .iter()
3002 .map(move |(k, v)| (Row::pack_slice(&[Datum::from(k), v]), Diff::ONE))
3003}
3004
3005impl AggregateFunc {
3006 pub fn name(&self) -> &'static str {
3009 match self {
3010 Self::MaxNumeric => "max",
3011 Self::MaxInt16 => "max",
3012 Self::MaxInt32 => "max",
3013 Self::MaxInt64 => "max",
3014 Self::MaxUInt16 => "max",
3015 Self::MaxUInt32 => "max",
3016 Self::MaxUInt64 => "max",
3017 Self::MaxMzTimestamp => "max",
3018 Self::MaxFloat32 => "max",
3019 Self::MaxFloat64 => "max",
3020 Self::MaxBool => "max",
3021 Self::MaxString => "max",
3022 Self::MaxDate => "max",
3023 Self::MaxTimestamp => "max",
3024 Self::MaxTimestampTz => "max",
3025 Self::MaxInterval => "max",
3026 Self::MaxTime => "max",
3027 Self::MinNumeric => "min",
3028 Self::MinInt16 => "min",
3029 Self::MinInt32 => "min",
3030 Self::MinInt64 => "min",
3031 Self::MinUInt16 => "min",
3032 Self::MinUInt32 => "min",
3033 Self::MinUInt64 => "min",
3034 Self::MinMzTimestamp => "min",
3035 Self::MinFloat32 => "min",
3036 Self::MinFloat64 => "min",
3037 Self::MinBool => "min",
3038 Self::MinString => "min",
3039 Self::MinDate => "min",
3040 Self::MinTimestamp => "min",
3041 Self::MinTimestampTz => "min",
3042 Self::MinInterval => "min",
3043 Self::MinTime => "min",
3044 Self::SumInt16 => "sum",
3045 Self::SumInt32 => "sum",
3046 Self::SumInt64 => "sum",
3047 Self::SumUInt16 => "sum",
3048 Self::SumUInt32 => "sum",
3049 Self::SumUInt64 => "sum",
3050 Self::SumFloat32 => "sum",
3051 Self::SumFloat64 => "sum",
3052 Self::SumNumeric => "sum",
3053 Self::Count => "count",
3054 Self::Any => "any",
3055 Self::All => "all",
3056 Self::JsonbAgg { .. } => "jsonb_agg",
3057 Self::JsonbObjectAgg { .. } => "jsonb_object_agg",
3058 Self::MapAgg { .. } => "map_agg",
3059 Self::ArrayConcat { .. } => "array_agg",
3060 Self::ListConcat { .. } => "list_agg",
3061 Self::StringAgg { .. } => "string_agg",
3062 Self::RowNumber { .. } => "row_number",
3063 Self::Rank { .. } => "rank",
3064 Self::DenseRank { .. } => "dense_rank",
3065 Self::LagLead {
3066 lag_lead: LagLeadType::Lag,
3067 ..
3068 } => "lag",
3069 Self::LagLead {
3070 lag_lead: LagLeadType::Lead,
3071 ..
3072 } => "lead",
3073 Self::FirstValue { .. } => "first_value",
3074 Self::LastValue { .. } => "last_value",
3075 Self::WindowAggregate { .. } => "window_agg",
3076 Self::FusedValueWindowFunc { .. } => "fused_value_window_func",
3077 Self::FusedWindowAggregate { .. } => "fused_window_agg",
3078 Self::Dummy => "dummy",
3079 }
3080 }
3081}
3082
3083impl<'a, M> fmt::Display for HumanizedExpr<'a, AggregateFunc, M>
3084where
3085 M: HumanizerMode,
3086{
3087 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
3088 use AggregateFunc::*;
3089 let name = self.expr.name();
3090 match self.expr {
3091 JsonbAgg { order_by }
3092 | JsonbObjectAgg { order_by }
3093 | MapAgg { order_by, .. }
3094 | ArrayConcat { order_by }
3095 | ListConcat { order_by }
3096 | StringAgg { order_by }
3097 | RowNumber { order_by }
3098 | Rank { order_by }
3099 | DenseRank { order_by } => {
3100 let order_by = order_by.iter().map(|col| self.child(col));
3101 write!(f, "{}[order_by=[{}]]", name, separated(", ", order_by))
3102 }
3103 LagLead {
3104 lag_lead: _,
3105 ignore_nulls,
3106 order_by,
3107 } => {
3108 let order_by = order_by.iter().map(|col| self.child(col));
3109 f.write_str(name)?;
3110 f.write_str("[")?;
3111 if *ignore_nulls {
3112 f.write_str("ignore_nulls=true, ")?;
3113 }
3114 write!(f, "order_by=[{}]", separated(", ", order_by))?;
3115 f.write_str("]")
3116 }
3117 FirstValue {
3118 order_by,
3119 window_frame,
3120 } => {
3121 let order_by = order_by.iter().map(|col| self.child(col));
3122 f.write_str(name)?;
3123 f.write_str("[")?;
3124 write!(f, "order_by=[{}]", separated(", ", order_by))?;
3125 if *window_frame != WindowFrame::default() {
3126 write!(f, " {}", window_frame)?;
3127 }
3128 f.write_str("]")
3129 }
3130 LastValue {
3131 order_by,
3132 window_frame,
3133 } => {
3134 let order_by = order_by.iter().map(|col| self.child(col));
3135 f.write_str(name)?;
3136 f.write_str("[")?;
3137 write!(f, "order_by=[{}]", separated(", ", order_by))?;
3138 if *window_frame != WindowFrame::default() {
3139 write!(f, " {}", window_frame)?;
3140 }
3141 f.write_str("]")
3142 }
3143 WindowAggregate {
3144 wrapped_aggregate,
3145 order_by,
3146 window_frame,
3147 } => {
3148 let order_by = order_by.iter().map(|col| self.child(col));
3149 let wrapped_aggregate = self.child(wrapped_aggregate.deref());
3150 f.write_str(name)?;
3151 f.write_str("[")?;
3152 write!(f, "{} ", wrapped_aggregate)?;
3153 write!(f, "order_by=[{}]", separated(", ", order_by))?;
3154 if *window_frame != WindowFrame::default() {
3155 write!(f, " {}", window_frame)?;
3156 }
3157 f.write_str("]")
3158 }
3159 FusedValueWindowFunc { funcs, order_by } => {
3160 let order_by = order_by.iter().map(|col| self.child(col));
3161 let funcs = separated(", ", funcs.iter().map(|func| self.child(func)));
3162 f.write_str(name)?;
3163 f.write_str("[")?;
3164 write!(f, "{} ", funcs)?;
3165 write!(f, "order_by=[{}]", separated(", ", order_by))?;
3166 f.write_str("]")
3167 }
3168 _ => f.write_str(name),
3169 }
3170 }
3171}
3172
3173#[derive(
3174 Clone,
3175 Debug,
3176 Eq,
3177 PartialEq,
3178 Ord,
3179 PartialOrd,
3180 Serialize,
3181 Deserialize,
3182 Hash,
3183 MzReflect
3184)]
3185pub struct CaptureGroupDesc {
3186 pub index: u32,
3187 pub name: Option<String>,
3188 pub nullable: bool,
3189}
3190
3191#[derive(
3192 Clone,
3193 Copy,
3194 Debug,
3195 Eq,
3196 PartialEq,
3197 Ord,
3198 PartialOrd,
3199 Serialize,
3200 Deserialize,
3201 Hash,
3202 MzReflect,
3203 Default
3204)]
3205pub struct AnalyzedRegexOpts {
3206 pub case_insensitive: bool,
3207 pub global: bool,
3208}
3209
3210impl FromStr for AnalyzedRegexOpts {
3211 type Err = EvalError;
3212
3213 fn from_str(s: &str) -> Result<Self, Self::Err> {
3214 let mut opts = AnalyzedRegexOpts::default();
3215 for c in s.chars() {
3216 match c {
3217 'i' => opts.case_insensitive = true,
3218 'g' => opts.global = true,
3219 _ => return Err(EvalError::InvalidRegexFlag(c)),
3220 }
3221 }
3222 Ok(opts)
3223 }
3224}
3225
3226#[derive(
3227 Clone,
3228 Debug,
3229 Eq,
3230 PartialEq,
3231 Ord,
3232 PartialOrd,
3233 Serialize,
3234 Deserialize,
3235 Hash,
3236 MzReflect
3237)]
3238pub struct AnalyzedRegex(ReprRegex, Vec<CaptureGroupDesc>, AnalyzedRegexOpts);
3239
3240impl AnalyzedRegex {
3241 pub fn new(s: &str, opts: AnalyzedRegexOpts) -> Result<Self, RegexCompilationError> {
3242 let r = ReprRegex::new(s, opts.case_insensitive)?;
3243 #[allow(clippy::as_conversions)]
3245 let descs: Vec<_> = r
3246 .capture_names()
3247 .enumerate()
3248 .skip(1)
3253 .map(|(i, name)| CaptureGroupDesc {
3254 index: i as u32,
3255 name: name.map(String::from),
3256 nullable: true,
3259 })
3260 .collect();
3261 Ok(Self(r, descs, opts))
3262 }
3263 pub fn capture_groups_len(&self) -> usize {
3264 self.1.len()
3265 }
3266 pub fn capture_groups_iter(&self) -> impl Iterator<Item = &CaptureGroupDesc> {
3267 self.1.iter()
3268 }
3269 pub fn inner(&self) -> &Regex {
3270 &(self.0).regex
3271 }
3272 pub fn opts(&self) -> &AnalyzedRegexOpts {
3273 &self.2
3274 }
3275}
3276
3277pub fn csv_extract(a: Datum<'_>, n_cols: usize) -> impl Iterator<Item = (Row, Diff)> + '_ {
3278 let bytes = a.unwrap_str().as_bytes();
3279 let mut row = Row::default();
3280 let csv_reader = csv::ReaderBuilder::new()
3281 .has_headers(false)
3282 .from_reader(bytes);
3283 csv_reader.into_records().filter_map(move |res| match res {
3284 Ok(sr) if sr.len() == n_cols => {
3285 row.packer().extend(sr.iter().map(Datum::String));
3286 Some((row.clone(), Diff::ONE))
3287 }
3288 _ => None,
3289 })
3290}
3291
3292pub fn repeat_row(a: Datum) -> Option<(Row, Diff)> {
3293 let n = a.unwrap_int64();
3294 if n != 0 {
3295 Some((Row::default(), n.into()))
3296 } else {
3297 None
3298 }
3299}
3300
3301pub fn repeat_row_non_negative<'a>(
3302 a: Datum,
3303) -> Result<Box<dyn Iterator<Item = (Row, Diff)> + 'a>, EvalError> {
3304 let n = a.unwrap_int64();
3305 if n < 0 {
3306 Err(EvalError::InvalidParameterValue(
3307 format!("repeat_row_non_negative got {}", n).into(),
3308 ))
3309 } else if n == 0 {
3310 Ok(Box::new(iter::empty()))
3311 } else {
3312 Ok(Box::new(iter::once((Row::default(), n.into()))))
3314 }
3315}
3316
3317fn wrap<'a>(datums: &'a [Datum<'a>], width: usize) -> impl Iterator<Item = (Row, Diff)> + 'a {
3318 datums
3319 .chunks(width)
3320 .map(|chunk| (Row::pack(chunk), Diff::ONE))
3321}
3322
3323fn acl_explode<'a>(
3324 acl_items: Datum<'a>,
3325 temp_storage: &'a RowArena,
3326) -> Result<impl Iterator<Item = (Row, Diff)> + 'a, EvalError> {
3327 let acl_items = acl_items.unwrap_array();
3328 let mut res = Vec::new();
3329 for acl_item in acl_items.elements().iter() {
3330 if acl_item.is_null() {
3331 return Err(EvalError::AclArrayNullElement);
3332 }
3333 let acl_item = acl_item.unwrap_acl_item();
3334 for privilege in acl_item.acl_mode.explode() {
3335 let row = [
3336 Datum::UInt32(acl_item.grantor.0),
3337 Datum::UInt32(acl_item.grantee.0),
3338 Datum::String(temp_storage.push_string(privilege.to_string())),
3339 Datum::False,
3341 ];
3342 res.push((Row::pack_slice(&row), Diff::ONE));
3343 }
3344 }
3345 Ok(res.into_iter())
3346}
3347
3348fn mz_acl_explode<'a>(
3349 mz_acl_items: Datum<'a>,
3350 temp_storage: &'a RowArena,
3351) -> Result<impl Iterator<Item = (Row, Diff)> + 'a, EvalError> {
3352 let mz_acl_items = mz_acl_items.unwrap_array();
3353 let mut res = Vec::new();
3354 for mz_acl_item in mz_acl_items.elements().iter() {
3355 if mz_acl_item.is_null() {
3356 return Err(EvalError::MzAclArrayNullElement);
3357 }
3358 let mz_acl_item = mz_acl_item.unwrap_mz_acl_item();
3359 for privilege in mz_acl_item.acl_mode.explode() {
3360 let row = [
3361 Datum::String(temp_storage.push_string(mz_acl_item.grantor.to_string())),
3362 Datum::String(temp_storage.push_string(mz_acl_item.grantee.to_string())),
3363 Datum::String(temp_storage.push_string(privilege.to_string())),
3364 Datum::False,
3366 ];
3367 res.push((Row::pack_slice(&row), Diff::ONE));
3368 }
3369 }
3370 Ok(res.into_iter())
3371}
3372
3373#[derive(
3376 Clone,
3377 Debug,
3378 Eq,
3379 PartialEq,
3380 Ord,
3381 PartialOrd,
3382 Serialize,
3383 Deserialize,
3384 Hash,
3385 MzReflect
3386)]
3387pub enum TableFunc {
3388 AclExplode,
3389 MzAclExplode,
3390 JsonbEach,
3391 JsonbEachStringify,
3392 JsonbObjectKeys,
3393 JsonbArrayElements,
3394 JsonbArrayElementsStringify,
3395 RegexpExtract(AnalyzedRegex),
3396 CsvExtract(usize),
3397 GenerateSeriesInt32,
3398 GenerateSeriesInt64,
3399 GenerateSeriesTimestamp,
3400 GenerateSeriesTimestampTz,
3401 GuardSubquerySize {
3422 column_type: SqlScalarType,
3423 },
3424 RepeatRow,
3431 RepeatRowNonNegative,
3434 UnnestArray {
3435 el_typ: SqlScalarType,
3436 },
3437 UnnestList {
3438 el_typ: SqlScalarType,
3439 },
3440 UnnestMap {
3441 value_type: SqlScalarType,
3442 },
3443 Wrap {
3449 types: Vec<SqlColumnType>,
3450 width: usize,
3451 },
3452 GenerateSubscriptsArray,
3453 TabletizedScalar {
3455 name: String,
3456 relation: SqlRelationType,
3457 },
3458 RegexpMatches,
3459 #[allow(private_interfaces)]
3464 WithOrdinality(WithOrdinality),
3465}
3466
3467#[derive(
3475 Clone,
3476 Debug,
3477 Eq,
3478 PartialEq,
3479 Ord,
3480 PartialOrd,
3481 Serialize,
3482 Deserialize,
3483 Hash,
3484 MzReflect
3485)]
3486struct WithOrdinality {
3487 inner: Box<TableFunc>,
3488}
3489
3490impl TableFunc {
3491 pub fn with_ordinality(inner: TableFunc) -> Option<TableFunc> {
3493 match inner {
3494 TableFunc::AclExplode
3495 | TableFunc::MzAclExplode
3496 | TableFunc::JsonbEach
3497 | TableFunc::JsonbEachStringify
3498 | TableFunc::JsonbObjectKeys
3499 | TableFunc::JsonbArrayElements
3500 | TableFunc::JsonbArrayElementsStringify
3501 | TableFunc::RegexpExtract(_)
3502 | TableFunc::CsvExtract(_)
3503 | TableFunc::GenerateSeriesInt32
3504 | TableFunc::GenerateSeriesInt64
3505 | TableFunc::GenerateSeriesTimestamp
3506 | TableFunc::GenerateSeriesTimestampTz
3507 | TableFunc::GuardSubquerySize { .. }
3508 | TableFunc::RepeatRowNonNegative
3509 | TableFunc::UnnestArray { .. }
3510 | TableFunc::UnnestList { .. }
3511 | TableFunc::UnnestMap { .. }
3512 | TableFunc::Wrap { .. }
3513 | TableFunc::GenerateSubscriptsArray
3514 | TableFunc::TabletizedScalar { .. }
3515 | TableFunc::RegexpMatches => Some(TableFunc::WithOrdinality(WithOrdinality {
3516 inner: Box::new(inner),
3517 })),
3518 TableFunc::RepeatRow | TableFunc::WithOrdinality(_) => None, }
3526 }
3527}
3528
3529impl TableFunc {
3530 pub fn eval<'a>(
3532 &'a self,
3533 datums: &'a [Datum<'a>],
3534 temp_storage: &'a RowArena,
3535 ) -> Result<Box<dyn Iterator<Item = (Row, Diff)> + 'a>, EvalError> {
3536 if self.empty_on_null_input() && datums.iter().any(|d| d.is_null()) {
3537 return Ok(Box::new(vec![].into_iter()));
3538 }
3539 match self {
3540 TableFunc::AclExplode => Ok(Box::new(acl_explode(datums[0], temp_storage)?)),
3541 TableFunc::MzAclExplode => Ok(Box::new(mz_acl_explode(datums[0], temp_storage)?)),
3542 TableFunc::JsonbEach => Ok(Box::new(jsonb_each(datums[0]))),
3543 TableFunc::JsonbEachStringify => {
3544 Ok(Box::new(jsonb_each_stringify(datums[0], temp_storage)))
3545 }
3546 TableFunc::JsonbObjectKeys => Ok(Box::new(jsonb_object_keys(datums[0]))),
3547 TableFunc::JsonbArrayElements => Ok(Box::new(jsonb_array_elements(datums[0]))),
3548 TableFunc::JsonbArrayElementsStringify => Ok(Box::new(jsonb_array_elements_stringify(
3549 datums[0],
3550 temp_storage,
3551 ))),
3552 TableFunc::RegexpExtract(a) => Ok(Box::new(regexp_extract(datums[0], a).into_iter())),
3553 TableFunc::CsvExtract(n_cols) => Ok(Box::new(csv_extract(datums[0], *n_cols))),
3554 TableFunc::GenerateSeriesInt32 => {
3555 let res = generate_series(
3556 datums[0].unwrap_int32(),
3557 datums[1].unwrap_int32(),
3558 datums[2].unwrap_int32(),
3559 )?;
3560 Ok(Box::new(res))
3561 }
3562 TableFunc::GenerateSeriesInt64 => {
3563 let res = generate_series(
3564 datums[0].unwrap_int64(),
3565 datums[1].unwrap_int64(),
3566 datums[2].unwrap_int64(),
3567 )?;
3568 Ok(Box::new(res))
3569 }
3570 TableFunc::GenerateSeriesTimestamp => {
3571 fn pass_through<'a>(d: CheckedTimestamp<NaiveDateTime>) -> Datum<'a> {
3572 Datum::from(d)
3573 }
3574 let res = generate_series_ts(
3575 datums[0].unwrap_timestamp(),
3576 datums[1].unwrap_timestamp(),
3577 datums[2].unwrap_interval(),
3578 pass_through,
3579 )?;
3580 Ok(Box::new(res))
3581 }
3582 TableFunc::GenerateSeriesTimestampTz => {
3583 fn gen_ts_tz<'a>(d: CheckedTimestamp<DateTime<Utc>>) -> Datum<'a> {
3584 Datum::from(d)
3585 }
3586 let res = generate_series_ts(
3587 datums[0].unwrap_timestamptz(),
3588 datums[1].unwrap_timestamptz(),
3589 datums[2].unwrap_interval(),
3590 gen_ts_tz,
3591 )?;
3592 Ok(Box::new(res))
3593 }
3594 TableFunc::GenerateSubscriptsArray => {
3595 generate_subscripts_array(datums[0], datums[1].unwrap_int32())
3596 }
3597 TableFunc::GuardSubquerySize { column_type: _ } => {
3598 let count = datums[0].unwrap_int64();
3601 if count == 1 {
3602 Ok(Box::new([].into_iter()))
3603 } else if count > 1 {
3604 Err(EvalError::MultipleRowsFromSubquery)
3605 } else if count < 0 {
3606 Err(EvalError::NegativeRowsFromSubquery)
3607 } else {
3608 soft_panic_or_log!("subquery counting unexpectedly produced 0");
3611 Err(EvalError::Internal(
3612 "subquery counting unexpectedly produced 0".into(),
3613 ))
3614 }
3615 }
3616 TableFunc::RepeatRow => Ok(Box::new(repeat_row(datums[0]).into_iter())),
3617 TableFunc::RepeatRowNonNegative => repeat_row_non_negative(datums[0]),
3618 TableFunc::UnnestArray { .. } => Ok(Box::new(unnest_array(datums[0]))),
3619 TableFunc::UnnestList { .. } => Ok(Box::new(unnest_list(datums[0]))),
3620 TableFunc::UnnestMap { .. } => Ok(Box::new(unnest_map(datums[0]))),
3621 TableFunc::Wrap { width, .. } => Ok(Box::new(wrap(datums, *width))),
3622 TableFunc::TabletizedScalar { .. } => {
3623 let r = Row::pack_slice(datums);
3624 Ok(Box::new(std::iter::once((r, Diff::ONE))))
3625 }
3626 TableFunc::RegexpMatches => Ok(Box::new(regexp_matches(datums)?)),
3627 TableFunc::WithOrdinality(func_with_ordinality) => {
3628 func_with_ordinality.eval(datums, temp_storage)
3629 }
3630 }
3631 }
3632
3633 pub fn output_sql_type(&self) -> SqlRelationType {
3634 let (column_types, keys) = match self {
3635 TableFunc::AclExplode => {
3636 let column_types = vec![
3637 SqlScalarType::Oid.nullable(false),
3638 SqlScalarType::Oid.nullable(false),
3639 SqlScalarType::String.nullable(false),
3640 SqlScalarType::Bool.nullable(false),
3641 ];
3642 let keys = vec![];
3643 (column_types, keys)
3644 }
3645 TableFunc::MzAclExplode => {
3646 let column_types = vec![
3647 SqlScalarType::String.nullable(false),
3648 SqlScalarType::String.nullable(false),
3649 SqlScalarType::String.nullable(false),
3650 SqlScalarType::Bool.nullable(false),
3651 ];
3652 let keys = vec![];
3653 (column_types, keys)
3654 }
3655 TableFunc::JsonbEach => {
3656 let column_types = vec![
3657 SqlScalarType::String.nullable(false),
3658 SqlScalarType::Jsonb.nullable(false),
3659 ];
3660 let keys = vec![];
3661 (column_types, keys)
3662 }
3663 TableFunc::JsonbEachStringify => {
3664 let column_types = vec![
3665 SqlScalarType::String.nullable(false),
3666 SqlScalarType::String.nullable(true),
3667 ];
3668 let keys = vec![];
3669 (column_types, keys)
3670 }
3671 TableFunc::JsonbObjectKeys => {
3672 let column_types = vec![SqlScalarType::String.nullable(false)];
3673 let keys = vec![];
3674 (column_types, keys)
3675 }
3676 TableFunc::JsonbArrayElements => {
3677 let column_types = vec![SqlScalarType::Jsonb.nullable(false)];
3678 let keys = vec![];
3679 (column_types, keys)
3680 }
3681 TableFunc::JsonbArrayElementsStringify => {
3682 let column_types = vec![SqlScalarType::String.nullable(true)];
3683 let keys = vec![];
3684 (column_types, keys)
3685 }
3686 TableFunc::RegexpExtract(a) => {
3687 let column_types = a
3688 .capture_groups_iter()
3689 .map(|cg| SqlScalarType::String.nullable(cg.nullable))
3690 .collect();
3691 let keys = vec![];
3692 (column_types, keys)
3693 }
3694 TableFunc::CsvExtract(n_cols) => {
3695 let column_types = iter::repeat(SqlScalarType::String.nullable(false))
3696 .take(*n_cols)
3697 .collect();
3698 let keys = vec![];
3699 (column_types, keys)
3700 }
3701 TableFunc::GenerateSeriesInt32 => {
3702 let column_types = vec![SqlScalarType::Int32.nullable(false)];
3703 let keys = vec![vec![0]];
3704 (column_types, keys)
3705 }
3706 TableFunc::GenerateSeriesInt64 => {
3707 let column_types = vec![SqlScalarType::Int64.nullable(false)];
3708 let keys = vec![vec![0]];
3709 (column_types, keys)
3710 }
3711 TableFunc::GenerateSeriesTimestamp => {
3712 let column_types =
3713 vec![SqlScalarType::Timestamp { precision: None }.nullable(false)];
3714 let keys = vec![vec![0]];
3715 (column_types, keys)
3716 }
3717 TableFunc::GenerateSeriesTimestampTz => {
3718 let column_types =
3719 vec![SqlScalarType::TimestampTz { precision: None }.nullable(false)];
3720 let keys = vec![vec![0]];
3721 (column_types, keys)
3722 }
3723 TableFunc::GenerateSubscriptsArray => {
3724 let column_types = vec![SqlScalarType::Int32.nullable(false)];
3725 let keys = vec![vec![0]];
3726 (column_types, keys)
3727 }
3728 TableFunc::GuardSubquerySize { column_type } => {
3729 let column_types = vec![column_type.clone().nullable(false)];
3730 let keys = vec![];
3731 (column_types, keys)
3732 }
3733 TableFunc::RepeatRow | TableFunc::RepeatRowNonNegative => {
3734 let column_types = vec![];
3735 let keys = vec![];
3736 (column_types, keys)
3737 }
3738 TableFunc::UnnestArray { el_typ } => {
3739 let column_types = vec![el_typ.clone().nullable(true)];
3740 let keys = vec![];
3741 (column_types, keys)
3742 }
3743 TableFunc::UnnestList { el_typ } => {
3744 let column_types = vec![el_typ.clone().nullable(true)];
3745 let keys = vec![];
3746 (column_types, keys)
3747 }
3748 TableFunc::UnnestMap { value_type } => {
3749 let column_types = vec![
3750 SqlScalarType::String.nullable(false),
3751 value_type.clone().nullable(true),
3752 ];
3753 let keys = vec![vec![0]];
3754 (column_types, keys)
3755 }
3756 TableFunc::Wrap { types, .. } => {
3757 let column_types = types.clone();
3758 let keys = vec![];
3759 (column_types, keys)
3760 }
3761 TableFunc::TabletizedScalar { relation, .. } => {
3762 return relation.clone();
3763 }
3764 TableFunc::RegexpMatches => {
3765 let column_types =
3766 vec![SqlScalarType::Array(Box::new(SqlScalarType::String)).nullable(false)];
3767 let keys = vec![];
3768
3769 (column_types, keys)
3770 }
3771 TableFunc::WithOrdinality(WithOrdinality { inner }) => {
3772 let mut typ = inner.output_sql_type();
3773 typ.column_types.push(SqlScalarType::Int64.nullable(false));
3775 typ.keys.push(vec![typ.column_types.len() - 1]);
3777 (typ.column_types, typ.keys)
3778 }
3779 };
3780
3781 soft_assert_eq_no_log!(column_types.len(), self.output_arity());
3782
3783 if !keys.is_empty() {
3784 SqlRelationType::new(column_types).with_keys(keys)
3785 } else {
3786 SqlRelationType::new(column_types)
3787 }
3788 }
3789
3790 pub fn output_type(&self) -> ReprRelationType {
3794 ReprRelationType::from(&self.output_sql_type())
3795 }
3796
3797 pub fn output_arity(&self) -> usize {
3798 match self {
3799 TableFunc::AclExplode => 4,
3800 TableFunc::MzAclExplode => 4,
3801 TableFunc::JsonbEach => 2,
3802 TableFunc::JsonbEachStringify => 2,
3803 TableFunc::JsonbObjectKeys => 1,
3804 TableFunc::JsonbArrayElements => 1,
3805 TableFunc::JsonbArrayElementsStringify => 1,
3806 TableFunc::RegexpExtract(a) => a.capture_groups_len(),
3807 TableFunc::CsvExtract(n_cols) => *n_cols,
3808 TableFunc::GenerateSeriesInt32 => 1,
3809 TableFunc::GenerateSeriesInt64 => 1,
3810 TableFunc::GenerateSeriesTimestamp => 1,
3811 TableFunc::GenerateSeriesTimestampTz => 1,
3812 TableFunc::GenerateSubscriptsArray => 1,
3813 TableFunc::GuardSubquerySize { .. } => 1,
3814 TableFunc::RepeatRow => 0,
3815 TableFunc::RepeatRowNonNegative => 0,
3816 TableFunc::UnnestArray { .. } => 1,
3817 TableFunc::UnnestList { .. } => 1,
3818 TableFunc::UnnestMap { .. } => 2,
3819 TableFunc::Wrap { width, .. } => *width,
3820 TableFunc::TabletizedScalar { relation, .. } => relation.column_types.len(),
3821 TableFunc::RegexpMatches => 1,
3822 TableFunc::WithOrdinality(WithOrdinality { inner }) => inner.output_arity() + 1,
3823 }
3824 }
3825
3826 pub fn empty_on_null_input(&self) -> bool {
3827 match self {
3828 TableFunc::AclExplode
3829 | TableFunc::MzAclExplode
3830 | TableFunc::JsonbEach
3831 | TableFunc::JsonbEachStringify
3832 | TableFunc::JsonbObjectKeys
3833 | TableFunc::JsonbArrayElements
3834 | TableFunc::JsonbArrayElementsStringify
3835 | TableFunc::GenerateSeriesInt32
3836 | TableFunc::GenerateSeriesInt64
3837 | TableFunc::GenerateSeriesTimestamp
3838 | TableFunc::GenerateSeriesTimestampTz
3839 | TableFunc::GenerateSubscriptsArray
3840 | TableFunc::RegexpExtract(_)
3841 | TableFunc::CsvExtract(_)
3842 | TableFunc::RepeatRow
3843 | TableFunc::RepeatRowNonNegative
3844 | TableFunc::UnnestArray { .. }
3845 | TableFunc::UnnestList { .. }
3846 | TableFunc::UnnestMap { .. }
3847 | TableFunc::RegexpMatches => true,
3848 TableFunc::GuardSubquerySize { .. } => false,
3849 TableFunc::Wrap { .. } => false,
3850 TableFunc::TabletizedScalar { .. } => false,
3851 TableFunc::WithOrdinality(WithOrdinality { inner }) => inner.empty_on_null_input(),
3852 }
3853 }
3854
3855 pub fn preserves_monotonicity(&self) -> bool {
3857 match self {
3860 TableFunc::AclExplode => false,
3861 TableFunc::MzAclExplode => false,
3862 TableFunc::JsonbEach => true,
3863 TableFunc::JsonbEachStringify => true,
3864 TableFunc::JsonbObjectKeys => true,
3865 TableFunc::JsonbArrayElements => true,
3866 TableFunc::JsonbArrayElementsStringify => true,
3867 TableFunc::RegexpExtract(_) => true,
3868 TableFunc::CsvExtract(_) => true,
3869 TableFunc::GenerateSeriesInt32 => true,
3870 TableFunc::GenerateSeriesInt64 => true,
3871 TableFunc::GenerateSeriesTimestamp => true,
3872 TableFunc::GenerateSeriesTimestampTz => true,
3873 TableFunc::GenerateSubscriptsArray => true,
3874 TableFunc::RepeatRow => false,
3875 TableFunc::RepeatRowNonNegative => true,
3876 TableFunc::UnnestArray { .. } => true,
3877 TableFunc::UnnestList { .. } => true,
3878 TableFunc::UnnestMap { .. } => true,
3879 TableFunc::Wrap { .. } => true,
3880 TableFunc::TabletizedScalar { .. } => true,
3881 TableFunc::RegexpMatches => true,
3882 TableFunc::GuardSubquerySize { .. } => false,
3883 TableFunc::WithOrdinality(WithOrdinality { inner }) => inner.preserves_monotonicity(),
3884 }
3885 }
3886}
3887
3888impl fmt::Display for TableFunc {
3889 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
3890 match self {
3891 TableFunc::AclExplode => f.write_str("aclexplode"),
3892 TableFunc::MzAclExplode => f.write_str("mz_aclexplode"),
3893 TableFunc::JsonbEach => f.write_str("jsonb_each"),
3894 TableFunc::JsonbEachStringify => f.write_str("jsonb_each_text"),
3895 TableFunc::JsonbObjectKeys => f.write_str("jsonb_object_keys"),
3896 TableFunc::JsonbArrayElements => f.write_str("jsonb_array_elements"),
3897 TableFunc::JsonbArrayElementsStringify => f.write_str("jsonb_array_elements_text"),
3898 TableFunc::RegexpExtract(a) => write!(f, "regexp_extract({:?}, _)", a.0),
3899 TableFunc::CsvExtract(n_cols) => write!(f, "csv_extract({}, _)", n_cols),
3900 TableFunc::GenerateSeriesInt32 => f.write_str("generate_series"),
3901 TableFunc::GenerateSeriesInt64 => f.write_str("generate_series"),
3902 TableFunc::GenerateSeriesTimestamp => f.write_str("generate_series"),
3903 TableFunc::GenerateSeriesTimestampTz => f.write_str("generate_series"),
3904 TableFunc::GenerateSubscriptsArray => f.write_str("generate_subscripts"),
3905 TableFunc::GuardSubquerySize { .. } => f.write_str("guard_subquery_size"),
3906 TableFunc::RepeatRow => f.write_str(REPEAT_ROW_NAME),
3907 TableFunc::RepeatRowNonNegative => f.write_str("repeat_row_non_negative"),
3908 TableFunc::UnnestArray { .. } => f.write_str("unnest_array"),
3909 TableFunc::UnnestList { .. } => f.write_str("unnest_list"),
3910 TableFunc::UnnestMap { .. } => f.write_str("unnest_map"),
3911 TableFunc::Wrap { width, .. } => write!(f, "wrap{}", width),
3912 TableFunc::TabletizedScalar { name, .. } => f.write_str(name),
3913 TableFunc::RegexpMatches => write!(f, "regexp_matches(_, _, _)"),
3914 TableFunc::WithOrdinality(WithOrdinality { inner }) => {
3915 write!(f, "{}[with_ordinality]", inner)
3916 }
3917 }
3918 }
3919}
3920
3921impl WithOrdinality {
3922 fn eval<'a>(
3931 &'a self,
3932 datums: &'a [Datum<'a>],
3933 temp_storage: &'a RowArena,
3934 ) -> Result<Box<dyn Iterator<Item = (Row, Diff)> + 'a>, EvalError> {
3935 let mut next_ordinal: i64 = 1;
3936 let it = self
3937 .inner
3938 .eval(datums, temp_storage)?
3939 .flat_map(move |(mut row, diff)| {
3940 let diff = diff.into_inner();
3941 assert!(diff >= 0);
3949 let mut ordinals = next_ordinal..(next_ordinal + diff);
3951 next_ordinal += diff;
3952 let cap = row.data_len() + datum_size(&Datum::Int64(next_ordinal));
3954 iter::from_fn(move || {
3955 let ordinal = ordinals.next()?;
3956 let mut row = if ordinals.is_empty() {
3957 std::mem::take(&mut row)
3960 } else {
3961 let mut new_row = Row::with_capacity(cap);
3962 new_row.clone_from(&row);
3963 new_row
3964 };
3965 RowPacker::for_existing_row(&mut row).push(Datum::Int64(ordinal));
3966 Some((row, Diff::ONE))
3967 })
3968 });
3969 Ok(Box::new(it))
3970 }
3971}
3972
3973pub const REPEAT_ROW_NAME: &str = "repeat_row";