1#![allow(missing_docs)]
11
12use std::cmp::{max, min};
13use std::iter::Sum;
14use std::ops::Deref;
15use std::str::FromStr;
16use std::{fmt, iter};
17
18use chrono::{DateTime, NaiveDateTime, NaiveTime, Utc};
19use dec::OrderedDecimal;
20use itertools::{Either, Itertools};
21use mz_lowertest::MzReflect;
22use mz_ore::cast::CastFrom;
23
24use mz_ore::str::separated;
25use mz_ore::{soft_assert_eq_no_log, soft_assert_or_log};
26use mz_repr::adt::array::ArrayDimension;
27use mz_repr::adt::date::Date;
28use mz_repr::adt::interval::Interval;
29use mz_repr::adt::numeric::{self, Numeric, NumericMaxScale};
30use mz_repr::adt::regex::{Regex as ReprRegex, RegexCompilationError};
31use mz_repr::adt::timestamp::{CheckedTimestamp, TimestampLike};
32use mz_repr::{
33 ColumnName, Datum, Diff, Row, RowArena, RowPacker, SharedRow, SqlColumnType, SqlRelationType,
34 SqlScalarType, datum_size,
35};
36use num::{CheckedAdd, Integer, Signed, ToPrimitive};
37use ordered_float::OrderedFloat;
38use regex::Regex;
39use serde::{Deserialize, Serialize};
40use smallvec::SmallVec;
41
42use crate::EvalError;
43use crate::WindowFrameBound::{
44 CurrentRow, OffsetFollowing, OffsetPreceding, UnboundedFollowing, UnboundedPreceding,
45};
46use crate::WindowFrameUnits::{Groups, Range, Rows};
47use crate::explain::{HumanizedExpr, HumanizerMode};
48use crate::relation::{
49 ColumnOrder, WindowFrame, WindowFrameBound, WindowFrameUnits, compare_columns,
50};
51use crate::scalar::func::{add_timestamp_months, jsonb_stringify};
52
53fn max_string<'a, I>(datums: I) -> Datum<'a>
57where
58 I: IntoIterator<Item = Datum<'a>>,
59{
60 match datums
61 .into_iter()
62 .filter(|d| !d.is_null())
63 .max_by(|a, b| a.unwrap_str().cmp(b.unwrap_str()))
64 {
65 Some(datum) => datum,
66 None => Datum::Null,
67 }
68}
69
70fn max_datum<'a, I, DatumType>(datums: I) -> Datum<'a>
71where
72 I: IntoIterator<Item = Datum<'a>>,
73 DatumType: TryFrom<Datum<'a>> + Ord,
74 <DatumType as TryFrom<Datum<'a>>>::Error: std::fmt::Debug,
75 Datum<'a>: From<Option<DatumType>>,
76{
77 let x: Option<DatumType> = datums
78 .into_iter()
79 .filter(|d| !d.is_null())
80 .map(|d| DatumType::try_from(d).expect("unexpected type"))
81 .max();
82
83 x.into()
84}
85
86fn min_datum<'a, I, DatumType>(datums: I) -> Datum<'a>
87where
88 I: IntoIterator<Item = Datum<'a>>,
89 DatumType: TryFrom<Datum<'a>> + Ord,
90 <DatumType as TryFrom<Datum<'a>>>::Error: std::fmt::Debug,
91 Datum<'a>: From<Option<DatumType>>,
92{
93 let x: Option<DatumType> = datums
94 .into_iter()
95 .filter(|d| !d.is_null())
96 .map(|d| DatumType::try_from(d).expect("unexpected type"))
97 .min();
98
99 x.into()
100}
101
102fn min_string<'a, I>(datums: I) -> Datum<'a>
103where
104 I: IntoIterator<Item = Datum<'a>>,
105{
106 match datums
107 .into_iter()
108 .filter(|d| !d.is_null())
109 .min_by(|a, b| a.unwrap_str().cmp(b.unwrap_str()))
110 {
111 Some(datum) => datum,
112 None => Datum::Null,
113 }
114}
115
116fn sum_datum<'a, I, DatumType, ResultType>(datums: I) -> Datum<'a>
117where
118 I: IntoIterator<Item = Datum<'a>>,
119 DatumType: TryFrom<Datum<'a>>,
120 <DatumType as TryFrom<Datum<'a>>>::Error: std::fmt::Debug,
121 ResultType: From<DatumType> + Sum + Into<Datum<'a>>,
122{
123 let mut datums = datums.into_iter().filter(|d| !d.is_null()).peekable();
124 if datums.peek().is_none() {
125 Datum::Null
126 } else {
127 let x = datums
128 .map(|d| ResultType::from(DatumType::try_from(d).expect("unexpected type")))
129 .sum::<ResultType>();
130 x.into()
131 }
132}
133
134fn sum_numeric<'a, I>(datums: I) -> Datum<'a>
135where
136 I: IntoIterator<Item = Datum<'a>>,
137{
138 let mut cx = numeric::cx_datum();
139 let mut sum = Numeric::zero();
140 let mut empty = true;
141 for d in datums {
142 if !d.is_null() {
143 empty = false;
144 cx.add(&mut sum, &d.unwrap_numeric().0);
145 }
146 }
147 match empty {
148 true => Datum::Null,
149 false => Datum::from(sum),
150 }
151}
152
153#[allow(clippy::as_conversions)]
155fn count<'a, I>(datums: I) -> Datum<'a>
156where
157 I: IntoIterator<Item = Datum<'a>>,
158{
159 let x: i64 = datums.into_iter().filter(|d| !d.is_null()).count() as i64;
161 Datum::from(x)
162}
163
164fn any<'a, I>(datums: I) -> Datum<'a>
165where
166 I: IntoIterator<Item = Datum<'a>>,
167{
168 datums
169 .into_iter()
170 .fold(Datum::False, |state, next| match (state, next) {
171 (Datum::True, _) | (_, Datum::True) => Datum::True,
172 (Datum::Null, _) | (_, Datum::Null) => Datum::Null,
173 _ => Datum::False,
174 })
175}
176
177fn all<'a, I>(datums: I) -> Datum<'a>
178where
179 I: IntoIterator<Item = Datum<'a>>,
180{
181 datums
182 .into_iter()
183 .fold(Datum::True, |state, next| match (state, next) {
184 (Datum::False, _) | (_, Datum::False) => Datum::False,
185 (Datum::Null, _) | (_, Datum::Null) => Datum::Null,
186 _ => Datum::True,
187 })
188}
189
190fn string_agg<'a, I>(datums: I, temp_storage: &'a RowArena, order_by: &[ColumnOrder]) -> Datum<'a>
191where
192 I: IntoIterator<Item = Datum<'a>>,
193{
194 const EMPTY_SEP: &str = "";
195
196 let datums = order_aggregate_datums(datums, order_by);
197 let mut sep_value_pairs = datums.into_iter().filter_map(|d| {
198 if d.is_null() {
199 return None;
200 }
201 let mut value_sep = d.unwrap_list().iter();
202 match (value_sep.next().unwrap(), value_sep.next().unwrap()) {
203 (Datum::Null, _) => None,
204 (Datum::String(val), Datum::Null) => Some((EMPTY_SEP, val)),
205 (Datum::String(val), Datum::String(sep)) => Some((sep, val)),
206 _ => unreachable!(),
207 }
208 });
209
210 let mut s = String::default();
211 match sep_value_pairs.next() {
212 Some((_, value)) => s.push_str(value),
214 None => return Datum::Null,
216 }
217
218 for (sep, value) in sep_value_pairs {
219 s.push_str(sep);
220 s.push_str(value);
221 }
222
223 Datum::String(temp_storage.push_string(s))
224}
225
226fn jsonb_agg<'a, I>(datums: I, temp_storage: &'a RowArena, order_by: &[ColumnOrder]) -> Datum<'a>
227where
228 I: IntoIterator<Item = Datum<'a>>,
229{
230 let datums = order_aggregate_datums(datums, order_by);
231 temp_storage.make_datum(|packer| {
232 packer.push_list(datums.into_iter().filter(|d| !d.is_null()));
233 })
234}
235
236fn dict_agg<'a, I>(datums: I, temp_storage: &'a RowArena, order_by: &[ColumnOrder]) -> Datum<'a>
237where
238 I: IntoIterator<Item = Datum<'a>>,
239{
240 let datums = order_aggregate_datums(datums, order_by);
241 temp_storage.make_datum(|packer| {
242 let mut datums: Vec<_> = datums
243 .into_iter()
244 .filter_map(|d| {
245 if d.is_null() {
246 return None;
247 }
248 let mut list = d.unwrap_list().iter();
249 let key = list.next().unwrap();
250 let val = list.next().unwrap();
251 if key.is_null() {
252 None
255 } else {
256 Some((key.unwrap_str(), val))
257 }
258 })
259 .collect();
260 datums.sort_by_key(|(k, _v)| *k);
266 datums.reverse();
267 datums.dedup_by_key(|(k, _v)| *k);
268 datums.reverse();
269 packer.push_dict(datums);
270 })
271}
272
273pub fn order_aggregate_datums<'a: 'b, 'b, I>(
283 datums: I,
284 order_by: &[ColumnOrder],
285) -> impl Iterator<Item = Datum<'b>>
286where
287 I: IntoIterator<Item = Datum<'a>>,
288{
289 order_aggregate_datums_with_rank_inner(datums, order_by)
290 .into_iter()
291 .map(|(payload, _order_datums)| payload)
293}
294
295fn order_aggregate_datums_with_rank<'a, I>(
298 datums: I,
299 order_by: &[ColumnOrder],
300) -> impl Iterator<Item = (Datum<'a>, Row)>
301where
302 I: IntoIterator<Item = Datum<'a>>,
303{
304 order_aggregate_datums_with_rank_inner(datums, order_by)
305 .into_iter()
306 .map(|(payload, order_by_datums)| (payload, Row::pack(order_by_datums)))
307}
308
309fn order_aggregate_datums_with_rank_inner<'a, I>(
310 datums: I,
311 order_by: &[ColumnOrder],
312) -> Vec<(Datum<'a>, Vec<Datum<'a>>)>
313where
314 I: IntoIterator<Item = Datum<'a>>,
315{
316 let mut decoded: Vec<(Datum, Vec<Datum>)> = datums
317 .into_iter()
318 .map(|d| {
319 let list = d.unwrap_list();
320 let mut list_it = list.iter();
321 let payload = list_it.next().unwrap();
322
323 let mut order_by_datums = Vec::with_capacity(order_by.len());
333 for _ in 0..order_by.len() {
334 order_by_datums.push(
335 list_it
336 .next()
337 .expect("must have exactly the same number of Datums as `order_by`"),
338 );
339 }
340
341 (payload, order_by_datums)
342 })
343 .collect();
344
345 let mut sort_by =
346 |(payload_left, left_order_by_datums): &(Datum, Vec<Datum>),
347 (payload_right, right_order_by_datums): &(Datum, Vec<Datum>)| {
348 compare_columns(
349 order_by,
350 left_order_by_datums,
351 right_order_by_datums,
352 || payload_left.cmp(payload_right),
353 )
354 };
355 decoded.sort_unstable_by(&mut sort_by);
360 decoded
361}
362
363fn array_concat<'a, I>(datums: I, temp_storage: &'a RowArena, order_by: &[ColumnOrder]) -> Datum<'a>
364where
365 I: IntoIterator<Item = Datum<'a>>,
366{
367 let datums = order_aggregate_datums(datums, order_by);
368 let datums: Vec<_> = datums
369 .into_iter()
370 .map(|d| d.unwrap_array().elements().iter())
371 .flatten()
372 .collect();
373 let dims = ArrayDimension {
374 lower_bound: 1,
375 length: datums.len(),
376 };
377 temp_storage.make_datum(|packer| {
378 packer.try_push_array(&[dims], datums).unwrap();
379 })
380}
381
382fn list_concat<'a, I>(datums: I, temp_storage: &'a RowArena, order_by: &[ColumnOrder]) -> Datum<'a>
383where
384 I: IntoIterator<Item = Datum<'a>>,
385{
386 let datums = order_aggregate_datums(datums, order_by);
387 temp_storage.make_datum(|packer| {
388 packer.push_list(datums.into_iter().map(|d| d.unwrap_list().iter()).flatten());
389 })
390}
391
392fn row_number<'a, I>(
396 datums: I,
397 callers_temp_storage: &'a RowArena,
398 order_by: &[ColumnOrder],
399) -> Datum<'a>
400where
401 I: IntoIterator<Item = Datum<'a>>,
402{
403 let temp_storage = RowArena::new();
407 let datums = row_number_no_list(datums, &temp_storage, order_by);
408
409 callers_temp_storage.make_datum(|packer| {
410 packer.push_list(datums);
411 })
412}
413
414fn row_number_no_list<'a: 'b, 'b, I>(
417 datums: I,
418 callers_temp_storage: &'b RowArena,
419 order_by: &[ColumnOrder],
420) -> impl Iterator<Item = Datum<'b>>
421where
422 I: IntoIterator<Item = Datum<'a>>,
423{
424 let datums = order_aggregate_datums(datums, order_by);
425
426 callers_temp_storage.reserve(datums.size_hint().0);
427 #[allow(clippy::disallowed_methods)]
428 datums
429 .into_iter()
430 .map(|d| d.unwrap_list().iter())
431 .flatten()
432 .zip(1i64..)
433 .map(|(d, i)| {
434 callers_temp_storage.make_datum(|packer| {
435 packer.push_list_with(|packer| {
436 packer.push(Datum::Int64(i));
437 packer.push(d);
438 });
439 })
440 })
441}
442
443fn rank<'a, I>(datums: I, callers_temp_storage: &'a RowArena, order_by: &[ColumnOrder]) -> Datum<'a>
447where
448 I: IntoIterator<Item = Datum<'a>>,
449{
450 let temp_storage = RowArena::new();
451 let datums = rank_no_list(datums, &temp_storage, order_by);
452
453 callers_temp_storage.make_datum(|packer| {
454 packer.push_list(datums);
455 })
456}
457
458fn rank_no_list<'a: 'b, 'b, I>(
461 datums: I,
462 callers_temp_storage: &'b RowArena,
463 order_by: &[ColumnOrder],
464) -> impl Iterator<Item = Datum<'b>>
465where
466 I: IntoIterator<Item = Datum<'a>>,
467{
468 let datums = order_aggregate_datums_with_rank(datums, order_by);
470
471 let mut datums = datums
472 .into_iter()
473 .map(|(d0, order_row)| {
474 d0.unwrap_list()
475 .iter()
476 .map(move |d1| (d1, order_row.clone()))
477 })
478 .flatten();
479
480 callers_temp_storage.reserve(datums.size_hint().0);
481 datums
482 .next()
483 .map_or(vec![], |(first_datum, first_order_row)| {
484 datums.fold((first_order_row, 1, 1, vec![(first_datum, 1)]), |mut acc, (next_datum, next_order_row)| {
486 let (ref mut acc_row, ref mut acc_rank, ref mut acc_row_num, ref mut output) = acc;
487 *acc_row_num += 1;
488 if *acc_row != next_order_row {
490 *acc_rank = *acc_row_num;
491 *acc_row = next_order_row;
492 }
493
494 (*output).push((next_datum, *acc_rank));
495 acc
496 })
497 }.3).into_iter().map(|(d, i)| {
498 callers_temp_storage.make_datum(|packer| {
499 packer.push_list_with(|packer| {
500 packer.push(Datum::Int64(i));
501 packer.push(d);
502 });
503 })
504 })
505}
506
507fn dense_rank<'a, I>(
511 datums: I,
512 callers_temp_storage: &'a RowArena,
513 order_by: &[ColumnOrder],
514) -> Datum<'a>
515where
516 I: IntoIterator<Item = Datum<'a>>,
517{
518 let temp_storage = RowArena::new();
519 let datums = dense_rank_no_list(datums, &temp_storage, order_by);
520
521 callers_temp_storage.make_datum(|packer| {
522 packer.push_list(datums);
523 })
524}
525
526fn dense_rank_no_list<'a: 'b, 'b, I>(
529 datums: I,
530 callers_temp_storage: &'b RowArena,
531 order_by: &[ColumnOrder],
532) -> impl Iterator<Item = Datum<'b>>
533where
534 I: IntoIterator<Item = Datum<'a>>,
535{
536 let datums = order_aggregate_datums_with_rank(datums, order_by);
538
539 let mut datums = datums
540 .into_iter()
541 .map(|(d0, order_row)| {
542 d0.unwrap_list()
543 .iter()
544 .map(move |d1| (d1, order_row.clone()))
545 })
546 .flatten();
547
548 callers_temp_storage.reserve(datums.size_hint().0);
549 datums
550 .next()
551 .map_or(vec![], |(first_datum, first_order_row)| {
552 datums.fold((first_order_row, 1, vec![(first_datum, 1)]), |mut acc, (next_datum, next_order_row)| {
554 let (ref mut acc_row, ref mut acc_rank, ref mut output) = acc;
555 if *acc_row != next_order_row {
557 *acc_rank += 1;
558 *acc_row = next_order_row;
559 }
560
561 (*output).push((next_datum, *acc_rank));
562 acc
563 })
564 }.2).into_iter().map(|(d, i)| {
565 callers_temp_storage.make_datum(|packer| {
566 packer.push_list_with(|packer| {
567 packer.push(Datum::Int64(i));
568 packer.push(d);
569 });
570 })
571 })
572}
573
574fn lag_lead<'a, I>(
596 datums: I,
597 callers_temp_storage: &'a RowArena,
598 order_by: &[ColumnOrder],
599 lag_lead_type: &LagLeadType,
600 ignore_nulls: &bool,
601) -> Datum<'a>
602where
603 I: IntoIterator<Item = Datum<'a>>,
604{
605 let temp_storage = RowArena::new();
606 let iter = lag_lead_no_list(datums, &temp_storage, order_by, lag_lead_type, ignore_nulls);
607 callers_temp_storage.make_datum(|packer| {
608 packer.push_list(iter);
609 })
610}
611
612fn lag_lead_no_list<'a: 'b, 'b, I>(
615 datums: I,
616 callers_temp_storage: &'b RowArena,
617 order_by: &[ColumnOrder],
618 lag_lead_type: &LagLeadType,
619 ignore_nulls: &bool,
620) -> impl Iterator<Item = Datum<'b>>
621where
622 I: IntoIterator<Item = Datum<'a>>,
623{
624 let datums = order_aggregate_datums(datums, order_by);
626
627 let (orig_rows, unwrapped_args): (Vec<_>, Vec<_>) = datums
631 .into_iter()
632 .map(|d| {
633 let mut iter = d.unwrap_list().iter();
634 let original_row = iter.next().unwrap();
635 let (input_value, offset, default_value) =
636 unwrap_lag_lead_encoded_args(iter.next().unwrap());
637 (original_row, (input_value, offset, default_value))
638 })
639 .unzip();
640
641 let result = lag_lead_inner(unwrapped_args, lag_lead_type, ignore_nulls);
642
643 callers_temp_storage.reserve(result.len());
644 result
645 .into_iter()
646 .zip_eq(orig_rows)
647 .map(|(result_value, original_row)| {
648 callers_temp_storage.make_datum(|packer| {
649 packer.push_list_with(|packer| {
650 packer.push(result_value);
651 packer.push(original_row);
652 });
653 })
654 })
655}
656
657fn unwrap_lag_lead_encoded_args(encoded_args: Datum) -> (Datum, Datum, Datum) {
659 let mut encoded_args_iter = encoded_args.unwrap_list().iter();
660 let (input_value, offset, default_value) = (
661 encoded_args_iter.next().unwrap(),
662 encoded_args_iter.next().unwrap(),
663 encoded_args_iter.next().unwrap(),
664 );
665 (input_value, offset, default_value)
666}
667
668fn lag_lead_inner<'a>(
671 args: Vec<(Datum<'a>, Datum<'a>, Datum<'a>)>,
672 lag_lead_type: &LagLeadType,
673 ignore_nulls: &bool,
674) -> Vec<Datum<'a>> {
675 if *ignore_nulls {
676 lag_lead_inner_ignore_nulls(args, lag_lead_type)
677 } else {
678 lag_lead_inner_respect_nulls(args, lag_lead_type)
679 }
680}
681
682fn lag_lead_inner_respect_nulls<'a>(
683 args: Vec<(Datum<'a>, Datum<'a>, Datum<'a>)>,
684 lag_lead_type: &LagLeadType,
685) -> Vec<Datum<'a>> {
686 let mut result: Vec<Datum> = Vec::with_capacity(args.len());
687 for (idx, (_, offset, default_value)) in args.iter().enumerate() {
688 if offset.is_null() {
690 result.push(Datum::Null);
691 continue;
692 }
693
694 let idx = i64::try_from(idx).expect("Array index does not fit in i64");
695 let offset = i64::from(offset.unwrap_int32());
696 let offset = match lag_lead_type {
697 LagLeadType::Lag => -offset,
698 LagLeadType::Lead => offset,
699 };
700
701 let datums_get = |i: i64| -> Option<Datum> {
703 match u64::try_from(i) {
704 Ok(i) => args
705 .get(usize::cast_from(i))
706 .map(|d| Some(d.0)) .unwrap_or(None), Err(_) => None, }
710 };
711
712 let lagged_value = datums_get(idx + offset).unwrap_or(*default_value);
713
714 result.push(lagged_value);
715 }
716
717 result
718}
719
720#[allow(clippy::as_conversions)]
724fn lag_lead_inner_ignore_nulls<'a>(
725 args: Vec<(Datum<'a>, Datum<'a>, Datum<'a>)>,
726 lag_lead_type: &LagLeadType,
727) -> Vec<Datum<'a>> {
728 if i64::try_from(args.len()).is_err() {
731 panic!("window partition way too big")
732 }
733 let mut skip_nulls_backward = vec![None; args.len()];
736 let mut last_non_null: i64 = -1;
737 let pairs = args
738 .iter()
739 .enumerate()
740 .zip_eq(skip_nulls_backward.iter_mut());
741 for ((i, (d, _, _)), slot) in pairs {
742 if d.is_null() {
743 *slot = Some(last_non_null);
744 } else {
745 last_non_null = i as i64;
746 }
747 }
748 let mut skip_nulls_forward = vec![None; args.len()];
749 let mut last_non_null: i64 = args.len() as i64;
750 let pairs = args
751 .iter()
752 .enumerate()
753 .rev()
754 .zip_eq(skip_nulls_forward.iter_mut().rev());
755 for ((i, (d, _, _)), slot) in pairs {
756 if d.is_null() {
757 *slot = Some(last_non_null);
758 } else {
759 last_non_null = i as i64;
760 }
761 }
762
763 let mut result: Vec<Datum> = Vec::with_capacity(args.len());
765 for (idx, (_, offset, default_value)) in args.iter().enumerate() {
766 if offset.is_null() {
768 result.push(Datum::Null);
769 continue;
770 }
771
772 let idx = idx as i64; let offset = i64::cast_from(offset.unwrap_int32());
774 let offset = match lag_lead_type {
775 LagLeadType::Lag => -offset,
776 LagLeadType::Lead => offset,
777 };
778 let increment = offset.signum();
779
780 let datums_get = |i: i64| -> Option<Datum> {
782 match u64::try_from(i) {
783 Ok(i) => args
784 .get(usize::cast_from(i))
785 .map(|d| Some(d.0)) .unwrap_or(None), Err(_) => None, }
789 };
790
791 let lagged_value = if increment != 0 {
792 let mut j = idx;
802 for _ in 0..num::abs(offset) {
803 j += increment;
804 if datums_get(j).is_some_and(|d| d.is_null()) {
806 let ju = j as usize; if increment > 0 {
808 j = skip_nulls_forward[ju].expect("checked above that it's null");
809 } else {
810 j = skip_nulls_backward[ju].expect("checked above that it's null");
811 }
812 }
813 if datums_get(j).is_none() {
814 break;
815 }
816 }
817 match datums_get(j) {
818 Some(datum) => datum,
819 None => *default_value,
820 }
821 } else {
822 assert_eq!(offset, 0);
823 let datum = datums_get(idx).expect("known to exist");
824 if !datum.is_null() {
825 datum
826 } else {
827 panic!("0 offset in lag/lead IGNORE NULLS");
833 }
834 };
835
836 result.push(lagged_value);
837 }
838
839 result
840}
841
842fn first_value<'a, I>(
844 datums: I,
845 callers_temp_storage: &'a RowArena,
846 order_by: &[ColumnOrder],
847 window_frame: &WindowFrame,
848) -> Datum<'a>
849where
850 I: IntoIterator<Item = Datum<'a>>,
851{
852 let temp_storage = RowArena::new();
853 let iter = first_value_no_list(datums, &temp_storage, order_by, window_frame);
854 callers_temp_storage.make_datum(|packer| {
855 packer.push_list(iter);
856 })
857}
858
859fn first_value_no_list<'a: 'b, 'b, I>(
862 datums: I,
863 callers_temp_storage: &'b RowArena,
864 order_by: &[ColumnOrder],
865 window_frame: &WindowFrame,
866) -> impl Iterator<Item = Datum<'b>>
867where
868 I: IntoIterator<Item = Datum<'a>>,
869{
870 let datums = order_aggregate_datums(datums, order_by);
872
873 let (orig_rows, args): (Vec<_>, Vec<_>) = datums
875 .into_iter()
876 .map(|d| {
877 let mut iter = d.unwrap_list().iter();
878 let original_row = iter.next().unwrap();
879 let arg = iter.next().unwrap();
880
881 (original_row, arg)
882 })
883 .unzip();
884
885 let results = first_value_inner(args, window_frame);
886
887 callers_temp_storage.reserve(results.len());
888 results
889 .into_iter()
890 .zip_eq(orig_rows)
891 .map(|(result_value, original_row)| {
892 callers_temp_storage.make_datum(|packer| {
893 packer.push_list_with(|packer| {
894 packer.push(result_value);
895 packer.push(original_row);
896 });
897 })
898 })
899}
900
901fn first_value_inner<'a>(datums: Vec<Datum<'a>>, window_frame: &WindowFrame) -> Vec<Datum<'a>> {
902 let length = datums.len();
903 let mut result: Vec<Datum> = Vec::with_capacity(length);
904 for (idx, current_datum) in datums.iter().enumerate() {
905 let first_value = match &window_frame.start_bound {
906 WindowFrameBound::CurrentRow => *current_datum,
908 WindowFrameBound::UnboundedPreceding => {
909 if let WindowFrameBound::OffsetPreceding(end_offset) = &window_frame.end_bound {
910 let end_offset = usize::cast_from(*end_offset);
911
912 if idx < end_offset {
914 Datum::Null
915 } else {
916 datums[0]
917 }
918 } else {
919 datums[0]
920 }
921 }
922 WindowFrameBound::OffsetPreceding(offset) => {
923 let start_offset = usize::cast_from(*offset);
924 let start_idx = idx.saturating_sub(start_offset);
925 if let WindowFrameBound::OffsetPreceding(end_offset) = &window_frame.end_bound {
926 let end_offset = usize::cast_from(*end_offset);
927
928 if start_offset < end_offset || idx < end_offset {
930 Datum::Null
931 } else {
932 datums[start_idx]
933 }
934 } else {
935 datums[start_idx]
936 }
937 }
938 WindowFrameBound::OffsetFollowing(offset) => {
939 let start_offset = usize::cast_from(*offset);
940 let start_idx = idx.saturating_add(start_offset);
941 if let WindowFrameBound::OffsetFollowing(end_offset) = &window_frame.end_bound {
942 if offset > end_offset || start_idx >= length {
944 Datum::Null
945 } else {
946 datums[start_idx]
947 }
948 } else {
949 datums
950 .get(start_idx)
951 .map(|d| d.clone())
952 .unwrap_or(Datum::Null)
953 }
954 }
955 WindowFrameBound::UnboundedFollowing => unreachable!(),
957 };
958 result.push(first_value);
959 }
960 result
961}
962
963fn last_value<'a, I>(
965 datums: I,
966 callers_temp_storage: &'a RowArena,
967 order_by: &[ColumnOrder],
968 window_frame: &WindowFrame,
969) -> Datum<'a>
970where
971 I: IntoIterator<Item = Datum<'a>>,
972{
973 let temp_storage = RowArena::new();
974 let iter = last_value_no_list(datums, &temp_storage, order_by, window_frame);
975 callers_temp_storage.make_datum(|packer| {
976 packer.push_list(iter);
977 })
978}
979
980fn last_value_no_list<'a: 'b, 'b, I>(
983 datums: I,
984 callers_temp_storage: &'b RowArena,
985 order_by: &[ColumnOrder],
986 window_frame: &WindowFrame,
987) -> impl Iterator<Item = Datum<'b>>
988where
989 I: IntoIterator<Item = Datum<'a>>,
990{
991 let datums = order_aggregate_datums_with_rank(datums, order_by);
994
995 let size_hint = datums.size_hint().0;
997 let mut args = Vec::with_capacity(size_hint);
998 let mut original_rows = Vec::with_capacity(size_hint);
999 let mut order_by_rows = Vec::with_capacity(size_hint);
1000 for (d, order_by_row) in datums.into_iter() {
1001 let mut iter = d.unwrap_list().iter();
1002 let original_row = iter.next().unwrap();
1003 let arg = iter.next().unwrap();
1004 order_by_rows.push(order_by_row);
1005 original_rows.push(original_row);
1006 args.push(arg);
1007 }
1008
1009 let results = last_value_inner(args, &order_by_rows, window_frame);
1010
1011 callers_temp_storage.reserve(results.len());
1012 results
1013 .into_iter()
1014 .zip_eq(original_rows)
1015 .map(|(result_value, original_row)| {
1016 callers_temp_storage.make_datum(|packer| {
1017 packer.push_list_with(|packer| {
1018 packer.push(result_value);
1019 packer.push(original_row);
1020 });
1021 })
1022 })
1023}
1024
1025fn last_value_inner<'a>(
1026 args: Vec<Datum<'a>>,
1027 order_by_rows: &Vec<Row>,
1028 window_frame: &WindowFrame,
1029) -> Vec<Datum<'a>> {
1030 let length = args.len();
1031 let mut results: Vec<Datum> = Vec::with_capacity(length);
1032 for (idx, (current_datum, order_by_row)) in args.iter().zip_eq(order_by_rows).enumerate() {
1033 let last_value = match &window_frame.end_bound {
1034 WindowFrameBound::CurrentRow => match &window_frame.units {
1035 WindowFrameUnits::Rows => *current_datum,
1037 WindowFrameUnits::Range => {
1038 let target_idx = order_by_rows[idx..]
1043 .iter()
1044 .enumerate()
1045 .take_while(|(_, row)| *row == order_by_row)
1046 .last()
1047 .unwrap()
1048 .0
1049 + idx;
1050 args[target_idx]
1051 }
1052 WindowFrameUnits::Groups => unreachable!(),
1054 },
1055 WindowFrameBound::UnboundedFollowing => {
1056 if let WindowFrameBound::OffsetFollowing(start_offset) = &window_frame.start_bound {
1057 let start_offset = usize::cast_from(*start_offset);
1058
1059 if idx + start_offset > length - 1 {
1061 Datum::Null
1062 } else {
1063 args[length - 1]
1064 }
1065 } else {
1066 args[length - 1]
1067 }
1068 }
1069 WindowFrameBound::OffsetFollowing(offset) => {
1070 let end_offset = usize::cast_from(*offset);
1071 let end_idx = idx.saturating_add(end_offset);
1072 if let WindowFrameBound::OffsetFollowing(start_offset) = &window_frame.start_bound {
1073 let start_offset = usize::cast_from(*start_offset);
1074 let start_idx = idx.saturating_add(start_offset);
1075
1076 if end_offset < start_offset || start_idx >= length {
1078 Datum::Null
1079 } else {
1080 args.get(end_idx).unwrap_or(&args[length - 1]).clone()
1082 }
1083 } else {
1084 args.get(end_idx).unwrap_or(&args[length - 1]).clone()
1085 }
1086 }
1087 WindowFrameBound::OffsetPreceding(offset) => {
1088 let end_offset = usize::cast_from(*offset);
1089 let end_idx = idx.saturating_sub(end_offset);
1090 if idx < end_offset {
1091 Datum::Null
1093 } else if let WindowFrameBound::OffsetPreceding(start_offset) =
1094 &window_frame.start_bound
1095 {
1096 if offset > start_offset {
1098 Datum::Null
1099 } else {
1100 args[end_idx]
1101 }
1102 } else {
1103 args[end_idx]
1104 }
1105 }
1106 WindowFrameBound::UnboundedPreceding => unreachable!(),
1108 };
1109 results.push(last_value);
1110 }
1111 results
1112}
1113
1114fn fused_value_window_func<'a, I>(
1120 input_datums: I,
1121 callers_temp_storage: &'a RowArena,
1122 funcs: &Vec<AggregateFunc>,
1123 order_by: &Vec<ColumnOrder>,
1124) -> Datum<'a>
1125where
1126 I: IntoIterator<Item = Datum<'a>>,
1127{
1128 let temp_storage = RowArena::new();
1129 let iter = fused_value_window_func_no_list(input_datums, &temp_storage, funcs, order_by);
1130 callers_temp_storage.make_datum(|packer| {
1131 packer.push_list(iter);
1132 })
1133}
1134
1135fn fused_value_window_func_no_list<'a: 'b, 'b, I>(
1138 input_datums: I,
1139 callers_temp_storage: &'b RowArena,
1140 funcs: &Vec<AggregateFunc>,
1141 order_by: &Vec<ColumnOrder>,
1142) -> impl Iterator<Item = Datum<'b>>
1143where
1144 I: IntoIterator<Item = Datum<'a>>,
1145{
1146 let has_last_value = funcs
1147 .iter()
1148 .any(|f| matches!(f, AggregateFunc::LastValue { .. }));
1149
1150 let input_datums_with_ranks = order_aggregate_datums_with_rank(input_datums, order_by);
1151
1152 let size_hint = input_datums_with_ranks.size_hint().0;
1153 let mut encoded_argsss = vec![Vec::with_capacity(size_hint); funcs.len()];
1154 let mut original_rows = Vec::with_capacity(size_hint);
1155 let mut order_by_rows = Vec::with_capacity(size_hint);
1156 for (d, order_by_row) in input_datums_with_ranks {
1157 let mut iter = d.unwrap_list().iter();
1158 let original_row = iter.next().unwrap();
1159 original_rows.push(original_row);
1160 let mut argss_iter = iter.next().unwrap().unwrap_list().iter();
1161 for i in 0..funcs.len() {
1162 let encoded_args = argss_iter.next().unwrap();
1163 encoded_argsss[i].push(encoded_args);
1164 }
1165 if has_last_value {
1166 order_by_rows.push(order_by_row);
1167 }
1168 }
1169
1170 let mut results_per_row = vec![Vec::with_capacity(funcs.len()); original_rows.len()];
1171 for (func, encoded_argss) in funcs.iter().zip_eq(encoded_argsss) {
1172 let results = match func {
1173 AggregateFunc::LagLead {
1174 order_by: inner_order_by,
1175 lag_lead,
1176 ignore_nulls,
1177 } => {
1178 assert_eq!(order_by, inner_order_by);
1179 let unwrapped_argss = encoded_argss
1180 .into_iter()
1181 .map(|encoded_args| unwrap_lag_lead_encoded_args(encoded_args))
1182 .collect();
1183 lag_lead_inner(unwrapped_argss, lag_lead, ignore_nulls)
1184 }
1185 AggregateFunc::FirstValue {
1186 order_by: inner_order_by,
1187 window_frame,
1188 } => {
1189 assert_eq!(order_by, inner_order_by);
1190 first_value_inner(encoded_argss, window_frame)
1193 }
1194 AggregateFunc::LastValue {
1195 order_by: inner_order_by,
1196 window_frame,
1197 } => {
1198 assert_eq!(order_by, inner_order_by);
1199 last_value_inner(encoded_argss, &order_by_rows, window_frame)
1202 }
1203 _ => panic!("unknown window function in FusedValueWindowFunc"),
1204 };
1205 for (results, result) in results_per_row.iter_mut().zip_eq(results) {
1206 results.push(result);
1207 }
1208 }
1209
1210 callers_temp_storage.reserve(2 * original_rows.len());
1211 results_per_row
1212 .into_iter()
1213 .enumerate()
1214 .map(move |(i, results)| {
1215 callers_temp_storage.make_datum(|packer| {
1216 packer.push_list_with(|packer| {
1217 packer
1218 .push(callers_temp_storage.make_datum(|packer| packer.push_list(results)));
1219 packer.push(original_rows[i]);
1220 });
1221 })
1222 })
1223}
1224
1225fn window_aggr<'a, I, A>(
1234 input_datums: I,
1235 callers_temp_storage: &'a RowArena,
1236 wrapped_aggregate: &AggregateFunc,
1237 order_by: &[ColumnOrder],
1238 window_frame: &WindowFrame,
1239) -> Datum<'a>
1240where
1241 I: IntoIterator<Item = Datum<'a>>,
1242 A: OneByOneAggr,
1243{
1244 let temp_storage = RowArena::new();
1245 let iter = window_aggr_no_list::<I, A>(
1246 input_datums,
1247 &temp_storage,
1248 wrapped_aggregate,
1249 order_by,
1250 window_frame,
1251 );
1252 callers_temp_storage.make_datum(|packer| {
1253 packer.push_list(iter);
1254 })
1255}
1256
1257fn window_aggr_no_list<'a: 'b, 'b, I, A>(
1260 input_datums: I,
1261 callers_temp_storage: &'b RowArena,
1262 wrapped_aggregate: &AggregateFunc,
1263 order_by: &[ColumnOrder],
1264 window_frame: &WindowFrame,
1265) -> impl Iterator<Item = Datum<'b>>
1266where
1267 I: IntoIterator<Item = Datum<'a>>,
1268 A: OneByOneAggr,
1269{
1270 let datums = order_aggregate_datums_with_rank(input_datums, order_by);
1273
1274 let size_hint = datums.size_hint().0;
1276 let mut args: Vec<Datum> = Vec::with_capacity(size_hint);
1277 let mut original_rows: Vec<Datum> = Vec::with_capacity(size_hint);
1278 let mut order_by_rows = Vec::with_capacity(size_hint);
1279 for (d, order_by_row) in datums.into_iter() {
1280 let mut iter = d.unwrap_list().iter();
1281 let original_row = iter.next().unwrap();
1282 let arg = iter.next().unwrap();
1283 order_by_rows.push(order_by_row);
1284 original_rows.push(original_row);
1285 args.push(arg);
1286 }
1287
1288 let results = window_aggr_inner::<A>(
1289 args,
1290 &order_by_rows,
1291 wrapped_aggregate,
1292 order_by,
1293 window_frame,
1294 callers_temp_storage,
1295 );
1296
1297 callers_temp_storage.reserve(results.len());
1298 results
1299 .into_iter()
1300 .zip_eq(original_rows)
1301 .map(|(result_value, original_row)| {
1302 callers_temp_storage.make_datum(|packer| {
1303 packer.push_list_with(|packer| {
1304 packer.push(result_value);
1305 packer.push(original_row);
1306 });
1307 })
1308 })
1309}
1310
1311fn window_aggr_inner<'a, A>(
1312 mut args: Vec<Datum<'a>>,
1313 order_by_rows: &Vec<Row>,
1314 wrapped_aggregate: &AggregateFunc,
1315 order_by: &[ColumnOrder],
1316 window_frame: &WindowFrame,
1317 temp_storage: &'a RowArena,
1318) -> Vec<Datum<'a>>
1319where
1320 A: OneByOneAggr,
1321{
1322 let length = args.len();
1323 let mut result: Vec<Datum> = Vec::with_capacity(length);
1324
1325 soft_assert_or_log!(
1331 !((matches!(window_frame.units, WindowFrameUnits::Groups)
1332 || matches!(window_frame.units, WindowFrameUnits::Range))
1333 && !window_frame.includes_current_row()),
1334 "window frame without current row"
1335 );
1336
1337 if (matches!(
1338 window_frame.start_bound,
1339 WindowFrameBound::UnboundedPreceding
1340 ) && matches!(window_frame.end_bound, WindowFrameBound::UnboundedFollowing))
1341 || (order_by.is_empty()
1342 && (matches!(window_frame.units, WindowFrameUnits::Groups)
1343 || matches!(window_frame.units, WindowFrameUnits::Range))
1344 && window_frame.includes_current_row())
1345 {
1346 let result_value = wrapped_aggregate.eval(args, temp_storage);
1353 for _ in 0..length {
1355 result.push(result_value);
1356 }
1357 } else {
1358 fn rows_between_unbounded_preceding_and_current_row<'a, A>(
1359 args: Vec<Datum<'a>>,
1360 result: &mut Vec<Datum<'a>>,
1361 mut one_by_one_aggr: A,
1362 temp_storage: &'a RowArena,
1363 ) where
1364 A: OneByOneAggr,
1365 {
1366 for current_arg in args.into_iter() {
1367 one_by_one_aggr.give(¤t_arg);
1368 let result_value = one_by_one_aggr.get_current_aggregate(temp_storage);
1369 result.push(result_value);
1370 }
1371 }
1372
1373 fn groups_between_unbounded_preceding_and_current_row<'a, A>(
1374 args: Vec<Datum<'a>>,
1375 order_by_rows: &Vec<Row>,
1376 result: &mut Vec<Datum<'a>>,
1377 mut one_by_one_aggr: A,
1378 temp_storage: &'a RowArena,
1379 ) where
1380 A: OneByOneAggr,
1381 {
1382 let mut peer_group_start = 0;
1383 while peer_group_start < args.len() {
1384 let mut peer_group_end = peer_group_start + 1;
1388 while peer_group_end < args.len()
1389 && order_by_rows[peer_group_start] == order_by_rows[peer_group_end]
1390 {
1391 peer_group_end += 1;
1393 }
1394 for current_arg in args[peer_group_start..peer_group_end].iter() {
1397 one_by_one_aggr.give(current_arg);
1398 }
1399 let agg_for_peer_group = one_by_one_aggr.get_current_aggregate(temp_storage);
1400 for _ in args[peer_group_start..peer_group_end].iter() {
1402 result.push(agg_for_peer_group);
1403 }
1404 peer_group_start = peer_group_end;
1406 }
1407 }
1408
1409 fn rows_between_offset_and_offset<'a>(
1410 args: Vec<Datum<'a>>,
1411 result: &mut Vec<Datum<'a>>,
1412 wrapped_aggregate: &AggregateFunc,
1413 temp_storage: &'a RowArena,
1414 offset_start: i64,
1415 offset_end: i64,
1416 ) {
1417 let len = args
1418 .len()
1419 .to_i64()
1420 .expect("window partition's len should fit into i64");
1421 for i in 0..len {
1422 let i = i.to_i64().expect("window partition shouldn't be super big");
1423 let frame_start = max(i + offset_start, 0)
1426 .to_usize()
1427 .expect("The max made sure it's not negative");
1428 let frame_end = min(i + offset_end, len - 1).to_usize();
1431 match frame_end {
1432 Some(frame_end) => {
1433 if frame_start <= frame_end {
1434 let frame_values = args[frame_start..=frame_end].iter().cloned();
1445 let result_value = wrapped_aggregate.eval(frame_values, temp_storage);
1446 result.push(result_value);
1447 } else {
1448 let result_value = wrapped_aggregate.default();
1450 result.push(result_value);
1451 }
1452 }
1453 None => {
1454 let result_value = wrapped_aggregate.default();
1456 result.push(result_value);
1457 }
1458 }
1459 }
1460 }
1461
1462 match (
1463 &window_frame.units,
1464 &window_frame.start_bound,
1465 &window_frame.end_bound,
1466 ) {
1467 (Rows, UnboundedPreceding, CurrentRow) => {
1472 rows_between_unbounded_preceding_and_current_row::<A>(
1473 args,
1474 &mut result,
1475 A::new(wrapped_aggregate, false),
1476 temp_storage,
1477 );
1478 }
1479 (Rows, CurrentRow, UnboundedFollowing) => {
1480 args.reverse();
1482 rows_between_unbounded_preceding_and_current_row::<A>(
1483 args,
1484 &mut result,
1485 A::new(wrapped_aggregate, true),
1486 temp_storage,
1487 );
1488 result.reverse();
1489 }
1490 (Range, UnboundedPreceding, CurrentRow) => {
1491 groups_between_unbounded_preceding_and_current_row::<A>(
1494 args,
1495 order_by_rows,
1496 &mut result,
1497 A::new(wrapped_aggregate, false),
1498 temp_storage,
1499 );
1500 }
1501 (Rows, OffsetPreceding(start_prec), OffsetPreceding(end_prec)) => {
1505 let start_prec = start_prec.to_i64().expect(
1506 "window frame start OFFSET shouldn't be super big (the planning ensured this)",
1507 );
1508 let end_prec = end_prec.to_i64().expect(
1509 "window frame end OFFSET shouldn't be super big (the planning ensured this)",
1510 );
1511 rows_between_offset_and_offset(
1512 args,
1513 &mut result,
1514 wrapped_aggregate,
1515 temp_storage,
1516 -start_prec,
1517 -end_prec,
1518 );
1519 }
1520 (Rows, OffsetPreceding(start_prec), OffsetFollowing(end_fol)) => {
1521 let start_prec = start_prec.to_i64().expect(
1522 "window frame start OFFSET shouldn't be super big (the planning ensured this)",
1523 );
1524 let end_fol = end_fol.to_i64().expect(
1525 "window frame end OFFSET shouldn't be super big (the planning ensured this)",
1526 );
1527 rows_between_offset_and_offset(
1528 args,
1529 &mut result,
1530 wrapped_aggregate,
1531 temp_storage,
1532 -start_prec,
1533 end_fol,
1534 );
1535 }
1536 (Rows, OffsetFollowing(start_fol), OffsetFollowing(end_fol)) => {
1537 let start_fol = start_fol.to_i64().expect(
1538 "window frame start OFFSET shouldn't be super big (the planning ensured this)",
1539 );
1540 let end_fol = end_fol.to_i64().expect(
1541 "window frame end OFFSET shouldn't be super big (the planning ensured this)",
1542 );
1543 rows_between_offset_and_offset(
1544 args,
1545 &mut result,
1546 wrapped_aggregate,
1547 temp_storage,
1548 start_fol,
1549 end_fol,
1550 );
1551 }
1552 (Rows, OffsetFollowing(_), OffsetPreceding(_)) => {
1553 unreachable!() }
1555 (Rows, OffsetPreceding(start_prec), CurrentRow) => {
1556 let start_prec = start_prec.to_i64().expect(
1557 "window frame start OFFSET shouldn't be super big (the planning ensured this)",
1558 );
1559 let end_fol = 0;
1560 rows_between_offset_and_offset(
1561 args,
1562 &mut result,
1563 wrapped_aggregate,
1564 temp_storage,
1565 -start_prec,
1566 end_fol,
1567 );
1568 }
1569 (Rows, CurrentRow, OffsetFollowing(end_fol)) => {
1570 let start_fol = 0;
1571 let end_fol = end_fol.to_i64().expect(
1572 "window frame end OFFSET shouldn't be super big (the planning ensured this)",
1573 );
1574 rows_between_offset_and_offset(
1575 args,
1576 &mut result,
1577 wrapped_aggregate,
1578 temp_storage,
1579 start_fol,
1580 end_fol,
1581 );
1582 }
1583 (Rows, CurrentRow, CurrentRow) => {
1584 let start_fol = 0;
1587 let end_fol = 0;
1588 rows_between_offset_and_offset(
1589 args,
1590 &mut result,
1591 wrapped_aggregate,
1592 temp_storage,
1593 start_fol,
1594 end_fol,
1595 );
1596 }
1597 (Rows, CurrentRow, OffsetPreceding(_))
1598 | (Rows, UnboundedFollowing, _)
1599 | (Rows, _, UnboundedPreceding)
1600 | (Rows, OffsetFollowing(..), CurrentRow) => {
1601 unreachable!() }
1603 (Rows, UnboundedPreceding, UnboundedFollowing) => {
1604 unreachable!()
1607 }
1608 (Rows, UnboundedPreceding, OffsetPreceding(_))
1609 | (Rows, UnboundedPreceding, OffsetFollowing(_))
1610 | (Rows, OffsetPreceding(..), UnboundedFollowing)
1611 | (Rows, OffsetFollowing(..), UnboundedFollowing) => {
1612 unreachable!()
1615 }
1616 (Range, _, _) => {
1617 unreachable!()
1624 }
1625 (Groups, _, _) => {
1626 unreachable!()
1630 }
1631 }
1632 }
1633
1634 result
1635}
1636
1637fn fused_window_aggr<'a, I, A>(
1641 input_datums: I,
1642 callers_temp_storage: &'a RowArena,
1643 wrapped_aggregates: &Vec<AggregateFunc>,
1644 order_by: &Vec<ColumnOrder>,
1645 window_frame: &WindowFrame,
1646) -> Datum<'a>
1647where
1648 I: IntoIterator<Item = Datum<'a>>,
1649 A: OneByOneAggr,
1650{
1651 let temp_storage = RowArena::new();
1652 let iter = fused_window_aggr_no_list::<_, A>(
1653 input_datums,
1654 &temp_storage,
1655 wrapped_aggregates,
1656 order_by,
1657 window_frame,
1658 );
1659 callers_temp_storage.make_datum(|packer| {
1660 packer.push_list(iter);
1661 })
1662}
1663
1664fn fused_window_aggr_no_list<'a: 'b, 'b, I, A>(
1667 input_datums: I,
1668 callers_temp_storage: &'b RowArena,
1669 wrapped_aggregates: &Vec<AggregateFunc>,
1670 order_by: &Vec<ColumnOrder>,
1671 window_frame: &WindowFrame,
1672) -> impl Iterator<Item = Datum<'b>>
1673where
1674 I: IntoIterator<Item = Datum<'a>>,
1675 A: OneByOneAggr,
1676{
1677 let datums = order_aggregate_datums_with_rank(input_datums, order_by);
1680
1681 let size_hint = datums.size_hint().0;
1682 let mut argss = vec![Vec::with_capacity(size_hint); wrapped_aggregates.len()];
1683 let mut original_rows = Vec::with_capacity(size_hint);
1684 let mut order_by_rows = Vec::with_capacity(size_hint);
1685 for (d, order_by_row) in datums {
1686 let mut iter = d.unwrap_list().iter();
1687 let original_row = iter.next().unwrap();
1688 original_rows.push(original_row);
1689 let args_iter = iter.next().unwrap().unwrap_list().iter();
1690 for (args, arg) in argss.iter_mut().zip_eq(args_iter) {
1692 args.push(arg);
1693 }
1694 order_by_rows.push(order_by_row);
1695 }
1696
1697 let mut results_per_row =
1698 vec![Vec::with_capacity(wrapped_aggregates.len()); original_rows.len()];
1699 for (wrapped_aggr, args) in wrapped_aggregates.iter().zip_eq(argss) {
1700 let results = window_aggr_inner::<A>(
1701 args,
1702 &order_by_rows,
1703 wrapped_aggr,
1704 order_by,
1705 window_frame,
1706 callers_temp_storage,
1707 );
1708 for (results, result) in results_per_row.iter_mut().zip_eq(results) {
1709 results.push(result);
1710 }
1711 }
1712
1713 callers_temp_storage.reserve(2 * original_rows.len());
1714 results_per_row
1715 .into_iter()
1716 .enumerate()
1717 .map(move |(i, results)| {
1718 callers_temp_storage.make_datum(|packer| {
1719 packer.push_list_with(|packer| {
1720 packer
1721 .push(callers_temp_storage.make_datum(|packer| packer.push_list(results)));
1722 packer.push(original_rows[i]);
1723 });
1724 })
1725 })
1726}
1727
1728pub trait OneByOneAggr {
1732 fn new(agg: &AggregateFunc, reverse: bool) -> Self;
1737 fn give(&mut self, d: &Datum);
1739 fn get_current_aggregate<'a>(&self, temp_storage: &'a RowArena) -> Datum<'a>;
1741}
1742
1743#[derive(Debug)]
1749pub struct NaiveOneByOneAggr {
1750 agg: AggregateFunc,
1751 input: Vec<Row>,
1752 reverse: bool,
1753}
1754
1755impl OneByOneAggr for NaiveOneByOneAggr {
1756 fn new(agg: &AggregateFunc, reverse: bool) -> Self {
1757 NaiveOneByOneAggr {
1758 agg: agg.clone(),
1759 input: Vec::new(),
1760 reverse,
1761 }
1762 }
1763
1764 fn give(&mut self, d: &Datum) {
1765 let mut row = Row::default();
1766 row.packer().push(d);
1767 self.input.push(row);
1768 }
1769
1770 fn get_current_aggregate<'a>(&self, temp_storage: &'a RowArena) -> Datum<'a> {
1771 temp_storage.make_datum(|packer| {
1772 packer.push(if !self.reverse {
1773 self.agg
1774 .eval(self.input.iter().map(|r| r.unpack_first()), temp_storage)
1775 } else {
1776 self.agg.eval(
1777 self.input.iter().rev().map(|r| r.unpack_first()),
1778 temp_storage,
1779 )
1780 });
1781 })
1782 }
1783}
1784
1785#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize, Hash, MzReflect)]
1788pub enum LagLeadType {
1789 Lag,
1790 Lead,
1791}
1792
1793#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize, Hash, MzReflect)]
1794pub enum AggregateFunc {
1795 MaxNumeric,
1796 MaxInt16,
1797 MaxInt32,
1798 MaxInt64,
1799 MaxUInt16,
1800 MaxUInt32,
1801 MaxUInt64,
1802 MaxMzTimestamp,
1803 MaxFloat32,
1804 MaxFloat64,
1805 MaxBool,
1806 MaxString,
1807 MaxDate,
1808 MaxTimestamp,
1809 MaxTimestampTz,
1810 MaxInterval,
1811 MaxTime,
1812 MinNumeric,
1813 MinInt16,
1814 MinInt32,
1815 MinInt64,
1816 MinUInt16,
1817 MinUInt32,
1818 MinUInt64,
1819 MinMzTimestamp,
1820 MinFloat32,
1821 MinFloat64,
1822 MinBool,
1823 MinString,
1824 MinDate,
1825 MinTimestamp,
1826 MinTimestampTz,
1827 MinInterval,
1828 MinTime,
1829 SumInt16,
1830 SumInt32,
1831 SumInt64,
1832 SumUInt16,
1833 SumUInt32,
1834 SumUInt64,
1835 SumFloat32,
1836 SumFloat64,
1837 SumNumeric,
1838 Count,
1839 Any,
1840 All,
1841 JsonbAgg {
1848 order_by: Vec<ColumnOrder>,
1849 },
1850 JsonbObjectAgg {
1857 order_by: Vec<ColumnOrder>,
1858 },
1859 MapAgg {
1863 order_by: Vec<ColumnOrder>,
1864 value_type: SqlScalarType,
1865 },
1866 ArrayConcat {
1869 order_by: Vec<ColumnOrder>,
1870 },
1871 ListConcat {
1874 order_by: Vec<ColumnOrder>,
1875 },
1876 StringAgg {
1877 order_by: Vec<ColumnOrder>,
1878 },
1879 RowNumber {
1880 order_by: Vec<ColumnOrder>,
1881 },
1882 Rank {
1883 order_by: Vec<ColumnOrder>,
1884 },
1885 DenseRank {
1886 order_by: Vec<ColumnOrder>,
1887 },
1888 LagLead {
1889 order_by: Vec<ColumnOrder>,
1890 lag_lead: LagLeadType,
1891 ignore_nulls: bool,
1892 },
1893 FirstValue {
1894 order_by: Vec<ColumnOrder>,
1895 window_frame: WindowFrame,
1896 },
1897 LastValue {
1898 order_by: Vec<ColumnOrder>,
1899 window_frame: WindowFrame,
1900 },
1901 FusedValueWindowFunc {
1903 funcs: Vec<AggregateFunc>,
1904 order_by: Vec<ColumnOrder>,
1907 },
1908 WindowAggregate {
1909 wrapped_aggregate: Box<AggregateFunc>,
1910 order_by: Vec<ColumnOrder>,
1911 window_frame: WindowFrame,
1912 },
1913 FusedWindowAggregate {
1914 wrapped_aggregates: Vec<AggregateFunc>,
1915 order_by: Vec<ColumnOrder>,
1916 window_frame: WindowFrame,
1917 },
1918 Dummy,
1923}
1924
1925impl AggregateFunc {
1926 pub fn eval<'a, I>(&self, datums: I, temp_storage: &'a RowArena) -> Datum<'a>
1927 where
1928 I: IntoIterator<Item = Datum<'a>>,
1929 {
1930 match self {
1931 AggregateFunc::MaxNumeric => {
1932 max_datum::<'a, I, OrderedDecimal<numeric::Numeric>>(datums)
1933 }
1934 AggregateFunc::MaxInt16 => max_datum::<'a, I, i16>(datums),
1935 AggregateFunc::MaxInt32 => max_datum::<'a, I, i32>(datums),
1936 AggregateFunc::MaxInt64 => max_datum::<'a, I, i64>(datums),
1937 AggregateFunc::MaxUInt16 => max_datum::<'a, I, u16>(datums),
1938 AggregateFunc::MaxUInt32 => max_datum::<'a, I, u32>(datums),
1939 AggregateFunc::MaxUInt64 => max_datum::<'a, I, u64>(datums),
1940 AggregateFunc::MaxMzTimestamp => max_datum::<'a, I, mz_repr::Timestamp>(datums),
1941 AggregateFunc::MaxFloat32 => max_datum::<'a, I, OrderedFloat<f32>>(datums),
1942 AggregateFunc::MaxFloat64 => max_datum::<'a, I, OrderedFloat<f64>>(datums),
1943 AggregateFunc::MaxBool => max_datum::<'a, I, bool>(datums),
1944 AggregateFunc::MaxString => max_string(datums),
1945 AggregateFunc::MaxDate => max_datum::<'a, I, Date>(datums),
1946 AggregateFunc::MaxTimestamp => {
1947 max_datum::<'a, I, CheckedTimestamp<NaiveDateTime>>(datums)
1948 }
1949 AggregateFunc::MaxTimestampTz => {
1950 max_datum::<'a, I, CheckedTimestamp<DateTime<Utc>>>(datums)
1951 }
1952 AggregateFunc::MaxInterval => max_datum::<'a, I, Interval>(datums),
1953 AggregateFunc::MaxTime => max_datum::<'a, I, NaiveTime>(datums),
1954 AggregateFunc::MinNumeric => {
1955 min_datum::<'a, I, OrderedDecimal<numeric::Numeric>>(datums)
1956 }
1957 AggregateFunc::MinInt16 => min_datum::<'a, I, i16>(datums),
1958 AggregateFunc::MinInt32 => min_datum::<'a, I, i32>(datums),
1959 AggregateFunc::MinInt64 => min_datum::<'a, I, i64>(datums),
1960 AggregateFunc::MinUInt16 => min_datum::<'a, I, u16>(datums),
1961 AggregateFunc::MinUInt32 => min_datum::<'a, I, u32>(datums),
1962 AggregateFunc::MinUInt64 => min_datum::<'a, I, u64>(datums),
1963 AggregateFunc::MinMzTimestamp => min_datum::<'a, I, mz_repr::Timestamp>(datums),
1964 AggregateFunc::MinFloat32 => min_datum::<'a, I, OrderedFloat<f32>>(datums),
1965 AggregateFunc::MinFloat64 => min_datum::<'a, I, OrderedFloat<f64>>(datums),
1966 AggregateFunc::MinBool => min_datum::<'a, I, bool>(datums),
1967 AggregateFunc::MinString => min_string(datums),
1968 AggregateFunc::MinDate => min_datum::<'a, I, Date>(datums),
1969 AggregateFunc::MinTimestamp => {
1970 min_datum::<'a, I, CheckedTimestamp<NaiveDateTime>>(datums)
1971 }
1972 AggregateFunc::MinTimestampTz => {
1973 min_datum::<'a, I, CheckedTimestamp<DateTime<Utc>>>(datums)
1974 }
1975 AggregateFunc::MinInterval => min_datum::<'a, I, Interval>(datums),
1976 AggregateFunc::MinTime => min_datum::<'a, I, NaiveTime>(datums),
1977 AggregateFunc::SumInt16 => sum_datum::<'a, I, i16, i64>(datums),
1978 AggregateFunc::SumInt32 => sum_datum::<'a, I, i32, i64>(datums),
1979 AggregateFunc::SumInt64 => sum_datum::<'a, I, i64, i128>(datums),
1980 AggregateFunc::SumUInt16 => sum_datum::<'a, I, u16, u64>(datums),
1981 AggregateFunc::SumUInt32 => sum_datum::<'a, I, u32, u64>(datums),
1982 AggregateFunc::SumUInt64 => sum_datum::<'a, I, u64, u128>(datums),
1983 AggregateFunc::SumFloat32 => sum_datum::<'a, I, f32, f32>(datums),
1984 AggregateFunc::SumFloat64 => sum_datum::<'a, I, f64, f64>(datums),
1985 AggregateFunc::SumNumeric => sum_numeric(datums),
1986 AggregateFunc::Count => count(datums),
1987 AggregateFunc::Any => any(datums),
1988 AggregateFunc::All => all(datums),
1989 AggregateFunc::JsonbAgg { order_by } => jsonb_agg(datums, temp_storage, order_by),
1990 AggregateFunc::MapAgg { order_by, .. } | AggregateFunc::JsonbObjectAgg { order_by } => {
1991 dict_agg(datums, temp_storage, order_by)
1992 }
1993 AggregateFunc::ArrayConcat { order_by } => array_concat(datums, temp_storage, order_by),
1994 AggregateFunc::ListConcat { order_by } => list_concat(datums, temp_storage, order_by),
1995 AggregateFunc::StringAgg { order_by } => string_agg(datums, temp_storage, order_by),
1996 AggregateFunc::RowNumber { order_by } => row_number(datums, temp_storage, order_by),
1997 AggregateFunc::Rank { order_by } => rank(datums, temp_storage, order_by),
1998 AggregateFunc::DenseRank { order_by } => dense_rank(datums, temp_storage, order_by),
1999 AggregateFunc::LagLead {
2000 order_by,
2001 lag_lead: lag_lead_type,
2002 ignore_nulls,
2003 } => lag_lead(datums, temp_storage, order_by, lag_lead_type, ignore_nulls),
2004 AggregateFunc::FirstValue {
2005 order_by,
2006 window_frame,
2007 } => first_value(datums, temp_storage, order_by, window_frame),
2008 AggregateFunc::LastValue {
2009 order_by,
2010 window_frame,
2011 } => last_value(datums, temp_storage, order_by, window_frame),
2012 AggregateFunc::WindowAggregate {
2013 wrapped_aggregate,
2014 order_by,
2015 window_frame,
2016 } => window_aggr::<_, NaiveOneByOneAggr>(
2017 datums,
2018 temp_storage,
2019 wrapped_aggregate,
2020 order_by,
2021 window_frame,
2022 ),
2023 AggregateFunc::FusedValueWindowFunc { funcs, order_by } => {
2024 fused_value_window_func(datums, temp_storage, funcs, order_by)
2025 }
2026 AggregateFunc::FusedWindowAggregate {
2027 wrapped_aggregates,
2028 order_by,
2029 window_frame,
2030 } => fused_window_aggr::<_, NaiveOneByOneAggr>(
2031 datums,
2032 temp_storage,
2033 wrapped_aggregates,
2034 order_by,
2035 window_frame,
2036 ),
2037 AggregateFunc::Dummy => Datum::Dummy,
2038 }
2039 }
2040
2041 pub fn eval_with_fast_window_agg<'a, I, W>(
2045 &self,
2046 datums: I,
2047 temp_storage: &'a RowArena,
2048 ) -> Datum<'a>
2049 where
2050 I: IntoIterator<Item = Datum<'a>>,
2051 W: OneByOneAggr,
2052 {
2053 match self {
2054 AggregateFunc::WindowAggregate {
2055 wrapped_aggregate,
2056 order_by,
2057 window_frame,
2058 } => window_aggr::<_, W>(
2059 datums,
2060 temp_storage,
2061 wrapped_aggregate,
2062 order_by,
2063 window_frame,
2064 ),
2065 AggregateFunc::FusedWindowAggregate {
2066 wrapped_aggregates,
2067 order_by,
2068 window_frame,
2069 } => fused_window_aggr::<_, W>(
2070 datums,
2071 temp_storage,
2072 wrapped_aggregates,
2073 order_by,
2074 window_frame,
2075 ),
2076 _ => self.eval(datums, temp_storage),
2077 }
2078 }
2079
2080 pub fn eval_with_unnest_list<'a, I, W>(
2081 &self,
2082 datums: I,
2083 temp_storage: &'a RowArena,
2084 ) -> impl Iterator<Item = Datum<'a>>
2085 where
2086 I: IntoIterator<Item = Datum<'a>>,
2087 W: OneByOneAggr,
2088 {
2089 assert!(self.can_fuse_with_unnest_list());
2091 match self {
2092 AggregateFunc::RowNumber { order_by } => {
2093 row_number_no_list(datums, temp_storage, order_by).collect_vec()
2094 }
2095 AggregateFunc::Rank { order_by } => {
2096 rank_no_list(datums, temp_storage, order_by).collect_vec()
2097 }
2098 AggregateFunc::DenseRank { order_by } => {
2099 dense_rank_no_list(datums, temp_storage, order_by).collect_vec()
2100 }
2101 AggregateFunc::LagLead {
2102 order_by,
2103 lag_lead: lag_lead_type,
2104 ignore_nulls,
2105 } => lag_lead_no_list(datums, temp_storage, order_by, lag_lead_type, ignore_nulls)
2106 .collect_vec(),
2107 AggregateFunc::FirstValue {
2108 order_by,
2109 window_frame,
2110 } => first_value_no_list(datums, temp_storage, order_by, window_frame).collect_vec(),
2111 AggregateFunc::LastValue {
2112 order_by,
2113 window_frame,
2114 } => last_value_no_list(datums, temp_storage, order_by, window_frame).collect_vec(),
2115 AggregateFunc::FusedValueWindowFunc { funcs, order_by } => {
2116 fused_value_window_func_no_list(datums, temp_storage, funcs, order_by).collect_vec()
2117 }
2118 AggregateFunc::WindowAggregate {
2119 wrapped_aggregate,
2120 order_by,
2121 window_frame,
2122 } => window_aggr_no_list::<_, W>(
2123 datums,
2124 temp_storage,
2125 wrapped_aggregate,
2126 order_by,
2127 window_frame,
2128 )
2129 .collect_vec(),
2130 AggregateFunc::FusedWindowAggregate {
2131 wrapped_aggregates,
2132 order_by,
2133 window_frame,
2134 } => fused_window_aggr_no_list::<_, W>(
2135 datums,
2136 temp_storage,
2137 wrapped_aggregates,
2138 order_by,
2139 window_frame,
2140 )
2141 .collect_vec(),
2142 _ => unreachable!("asserted above that `can_fuse_with_unnest_list`"),
2143 }
2144 .into_iter()
2145 }
2146
2147 pub fn default(&self) -> Datum<'static> {
2150 match self {
2151 AggregateFunc::Count => Datum::Int64(0),
2152 AggregateFunc::Any => Datum::False,
2153 AggregateFunc::All => Datum::True,
2154 AggregateFunc::Dummy => Datum::Dummy,
2155 _ => Datum::Null,
2156 }
2157 }
2158
2159 pub fn identity_datum(&self) -> Datum<'static> {
2162 match self {
2163 AggregateFunc::Any => Datum::False,
2164 AggregateFunc::All => Datum::True,
2165 AggregateFunc::Dummy => Datum::Dummy,
2166 AggregateFunc::ArrayConcat { .. } => Datum::empty_array(),
2167 AggregateFunc::ListConcat { .. } => Datum::empty_list(),
2168 AggregateFunc::RowNumber { .. }
2169 | AggregateFunc::Rank { .. }
2170 | AggregateFunc::DenseRank { .. }
2171 | AggregateFunc::LagLead { .. }
2172 | AggregateFunc::FirstValue { .. }
2173 | AggregateFunc::LastValue { .. }
2174 | AggregateFunc::WindowAggregate { .. }
2175 | AggregateFunc::FusedValueWindowFunc { .. }
2176 | AggregateFunc::FusedWindowAggregate { .. } => Datum::empty_list(),
2177 AggregateFunc::MaxNumeric
2178 | AggregateFunc::MaxInt16
2179 | AggregateFunc::MaxInt32
2180 | AggregateFunc::MaxInt64
2181 | AggregateFunc::MaxUInt16
2182 | AggregateFunc::MaxUInt32
2183 | AggregateFunc::MaxUInt64
2184 | AggregateFunc::MaxMzTimestamp
2185 | AggregateFunc::MaxFloat32
2186 | AggregateFunc::MaxFloat64
2187 | AggregateFunc::MaxBool
2188 | AggregateFunc::MaxString
2189 | AggregateFunc::MaxDate
2190 | AggregateFunc::MaxTimestamp
2191 | AggregateFunc::MaxTimestampTz
2192 | AggregateFunc::MaxInterval
2193 | AggregateFunc::MaxTime
2194 | AggregateFunc::MinNumeric
2195 | AggregateFunc::MinInt16
2196 | AggregateFunc::MinInt32
2197 | AggregateFunc::MinInt64
2198 | AggregateFunc::MinUInt16
2199 | AggregateFunc::MinUInt32
2200 | AggregateFunc::MinUInt64
2201 | AggregateFunc::MinMzTimestamp
2202 | AggregateFunc::MinFloat32
2203 | AggregateFunc::MinFloat64
2204 | AggregateFunc::MinBool
2205 | AggregateFunc::MinString
2206 | AggregateFunc::MinDate
2207 | AggregateFunc::MinTimestamp
2208 | AggregateFunc::MinTimestampTz
2209 | AggregateFunc::MinInterval
2210 | AggregateFunc::MinTime
2211 | AggregateFunc::SumInt16
2212 | AggregateFunc::SumInt32
2213 | AggregateFunc::SumInt64
2214 | AggregateFunc::SumUInt16
2215 | AggregateFunc::SumUInt32
2216 | AggregateFunc::SumUInt64
2217 | AggregateFunc::SumFloat32
2218 | AggregateFunc::SumFloat64
2219 | AggregateFunc::SumNumeric
2220 | AggregateFunc::Count
2221 | AggregateFunc::JsonbAgg { .. }
2222 | AggregateFunc::JsonbObjectAgg { .. }
2223 | AggregateFunc::MapAgg { .. }
2224 | AggregateFunc::StringAgg { .. } => Datum::Null,
2225 }
2226 }
2227
2228 pub fn can_fuse_with_unnest_list(&self) -> bool {
2229 match self {
2230 AggregateFunc::RowNumber { .. }
2231 | AggregateFunc::Rank { .. }
2232 | AggregateFunc::DenseRank { .. }
2233 | AggregateFunc::LagLead { .. }
2234 | AggregateFunc::FirstValue { .. }
2235 | AggregateFunc::LastValue { .. }
2236 | AggregateFunc::WindowAggregate { .. }
2237 | AggregateFunc::FusedValueWindowFunc { .. }
2238 | AggregateFunc::FusedWindowAggregate { .. } => true,
2239 AggregateFunc::ArrayConcat { .. }
2240 | AggregateFunc::ListConcat { .. }
2241 | AggregateFunc::Any
2242 | AggregateFunc::All
2243 | AggregateFunc::Dummy
2244 | AggregateFunc::MaxNumeric
2245 | AggregateFunc::MaxInt16
2246 | AggregateFunc::MaxInt32
2247 | AggregateFunc::MaxInt64
2248 | AggregateFunc::MaxUInt16
2249 | AggregateFunc::MaxUInt32
2250 | AggregateFunc::MaxUInt64
2251 | AggregateFunc::MaxMzTimestamp
2252 | AggregateFunc::MaxFloat32
2253 | AggregateFunc::MaxFloat64
2254 | AggregateFunc::MaxBool
2255 | AggregateFunc::MaxString
2256 | AggregateFunc::MaxDate
2257 | AggregateFunc::MaxTimestamp
2258 | AggregateFunc::MaxTimestampTz
2259 | AggregateFunc::MaxInterval
2260 | AggregateFunc::MaxTime
2261 | AggregateFunc::MinNumeric
2262 | AggregateFunc::MinInt16
2263 | AggregateFunc::MinInt32
2264 | AggregateFunc::MinInt64
2265 | AggregateFunc::MinUInt16
2266 | AggregateFunc::MinUInt32
2267 | AggregateFunc::MinUInt64
2268 | AggregateFunc::MinMzTimestamp
2269 | AggregateFunc::MinFloat32
2270 | AggregateFunc::MinFloat64
2271 | AggregateFunc::MinBool
2272 | AggregateFunc::MinString
2273 | AggregateFunc::MinDate
2274 | AggregateFunc::MinTimestamp
2275 | AggregateFunc::MinTimestampTz
2276 | AggregateFunc::MinInterval
2277 | AggregateFunc::MinTime
2278 | AggregateFunc::SumInt16
2279 | AggregateFunc::SumInt32
2280 | AggregateFunc::SumInt64
2281 | AggregateFunc::SumUInt16
2282 | AggregateFunc::SumUInt32
2283 | AggregateFunc::SumUInt64
2284 | AggregateFunc::SumFloat32
2285 | AggregateFunc::SumFloat64
2286 | AggregateFunc::SumNumeric
2287 | AggregateFunc::Count
2288 | AggregateFunc::JsonbAgg { .. }
2289 | AggregateFunc::JsonbObjectAgg { .. }
2290 | AggregateFunc::MapAgg { .. }
2291 | AggregateFunc::StringAgg { .. } => false,
2292 }
2293 }
2294
2295 pub fn output_type(&self, input_type: SqlColumnType) -> SqlColumnType {
2301 let scalar_type = match self {
2302 AggregateFunc::Count => SqlScalarType::Int64,
2303 AggregateFunc::Any => SqlScalarType::Bool,
2304 AggregateFunc::All => SqlScalarType::Bool,
2305 AggregateFunc::JsonbAgg { .. } => SqlScalarType::Jsonb,
2306 AggregateFunc::JsonbObjectAgg { .. } => SqlScalarType::Jsonb,
2307 AggregateFunc::SumInt16 => SqlScalarType::Int64,
2308 AggregateFunc::SumInt32 => SqlScalarType::Int64,
2309 AggregateFunc::SumInt64 => SqlScalarType::Numeric {
2310 max_scale: Some(NumericMaxScale::ZERO),
2311 },
2312 AggregateFunc::SumUInt16 => SqlScalarType::UInt64,
2313 AggregateFunc::SumUInt32 => SqlScalarType::UInt64,
2314 AggregateFunc::SumUInt64 => SqlScalarType::Numeric {
2315 max_scale: Some(NumericMaxScale::ZERO),
2316 },
2317 AggregateFunc::MapAgg { value_type, .. } => SqlScalarType::Map {
2318 value_type: Box::new(value_type.clone()),
2319 custom_id: None,
2320 },
2321 AggregateFunc::ArrayConcat { .. } | AggregateFunc::ListConcat { .. } => {
2322 match input_type.scalar_type {
2323 SqlScalarType::Record { ref fields, .. } => fields[0].1.scalar_type.clone(),
2325 _ => unreachable!(),
2326 }
2327 }
2328 AggregateFunc::StringAgg { .. } => SqlScalarType::String,
2329 AggregateFunc::RowNumber { .. } => {
2330 AggregateFunc::output_type_ranking_window_funcs(&input_type, "?row_number?")
2331 }
2332 AggregateFunc::Rank { .. } => {
2333 AggregateFunc::output_type_ranking_window_funcs(&input_type, "?rank?")
2334 }
2335 AggregateFunc::DenseRank { .. } => {
2336 AggregateFunc::output_type_ranking_window_funcs(&input_type, "?dense_rank?")
2337 }
2338 AggregateFunc::LagLead { lag_lead: lag_lead_type, .. } => {
2339 let fields = input_type.scalar_type.unwrap_record_element_type();
2341 let original_row_type = fields[0].unwrap_record_element_type()[0]
2342 .clone()
2343 .nullable(false);
2344 let output_type_inner = Self::lag_lead_output_type_inner_from_encoded_args(fields[0].unwrap_record_element_type()[1]);
2345 let column_name = Self::lag_lead_result_column_name(lag_lead_type);
2346
2347 SqlScalarType::List {
2348 element_type: Box::new(SqlScalarType::Record {
2349 fields: [
2350 (column_name, output_type_inner),
2351 (ColumnName::from("?orig_row?"), original_row_type),
2352 ].into(),
2353 custom_id: None,
2354 }),
2355 custom_id: None,
2356 }
2357 }
2358 AggregateFunc::FirstValue { .. } => {
2359 let fields = input_type.scalar_type.unwrap_record_element_type();
2361 let original_row_type = fields[0].unwrap_record_element_type()[0]
2362 .clone()
2363 .nullable(false);
2364 let value_type = fields[0].unwrap_record_element_type()[1]
2365 .clone()
2366 .nullable(true); SqlScalarType::List {
2369 element_type: Box::new(SqlScalarType::Record {
2370 fields: [
2371 (ColumnName::from("?first_value?"), value_type),
2372 (ColumnName::from("?orig_row?"), original_row_type),
2373 ].into(),
2374 custom_id: None,
2375 }),
2376 custom_id: None,
2377 }
2378 }
2379 AggregateFunc::LastValue { .. } => {
2380 let fields = input_type.scalar_type.unwrap_record_element_type();
2382 let original_row_type = fields[0].unwrap_record_element_type()[0]
2383 .clone()
2384 .nullable(false);
2385 let value_type = fields[0].unwrap_record_element_type()[1]
2386 .clone()
2387 .nullable(true); SqlScalarType::List {
2390 element_type: Box::new(SqlScalarType::Record {
2391 fields: [
2392 (ColumnName::from("?last_value?"), value_type),
2393 (ColumnName::from("?orig_row?"), original_row_type),
2394 ].into(),
2395 custom_id: None,
2396 }),
2397 custom_id: None,
2398 }
2399 }
2400 AggregateFunc::WindowAggregate {
2401 wrapped_aggregate, ..
2402 } => {
2403 let fields = input_type.scalar_type.unwrap_record_element_type();
2405 let original_row_type = fields[0].unwrap_record_element_type()[0]
2406 .clone()
2407 .nullable(false);
2408 let arg_type = fields[0].unwrap_record_element_type()[1]
2409 .clone()
2410 .nullable(true);
2411 let wrapped_aggr_out_type = wrapped_aggregate.output_type(arg_type);
2412
2413 SqlScalarType::List {
2414 element_type: Box::new(SqlScalarType::Record {
2415 fields: [
2416 (ColumnName::from("?window_agg?"), wrapped_aggr_out_type),
2417 (ColumnName::from("?orig_row?"), original_row_type),
2418 ].into(),
2419 custom_id: None,
2420 }),
2421 custom_id: None,
2422 }
2423 }
2424 AggregateFunc::FusedWindowAggregate {
2425 wrapped_aggregates, ..
2426 } => {
2427 let fields = input_type.scalar_type.unwrap_record_element_type();
2430 let original_row_type = fields[0].unwrap_record_element_type()[0]
2431 .clone()
2432 .nullable(false);
2433 let args_type = fields[0].unwrap_record_element_type()[1];
2434 let arg_types = args_type.unwrap_record_element_type();
2435 let out_fields = arg_types.iter().zip_eq(wrapped_aggregates).map(|(arg_type, wrapped_agg)| {
2436 (
2437 ColumnName::from(wrapped_agg.name()),
2438 wrapped_agg.output_type((**arg_type).clone().nullable(true)),
2439 )
2440 }).collect_vec();
2441
2442 SqlScalarType::List {
2443 element_type: Box::new(SqlScalarType::Record {
2444 fields: [
2445 (ColumnName::from("?fused_window_agg?"), SqlScalarType::Record {
2446 fields: out_fields.into(),
2447 custom_id: None,
2448 }.nullable(false)),
2449 (ColumnName::from("?orig_row?"), original_row_type),
2450 ].into(),
2451 custom_id: None,
2452 }),
2453 custom_id: None,
2454 }
2455 }
2456 AggregateFunc::FusedValueWindowFunc { funcs, order_by: _ } => {
2457 let fields = input_type.scalar_type.unwrap_record_element_type();
2462 let original_row_type = fields[0].unwrap_record_element_type()[0]
2463 .clone()
2464 .nullable(false);
2465 let encoded_args_type = fields[0].unwrap_record_element_type()[1].unwrap_record_element_type();
2466
2467 SqlScalarType::List {
2468 element_type: Box::new(SqlScalarType::Record {
2469 fields: [
2470 (ColumnName::from("?fused_value_window_func?"), SqlScalarType::Record {
2471 fields: encoded_args_type.into_iter().zip_eq(funcs).map(|(arg_type, func)| {
2472 match func {
2473 AggregateFunc::LagLead { lag_lead: lag_lead_type, .. } => {
2474 (
2475 Self::lag_lead_result_column_name(lag_lead_type),
2476 Self::lag_lead_output_type_inner_from_encoded_args(arg_type)
2477 )
2478 },
2479 AggregateFunc::FirstValue { .. } => {
2480 (
2481 ColumnName::from("?first_value?"),
2482 arg_type.clone().nullable(true),
2483 )
2484 }
2485 AggregateFunc::LastValue { .. } => {
2486 (
2487 ColumnName::from("?last_value?"),
2488 arg_type.clone().nullable(true),
2489 )
2490 }
2491 _ => panic!("FusedValueWindowFunc has an unknown function"),
2492 }
2493 }).collect(),
2494 custom_id: None,
2495 }.nullable(false)),
2496 (ColumnName::from("?orig_row?"), original_row_type),
2497 ].into(),
2498 custom_id: None,
2499 }),
2500 custom_id: None,
2501 }
2502 }
2503 AggregateFunc::Dummy
2504 | AggregateFunc::MaxNumeric
2505 | AggregateFunc::MaxInt16
2506 | AggregateFunc::MaxInt32
2507 | AggregateFunc::MaxInt64
2508 | AggregateFunc::MaxUInt16
2509 | AggregateFunc::MaxUInt32
2510 | AggregateFunc::MaxUInt64
2511 | AggregateFunc::MaxMzTimestamp
2512 | AggregateFunc::MaxFloat32
2513 | AggregateFunc::MaxFloat64
2514 | AggregateFunc::MaxBool
2515 | AggregateFunc::MaxString
2519 | AggregateFunc::MaxDate
2520 | AggregateFunc::MaxTimestamp
2521 | AggregateFunc::MaxTimestampTz
2522 | AggregateFunc::MaxInterval
2523 | AggregateFunc::MaxTime
2524 | AggregateFunc::MinNumeric
2525 | AggregateFunc::MinInt16
2526 | AggregateFunc::MinInt32
2527 | AggregateFunc::MinInt64
2528 | AggregateFunc::MinUInt16
2529 | AggregateFunc::MinUInt32
2530 | AggregateFunc::MinUInt64
2531 | AggregateFunc::MinMzTimestamp
2532 | AggregateFunc::MinFloat32
2533 | AggregateFunc::MinFloat64
2534 | AggregateFunc::MinBool
2535 | AggregateFunc::MinString
2536 | AggregateFunc::MinDate
2537 | AggregateFunc::MinTimestamp
2538 | AggregateFunc::MinTimestampTz
2539 | AggregateFunc::MinInterval
2540 | AggregateFunc::MinTime
2541 | AggregateFunc::SumFloat32
2542 | AggregateFunc::SumFloat64
2543 | AggregateFunc::SumNumeric => input_type.scalar_type.clone(),
2544 };
2545 let nullable = match self {
2548 AggregateFunc::Count => false,
2549 AggregateFunc::StringAgg { .. } => match input_type.scalar_type {
2551 SqlScalarType::Record { fields, .. } => match &fields[0].1.scalar_type {
2553 SqlScalarType::Record { fields, .. } => fields[0].1.nullable,
2555 _ => unreachable!(),
2556 },
2557 _ => unreachable!(),
2558 },
2559 _ => input_type.nullable,
2560 };
2561 scalar_type.nullable(nullable)
2562 }
2563
2564 fn output_type_ranking_window_funcs(
2566 input_type: &SqlColumnType,
2567 col_name: &str,
2568 ) -> SqlScalarType {
2569 match input_type.scalar_type {
2570 SqlScalarType::Record { ref fields, .. } => SqlScalarType::List {
2571 element_type: Box::new(SqlScalarType::Record {
2572 fields: [
2573 (
2574 ColumnName::from(col_name),
2575 SqlScalarType::Int64.nullable(false),
2576 ),
2577 (ColumnName::from("?orig_row?"), {
2578 let inner = match &fields[0].1.scalar_type {
2579 SqlScalarType::List { element_type, .. } => element_type.clone(),
2580 _ => unreachable!(),
2581 };
2582 inner.nullable(false)
2583 }),
2584 ]
2585 .into(),
2586 custom_id: None,
2587 }),
2588 custom_id: None,
2589 },
2590 _ => unreachable!(),
2591 }
2592 }
2593
2594 fn lag_lead_output_type_inner_from_encoded_args(
2598 encoded_args_type: &SqlScalarType,
2599 ) -> SqlColumnType {
2600 encoded_args_type.unwrap_record_element_type()[0]
2604 .clone()
2605 .nullable(true)
2606 }
2607
2608 fn lag_lead_result_column_name(lag_lead_type: &LagLeadType) -> ColumnName {
2609 ColumnName::from(match lag_lead_type {
2610 LagLeadType::Lag => "?lag?",
2611 LagLeadType::Lead => "?lead?",
2612 })
2613 }
2614
2615 pub fn propagates_nonnull_constraint(&self) -> bool {
2620 match self {
2621 AggregateFunc::MaxNumeric
2622 | AggregateFunc::MaxInt16
2623 | AggregateFunc::MaxInt32
2624 | AggregateFunc::MaxInt64
2625 | AggregateFunc::MaxUInt16
2626 | AggregateFunc::MaxUInt32
2627 | AggregateFunc::MaxUInt64
2628 | AggregateFunc::MaxMzTimestamp
2629 | AggregateFunc::MaxFloat32
2630 | AggregateFunc::MaxFloat64
2631 | AggregateFunc::MaxBool
2632 | AggregateFunc::MaxString
2633 | AggregateFunc::MaxDate
2634 | AggregateFunc::MaxTimestamp
2635 | AggregateFunc::MaxTimestampTz
2636 | AggregateFunc::MinNumeric
2637 | AggregateFunc::MinInt16
2638 | AggregateFunc::MinInt32
2639 | AggregateFunc::MinInt64
2640 | AggregateFunc::MinUInt16
2641 | AggregateFunc::MinUInt32
2642 | AggregateFunc::MinUInt64
2643 | AggregateFunc::MinMzTimestamp
2644 | AggregateFunc::MinFloat32
2645 | AggregateFunc::MinFloat64
2646 | AggregateFunc::MinBool
2647 | AggregateFunc::MinString
2648 | AggregateFunc::MinDate
2649 | AggregateFunc::MinTimestamp
2650 | AggregateFunc::MinTimestampTz
2651 | AggregateFunc::SumInt16
2652 | AggregateFunc::SumInt32
2653 | AggregateFunc::SumInt64
2654 | AggregateFunc::SumUInt16
2655 | AggregateFunc::SumUInt32
2656 | AggregateFunc::SumUInt64
2657 | AggregateFunc::SumFloat32
2658 | AggregateFunc::SumFloat64
2659 | AggregateFunc::SumNumeric
2660 | AggregateFunc::StringAgg { .. } => true,
2661 AggregateFunc::Count => false,
2663 _ => false,
2664 }
2665 }
2666}
2667
2668fn jsonb_each<'a>(a: Datum<'a>) -> impl Iterator<Item = (Row, Diff)> + 'a {
2669 let map = match a {
2671 Datum::Map(dict) => dict,
2672 _ => mz_repr::DatumMap::empty(),
2673 };
2674
2675 map.iter()
2676 .map(move |(k, v)| (Row::pack_slice(&[Datum::String(k), v]), Diff::ONE))
2677}
2678
2679fn jsonb_each_stringify<'a>(
2680 a: Datum<'a>,
2681 temp_storage: &'a RowArena,
2682) -> impl Iterator<Item = (Row, Diff)> + 'a {
2683 let map = match a {
2685 Datum::Map(dict) => dict,
2686 _ => mz_repr::DatumMap::empty(),
2687 };
2688
2689 map.iter().map(move |(k, mut v)| {
2690 v = jsonb_stringify(v, temp_storage)
2691 .map(Datum::String)
2692 .unwrap_or(Datum::Null);
2693 (Row::pack_slice(&[Datum::String(k), v]), Diff::ONE)
2694 })
2695}
2696
2697fn jsonb_object_keys<'a>(a: Datum<'a>) -> impl Iterator<Item = (Row, Diff)> + 'a {
2698 let map = match a {
2699 Datum::Map(dict) => dict,
2700 _ => mz_repr::DatumMap::empty(),
2701 };
2702
2703 map.iter()
2704 .map(move |(k, _)| (Row::pack_slice(&[Datum::String(k)]), Diff::ONE))
2705}
2706
2707fn jsonb_array_elements<'a>(a: Datum<'a>) -> impl Iterator<Item = (Row, Diff)> + 'a {
2708 let list = match a {
2709 Datum::List(list) => list,
2710 _ => mz_repr::DatumList::empty(),
2711 };
2712 list.iter().map(move |e| (Row::pack_slice(&[e]), Diff::ONE))
2713}
2714
2715fn jsonb_array_elements_stringify<'a>(
2716 a: Datum<'a>,
2717 temp_storage: &'a RowArena,
2718) -> impl Iterator<Item = (Row, Diff)> + 'a {
2719 let list = match a {
2720 Datum::List(list) => list,
2721 _ => mz_repr::DatumList::empty(),
2722 };
2723 list.iter().map(move |mut e| {
2724 e = jsonb_stringify(e, temp_storage)
2725 .map(Datum::String)
2726 .unwrap_or(Datum::Null);
2727 (Row::pack_slice(&[e]), Diff::ONE)
2728 })
2729}
2730
2731fn regexp_extract(a: Datum, r: &AnalyzedRegex) -> Option<(Row, Diff)> {
2732 let r = r.inner();
2733 let a = a.unwrap_str();
2734 let captures = r.captures(a)?;
2735 let datums = captures
2736 .iter()
2737 .skip(1)
2738 .map(|m| Datum::from(m.map(|m| m.as_str())));
2739 Some((Row::pack(datums), Diff::ONE))
2740}
2741
2742fn regexp_matches<'a, 'r: 'a>(
2743 exprs: &[Datum<'a>],
2744) -> Result<impl Iterator<Item = (Row, Diff)> + 'a, EvalError> {
2745 assert!(exprs.len() == 2 || exprs.len() == 3);
2749 let a = exprs[0].unwrap_str();
2750 let r = exprs[1].unwrap_str();
2751
2752 let (regex, opts) = if exprs.len() == 3 {
2753 let flag = exprs[2].unwrap_str();
2754 let opts = AnalyzedRegexOpts::from_str(flag)?;
2755 (AnalyzedRegex::new(r, opts)?, opts)
2756 } else {
2757 let opts = AnalyzedRegexOpts::default();
2758 (AnalyzedRegex::new(r, opts)?, opts)
2759 };
2760
2761 let regex = regex.inner().clone();
2762
2763 let iter = regex.captures_iter(a).map(move |captures| {
2764 let matches = captures
2765 .iter()
2766 .skip(1)
2768 .map(|m| Datum::from(m.map(|m| m.as_str())))
2769 .collect::<Vec<_>>();
2770
2771 let mut binding = SharedRow::get();
2772 let mut packer = binding.packer();
2773
2774 let dimension = ArrayDimension {
2775 lower_bound: 1,
2776 length: matches.len(),
2777 };
2778 packer
2779 .try_push_array(&[dimension], matches)
2780 .expect("generated dimensions above");
2781
2782 (binding.clone(), Diff::ONE)
2783 });
2784
2785 let out = iter.collect::<SmallVec<[_; 3]>>();
2790
2791 if opts.global {
2792 Ok(Either::Left(out.into_iter()))
2793 } else {
2794 Ok(Either::Right(out.into_iter().take(1)))
2795 }
2796}
2797
2798fn generate_series<N>(
2799 start: N,
2800 stop: N,
2801 step: N,
2802) -> Result<impl Iterator<Item = (Row, Diff)>, EvalError>
2803where
2804 N: Integer + Signed + CheckedAdd + Clone,
2805 Datum<'static>: From<N>,
2806{
2807 if step == N::zero() {
2808 return Err(EvalError::InvalidParameterValue(
2809 "step size cannot equal zero".into(),
2810 ));
2811 }
2812 Ok(num::range_step_inclusive(start, stop, step)
2813 .map(move |i| (Row::pack_slice(&[Datum::from(i)]), Diff::ONE)))
2814}
2815
2816#[derive(Clone)]
2820pub struct TimestampRangeStepInclusive<T> {
2821 state: CheckedTimestamp<T>,
2822 stop: CheckedTimestamp<T>,
2823 step: Interval,
2824 rev: bool,
2825 done: bool,
2826}
2827
2828impl<T: TimestampLike> Iterator for TimestampRangeStepInclusive<T> {
2829 type Item = CheckedTimestamp<T>;
2830
2831 #[inline]
2832 fn next(&mut self) -> Option<CheckedTimestamp<T>> {
2833 if !self.done
2834 && ((self.rev && self.state >= self.stop) || (!self.rev && self.state <= self.stop))
2835 {
2836 let result = self.state.clone();
2837 match add_timestamp_months(self.state.deref(), self.step.months) {
2838 Ok(state) => match state.checked_add_signed(self.step.duration_as_chrono()) {
2839 Some(v) => match CheckedTimestamp::from_timestamplike(v) {
2840 Ok(v) => self.state = v,
2841 Err(_) => self.done = true,
2842 },
2843 None => self.done = true,
2844 },
2845 Err(..) => {
2846 self.done = true;
2847 }
2848 }
2849
2850 Some(result)
2851 } else {
2852 None
2853 }
2854 }
2855}
2856
2857fn generate_series_ts<T: TimestampLike>(
2858 start: CheckedTimestamp<T>,
2859 stop: CheckedTimestamp<T>,
2860 step: Interval,
2861 conv: fn(CheckedTimestamp<T>) -> Datum<'static>,
2862) -> Result<impl Iterator<Item = (Row, Diff)>, EvalError> {
2863 let normalized_step = step.as_microseconds();
2864 if normalized_step == 0 {
2865 return Err(EvalError::InvalidParameterValue(
2866 "step size cannot equal zero".into(),
2867 ));
2868 }
2869 let rev = normalized_step < 0;
2870
2871 let trsi = TimestampRangeStepInclusive {
2872 state: start,
2873 stop,
2874 step,
2875 rev,
2876 done: false,
2877 };
2878
2879 Ok(trsi.map(move |i| (Row::pack_slice(&[conv(i)]), Diff::ONE)))
2880}
2881
2882fn generate_subscripts_array(
2883 a: Datum,
2884 dim: i32,
2885) -> Result<Box<dyn Iterator<Item = (Row, Diff)>>, EvalError> {
2886 if dim <= 0 {
2887 return Ok(Box::new(iter::empty()));
2888 }
2889
2890 match a.unwrap_array().dims().into_iter().nth(
2891 (dim - 1)
2892 .try_into()
2893 .map_err(|_| EvalError::Int32OutOfRange((dim - 1).to_string().into()))?,
2894 ) {
2895 Some(requested_dim) => Ok(Box::new(generate_series::<i32>(
2896 requested_dim.lower_bound.try_into().map_err(|_| {
2897 EvalError::Int32OutOfRange(requested_dim.lower_bound.to_string().into())
2898 })?,
2899 requested_dim
2900 .length
2901 .try_into()
2902 .map_err(|_| EvalError::Int32OutOfRange(requested_dim.length.to_string().into()))?,
2903 1,
2904 )?)),
2905 None => Ok(Box::new(iter::empty())),
2906 }
2907}
2908
2909fn unnest_array<'a>(a: Datum<'a>) -> impl Iterator<Item = (Row, Diff)> + 'a {
2910 a.unwrap_array()
2911 .elements()
2912 .iter()
2913 .map(move |e| (Row::pack_slice(&[e]), Diff::ONE))
2914}
2915
2916fn unnest_list<'a>(a: Datum<'a>) -> impl Iterator<Item = (Row, Diff)> + 'a {
2917 a.unwrap_list()
2918 .iter()
2919 .map(move |e| (Row::pack_slice(&[e]), Diff::ONE))
2920}
2921
2922fn unnest_map<'a>(a: Datum<'a>) -> impl Iterator<Item = (Row, Diff)> + 'a {
2923 a.unwrap_map()
2924 .iter()
2925 .map(move |(k, v)| (Row::pack_slice(&[Datum::from(k), v]), Diff::ONE))
2926}
2927
2928impl AggregateFunc {
2929 pub fn name(&self) -> &'static str {
2932 match self {
2933 Self::MaxNumeric => "max",
2934 Self::MaxInt16 => "max",
2935 Self::MaxInt32 => "max",
2936 Self::MaxInt64 => "max",
2937 Self::MaxUInt16 => "max",
2938 Self::MaxUInt32 => "max",
2939 Self::MaxUInt64 => "max",
2940 Self::MaxMzTimestamp => "max",
2941 Self::MaxFloat32 => "max",
2942 Self::MaxFloat64 => "max",
2943 Self::MaxBool => "max",
2944 Self::MaxString => "max",
2945 Self::MaxDate => "max",
2946 Self::MaxTimestamp => "max",
2947 Self::MaxTimestampTz => "max",
2948 Self::MaxInterval => "max",
2949 Self::MaxTime => "max",
2950 Self::MinNumeric => "min",
2951 Self::MinInt16 => "min",
2952 Self::MinInt32 => "min",
2953 Self::MinInt64 => "min",
2954 Self::MinUInt16 => "min",
2955 Self::MinUInt32 => "min",
2956 Self::MinUInt64 => "min",
2957 Self::MinMzTimestamp => "min",
2958 Self::MinFloat32 => "min",
2959 Self::MinFloat64 => "min",
2960 Self::MinBool => "min",
2961 Self::MinString => "min",
2962 Self::MinDate => "min",
2963 Self::MinTimestamp => "min",
2964 Self::MinTimestampTz => "min",
2965 Self::MinInterval => "min",
2966 Self::MinTime => "min",
2967 Self::SumInt16 => "sum",
2968 Self::SumInt32 => "sum",
2969 Self::SumInt64 => "sum",
2970 Self::SumUInt16 => "sum",
2971 Self::SumUInt32 => "sum",
2972 Self::SumUInt64 => "sum",
2973 Self::SumFloat32 => "sum",
2974 Self::SumFloat64 => "sum",
2975 Self::SumNumeric => "sum",
2976 Self::Count => "count",
2977 Self::Any => "any",
2978 Self::All => "all",
2979 Self::JsonbAgg { .. } => "jsonb_agg",
2980 Self::JsonbObjectAgg { .. } => "jsonb_object_agg",
2981 Self::MapAgg { .. } => "map_agg",
2982 Self::ArrayConcat { .. } => "array_agg",
2983 Self::ListConcat { .. } => "list_agg",
2984 Self::StringAgg { .. } => "string_agg",
2985 Self::RowNumber { .. } => "row_number",
2986 Self::Rank { .. } => "rank",
2987 Self::DenseRank { .. } => "dense_rank",
2988 Self::LagLead {
2989 lag_lead: LagLeadType::Lag,
2990 ..
2991 } => "lag",
2992 Self::LagLead {
2993 lag_lead: LagLeadType::Lead,
2994 ..
2995 } => "lead",
2996 Self::FirstValue { .. } => "first_value",
2997 Self::LastValue { .. } => "last_value",
2998 Self::WindowAggregate { .. } => "window_agg",
2999 Self::FusedValueWindowFunc { .. } => "fused_value_window_func",
3000 Self::FusedWindowAggregate { .. } => "fused_window_agg",
3001 Self::Dummy => "dummy",
3002 }
3003 }
3004}
3005
3006impl<'a, M> fmt::Display for HumanizedExpr<'a, AggregateFunc, M>
3007where
3008 M: HumanizerMode,
3009{
3010 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
3011 use AggregateFunc::*;
3012 let name = self.expr.name();
3013 match self.expr {
3014 JsonbAgg { order_by }
3015 | JsonbObjectAgg { order_by }
3016 | MapAgg { order_by, .. }
3017 | ArrayConcat { order_by }
3018 | ListConcat { order_by }
3019 | StringAgg { order_by }
3020 | RowNumber { order_by }
3021 | Rank { order_by }
3022 | DenseRank { order_by } => {
3023 let order_by = order_by.iter().map(|col| self.child(col));
3024 write!(f, "{}[order_by=[{}]]", name, separated(", ", order_by))
3025 }
3026 LagLead {
3027 lag_lead: _,
3028 ignore_nulls,
3029 order_by,
3030 } => {
3031 let order_by = order_by.iter().map(|col| self.child(col));
3032 f.write_str(name)?;
3033 f.write_str("[")?;
3034 if *ignore_nulls {
3035 f.write_str("ignore_nulls=true, ")?;
3036 }
3037 write!(f, "order_by=[{}]", separated(", ", order_by))?;
3038 f.write_str("]")
3039 }
3040 FirstValue {
3041 order_by,
3042 window_frame,
3043 } => {
3044 let order_by = order_by.iter().map(|col| self.child(col));
3045 f.write_str(name)?;
3046 f.write_str("[")?;
3047 write!(f, "order_by=[{}]", separated(", ", order_by))?;
3048 if *window_frame != WindowFrame::default() {
3049 write!(f, " {}", window_frame)?;
3050 }
3051 f.write_str("]")
3052 }
3053 LastValue {
3054 order_by,
3055 window_frame,
3056 } => {
3057 let order_by = order_by.iter().map(|col| self.child(col));
3058 f.write_str(name)?;
3059 f.write_str("[")?;
3060 write!(f, "order_by=[{}]", separated(", ", order_by))?;
3061 if *window_frame != WindowFrame::default() {
3062 write!(f, " {}", window_frame)?;
3063 }
3064 f.write_str("]")
3065 }
3066 WindowAggregate {
3067 wrapped_aggregate,
3068 order_by,
3069 window_frame,
3070 } => {
3071 let order_by = order_by.iter().map(|col| self.child(col));
3072 let wrapped_aggregate = self.child(wrapped_aggregate.deref());
3073 f.write_str(name)?;
3074 f.write_str("[")?;
3075 write!(f, "{} ", wrapped_aggregate)?;
3076 write!(f, "order_by=[{}]", separated(", ", order_by))?;
3077 if *window_frame != WindowFrame::default() {
3078 write!(f, " {}", window_frame)?;
3079 }
3080 f.write_str("]")
3081 }
3082 FusedValueWindowFunc { funcs, order_by } => {
3083 let order_by = order_by.iter().map(|col| self.child(col));
3084 let funcs = separated(", ", funcs.iter().map(|func| self.child(func)));
3085 f.write_str(name)?;
3086 f.write_str("[")?;
3087 write!(f, "{} ", funcs)?;
3088 write!(f, "order_by=[{}]", separated(", ", order_by))?;
3089 f.write_str("]")
3090 }
3091 _ => f.write_str(name),
3092 }
3093 }
3094}
3095
3096#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize, Hash, MzReflect)]
3097pub struct CaptureGroupDesc {
3098 pub index: u32,
3099 pub name: Option<String>,
3100 pub nullable: bool,
3101}
3102
3103#[derive(
3104 Clone,
3105 Copy,
3106 Debug,
3107 Eq,
3108 PartialEq,
3109 Ord,
3110 PartialOrd,
3111 Serialize,
3112 Deserialize,
3113 Hash,
3114 MzReflect,
3115 Default,
3116)]
3117pub struct AnalyzedRegexOpts {
3118 pub case_insensitive: bool,
3119 pub global: bool,
3120}
3121
3122impl FromStr for AnalyzedRegexOpts {
3123 type Err = EvalError;
3124
3125 fn from_str(s: &str) -> Result<Self, Self::Err> {
3126 let mut opts = AnalyzedRegexOpts::default();
3127 for c in s.chars() {
3128 match c {
3129 'i' => opts.case_insensitive = true,
3130 'g' => opts.global = true,
3131 _ => return Err(EvalError::InvalidRegexFlag(c)),
3132 }
3133 }
3134 Ok(opts)
3135 }
3136}
3137
3138#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize, Hash, MzReflect)]
3139pub struct AnalyzedRegex(ReprRegex, Vec<CaptureGroupDesc>, AnalyzedRegexOpts);
3140
3141impl AnalyzedRegex {
3142 pub fn new(s: &str, opts: AnalyzedRegexOpts) -> Result<Self, RegexCompilationError> {
3143 let r = ReprRegex::new(s, opts.case_insensitive)?;
3144 #[allow(clippy::as_conversions)]
3146 let descs: Vec<_> = r
3147 .capture_names()
3148 .enumerate()
3149 .skip(1)
3154 .map(|(i, name)| CaptureGroupDesc {
3155 index: i as u32,
3156 name: name.map(String::from),
3157 nullable: true,
3160 })
3161 .collect();
3162 Ok(Self(r, descs, opts))
3163 }
3164 pub fn capture_groups_len(&self) -> usize {
3165 self.1.len()
3166 }
3167 pub fn capture_groups_iter(&self) -> impl Iterator<Item = &CaptureGroupDesc> {
3168 self.1.iter()
3169 }
3170 pub fn inner(&self) -> &Regex {
3171 &(self.0).regex
3172 }
3173 pub fn opts(&self) -> &AnalyzedRegexOpts {
3174 &self.2
3175 }
3176}
3177
3178pub fn csv_extract(a: Datum<'_>, n_cols: usize) -> impl Iterator<Item = (Row, Diff)> + '_ {
3179 let bytes = a.unwrap_str().as_bytes();
3180 let mut row = Row::default();
3181 let csv_reader = csv::ReaderBuilder::new()
3182 .has_headers(false)
3183 .from_reader(bytes);
3184 csv_reader.into_records().filter_map(move |res| match res {
3185 Ok(sr) if sr.len() == n_cols => {
3186 row.packer().extend(sr.iter().map(Datum::String));
3187 Some((row.clone(), Diff::ONE))
3188 }
3189 _ => None,
3190 })
3191}
3192
3193pub fn repeat(a: Datum) -> Option<(Row, Diff)> {
3194 let n = a.unwrap_int64();
3195 if n != 0 {
3196 Some((Row::default(), n.into()))
3197 } else {
3198 None
3199 }
3200}
3201
3202fn wrap<'a>(datums: &'a [Datum<'a>], width: usize) -> impl Iterator<Item = (Row, Diff)> + 'a {
3203 datums
3204 .chunks(width)
3205 .map(|chunk| (Row::pack(chunk), Diff::ONE))
3206}
3207
3208fn acl_explode<'a>(
3209 acl_items: Datum<'a>,
3210 temp_storage: &'a RowArena,
3211) -> Result<impl Iterator<Item = (Row, Diff)> + 'a, EvalError> {
3212 let acl_items = acl_items.unwrap_array();
3213 let mut res = Vec::new();
3214 for acl_item in acl_items.elements().iter() {
3215 if acl_item.is_null() {
3216 return Err(EvalError::AclArrayNullElement);
3217 }
3218 let acl_item = acl_item.unwrap_acl_item();
3219 for privilege in acl_item.acl_mode.explode() {
3220 let row = [
3221 Datum::UInt32(acl_item.grantor.0),
3222 Datum::UInt32(acl_item.grantee.0),
3223 Datum::String(temp_storage.push_string(privilege.to_string())),
3224 Datum::False,
3226 ];
3227 res.push((Row::pack_slice(&row), Diff::ONE));
3228 }
3229 }
3230 Ok(res.into_iter())
3231}
3232
3233fn mz_acl_explode<'a>(
3234 mz_acl_items: Datum<'a>,
3235 temp_storage: &'a RowArena,
3236) -> Result<impl Iterator<Item = (Row, Diff)> + 'a, EvalError> {
3237 let mz_acl_items = mz_acl_items.unwrap_array();
3238 let mut res = Vec::new();
3239 for mz_acl_item in mz_acl_items.elements().iter() {
3240 if mz_acl_item.is_null() {
3241 return Err(EvalError::MzAclArrayNullElement);
3242 }
3243 let mz_acl_item = mz_acl_item.unwrap_mz_acl_item();
3244 for privilege in mz_acl_item.acl_mode.explode() {
3245 let row = [
3246 Datum::String(temp_storage.push_string(mz_acl_item.grantor.to_string())),
3247 Datum::String(temp_storage.push_string(mz_acl_item.grantee.to_string())),
3248 Datum::String(temp_storage.push_string(privilege.to_string())),
3249 Datum::False,
3251 ];
3252 res.push((Row::pack_slice(&row), Diff::ONE));
3253 }
3254 }
3255 Ok(res.into_iter())
3256}
3257
3258#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize, Hash, MzReflect)]
3261pub enum TableFunc {
3262 AclExplode,
3263 MzAclExplode,
3264 JsonbEach,
3265 JsonbEachStringify,
3266 JsonbObjectKeys,
3267 JsonbArrayElements,
3268 JsonbArrayElementsStringify,
3269 RegexpExtract(AnalyzedRegex),
3270 CsvExtract(usize),
3271 GenerateSeriesInt32,
3272 GenerateSeriesInt64,
3273 GenerateSeriesTimestamp,
3274 GenerateSeriesTimestampTz,
3275 GuardSubquerySize {
3296 column_type: SqlScalarType,
3297 },
3298 Repeat,
3299 UnnestArray {
3300 el_typ: SqlScalarType,
3301 },
3302 UnnestList {
3303 el_typ: SqlScalarType,
3304 },
3305 UnnestMap {
3306 value_type: SqlScalarType,
3307 },
3308 Wrap {
3314 types: Vec<SqlColumnType>,
3315 width: usize,
3316 },
3317 GenerateSubscriptsArray,
3318 TabletizedScalar {
3320 name: String,
3321 relation: SqlRelationType,
3322 },
3323 RegexpMatches,
3324 #[allow(private_interfaces)]
3329 WithOrdinality(WithOrdinality),
3330}
3331
3332#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize, Hash, MzReflect)]
3340struct WithOrdinality {
3341 inner: Box<TableFunc>,
3342}
3343
3344impl TableFunc {
3345 pub fn with_ordinality(inner: TableFunc) -> Option<TableFunc> {
3347 match inner {
3348 TableFunc::AclExplode
3349 | TableFunc::MzAclExplode
3350 | TableFunc::JsonbEach
3351 | TableFunc::JsonbEachStringify
3352 | TableFunc::JsonbObjectKeys
3353 | TableFunc::JsonbArrayElements
3354 | TableFunc::JsonbArrayElementsStringify
3355 | TableFunc::RegexpExtract(_)
3356 | TableFunc::CsvExtract(_)
3357 | TableFunc::GenerateSeriesInt32
3358 | TableFunc::GenerateSeriesInt64
3359 | TableFunc::GenerateSeriesTimestamp
3360 | TableFunc::GenerateSeriesTimestampTz
3361 | TableFunc::GuardSubquerySize { .. }
3362 | TableFunc::Repeat
3363 | TableFunc::UnnestArray { .. }
3364 | TableFunc::UnnestList { .. }
3365 | TableFunc::UnnestMap { .. }
3366 | TableFunc::Wrap { .. }
3367 | TableFunc::GenerateSubscriptsArray
3368 | TableFunc::TabletizedScalar { .. }
3369 | TableFunc::RegexpMatches => Some(TableFunc::WithOrdinality(WithOrdinality {
3370 inner: Box::new(inner),
3371 })),
3372 TableFunc::WithOrdinality(_) => None,
3375 }
3376 }
3377}
3378
3379impl TableFunc {
3380 pub fn eval<'a>(
3382 &'a self,
3383 datums: &'a [Datum<'a>],
3384 temp_storage: &'a RowArena,
3385 ) -> Result<Box<dyn Iterator<Item = (Row, Diff)> + 'a>, EvalError> {
3386 if self.empty_on_null_input() && datums.iter().any(|d| d.is_null()) {
3387 return Ok(Box::new(vec![].into_iter()));
3388 }
3389 match self {
3390 TableFunc::AclExplode => Ok(Box::new(acl_explode(datums[0], temp_storage)?)),
3391 TableFunc::MzAclExplode => Ok(Box::new(mz_acl_explode(datums[0], temp_storage)?)),
3392 TableFunc::JsonbEach => Ok(Box::new(jsonb_each(datums[0]))),
3393 TableFunc::JsonbEachStringify => {
3394 Ok(Box::new(jsonb_each_stringify(datums[0], temp_storage)))
3395 }
3396 TableFunc::JsonbObjectKeys => Ok(Box::new(jsonb_object_keys(datums[0]))),
3397 TableFunc::JsonbArrayElements => Ok(Box::new(jsonb_array_elements(datums[0]))),
3398 TableFunc::JsonbArrayElementsStringify => Ok(Box::new(jsonb_array_elements_stringify(
3399 datums[0],
3400 temp_storage,
3401 ))),
3402 TableFunc::RegexpExtract(a) => Ok(Box::new(regexp_extract(datums[0], a).into_iter())),
3403 TableFunc::CsvExtract(n_cols) => Ok(Box::new(csv_extract(datums[0], *n_cols))),
3404 TableFunc::GenerateSeriesInt32 => {
3405 let res = generate_series(
3406 datums[0].unwrap_int32(),
3407 datums[1].unwrap_int32(),
3408 datums[2].unwrap_int32(),
3409 )?;
3410 Ok(Box::new(res))
3411 }
3412 TableFunc::GenerateSeriesInt64 => {
3413 let res = generate_series(
3414 datums[0].unwrap_int64(),
3415 datums[1].unwrap_int64(),
3416 datums[2].unwrap_int64(),
3417 )?;
3418 Ok(Box::new(res))
3419 }
3420 TableFunc::GenerateSeriesTimestamp => {
3421 fn pass_through<'a>(d: CheckedTimestamp<NaiveDateTime>) -> Datum<'a> {
3422 Datum::from(d)
3423 }
3424 let res = generate_series_ts(
3425 datums[0].unwrap_timestamp(),
3426 datums[1].unwrap_timestamp(),
3427 datums[2].unwrap_interval(),
3428 pass_through,
3429 )?;
3430 Ok(Box::new(res))
3431 }
3432 TableFunc::GenerateSeriesTimestampTz => {
3433 fn gen_ts_tz<'a>(d: CheckedTimestamp<DateTime<Utc>>) -> Datum<'a> {
3434 Datum::from(d)
3435 }
3436 let res = generate_series_ts(
3437 datums[0].unwrap_timestamptz(),
3438 datums[1].unwrap_timestamptz(),
3439 datums[2].unwrap_interval(),
3440 gen_ts_tz,
3441 )?;
3442 Ok(Box::new(res))
3443 }
3444 TableFunc::GenerateSubscriptsArray => {
3445 generate_subscripts_array(datums[0], datums[1].unwrap_int32())
3446 }
3447 TableFunc::GuardSubquerySize { column_type: _ } => {
3448 let count = datums[0].unwrap_int64();
3451 if count != 1 {
3452 Err(EvalError::MultipleRowsFromSubquery)
3453 } else {
3454 Ok(Box::new([].into_iter()))
3455 }
3456 }
3457 TableFunc::Repeat => Ok(Box::new(repeat(datums[0]).into_iter())),
3458 TableFunc::UnnestArray { .. } => Ok(Box::new(unnest_array(datums[0]))),
3459 TableFunc::UnnestList { .. } => Ok(Box::new(unnest_list(datums[0]))),
3460 TableFunc::UnnestMap { .. } => Ok(Box::new(unnest_map(datums[0]))),
3461 TableFunc::Wrap { width, .. } => Ok(Box::new(wrap(datums, *width))),
3462 TableFunc::TabletizedScalar { .. } => {
3463 let r = Row::pack_slice(datums);
3464 Ok(Box::new(std::iter::once((r, Diff::ONE))))
3465 }
3466 TableFunc::RegexpMatches => Ok(Box::new(regexp_matches(datums)?)),
3467 TableFunc::WithOrdinality(func_with_ordinality) => {
3468 func_with_ordinality.eval(datums, temp_storage)
3469 }
3470 }
3471 }
3472
3473 pub fn output_type(&self) -> SqlRelationType {
3474 let (column_types, keys) = match self {
3475 TableFunc::AclExplode => {
3476 let column_types = vec![
3477 SqlScalarType::Oid.nullable(false),
3478 SqlScalarType::Oid.nullable(false),
3479 SqlScalarType::String.nullable(false),
3480 SqlScalarType::Bool.nullable(false),
3481 ];
3482 let keys = vec![];
3483 (column_types, keys)
3484 }
3485 TableFunc::MzAclExplode => {
3486 let column_types = vec![
3487 SqlScalarType::String.nullable(false),
3488 SqlScalarType::String.nullable(false),
3489 SqlScalarType::String.nullable(false),
3490 SqlScalarType::Bool.nullable(false),
3491 ];
3492 let keys = vec![];
3493 (column_types, keys)
3494 }
3495 TableFunc::JsonbEach => {
3496 let column_types = vec![
3497 SqlScalarType::String.nullable(false),
3498 SqlScalarType::Jsonb.nullable(false),
3499 ];
3500 let keys = vec![];
3501 (column_types, keys)
3502 }
3503 TableFunc::JsonbEachStringify => {
3504 let column_types = vec![
3505 SqlScalarType::String.nullable(false),
3506 SqlScalarType::String.nullable(true),
3507 ];
3508 let keys = vec![];
3509 (column_types, keys)
3510 }
3511 TableFunc::JsonbObjectKeys => {
3512 let column_types = vec![SqlScalarType::String.nullable(false)];
3513 let keys = vec![];
3514 (column_types, keys)
3515 }
3516 TableFunc::JsonbArrayElements => {
3517 let column_types = vec![SqlScalarType::Jsonb.nullable(false)];
3518 let keys = vec![];
3519 (column_types, keys)
3520 }
3521 TableFunc::JsonbArrayElementsStringify => {
3522 let column_types = vec![SqlScalarType::String.nullable(true)];
3523 let keys = vec![];
3524 (column_types, keys)
3525 }
3526 TableFunc::RegexpExtract(a) => {
3527 let column_types = a
3528 .capture_groups_iter()
3529 .map(|cg| SqlScalarType::String.nullable(cg.nullable))
3530 .collect();
3531 let keys = vec![];
3532 (column_types, keys)
3533 }
3534 TableFunc::CsvExtract(n_cols) => {
3535 let column_types = iter::repeat(SqlScalarType::String.nullable(false))
3536 .take(*n_cols)
3537 .collect();
3538 let keys = vec![];
3539 (column_types, keys)
3540 }
3541 TableFunc::GenerateSeriesInt32 => {
3542 let column_types = vec![SqlScalarType::Int32.nullable(false)];
3543 let keys = vec![vec![0]];
3544 (column_types, keys)
3545 }
3546 TableFunc::GenerateSeriesInt64 => {
3547 let column_types = vec![SqlScalarType::Int64.nullable(false)];
3548 let keys = vec![vec![0]];
3549 (column_types, keys)
3550 }
3551 TableFunc::GenerateSeriesTimestamp => {
3552 let column_types =
3553 vec![SqlScalarType::Timestamp { precision: None }.nullable(false)];
3554 let keys = vec![vec![0]];
3555 (column_types, keys)
3556 }
3557 TableFunc::GenerateSeriesTimestampTz => {
3558 let column_types =
3559 vec![SqlScalarType::TimestampTz { precision: None }.nullable(false)];
3560 let keys = vec![vec![0]];
3561 (column_types, keys)
3562 }
3563 TableFunc::GenerateSubscriptsArray => {
3564 let column_types = vec![SqlScalarType::Int32.nullable(false)];
3565 let keys = vec![vec![0]];
3566 (column_types, keys)
3567 }
3568 TableFunc::GuardSubquerySize { column_type } => {
3569 let column_types = vec![column_type.clone().nullable(false)];
3570 let keys = vec![];
3571 (column_types, keys)
3572 }
3573 TableFunc::Repeat => {
3574 let column_types = vec![];
3575 let keys = vec![];
3576 (column_types, keys)
3577 }
3578 TableFunc::UnnestArray { el_typ } => {
3579 let column_types = vec![el_typ.clone().nullable(true)];
3580 let keys = vec![];
3581 (column_types, keys)
3582 }
3583 TableFunc::UnnestList { el_typ } => {
3584 let column_types = vec![el_typ.clone().nullable(true)];
3585 let keys = vec![];
3586 (column_types, keys)
3587 }
3588 TableFunc::UnnestMap { value_type } => {
3589 let column_types = vec![
3590 SqlScalarType::String.nullable(false),
3591 value_type.clone().nullable(true),
3592 ];
3593 let keys = vec![vec![0]];
3594 (column_types, keys)
3595 }
3596 TableFunc::Wrap { types, .. } => {
3597 let column_types = types.clone();
3598 let keys = vec![];
3599 (column_types, keys)
3600 }
3601 TableFunc::TabletizedScalar { relation, .. } => {
3602 return relation.clone();
3603 }
3604 TableFunc::RegexpMatches => {
3605 let column_types =
3606 vec![SqlScalarType::Array(Box::new(SqlScalarType::String)).nullable(false)];
3607 let keys = vec![];
3608
3609 (column_types, keys)
3610 }
3611 TableFunc::WithOrdinality(WithOrdinality { inner }) => {
3612 let mut typ = inner.output_type();
3613 typ.column_types.push(SqlScalarType::Int64.nullable(false));
3615 typ.keys.push(vec![typ.column_types.len() - 1]);
3617 (typ.column_types, typ.keys)
3618 }
3619 };
3620
3621 soft_assert_eq_no_log!(column_types.len(), self.output_arity());
3622
3623 if !keys.is_empty() {
3624 SqlRelationType::new(column_types).with_keys(keys)
3625 } else {
3626 SqlRelationType::new(column_types)
3627 }
3628 }
3629
3630 pub fn output_arity(&self) -> usize {
3631 match self {
3632 TableFunc::AclExplode => 4,
3633 TableFunc::MzAclExplode => 4,
3634 TableFunc::JsonbEach => 2,
3635 TableFunc::JsonbEachStringify => 2,
3636 TableFunc::JsonbObjectKeys => 1,
3637 TableFunc::JsonbArrayElements => 1,
3638 TableFunc::JsonbArrayElementsStringify => 1,
3639 TableFunc::RegexpExtract(a) => a.capture_groups_len(),
3640 TableFunc::CsvExtract(n_cols) => *n_cols,
3641 TableFunc::GenerateSeriesInt32 => 1,
3642 TableFunc::GenerateSeriesInt64 => 1,
3643 TableFunc::GenerateSeriesTimestamp => 1,
3644 TableFunc::GenerateSeriesTimestampTz => 1,
3645 TableFunc::GenerateSubscriptsArray => 1,
3646 TableFunc::GuardSubquerySize { .. } => 1,
3647 TableFunc::Repeat => 0,
3648 TableFunc::UnnestArray { .. } => 1,
3649 TableFunc::UnnestList { .. } => 1,
3650 TableFunc::UnnestMap { .. } => 2,
3651 TableFunc::Wrap { width, .. } => *width,
3652 TableFunc::TabletizedScalar { relation, .. } => relation.column_types.len(),
3653 TableFunc::RegexpMatches => 1,
3654 TableFunc::WithOrdinality(WithOrdinality { inner }) => inner.output_arity() + 1,
3655 }
3656 }
3657
3658 pub fn empty_on_null_input(&self) -> bool {
3659 match self {
3660 TableFunc::AclExplode
3661 | TableFunc::MzAclExplode
3662 | TableFunc::JsonbEach
3663 | TableFunc::JsonbEachStringify
3664 | TableFunc::JsonbObjectKeys
3665 | TableFunc::JsonbArrayElements
3666 | TableFunc::JsonbArrayElementsStringify
3667 | TableFunc::GenerateSeriesInt32
3668 | TableFunc::GenerateSeriesInt64
3669 | TableFunc::GenerateSeriesTimestamp
3670 | TableFunc::GenerateSeriesTimestampTz
3671 | TableFunc::GenerateSubscriptsArray
3672 | TableFunc::RegexpExtract(_)
3673 | TableFunc::CsvExtract(_)
3674 | TableFunc::Repeat
3675 | TableFunc::UnnestArray { .. }
3676 | TableFunc::UnnestList { .. }
3677 | TableFunc::UnnestMap { .. }
3678 | TableFunc::RegexpMatches => true,
3679 TableFunc::GuardSubquerySize { .. } => false,
3680 TableFunc::Wrap { .. } => false,
3681 TableFunc::TabletizedScalar { .. } => false,
3682 TableFunc::WithOrdinality(WithOrdinality { inner }) => inner.empty_on_null_input(),
3683 }
3684 }
3685
3686 pub fn preserves_monotonicity(&self) -> bool {
3688 match self {
3691 TableFunc::AclExplode => false,
3692 TableFunc::MzAclExplode => false,
3693 TableFunc::JsonbEach => true,
3694 TableFunc::JsonbEachStringify => true,
3695 TableFunc::JsonbObjectKeys => true,
3696 TableFunc::JsonbArrayElements => true,
3697 TableFunc::JsonbArrayElementsStringify => true,
3698 TableFunc::RegexpExtract(_) => true,
3699 TableFunc::CsvExtract(_) => true,
3700 TableFunc::GenerateSeriesInt32 => true,
3701 TableFunc::GenerateSeriesInt64 => true,
3702 TableFunc::GenerateSeriesTimestamp => true,
3703 TableFunc::GenerateSeriesTimestampTz => true,
3704 TableFunc::GenerateSubscriptsArray => true,
3705 TableFunc::Repeat => false,
3706 TableFunc::UnnestArray { .. } => true,
3707 TableFunc::UnnestList { .. } => true,
3708 TableFunc::UnnestMap { .. } => true,
3709 TableFunc::Wrap { .. } => true,
3710 TableFunc::TabletizedScalar { .. } => true,
3711 TableFunc::RegexpMatches => true,
3712 TableFunc::GuardSubquerySize { .. } => false,
3713 TableFunc::WithOrdinality(WithOrdinality { inner }) => inner.preserves_monotonicity(),
3714 }
3715 }
3716}
3717
3718impl fmt::Display for TableFunc {
3719 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
3720 match self {
3721 TableFunc::AclExplode => f.write_str("aclexplode"),
3722 TableFunc::MzAclExplode => f.write_str("mz_aclexplode"),
3723 TableFunc::JsonbEach => f.write_str("jsonb_each"),
3724 TableFunc::JsonbEachStringify => f.write_str("jsonb_each_text"),
3725 TableFunc::JsonbObjectKeys => f.write_str("jsonb_object_keys"),
3726 TableFunc::JsonbArrayElements => f.write_str("jsonb_array_elements"),
3727 TableFunc::JsonbArrayElementsStringify => f.write_str("jsonb_array_elements_text"),
3728 TableFunc::RegexpExtract(a) => write!(f, "regexp_extract({:?}, _)", a.0),
3729 TableFunc::CsvExtract(n_cols) => write!(f, "csv_extract({}, _)", n_cols),
3730 TableFunc::GenerateSeriesInt32 => f.write_str("generate_series"),
3731 TableFunc::GenerateSeriesInt64 => f.write_str("generate_series"),
3732 TableFunc::GenerateSeriesTimestamp => f.write_str("generate_series"),
3733 TableFunc::GenerateSeriesTimestampTz => f.write_str("generate_series"),
3734 TableFunc::GenerateSubscriptsArray => f.write_str("generate_subscripts"),
3735 TableFunc::GuardSubquerySize { .. } => f.write_str("guard_subquery_size"),
3736 TableFunc::Repeat => f.write_str("repeat_row"),
3737 TableFunc::UnnestArray { .. } => f.write_str("unnest_array"),
3738 TableFunc::UnnestList { .. } => f.write_str("unnest_list"),
3739 TableFunc::UnnestMap { .. } => f.write_str("unnest_map"),
3740 TableFunc::Wrap { width, .. } => write!(f, "wrap{}", width),
3741 TableFunc::TabletizedScalar { name, .. } => f.write_str(name),
3742 TableFunc::RegexpMatches => write!(f, "regexp_matches(_, _, _)"),
3743 TableFunc::WithOrdinality(WithOrdinality { inner }) => {
3744 write!(f, "{}[with_ordinality]", inner)
3745 }
3746 }
3747 }
3748}
3749
3750impl WithOrdinality {
3751 fn eval<'a>(
3760 &'a self,
3761 datums: &'a [Datum<'a>],
3762 temp_storage: &'a RowArena,
3763 ) -> Result<Box<dyn Iterator<Item = (Row, Diff)> + 'a>, EvalError> {
3764 let mut next_ordinal: i64 = 1;
3765 let it = self
3766 .inner
3767 .eval(datums, temp_storage)?
3768 .flat_map(move |(mut row, diff)| {
3769 let diff = diff.into_inner();
3770 assert!(diff >= 0);
3777 let mut ordinals = next_ordinal..(next_ordinal + diff);
3779 next_ordinal += diff;
3780 let cap = row.data_len() + datum_size(&Datum::Int64(next_ordinal));
3782 iter::from_fn(move || {
3783 let ordinal = ordinals.next()?;
3784 let mut row = if ordinals.is_empty() {
3785 std::mem::take(&mut row)
3788 } else {
3789 let mut new_row = Row::with_capacity(cap);
3790 new_row.clone_from(&row);
3791 new_row
3792 };
3793 RowPacker::for_existing_row(&mut row).push(Datum::Int64(ordinal));
3794 Some((row, Diff::ONE))
3795 })
3796 });
3797 Ok(Box::new(it))
3798 }
3799}