1#![allow(missing_docs)]
11
12use std::cmp::{max, min};
13use std::iter::Sum;
14use std::ops::Deref;
15use std::str::FromStr;
16use std::{fmt, iter};
17
18use chrono::{DateTime, NaiveDateTime, NaiveTime, Utc};
19use dec::OrderedDecimal;
20use itertools::{Either, Itertools};
21use mz_lowertest::MzReflect;
22use mz_ore::cast::CastFrom;
23
24use mz_ore::str::separated;
25use mz_ore::{soft_assert_eq_no_log, soft_assert_or_log};
26use mz_repr::adt::array::ArrayDimension;
27use mz_repr::adt::date::Date;
28use mz_repr::adt::interval::Interval;
29use mz_repr::adt::numeric::{self, Numeric, NumericMaxScale};
30use mz_repr::adt::regex::Regex as ReprRegex;
31use mz_repr::adt::timestamp::{CheckedTimestamp, TimestampLike};
32use mz_repr::{
33 ColumnName, Datum, Diff, Row, RowArena, RowPacker, SharedRow, SqlColumnType, SqlRelationType,
34 SqlScalarType, datum_size,
35};
36use num::{CheckedAdd, Integer, Signed, ToPrimitive};
37use ordered_float::OrderedFloat;
38use regex::Regex;
39use serde::{Deserialize, Serialize};
40use smallvec::SmallVec;
41
42use crate::EvalError;
43use crate::WindowFrameBound::{
44 CurrentRow, OffsetFollowing, OffsetPreceding, UnboundedFollowing, UnboundedPreceding,
45};
46use crate::WindowFrameUnits::{Groups, Range, Rows};
47use crate::explain::{HumanizedExpr, HumanizerMode};
48use crate::relation::{
49 ColumnOrder, WindowFrame, WindowFrameBound, WindowFrameUnits, compare_columns,
50};
51use crate::scalar::func::{add_timestamp_months, jsonb_stringify};
52
53fn max_string<'a, I>(datums: I) -> Datum<'a>
57where
58 I: IntoIterator<Item = Datum<'a>>,
59{
60 match datums
61 .into_iter()
62 .filter(|d| !d.is_null())
63 .max_by(|a, b| a.unwrap_str().cmp(b.unwrap_str()))
64 {
65 Some(datum) => datum,
66 None => Datum::Null,
67 }
68}
69
70fn max_datum<'a, I, DatumType>(datums: I) -> Datum<'a>
71where
72 I: IntoIterator<Item = Datum<'a>>,
73 DatumType: TryFrom<Datum<'a>> + Ord,
74 <DatumType as TryFrom<Datum<'a>>>::Error: std::fmt::Debug,
75 Datum<'a>: From<Option<DatumType>>,
76{
77 let x: Option<DatumType> = datums
78 .into_iter()
79 .filter(|d| !d.is_null())
80 .map(|d| DatumType::try_from(d).expect("unexpected type"))
81 .max();
82
83 x.into()
84}
85
86fn min_datum<'a, I, DatumType>(datums: I) -> Datum<'a>
87where
88 I: IntoIterator<Item = Datum<'a>>,
89 DatumType: TryFrom<Datum<'a>> + Ord,
90 <DatumType as TryFrom<Datum<'a>>>::Error: std::fmt::Debug,
91 Datum<'a>: From<Option<DatumType>>,
92{
93 let x: Option<DatumType> = datums
94 .into_iter()
95 .filter(|d| !d.is_null())
96 .map(|d| DatumType::try_from(d).expect("unexpected type"))
97 .min();
98
99 x.into()
100}
101
102fn min_string<'a, I>(datums: I) -> Datum<'a>
103where
104 I: IntoIterator<Item = Datum<'a>>,
105{
106 match datums
107 .into_iter()
108 .filter(|d| !d.is_null())
109 .min_by(|a, b| a.unwrap_str().cmp(b.unwrap_str()))
110 {
111 Some(datum) => datum,
112 None => Datum::Null,
113 }
114}
115
116fn sum_datum<'a, I, DatumType, ResultType>(datums: I) -> Datum<'a>
117where
118 I: IntoIterator<Item = Datum<'a>>,
119 DatumType: TryFrom<Datum<'a>>,
120 <DatumType as TryFrom<Datum<'a>>>::Error: std::fmt::Debug,
121 ResultType: From<DatumType> + Sum + Into<Datum<'a>>,
122{
123 let mut datums = datums.into_iter().filter(|d| !d.is_null()).peekable();
124 if datums.peek().is_none() {
125 Datum::Null
126 } else {
127 let x = datums
128 .map(|d| ResultType::from(DatumType::try_from(d).expect("unexpected type")))
129 .sum::<ResultType>();
130 x.into()
131 }
132}
133
134fn sum_numeric<'a, I>(datums: I) -> Datum<'a>
135where
136 I: IntoIterator<Item = Datum<'a>>,
137{
138 let mut cx = numeric::cx_datum();
139 let mut sum = Numeric::zero();
140 let mut empty = true;
141 for d in datums {
142 if !d.is_null() {
143 empty = false;
144 cx.add(&mut sum, &d.unwrap_numeric().0);
145 }
146 }
147 match empty {
148 true => Datum::Null,
149 false => Datum::from(sum),
150 }
151}
152
153#[allow(clippy::as_conversions)]
155fn count<'a, I>(datums: I) -> Datum<'a>
156where
157 I: IntoIterator<Item = Datum<'a>>,
158{
159 let x: i64 = datums.into_iter().filter(|d| !d.is_null()).count() as i64;
161 Datum::from(x)
162}
163
164fn any<'a, I>(datums: I) -> Datum<'a>
165where
166 I: IntoIterator<Item = Datum<'a>>,
167{
168 datums
169 .into_iter()
170 .fold(Datum::False, |state, next| match (state, next) {
171 (Datum::True, _) | (_, Datum::True) => Datum::True,
172 (Datum::Null, _) | (_, Datum::Null) => Datum::Null,
173 _ => Datum::False,
174 })
175}
176
177fn all<'a, I>(datums: I) -> Datum<'a>
178where
179 I: IntoIterator<Item = Datum<'a>>,
180{
181 datums
182 .into_iter()
183 .fold(Datum::True, |state, next| match (state, next) {
184 (Datum::False, _) | (_, Datum::False) => Datum::False,
185 (Datum::Null, _) | (_, Datum::Null) => Datum::Null,
186 _ => Datum::True,
187 })
188}
189
190fn string_agg<'a, I>(datums: I, temp_storage: &'a RowArena, order_by: &[ColumnOrder]) -> Datum<'a>
191where
192 I: IntoIterator<Item = Datum<'a>>,
193{
194 const EMPTY_SEP: &str = "";
195
196 let datums = order_aggregate_datums(datums, order_by);
197 let mut sep_value_pairs = datums.into_iter().filter_map(|d| {
198 if d.is_null() {
199 return None;
200 }
201 let mut value_sep = d.unwrap_list().iter();
202 match (value_sep.next().unwrap(), value_sep.next().unwrap()) {
203 (Datum::Null, _) => None,
204 (Datum::String(val), Datum::Null) => Some((EMPTY_SEP, val)),
205 (Datum::String(val), Datum::String(sep)) => Some((sep, val)),
206 _ => unreachable!(),
207 }
208 });
209
210 let mut s = String::default();
211 match sep_value_pairs.next() {
212 Some((_, value)) => s.push_str(value),
214 None => return Datum::Null,
216 }
217
218 for (sep, value) in sep_value_pairs {
219 s.push_str(sep);
220 s.push_str(value);
221 }
222
223 Datum::String(temp_storage.push_string(s))
224}
225
226fn jsonb_agg<'a, I>(datums: I, temp_storage: &'a RowArena, order_by: &[ColumnOrder]) -> Datum<'a>
227where
228 I: IntoIterator<Item = Datum<'a>>,
229{
230 let datums = order_aggregate_datums(datums, order_by);
231 temp_storage.make_datum(|packer| {
232 packer.push_list(datums.into_iter().filter(|d| !d.is_null()));
233 })
234}
235
236fn dict_agg<'a, I>(datums: I, temp_storage: &'a RowArena, order_by: &[ColumnOrder]) -> Datum<'a>
237where
238 I: IntoIterator<Item = Datum<'a>>,
239{
240 let datums = order_aggregate_datums(datums, order_by);
241 temp_storage.make_datum(|packer| {
242 let mut datums: Vec<_> = datums
243 .into_iter()
244 .filter_map(|d| {
245 if d.is_null() {
246 return None;
247 }
248 let mut list = d.unwrap_list().iter();
249 let key = list.next().unwrap();
250 let val = list.next().unwrap();
251 if key.is_null() {
252 None
255 } else {
256 Some((key.unwrap_str(), val))
257 }
258 })
259 .collect();
260 datums.sort_by_key(|(k, _v)| *k);
266 datums.reverse();
267 datums.dedup_by_key(|(k, _v)| *k);
268 datums.reverse();
269 packer.push_dict(datums);
270 })
271}
272
273pub fn order_aggregate_datums<'a: 'b, 'b, I>(
283 datums: I,
284 order_by: &[ColumnOrder],
285) -> impl Iterator<Item = Datum<'b>>
286where
287 I: IntoIterator<Item = Datum<'a>>,
288{
289 order_aggregate_datums_with_rank_inner(datums, order_by)
290 .into_iter()
291 .map(|(payload, _order_datums)| payload)
293}
294
295fn order_aggregate_datums_with_rank<'a, I>(
298 datums: I,
299 order_by: &[ColumnOrder],
300) -> impl Iterator<Item = (Datum<'a>, Row)>
301where
302 I: IntoIterator<Item = Datum<'a>>,
303{
304 order_aggregate_datums_with_rank_inner(datums, order_by)
305 .into_iter()
306 .map(|(payload, order_by_datums)| (payload, Row::pack(order_by_datums)))
307}
308
309fn order_aggregate_datums_with_rank_inner<'a, I>(
310 datums: I,
311 order_by: &[ColumnOrder],
312) -> Vec<(Datum<'a>, Vec<Datum<'a>>)>
313where
314 I: IntoIterator<Item = Datum<'a>>,
315{
316 let mut decoded: Vec<(Datum, Vec<Datum>)> = datums
317 .into_iter()
318 .map(|d| {
319 let list = d.unwrap_list();
320 let mut list_it = list.iter();
321 let payload = list_it.next().unwrap();
322
323 let mut order_by_datums = Vec::with_capacity(order_by.len());
333 for _ in 0..order_by.len() {
334 order_by_datums.push(
335 list_it
336 .next()
337 .expect("must have exactly the same number of Datums as `order_by`"),
338 );
339 }
340
341 (payload, order_by_datums)
342 })
343 .collect();
344
345 let mut sort_by =
346 |(payload_left, left_order_by_datums): &(Datum, Vec<Datum>),
347 (payload_right, right_order_by_datums): &(Datum, Vec<Datum>)| {
348 compare_columns(
349 order_by,
350 left_order_by_datums,
351 right_order_by_datums,
352 || payload_left.cmp(payload_right),
353 )
354 };
355 decoded.sort_unstable_by(&mut sort_by);
360 decoded
361}
362
363fn array_concat<'a, I>(datums: I, temp_storage: &'a RowArena, order_by: &[ColumnOrder]) -> Datum<'a>
364where
365 I: IntoIterator<Item = Datum<'a>>,
366{
367 let datums = order_aggregate_datums(datums, order_by);
368 let datums: Vec<_> = datums
369 .into_iter()
370 .map(|d| d.unwrap_array().elements().iter())
371 .flatten()
372 .collect();
373 let dims = ArrayDimension {
374 lower_bound: 1,
375 length: datums.len(),
376 };
377 temp_storage.make_datum(|packer| {
378 packer.try_push_array(&[dims], datums).unwrap();
379 })
380}
381
382fn list_concat<'a, I>(datums: I, temp_storage: &'a RowArena, order_by: &[ColumnOrder]) -> Datum<'a>
383where
384 I: IntoIterator<Item = Datum<'a>>,
385{
386 let datums = order_aggregate_datums(datums, order_by);
387 temp_storage.make_datum(|packer| {
388 packer.push_list(datums.into_iter().map(|d| d.unwrap_list().iter()).flatten());
389 })
390}
391
392fn row_number<'a, I>(
396 datums: I,
397 callers_temp_storage: &'a RowArena,
398 order_by: &[ColumnOrder],
399) -> Datum<'a>
400where
401 I: IntoIterator<Item = Datum<'a>>,
402{
403 let temp_storage = RowArena::new();
407 let datums = row_number_no_list(datums, &temp_storage, order_by);
408
409 callers_temp_storage.make_datum(|packer| {
410 packer.push_list(datums);
411 })
412}
413
414fn row_number_no_list<'a: 'b, 'b, I>(
417 datums: I,
418 callers_temp_storage: &'b RowArena,
419 order_by: &[ColumnOrder],
420) -> impl Iterator<Item = Datum<'b>>
421where
422 I: IntoIterator<Item = Datum<'a>>,
423{
424 let datums = order_aggregate_datums(datums, order_by);
425
426 callers_temp_storage.reserve(datums.size_hint().0);
427 #[allow(clippy::disallowed_methods)]
428 datums
429 .into_iter()
430 .map(|d| d.unwrap_list().iter())
431 .flatten()
432 .zip(1i64..)
433 .map(|(d, i)| {
434 callers_temp_storage.make_datum(|packer| {
435 packer.push_list_with(|packer| {
436 packer.push(Datum::Int64(i));
437 packer.push(d);
438 });
439 })
440 })
441}
442
443fn rank<'a, I>(datums: I, callers_temp_storage: &'a RowArena, order_by: &[ColumnOrder]) -> Datum<'a>
447where
448 I: IntoIterator<Item = Datum<'a>>,
449{
450 let temp_storage = RowArena::new();
451 let datums = rank_no_list(datums, &temp_storage, order_by);
452
453 callers_temp_storage.make_datum(|packer| {
454 packer.push_list(datums);
455 })
456}
457
458fn rank_no_list<'a: 'b, 'b, I>(
461 datums: I,
462 callers_temp_storage: &'b RowArena,
463 order_by: &[ColumnOrder],
464) -> impl Iterator<Item = Datum<'b>>
465where
466 I: IntoIterator<Item = Datum<'a>>,
467{
468 let datums = order_aggregate_datums_with_rank(datums, order_by);
470
471 let mut datums = datums
472 .into_iter()
473 .map(|(d0, order_row)| {
474 d0.unwrap_list()
475 .iter()
476 .map(move |d1| (d1, order_row.clone()))
477 })
478 .flatten();
479
480 callers_temp_storage.reserve(datums.size_hint().0);
481 datums
482 .next()
483 .map_or(vec![], |(first_datum, first_order_row)| {
484 datums.fold((first_order_row, 1, 1, vec![(first_datum, 1)]), |mut acc, (next_datum, next_order_row)| {
486 let (ref mut acc_row, ref mut acc_rank, ref mut acc_row_num, ref mut output) = acc;
487 *acc_row_num += 1;
488 if *acc_row != next_order_row {
490 *acc_rank = *acc_row_num;
491 *acc_row = next_order_row;
492 }
493
494 (*output).push((next_datum, *acc_rank));
495 acc
496 })
497 }.3).into_iter().map(|(d, i)| {
498 callers_temp_storage.make_datum(|packer| {
499 packer.push_list_with(|packer| {
500 packer.push(Datum::Int64(i));
501 packer.push(d);
502 });
503 })
504 })
505}
506
507fn dense_rank<'a, I>(
511 datums: I,
512 callers_temp_storage: &'a RowArena,
513 order_by: &[ColumnOrder],
514) -> Datum<'a>
515where
516 I: IntoIterator<Item = Datum<'a>>,
517{
518 let temp_storage = RowArena::new();
519 let datums = dense_rank_no_list(datums, &temp_storage, order_by);
520
521 callers_temp_storage.make_datum(|packer| {
522 packer.push_list(datums);
523 })
524}
525
526fn dense_rank_no_list<'a: 'b, 'b, I>(
529 datums: I,
530 callers_temp_storage: &'b RowArena,
531 order_by: &[ColumnOrder],
532) -> impl Iterator<Item = Datum<'b>>
533where
534 I: IntoIterator<Item = Datum<'a>>,
535{
536 let datums = order_aggregate_datums_with_rank(datums, order_by);
538
539 let mut datums = datums
540 .into_iter()
541 .map(|(d0, order_row)| {
542 d0.unwrap_list()
543 .iter()
544 .map(move |d1| (d1, order_row.clone()))
545 })
546 .flatten();
547
548 callers_temp_storage.reserve(datums.size_hint().0);
549 datums
550 .next()
551 .map_or(vec![], |(first_datum, first_order_row)| {
552 datums.fold((first_order_row, 1, vec![(first_datum, 1)]), |mut acc, (next_datum, next_order_row)| {
554 let (ref mut acc_row, ref mut acc_rank, ref mut output) = acc;
555 if *acc_row != next_order_row {
557 *acc_rank += 1;
558 *acc_row = next_order_row;
559 }
560
561 (*output).push((next_datum, *acc_rank));
562 acc
563 })
564 }.2).into_iter().map(|(d, i)| {
565 callers_temp_storage.make_datum(|packer| {
566 packer.push_list_with(|packer| {
567 packer.push(Datum::Int64(i));
568 packer.push(d);
569 });
570 })
571 })
572}
573
574fn lag_lead<'a, I>(
596 datums: I,
597 callers_temp_storage: &'a RowArena,
598 order_by: &[ColumnOrder],
599 lag_lead_type: &LagLeadType,
600 ignore_nulls: &bool,
601) -> Datum<'a>
602where
603 I: IntoIterator<Item = Datum<'a>>,
604{
605 let temp_storage = RowArena::new();
606 let iter = lag_lead_no_list(datums, &temp_storage, order_by, lag_lead_type, ignore_nulls);
607 callers_temp_storage.make_datum(|packer| {
608 packer.push_list(iter);
609 })
610}
611
612fn lag_lead_no_list<'a: 'b, 'b, I>(
615 datums: I,
616 callers_temp_storage: &'b RowArena,
617 order_by: &[ColumnOrder],
618 lag_lead_type: &LagLeadType,
619 ignore_nulls: &bool,
620) -> impl Iterator<Item = Datum<'b>>
621where
622 I: IntoIterator<Item = Datum<'a>>,
623{
624 let datums = order_aggregate_datums(datums, order_by);
626
627 let (orig_rows, unwrapped_args): (Vec<_>, Vec<_>) = datums
631 .into_iter()
632 .map(|d| {
633 let mut iter = d.unwrap_list().iter();
634 let original_row = iter.next().unwrap();
635 let (input_value, offset, default_value) =
636 unwrap_lag_lead_encoded_args(iter.next().unwrap());
637 (original_row, (input_value, offset, default_value))
638 })
639 .unzip();
640
641 let result = lag_lead_inner(unwrapped_args, lag_lead_type, ignore_nulls);
642
643 callers_temp_storage.reserve(result.len());
644 result
645 .into_iter()
646 .zip_eq(orig_rows)
647 .map(|(result_value, original_row)| {
648 callers_temp_storage.make_datum(|packer| {
649 packer.push_list_with(|packer| {
650 packer.push(result_value);
651 packer.push(original_row);
652 });
653 })
654 })
655}
656
657fn unwrap_lag_lead_encoded_args(encoded_args: Datum) -> (Datum, Datum, Datum) {
659 let mut encoded_args_iter = encoded_args.unwrap_list().iter();
660 let (input_value, offset, default_value) = (
661 encoded_args_iter.next().unwrap(),
662 encoded_args_iter.next().unwrap(),
663 encoded_args_iter.next().unwrap(),
664 );
665 (input_value, offset, default_value)
666}
667
668fn lag_lead_inner<'a>(
671 args: Vec<(Datum<'a>, Datum<'a>, Datum<'a>)>,
672 lag_lead_type: &LagLeadType,
673 ignore_nulls: &bool,
674) -> Vec<Datum<'a>> {
675 if *ignore_nulls {
676 lag_lead_inner_ignore_nulls(args, lag_lead_type)
677 } else {
678 lag_lead_inner_respect_nulls(args, lag_lead_type)
679 }
680}
681
682fn lag_lead_inner_respect_nulls<'a>(
683 args: Vec<(Datum<'a>, Datum<'a>, Datum<'a>)>,
684 lag_lead_type: &LagLeadType,
685) -> Vec<Datum<'a>> {
686 let mut result: Vec<Datum> = Vec::with_capacity(args.len());
687 for (idx, (_, offset, default_value)) in args.iter().enumerate() {
688 if offset.is_null() {
690 result.push(Datum::Null);
691 continue;
692 }
693
694 let idx = i64::try_from(idx).expect("Array index does not fit in i64");
695 let offset = i64::from(offset.unwrap_int32());
696 let offset = match lag_lead_type {
697 LagLeadType::Lag => -offset,
698 LagLeadType::Lead => offset,
699 };
700
701 let datums_get = |i: i64| -> Option<Datum> {
703 match u64::try_from(i) {
704 Ok(i) => args
705 .get(usize::cast_from(i))
706 .map(|d| Some(d.0)) .unwrap_or(None), Err(_) => None, }
710 };
711
712 let lagged_value = datums_get(idx + offset).unwrap_or(*default_value);
713
714 result.push(lagged_value);
715 }
716
717 result
718}
719
720#[allow(clippy::as_conversions)]
724fn lag_lead_inner_ignore_nulls<'a>(
725 args: Vec<(Datum<'a>, Datum<'a>, Datum<'a>)>,
726 lag_lead_type: &LagLeadType,
727) -> Vec<Datum<'a>> {
728 if i64::try_from(args.len()).is_err() {
731 panic!("window partition way too big")
732 }
733 let mut skip_nulls_backward = vec![None; args.len()];
736 let mut last_non_null: i64 = -1;
737 let pairs = args
738 .iter()
739 .enumerate()
740 .zip_eq(skip_nulls_backward.iter_mut());
741 for ((i, (d, _, _)), slot) in pairs {
742 if d.is_null() {
743 *slot = Some(last_non_null);
744 } else {
745 last_non_null = i as i64;
746 }
747 }
748 let mut skip_nulls_forward = vec![None; args.len()];
749 let mut last_non_null: i64 = args.len() as i64;
750 let pairs = args
751 .iter()
752 .enumerate()
753 .rev()
754 .zip_eq(skip_nulls_forward.iter_mut().rev());
755 for ((i, (d, _, _)), slot) in pairs {
756 if d.is_null() {
757 *slot = Some(last_non_null);
758 } else {
759 last_non_null = i as i64;
760 }
761 }
762
763 let mut result: Vec<Datum> = Vec::with_capacity(args.len());
765 for (idx, (_, offset, default_value)) in args.iter().enumerate() {
766 if offset.is_null() {
768 result.push(Datum::Null);
769 continue;
770 }
771
772 let idx = idx as i64; let offset = i64::cast_from(offset.unwrap_int32());
774 let offset = match lag_lead_type {
775 LagLeadType::Lag => -offset,
776 LagLeadType::Lead => offset,
777 };
778 let increment = offset.signum();
779
780 let datums_get = |i: i64| -> Option<Datum> {
782 match u64::try_from(i) {
783 Ok(i) => args
784 .get(usize::cast_from(i))
785 .map(|d| Some(d.0)) .unwrap_or(None), Err(_) => None, }
789 };
790
791 let lagged_value = if increment != 0 {
792 let mut j = idx;
802 for _ in 0..num::abs(offset) {
803 j += increment;
804 if datums_get(j).is_some_and(|d| d.is_null()) {
806 let ju = j as usize; if increment > 0 {
808 j = skip_nulls_forward[ju].expect("checked above that it's null");
809 } else {
810 j = skip_nulls_backward[ju].expect("checked above that it's null");
811 }
812 }
813 if datums_get(j).is_none() {
814 break;
815 }
816 }
817 match datums_get(j) {
818 Some(datum) => datum,
819 None => *default_value,
820 }
821 } else {
822 assert_eq!(offset, 0);
823 let datum = datums_get(idx).expect("known to exist");
824 if !datum.is_null() {
825 datum
826 } else {
827 panic!("0 offset in lag/lead IGNORE NULLS");
833 }
834 };
835
836 result.push(lagged_value);
837 }
838
839 result
840}
841
842fn first_value<'a, I>(
844 datums: I,
845 callers_temp_storage: &'a RowArena,
846 order_by: &[ColumnOrder],
847 window_frame: &WindowFrame,
848) -> Datum<'a>
849where
850 I: IntoIterator<Item = Datum<'a>>,
851{
852 let temp_storage = RowArena::new();
853 let iter = first_value_no_list(datums, &temp_storage, order_by, window_frame);
854 callers_temp_storage.make_datum(|packer| {
855 packer.push_list(iter);
856 })
857}
858
859fn first_value_no_list<'a: 'b, 'b, I>(
862 datums: I,
863 callers_temp_storage: &'b RowArena,
864 order_by: &[ColumnOrder],
865 window_frame: &WindowFrame,
866) -> impl Iterator<Item = Datum<'b>>
867where
868 I: IntoIterator<Item = Datum<'a>>,
869{
870 let datums = order_aggregate_datums(datums, order_by);
872
873 let (orig_rows, args): (Vec<_>, Vec<_>) = datums
875 .into_iter()
876 .map(|d| {
877 let mut iter = d.unwrap_list().iter();
878 let original_row = iter.next().unwrap();
879 let arg = iter.next().unwrap();
880
881 (original_row, arg)
882 })
883 .unzip();
884
885 let results = first_value_inner(args, window_frame);
886
887 callers_temp_storage.reserve(results.len());
888 results
889 .into_iter()
890 .zip_eq(orig_rows)
891 .map(|(result_value, original_row)| {
892 callers_temp_storage.make_datum(|packer| {
893 packer.push_list_with(|packer| {
894 packer.push(result_value);
895 packer.push(original_row);
896 });
897 })
898 })
899}
900
901fn first_value_inner<'a>(datums: Vec<Datum<'a>>, window_frame: &WindowFrame) -> Vec<Datum<'a>> {
902 let length = datums.len();
903 let mut result: Vec<Datum> = Vec::with_capacity(length);
904 for (idx, current_datum) in datums.iter().enumerate() {
905 let first_value = match &window_frame.start_bound {
906 WindowFrameBound::CurrentRow => *current_datum,
908 WindowFrameBound::UnboundedPreceding => {
909 if let WindowFrameBound::OffsetPreceding(end_offset) = &window_frame.end_bound {
910 let end_offset = usize::cast_from(*end_offset);
911
912 if idx < end_offset {
914 Datum::Null
915 } else {
916 datums[0]
917 }
918 } else {
919 datums[0]
920 }
921 }
922 WindowFrameBound::OffsetPreceding(offset) => {
923 let start_offset = usize::cast_from(*offset);
924 let start_idx = idx.saturating_sub(start_offset);
925 if let WindowFrameBound::OffsetPreceding(end_offset) = &window_frame.end_bound {
926 let end_offset = usize::cast_from(*end_offset);
927
928 if start_offset < end_offset || idx < end_offset {
930 Datum::Null
931 } else {
932 datums[start_idx]
933 }
934 } else {
935 datums[start_idx]
936 }
937 }
938 WindowFrameBound::OffsetFollowing(offset) => {
939 let start_offset = usize::cast_from(*offset);
940 let start_idx = idx.saturating_add(start_offset);
941 if let WindowFrameBound::OffsetFollowing(end_offset) = &window_frame.end_bound {
942 if offset > end_offset || start_idx >= length {
944 Datum::Null
945 } else {
946 datums[start_idx]
947 }
948 } else {
949 datums
950 .get(start_idx)
951 .map(|d| d.clone())
952 .unwrap_or(Datum::Null)
953 }
954 }
955 WindowFrameBound::UnboundedFollowing => unreachable!(),
957 };
958 result.push(first_value);
959 }
960 result
961}
962
963fn last_value<'a, I>(
965 datums: I,
966 callers_temp_storage: &'a RowArena,
967 order_by: &[ColumnOrder],
968 window_frame: &WindowFrame,
969) -> Datum<'a>
970where
971 I: IntoIterator<Item = Datum<'a>>,
972{
973 let temp_storage = RowArena::new();
974 let iter = last_value_no_list(datums, &temp_storage, order_by, window_frame);
975 callers_temp_storage.make_datum(|packer| {
976 packer.push_list(iter);
977 })
978}
979
980fn last_value_no_list<'a: 'b, 'b, I>(
983 datums: I,
984 callers_temp_storage: &'b RowArena,
985 order_by: &[ColumnOrder],
986 window_frame: &WindowFrame,
987) -> impl Iterator<Item = Datum<'b>>
988where
989 I: IntoIterator<Item = Datum<'a>>,
990{
991 let datums = order_aggregate_datums_with_rank(datums, order_by);
994
995 let size_hint = datums.size_hint().0;
997 let mut args = Vec::with_capacity(size_hint);
998 let mut original_rows = Vec::with_capacity(size_hint);
999 let mut order_by_rows = Vec::with_capacity(size_hint);
1000 for (d, order_by_row) in datums.into_iter() {
1001 let mut iter = d.unwrap_list().iter();
1002 let original_row = iter.next().unwrap();
1003 let arg = iter.next().unwrap();
1004 order_by_rows.push(order_by_row);
1005 original_rows.push(original_row);
1006 args.push(arg);
1007 }
1008
1009 let results = last_value_inner(args, &order_by_rows, window_frame);
1010
1011 callers_temp_storage.reserve(results.len());
1012 results
1013 .into_iter()
1014 .zip_eq(original_rows)
1015 .map(|(result_value, original_row)| {
1016 callers_temp_storage.make_datum(|packer| {
1017 packer.push_list_with(|packer| {
1018 packer.push(result_value);
1019 packer.push(original_row);
1020 });
1021 })
1022 })
1023}
1024
1025fn last_value_inner<'a>(
1026 args: Vec<Datum<'a>>,
1027 order_by_rows: &Vec<Row>,
1028 window_frame: &WindowFrame,
1029) -> Vec<Datum<'a>> {
1030 let length = args.len();
1031 let mut results: Vec<Datum> = Vec::with_capacity(length);
1032 for (idx, (current_datum, order_by_row)) in args.iter().zip_eq(order_by_rows).enumerate() {
1033 let last_value = match &window_frame.end_bound {
1034 WindowFrameBound::CurrentRow => match &window_frame.units {
1035 WindowFrameUnits::Rows => *current_datum,
1037 WindowFrameUnits::Range => {
1038 let target_idx = order_by_rows[idx..]
1043 .iter()
1044 .enumerate()
1045 .take_while(|(_, row)| *row == order_by_row)
1046 .last()
1047 .unwrap()
1048 .0
1049 + idx;
1050 args[target_idx]
1051 }
1052 WindowFrameUnits::Groups => unreachable!(),
1054 },
1055 WindowFrameBound::UnboundedFollowing => {
1056 if let WindowFrameBound::OffsetFollowing(start_offset) = &window_frame.start_bound {
1057 let start_offset = usize::cast_from(*start_offset);
1058
1059 if idx + start_offset > length - 1 {
1061 Datum::Null
1062 } else {
1063 args[length - 1]
1064 }
1065 } else {
1066 args[length - 1]
1067 }
1068 }
1069 WindowFrameBound::OffsetFollowing(offset) => {
1070 let end_offset = usize::cast_from(*offset);
1071 let end_idx = idx.saturating_add(end_offset);
1072 if let WindowFrameBound::OffsetFollowing(start_offset) = &window_frame.start_bound {
1073 let start_offset = usize::cast_from(*start_offset);
1074 let start_idx = idx.saturating_add(start_offset);
1075
1076 if end_offset < start_offset || start_idx >= length {
1078 Datum::Null
1079 } else {
1080 args.get(end_idx).unwrap_or(&args[length - 1]).clone()
1082 }
1083 } else {
1084 args.get(end_idx).unwrap_or(&args[length - 1]).clone()
1085 }
1086 }
1087 WindowFrameBound::OffsetPreceding(offset) => {
1088 let end_offset = usize::cast_from(*offset);
1089 let end_idx = idx.saturating_sub(end_offset);
1090 if idx < end_offset {
1091 Datum::Null
1093 } else if let WindowFrameBound::OffsetPreceding(start_offset) =
1094 &window_frame.start_bound
1095 {
1096 if offset > start_offset {
1098 Datum::Null
1099 } else {
1100 args[end_idx]
1101 }
1102 } else {
1103 args[end_idx]
1104 }
1105 }
1106 WindowFrameBound::UnboundedPreceding => unreachable!(),
1108 };
1109 results.push(last_value);
1110 }
1111 results
1112}
1113
1114fn fused_value_window_func<'a, I>(
1120 input_datums: I,
1121 callers_temp_storage: &'a RowArena,
1122 funcs: &Vec<AggregateFunc>,
1123 order_by: &Vec<ColumnOrder>,
1124) -> Datum<'a>
1125where
1126 I: IntoIterator<Item = Datum<'a>>,
1127{
1128 let temp_storage = RowArena::new();
1129 let iter = fused_value_window_func_no_list(input_datums, &temp_storage, funcs, order_by);
1130 callers_temp_storage.make_datum(|packer| {
1131 packer.push_list(iter);
1132 })
1133}
1134
1135fn fused_value_window_func_no_list<'a: 'b, 'b, I>(
1138 input_datums: I,
1139 callers_temp_storage: &'b RowArena,
1140 funcs: &Vec<AggregateFunc>,
1141 order_by: &Vec<ColumnOrder>,
1142) -> impl Iterator<Item = Datum<'b>>
1143where
1144 I: IntoIterator<Item = Datum<'a>>,
1145{
1146 let has_last_value = funcs
1147 .iter()
1148 .any(|f| matches!(f, AggregateFunc::LastValue { .. }));
1149
1150 let input_datums_with_ranks = order_aggregate_datums_with_rank(input_datums, order_by);
1151
1152 let size_hint = input_datums_with_ranks.size_hint().0;
1153 let mut encoded_argsss = vec![Vec::with_capacity(size_hint); funcs.len()];
1154 let mut original_rows = Vec::with_capacity(size_hint);
1155 let mut order_by_rows = Vec::with_capacity(size_hint);
1156 for (d, order_by_row) in input_datums_with_ranks {
1157 let mut iter = d.unwrap_list().iter();
1158 let original_row = iter.next().unwrap();
1159 original_rows.push(original_row);
1160 let mut argss_iter = iter.next().unwrap().unwrap_list().iter();
1161 for i in 0..funcs.len() {
1162 let encoded_args = argss_iter.next().unwrap();
1163 encoded_argsss[i].push(encoded_args);
1164 }
1165 if has_last_value {
1166 order_by_rows.push(order_by_row);
1167 }
1168 }
1169
1170 let mut results_per_row = vec![Vec::with_capacity(funcs.len()); original_rows.len()];
1171 for (func, encoded_argss) in funcs.iter().zip_eq(encoded_argsss) {
1172 let results = match func {
1173 AggregateFunc::LagLead {
1174 order_by: inner_order_by,
1175 lag_lead,
1176 ignore_nulls,
1177 } => {
1178 assert_eq!(order_by, inner_order_by);
1179 let unwrapped_argss = encoded_argss
1180 .into_iter()
1181 .map(|encoded_args| unwrap_lag_lead_encoded_args(encoded_args))
1182 .collect();
1183 lag_lead_inner(unwrapped_argss, lag_lead, ignore_nulls)
1184 }
1185 AggregateFunc::FirstValue {
1186 order_by: inner_order_by,
1187 window_frame,
1188 } => {
1189 assert_eq!(order_by, inner_order_by);
1190 first_value_inner(encoded_argss, window_frame)
1193 }
1194 AggregateFunc::LastValue {
1195 order_by: inner_order_by,
1196 window_frame,
1197 } => {
1198 assert_eq!(order_by, inner_order_by);
1199 last_value_inner(encoded_argss, &order_by_rows, window_frame)
1202 }
1203 _ => panic!("unknown window function in FusedValueWindowFunc"),
1204 };
1205 for (results, result) in results_per_row.iter_mut().zip_eq(results) {
1206 results.push(result);
1207 }
1208 }
1209
1210 callers_temp_storage.reserve(2 * original_rows.len());
1211 results_per_row
1212 .into_iter()
1213 .enumerate()
1214 .map(move |(i, results)| {
1215 callers_temp_storage.make_datum(|packer| {
1216 packer.push_list_with(|packer| {
1217 packer
1218 .push(callers_temp_storage.make_datum(|packer| packer.push_list(results)));
1219 packer.push(original_rows[i]);
1220 });
1221 })
1222 })
1223}
1224
1225fn window_aggr<'a, I, A>(
1234 input_datums: I,
1235 callers_temp_storage: &'a RowArena,
1236 wrapped_aggregate: &AggregateFunc,
1237 order_by: &[ColumnOrder],
1238 window_frame: &WindowFrame,
1239) -> Datum<'a>
1240where
1241 I: IntoIterator<Item = Datum<'a>>,
1242 A: OneByOneAggr,
1243{
1244 let temp_storage = RowArena::new();
1245 let iter = window_aggr_no_list::<I, A>(
1246 input_datums,
1247 &temp_storage,
1248 wrapped_aggregate,
1249 order_by,
1250 window_frame,
1251 );
1252 callers_temp_storage.make_datum(|packer| {
1253 packer.push_list(iter);
1254 })
1255}
1256
1257fn window_aggr_no_list<'a: 'b, 'b, I, A>(
1260 input_datums: I,
1261 callers_temp_storage: &'b RowArena,
1262 wrapped_aggregate: &AggregateFunc,
1263 order_by: &[ColumnOrder],
1264 window_frame: &WindowFrame,
1265) -> impl Iterator<Item = Datum<'b>>
1266where
1267 I: IntoIterator<Item = Datum<'a>>,
1268 A: OneByOneAggr,
1269{
1270 let datums = order_aggregate_datums_with_rank(input_datums, order_by);
1273
1274 let size_hint = datums.size_hint().0;
1276 let mut args: Vec<Datum> = Vec::with_capacity(size_hint);
1277 let mut original_rows: Vec<Datum> = Vec::with_capacity(size_hint);
1278 let mut order_by_rows = Vec::with_capacity(size_hint);
1279 for (d, order_by_row) in datums.into_iter() {
1280 let mut iter = d.unwrap_list().iter();
1281 let original_row = iter.next().unwrap();
1282 let arg = iter.next().unwrap();
1283 order_by_rows.push(order_by_row);
1284 original_rows.push(original_row);
1285 args.push(arg);
1286 }
1287
1288 let results = window_aggr_inner::<A>(
1289 args,
1290 &order_by_rows,
1291 wrapped_aggregate,
1292 order_by,
1293 window_frame,
1294 callers_temp_storage,
1295 );
1296
1297 callers_temp_storage.reserve(results.len());
1298 results
1299 .into_iter()
1300 .zip_eq(original_rows)
1301 .map(|(result_value, original_row)| {
1302 callers_temp_storage.make_datum(|packer| {
1303 packer.push_list_with(|packer| {
1304 packer.push(result_value);
1305 packer.push(original_row);
1306 });
1307 })
1308 })
1309}
1310
1311fn window_aggr_inner<'a, A>(
1312 mut args: Vec<Datum<'a>>,
1313 order_by_rows: &Vec<Row>,
1314 wrapped_aggregate: &AggregateFunc,
1315 order_by: &[ColumnOrder],
1316 window_frame: &WindowFrame,
1317 temp_storage: &'a RowArena,
1318) -> Vec<Datum<'a>>
1319where
1320 A: OneByOneAggr,
1321{
1322 let length = args.len();
1323 let mut result: Vec<Datum> = Vec::with_capacity(length);
1324
1325 soft_assert_or_log!(
1331 !((matches!(window_frame.units, WindowFrameUnits::Groups)
1332 || matches!(window_frame.units, WindowFrameUnits::Range))
1333 && !window_frame.includes_current_row()),
1334 "window frame without current row"
1335 );
1336
1337 if (matches!(
1338 window_frame.start_bound,
1339 WindowFrameBound::UnboundedPreceding
1340 ) && matches!(window_frame.end_bound, WindowFrameBound::UnboundedFollowing))
1341 || (order_by.is_empty()
1342 && (matches!(window_frame.units, WindowFrameUnits::Groups)
1343 || matches!(window_frame.units, WindowFrameUnits::Range))
1344 && window_frame.includes_current_row())
1345 {
1346 let result_value = wrapped_aggregate.eval(args, temp_storage);
1353 for _ in 0..length {
1355 result.push(result_value);
1356 }
1357 } else {
1358 fn rows_between_unbounded_preceding_and_current_row<'a, A>(
1359 args: Vec<Datum<'a>>,
1360 result: &mut Vec<Datum<'a>>,
1361 mut one_by_one_aggr: A,
1362 temp_storage: &'a RowArena,
1363 ) where
1364 A: OneByOneAggr,
1365 {
1366 for current_arg in args.into_iter() {
1367 one_by_one_aggr.give(¤t_arg);
1368 let result_value = one_by_one_aggr.get_current_aggregate(temp_storage);
1369 result.push(result_value);
1370 }
1371 }
1372
1373 fn groups_between_unbounded_preceding_and_current_row<'a, A>(
1374 args: Vec<Datum<'a>>,
1375 order_by_rows: &Vec<Row>,
1376 result: &mut Vec<Datum<'a>>,
1377 mut one_by_one_aggr: A,
1378 temp_storage: &'a RowArena,
1379 ) where
1380 A: OneByOneAggr,
1381 {
1382 let mut peer_group_start = 0;
1383 while peer_group_start < args.len() {
1384 let mut peer_group_end = peer_group_start + 1;
1388 while peer_group_end < args.len()
1389 && order_by_rows[peer_group_start] == order_by_rows[peer_group_end]
1390 {
1391 peer_group_end += 1;
1393 }
1394 for current_arg in args[peer_group_start..peer_group_end].iter() {
1397 one_by_one_aggr.give(current_arg);
1398 }
1399 let agg_for_peer_group = one_by_one_aggr.get_current_aggregate(temp_storage);
1400 for _ in args[peer_group_start..peer_group_end].iter() {
1402 result.push(agg_for_peer_group);
1403 }
1404 peer_group_start = peer_group_end;
1406 }
1407 }
1408
1409 fn rows_between_offset_and_offset<'a>(
1410 args: Vec<Datum<'a>>,
1411 result: &mut Vec<Datum<'a>>,
1412 wrapped_aggregate: &AggregateFunc,
1413 temp_storage: &'a RowArena,
1414 offset_start: i64,
1415 offset_end: i64,
1416 ) {
1417 let len = args
1418 .len()
1419 .to_i64()
1420 .expect("window partition's len should fit into i64");
1421 for i in 0..len {
1422 let i = i.to_i64().expect("window partition shouldn't be super big");
1423 let frame_start = max(i + offset_start, 0)
1426 .to_usize()
1427 .expect("The max made sure it's not negative");
1428 let frame_end = min(i + offset_end, len - 1).to_usize();
1431 match frame_end {
1432 Some(frame_end) => {
1433 if frame_start <= frame_end {
1434 let frame_values = args[frame_start..=frame_end].iter().cloned();
1445 let result_value = wrapped_aggregate.eval(frame_values, temp_storage);
1446 result.push(result_value);
1447 } else {
1448 let result_value = wrapped_aggregate.default();
1450 result.push(result_value);
1451 }
1452 }
1453 None => {
1454 let result_value = wrapped_aggregate.default();
1456 result.push(result_value);
1457 }
1458 }
1459 }
1460 }
1461
1462 match (
1463 &window_frame.units,
1464 &window_frame.start_bound,
1465 &window_frame.end_bound,
1466 ) {
1467 (Rows, UnboundedPreceding, CurrentRow) => {
1472 rows_between_unbounded_preceding_and_current_row::<A>(
1473 args,
1474 &mut result,
1475 A::new(wrapped_aggregate, false),
1476 temp_storage,
1477 );
1478 }
1479 (Rows, CurrentRow, UnboundedFollowing) => {
1480 args.reverse();
1482 rows_between_unbounded_preceding_and_current_row::<A>(
1483 args,
1484 &mut result,
1485 A::new(wrapped_aggregate, true),
1486 temp_storage,
1487 );
1488 result.reverse();
1489 }
1490 (Range, UnboundedPreceding, CurrentRow) => {
1491 groups_between_unbounded_preceding_and_current_row::<A>(
1494 args,
1495 order_by_rows,
1496 &mut result,
1497 A::new(wrapped_aggregate, false),
1498 temp_storage,
1499 );
1500 }
1501 (Rows, OffsetPreceding(start_prec), OffsetPreceding(end_prec)) => {
1505 let start_prec = start_prec.to_i64().expect(
1506 "window frame start OFFSET shouldn't be super big (the planning ensured this)",
1507 );
1508 let end_prec = end_prec.to_i64().expect(
1509 "window frame end OFFSET shouldn't be super big (the planning ensured this)",
1510 );
1511 rows_between_offset_and_offset(
1512 args,
1513 &mut result,
1514 wrapped_aggregate,
1515 temp_storage,
1516 -start_prec,
1517 -end_prec,
1518 );
1519 }
1520 (Rows, OffsetPreceding(start_prec), OffsetFollowing(end_fol)) => {
1521 let start_prec = start_prec.to_i64().expect(
1522 "window frame start OFFSET shouldn't be super big (the planning ensured this)",
1523 );
1524 let end_fol = end_fol.to_i64().expect(
1525 "window frame end OFFSET shouldn't be super big (the planning ensured this)",
1526 );
1527 rows_between_offset_and_offset(
1528 args,
1529 &mut result,
1530 wrapped_aggregate,
1531 temp_storage,
1532 -start_prec,
1533 end_fol,
1534 );
1535 }
1536 (Rows, OffsetFollowing(start_fol), OffsetFollowing(end_fol)) => {
1537 let start_fol = start_fol.to_i64().expect(
1538 "window frame start OFFSET shouldn't be super big (the planning ensured this)",
1539 );
1540 let end_fol = end_fol.to_i64().expect(
1541 "window frame end OFFSET shouldn't be super big (the planning ensured this)",
1542 );
1543 rows_between_offset_and_offset(
1544 args,
1545 &mut result,
1546 wrapped_aggregate,
1547 temp_storage,
1548 start_fol,
1549 end_fol,
1550 );
1551 }
1552 (Rows, OffsetFollowing(_), OffsetPreceding(_)) => {
1553 unreachable!() }
1555 (Rows, OffsetPreceding(start_prec), CurrentRow) => {
1556 let start_prec = start_prec.to_i64().expect(
1557 "window frame start OFFSET shouldn't be super big (the planning ensured this)",
1558 );
1559 let end_fol = 0;
1560 rows_between_offset_and_offset(
1561 args,
1562 &mut result,
1563 wrapped_aggregate,
1564 temp_storage,
1565 -start_prec,
1566 end_fol,
1567 );
1568 }
1569 (Rows, CurrentRow, OffsetFollowing(end_fol)) => {
1570 let start_fol = 0;
1571 let end_fol = end_fol.to_i64().expect(
1572 "window frame end OFFSET shouldn't be super big (the planning ensured this)",
1573 );
1574 rows_between_offset_and_offset(
1575 args,
1576 &mut result,
1577 wrapped_aggregate,
1578 temp_storage,
1579 start_fol,
1580 end_fol,
1581 );
1582 }
1583 (Rows, CurrentRow, CurrentRow) => {
1584 let start_fol = 0;
1587 let end_fol = 0;
1588 rows_between_offset_and_offset(
1589 args,
1590 &mut result,
1591 wrapped_aggregate,
1592 temp_storage,
1593 start_fol,
1594 end_fol,
1595 );
1596 }
1597 (Rows, CurrentRow, OffsetPreceding(_))
1598 | (Rows, UnboundedFollowing, _)
1599 | (Rows, _, UnboundedPreceding)
1600 | (Rows, OffsetFollowing(..), CurrentRow) => {
1601 unreachable!() }
1603 (Rows, UnboundedPreceding, UnboundedFollowing) => {
1604 unreachable!()
1607 }
1608 (Rows, UnboundedPreceding, OffsetPreceding(_))
1609 | (Rows, UnboundedPreceding, OffsetFollowing(_))
1610 | (Rows, OffsetPreceding(..), UnboundedFollowing)
1611 | (Rows, OffsetFollowing(..), UnboundedFollowing) => {
1612 unreachable!()
1615 }
1616 (Range, _, _) => {
1617 unreachable!()
1624 }
1625 (Groups, _, _) => {
1626 unreachable!()
1630 }
1631 }
1632 }
1633
1634 result
1635}
1636
1637fn fused_window_aggr<'a, I, A>(
1641 input_datums: I,
1642 callers_temp_storage: &'a RowArena,
1643 wrapped_aggregates: &Vec<AggregateFunc>,
1644 order_by: &Vec<ColumnOrder>,
1645 window_frame: &WindowFrame,
1646) -> Datum<'a>
1647where
1648 I: IntoIterator<Item = Datum<'a>>,
1649 A: OneByOneAggr,
1650{
1651 let temp_storage = RowArena::new();
1652 let iter = fused_window_aggr_no_list::<_, A>(
1653 input_datums,
1654 &temp_storage,
1655 wrapped_aggregates,
1656 order_by,
1657 window_frame,
1658 );
1659 callers_temp_storage.make_datum(|packer| {
1660 packer.push_list(iter);
1661 })
1662}
1663
1664fn fused_window_aggr_no_list<'a: 'b, 'b, I, A>(
1667 input_datums: I,
1668 callers_temp_storage: &'b RowArena,
1669 wrapped_aggregates: &Vec<AggregateFunc>,
1670 order_by: &Vec<ColumnOrder>,
1671 window_frame: &WindowFrame,
1672) -> impl Iterator<Item = Datum<'b>>
1673where
1674 I: IntoIterator<Item = Datum<'a>>,
1675 A: OneByOneAggr,
1676{
1677 let datums = order_aggregate_datums_with_rank(input_datums, order_by);
1680
1681 let size_hint = datums.size_hint().0;
1682 let mut argss = vec![Vec::with_capacity(size_hint); wrapped_aggregates.len()];
1683 let mut original_rows = Vec::with_capacity(size_hint);
1684 let mut order_by_rows = Vec::with_capacity(size_hint);
1685 for (d, order_by_row) in datums {
1686 let mut iter = d.unwrap_list().iter();
1687 let original_row = iter.next().unwrap();
1688 original_rows.push(original_row);
1689 let args_iter = iter.next().unwrap().unwrap_list().iter();
1690 for (args, arg) in argss.iter_mut().zip_eq(args_iter) {
1692 args.push(arg);
1693 }
1694 order_by_rows.push(order_by_row);
1695 }
1696
1697 let mut results_per_row =
1698 vec![Vec::with_capacity(wrapped_aggregates.len()); original_rows.len()];
1699 for (wrapped_aggr, args) in wrapped_aggregates.iter().zip_eq(argss) {
1700 let results = window_aggr_inner::<A>(
1701 args,
1702 &order_by_rows,
1703 wrapped_aggr,
1704 order_by,
1705 window_frame,
1706 callers_temp_storage,
1707 );
1708 for (results, result) in results_per_row.iter_mut().zip_eq(results) {
1709 results.push(result);
1710 }
1711 }
1712
1713 callers_temp_storage.reserve(2 * original_rows.len());
1714 results_per_row
1715 .into_iter()
1716 .enumerate()
1717 .map(move |(i, results)| {
1718 callers_temp_storage.make_datum(|packer| {
1719 packer.push_list_with(|packer| {
1720 packer
1721 .push(callers_temp_storage.make_datum(|packer| packer.push_list(results)));
1722 packer.push(original_rows[i]);
1723 });
1724 })
1725 })
1726}
1727
1728pub trait OneByOneAggr {
1732 fn new(agg: &AggregateFunc, reverse: bool) -> Self;
1737 fn give(&mut self, d: &Datum);
1739 fn get_current_aggregate<'a>(&self, temp_storage: &'a RowArena) -> Datum<'a>;
1741}
1742
1743#[derive(Debug)]
1749pub struct NaiveOneByOneAggr {
1750 agg: AggregateFunc,
1751 input: Vec<Row>,
1752 reverse: bool,
1753}
1754
1755impl OneByOneAggr for NaiveOneByOneAggr {
1756 fn new(agg: &AggregateFunc, reverse: bool) -> Self {
1757 NaiveOneByOneAggr {
1758 agg: agg.clone(),
1759 input: Vec::new(),
1760 reverse,
1761 }
1762 }
1763
1764 fn give(&mut self, d: &Datum) {
1765 let mut row = Row::default();
1766 row.packer().push(d);
1767 self.input.push(row);
1768 }
1769
1770 fn get_current_aggregate<'a>(&self, temp_storage: &'a RowArena) -> Datum<'a> {
1771 temp_storage.make_datum(|packer| {
1772 packer.push(if !self.reverse {
1773 self.agg
1774 .eval(self.input.iter().map(|r| r.unpack_first()), temp_storage)
1775 } else {
1776 self.agg.eval(
1777 self.input.iter().rev().map(|r| r.unpack_first()),
1778 temp_storage,
1779 )
1780 });
1781 })
1782 }
1783}
1784
1785#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize, Hash, MzReflect)]
1788pub enum LagLeadType {
1789 Lag,
1790 Lead,
1791}
1792
1793#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize, Hash, MzReflect)]
1794pub enum AggregateFunc {
1795 MaxNumeric,
1796 MaxInt16,
1797 MaxInt32,
1798 MaxInt64,
1799 MaxUInt16,
1800 MaxUInt32,
1801 MaxUInt64,
1802 MaxMzTimestamp,
1803 MaxFloat32,
1804 MaxFloat64,
1805 MaxBool,
1806 MaxString,
1807 MaxDate,
1808 MaxTimestamp,
1809 MaxTimestampTz,
1810 MaxInterval,
1811 MaxTime,
1812 MinNumeric,
1813 MinInt16,
1814 MinInt32,
1815 MinInt64,
1816 MinUInt16,
1817 MinUInt32,
1818 MinUInt64,
1819 MinMzTimestamp,
1820 MinFloat32,
1821 MinFloat64,
1822 MinBool,
1823 MinString,
1824 MinDate,
1825 MinTimestamp,
1826 MinTimestampTz,
1827 MinInterval,
1828 MinTime,
1829 SumInt16,
1830 SumInt32,
1831 SumInt64,
1832 SumUInt16,
1833 SumUInt32,
1834 SumUInt64,
1835 SumFloat32,
1836 SumFloat64,
1837 SumNumeric,
1838 Count,
1839 Any,
1840 All,
1841 JsonbAgg {
1848 order_by: Vec<ColumnOrder>,
1849 },
1850 JsonbObjectAgg {
1857 order_by: Vec<ColumnOrder>,
1858 },
1859 MapAgg {
1863 order_by: Vec<ColumnOrder>,
1864 value_type: SqlScalarType,
1865 },
1866 ArrayConcat {
1869 order_by: Vec<ColumnOrder>,
1870 },
1871 ListConcat {
1874 order_by: Vec<ColumnOrder>,
1875 },
1876 StringAgg {
1877 order_by: Vec<ColumnOrder>,
1878 },
1879 RowNumber {
1880 order_by: Vec<ColumnOrder>,
1881 },
1882 Rank {
1883 order_by: Vec<ColumnOrder>,
1884 },
1885 DenseRank {
1886 order_by: Vec<ColumnOrder>,
1887 },
1888 LagLead {
1889 order_by: Vec<ColumnOrder>,
1890 lag_lead: LagLeadType,
1891 ignore_nulls: bool,
1892 },
1893 FirstValue {
1894 order_by: Vec<ColumnOrder>,
1895 window_frame: WindowFrame,
1896 },
1897 LastValue {
1898 order_by: Vec<ColumnOrder>,
1899 window_frame: WindowFrame,
1900 },
1901 FusedValueWindowFunc {
1903 funcs: Vec<AggregateFunc>,
1904 order_by: Vec<ColumnOrder>,
1907 },
1908 WindowAggregate {
1909 wrapped_aggregate: Box<AggregateFunc>,
1910 order_by: Vec<ColumnOrder>,
1911 window_frame: WindowFrame,
1912 },
1913 FusedWindowAggregate {
1914 wrapped_aggregates: Vec<AggregateFunc>,
1915 order_by: Vec<ColumnOrder>,
1916 window_frame: WindowFrame,
1917 },
1918 Dummy,
1923}
1924
1925impl AggregateFunc {
1926 pub fn eval<'a, I>(&self, datums: I, temp_storage: &'a RowArena) -> Datum<'a>
1927 where
1928 I: IntoIterator<Item = Datum<'a>>,
1929 {
1930 match self {
1931 AggregateFunc::MaxNumeric => {
1932 max_datum::<'a, I, OrderedDecimal<numeric::Numeric>>(datums)
1933 }
1934 AggregateFunc::MaxInt16 => max_datum::<'a, I, i16>(datums),
1935 AggregateFunc::MaxInt32 => max_datum::<'a, I, i32>(datums),
1936 AggregateFunc::MaxInt64 => max_datum::<'a, I, i64>(datums),
1937 AggregateFunc::MaxUInt16 => max_datum::<'a, I, u16>(datums),
1938 AggregateFunc::MaxUInt32 => max_datum::<'a, I, u32>(datums),
1939 AggregateFunc::MaxUInt64 => max_datum::<'a, I, u64>(datums),
1940 AggregateFunc::MaxMzTimestamp => max_datum::<'a, I, mz_repr::Timestamp>(datums),
1941 AggregateFunc::MaxFloat32 => max_datum::<'a, I, OrderedFloat<f32>>(datums),
1942 AggregateFunc::MaxFloat64 => max_datum::<'a, I, OrderedFloat<f64>>(datums),
1943 AggregateFunc::MaxBool => max_datum::<'a, I, bool>(datums),
1944 AggregateFunc::MaxString => max_string(datums),
1945 AggregateFunc::MaxDate => max_datum::<'a, I, Date>(datums),
1946 AggregateFunc::MaxTimestamp => {
1947 max_datum::<'a, I, CheckedTimestamp<NaiveDateTime>>(datums)
1948 }
1949 AggregateFunc::MaxTimestampTz => {
1950 max_datum::<'a, I, CheckedTimestamp<DateTime<Utc>>>(datums)
1951 }
1952 AggregateFunc::MaxInterval => max_datum::<'a, I, Interval>(datums),
1953 AggregateFunc::MaxTime => max_datum::<'a, I, NaiveTime>(datums),
1954 AggregateFunc::MinNumeric => {
1955 min_datum::<'a, I, OrderedDecimal<numeric::Numeric>>(datums)
1956 }
1957 AggregateFunc::MinInt16 => min_datum::<'a, I, i16>(datums),
1958 AggregateFunc::MinInt32 => min_datum::<'a, I, i32>(datums),
1959 AggregateFunc::MinInt64 => min_datum::<'a, I, i64>(datums),
1960 AggregateFunc::MinUInt16 => min_datum::<'a, I, u16>(datums),
1961 AggregateFunc::MinUInt32 => min_datum::<'a, I, u32>(datums),
1962 AggregateFunc::MinUInt64 => min_datum::<'a, I, u64>(datums),
1963 AggregateFunc::MinMzTimestamp => min_datum::<'a, I, mz_repr::Timestamp>(datums),
1964 AggregateFunc::MinFloat32 => min_datum::<'a, I, OrderedFloat<f32>>(datums),
1965 AggregateFunc::MinFloat64 => min_datum::<'a, I, OrderedFloat<f64>>(datums),
1966 AggregateFunc::MinBool => min_datum::<'a, I, bool>(datums),
1967 AggregateFunc::MinString => min_string(datums),
1968 AggregateFunc::MinDate => min_datum::<'a, I, Date>(datums),
1969 AggregateFunc::MinTimestamp => {
1970 min_datum::<'a, I, CheckedTimestamp<NaiveDateTime>>(datums)
1971 }
1972 AggregateFunc::MinTimestampTz => {
1973 min_datum::<'a, I, CheckedTimestamp<DateTime<Utc>>>(datums)
1974 }
1975 AggregateFunc::MinInterval => min_datum::<'a, I, Interval>(datums),
1976 AggregateFunc::MinTime => min_datum::<'a, I, NaiveTime>(datums),
1977 AggregateFunc::SumInt16 => sum_datum::<'a, I, i16, i64>(datums),
1978 AggregateFunc::SumInt32 => sum_datum::<'a, I, i32, i64>(datums),
1979 AggregateFunc::SumInt64 => sum_datum::<'a, I, i64, i128>(datums),
1980 AggregateFunc::SumUInt16 => sum_datum::<'a, I, u16, u64>(datums),
1981 AggregateFunc::SumUInt32 => sum_datum::<'a, I, u32, u64>(datums),
1982 AggregateFunc::SumUInt64 => sum_datum::<'a, I, u64, u128>(datums),
1983 AggregateFunc::SumFloat32 => sum_datum::<'a, I, f32, f32>(datums),
1984 AggregateFunc::SumFloat64 => sum_datum::<'a, I, f64, f64>(datums),
1985 AggregateFunc::SumNumeric => sum_numeric(datums),
1986 AggregateFunc::Count => count(datums),
1987 AggregateFunc::Any => any(datums),
1988 AggregateFunc::All => all(datums),
1989 AggregateFunc::JsonbAgg { order_by } => jsonb_agg(datums, temp_storage, order_by),
1990 AggregateFunc::MapAgg { order_by, .. } | AggregateFunc::JsonbObjectAgg { order_by } => {
1991 dict_agg(datums, temp_storage, order_by)
1992 }
1993 AggregateFunc::ArrayConcat { order_by } => array_concat(datums, temp_storage, order_by),
1994 AggregateFunc::ListConcat { order_by } => list_concat(datums, temp_storage, order_by),
1995 AggregateFunc::StringAgg { order_by } => string_agg(datums, temp_storage, order_by),
1996 AggregateFunc::RowNumber { order_by } => row_number(datums, temp_storage, order_by),
1997 AggregateFunc::Rank { order_by } => rank(datums, temp_storage, order_by),
1998 AggregateFunc::DenseRank { order_by } => dense_rank(datums, temp_storage, order_by),
1999 AggregateFunc::LagLead {
2000 order_by,
2001 lag_lead: lag_lead_type,
2002 ignore_nulls,
2003 } => lag_lead(datums, temp_storage, order_by, lag_lead_type, ignore_nulls),
2004 AggregateFunc::FirstValue {
2005 order_by,
2006 window_frame,
2007 } => first_value(datums, temp_storage, order_by, window_frame),
2008 AggregateFunc::LastValue {
2009 order_by,
2010 window_frame,
2011 } => last_value(datums, temp_storage, order_by, window_frame),
2012 AggregateFunc::WindowAggregate {
2013 wrapped_aggregate,
2014 order_by,
2015 window_frame,
2016 } => window_aggr::<_, NaiveOneByOneAggr>(
2017 datums,
2018 temp_storage,
2019 wrapped_aggregate,
2020 order_by,
2021 window_frame,
2022 ),
2023 AggregateFunc::FusedValueWindowFunc { funcs, order_by } => {
2024 fused_value_window_func(datums, temp_storage, funcs, order_by)
2025 }
2026 AggregateFunc::FusedWindowAggregate {
2027 wrapped_aggregates,
2028 order_by,
2029 window_frame,
2030 } => fused_window_aggr::<_, NaiveOneByOneAggr>(
2031 datums,
2032 temp_storage,
2033 wrapped_aggregates,
2034 order_by,
2035 window_frame,
2036 ),
2037 AggregateFunc::Dummy => Datum::Dummy,
2038 }
2039 }
2040
2041 pub fn eval_with_fast_window_agg<'a, I, W>(
2045 &self,
2046 datums: I,
2047 temp_storage: &'a RowArena,
2048 ) -> Datum<'a>
2049 where
2050 I: IntoIterator<Item = Datum<'a>>,
2051 W: OneByOneAggr,
2052 {
2053 match self {
2054 AggregateFunc::WindowAggregate {
2055 wrapped_aggregate,
2056 order_by,
2057 window_frame,
2058 } => window_aggr::<_, W>(
2059 datums,
2060 temp_storage,
2061 wrapped_aggregate,
2062 order_by,
2063 window_frame,
2064 ),
2065 AggregateFunc::FusedWindowAggregate {
2066 wrapped_aggregates,
2067 order_by,
2068 window_frame,
2069 } => fused_window_aggr::<_, W>(
2070 datums,
2071 temp_storage,
2072 wrapped_aggregates,
2073 order_by,
2074 window_frame,
2075 ),
2076 _ => self.eval(datums, temp_storage),
2077 }
2078 }
2079
2080 pub fn eval_with_unnest_list<'a, I, W>(
2081 &self,
2082 datums: I,
2083 temp_storage: &'a RowArena,
2084 ) -> impl Iterator<Item = Datum<'a>>
2085 where
2086 I: IntoIterator<Item = Datum<'a>>,
2087 W: OneByOneAggr,
2088 {
2089 assert!(self.can_fuse_with_unnest_list());
2091 match self {
2092 AggregateFunc::RowNumber { order_by } => {
2093 row_number_no_list(datums, temp_storage, order_by).collect_vec()
2094 }
2095 AggregateFunc::Rank { order_by } => {
2096 rank_no_list(datums, temp_storage, order_by).collect_vec()
2097 }
2098 AggregateFunc::DenseRank { order_by } => {
2099 dense_rank_no_list(datums, temp_storage, order_by).collect_vec()
2100 }
2101 AggregateFunc::LagLead {
2102 order_by,
2103 lag_lead: lag_lead_type,
2104 ignore_nulls,
2105 } => lag_lead_no_list(datums, temp_storage, order_by, lag_lead_type, ignore_nulls)
2106 .collect_vec(),
2107 AggregateFunc::FirstValue {
2108 order_by,
2109 window_frame,
2110 } => first_value_no_list(datums, temp_storage, order_by, window_frame).collect_vec(),
2111 AggregateFunc::LastValue {
2112 order_by,
2113 window_frame,
2114 } => last_value_no_list(datums, temp_storage, order_by, window_frame).collect_vec(),
2115 AggregateFunc::FusedValueWindowFunc { funcs, order_by } => {
2116 fused_value_window_func_no_list(datums, temp_storage, funcs, order_by).collect_vec()
2117 }
2118 AggregateFunc::WindowAggregate {
2119 wrapped_aggregate,
2120 order_by,
2121 window_frame,
2122 } => window_aggr_no_list::<_, W>(
2123 datums,
2124 temp_storage,
2125 wrapped_aggregate,
2126 order_by,
2127 window_frame,
2128 )
2129 .collect_vec(),
2130 AggregateFunc::FusedWindowAggregate {
2131 wrapped_aggregates,
2132 order_by,
2133 window_frame,
2134 } => fused_window_aggr_no_list::<_, W>(
2135 datums,
2136 temp_storage,
2137 wrapped_aggregates,
2138 order_by,
2139 window_frame,
2140 )
2141 .collect_vec(),
2142 _ => unreachable!("asserted above that `can_fuse_with_unnest_list`"),
2143 }
2144 .into_iter()
2145 }
2146
2147 pub fn default(&self) -> Datum<'static> {
2150 match self {
2151 AggregateFunc::Count => Datum::Int64(0),
2152 AggregateFunc::Any => Datum::False,
2153 AggregateFunc::All => Datum::True,
2154 AggregateFunc::Dummy => Datum::Dummy,
2155 _ => Datum::Null,
2156 }
2157 }
2158
2159 pub fn identity_datum(&self) -> Datum<'static> {
2162 match self {
2163 AggregateFunc::Any => Datum::False,
2164 AggregateFunc::All => Datum::True,
2165 AggregateFunc::Dummy => Datum::Dummy,
2166 AggregateFunc::ArrayConcat { .. } => Datum::empty_array(),
2167 AggregateFunc::ListConcat { .. } => Datum::empty_list(),
2168 AggregateFunc::RowNumber { .. }
2169 | AggregateFunc::Rank { .. }
2170 | AggregateFunc::DenseRank { .. }
2171 | AggregateFunc::LagLead { .. }
2172 | AggregateFunc::FirstValue { .. }
2173 | AggregateFunc::LastValue { .. }
2174 | AggregateFunc::WindowAggregate { .. }
2175 | AggregateFunc::FusedValueWindowFunc { .. }
2176 | AggregateFunc::FusedWindowAggregate { .. } => Datum::empty_list(),
2177 AggregateFunc::MaxNumeric
2178 | AggregateFunc::MaxInt16
2179 | AggregateFunc::MaxInt32
2180 | AggregateFunc::MaxInt64
2181 | AggregateFunc::MaxUInt16
2182 | AggregateFunc::MaxUInt32
2183 | AggregateFunc::MaxUInt64
2184 | AggregateFunc::MaxMzTimestamp
2185 | AggregateFunc::MaxFloat32
2186 | AggregateFunc::MaxFloat64
2187 | AggregateFunc::MaxBool
2188 | AggregateFunc::MaxString
2189 | AggregateFunc::MaxDate
2190 | AggregateFunc::MaxTimestamp
2191 | AggregateFunc::MaxTimestampTz
2192 | AggregateFunc::MaxInterval
2193 | AggregateFunc::MaxTime
2194 | AggregateFunc::MinNumeric
2195 | AggregateFunc::MinInt16
2196 | AggregateFunc::MinInt32
2197 | AggregateFunc::MinInt64
2198 | AggregateFunc::MinUInt16
2199 | AggregateFunc::MinUInt32
2200 | AggregateFunc::MinUInt64
2201 | AggregateFunc::MinMzTimestamp
2202 | AggregateFunc::MinFloat32
2203 | AggregateFunc::MinFloat64
2204 | AggregateFunc::MinBool
2205 | AggregateFunc::MinString
2206 | AggregateFunc::MinDate
2207 | AggregateFunc::MinTimestamp
2208 | AggregateFunc::MinTimestampTz
2209 | AggregateFunc::MinInterval
2210 | AggregateFunc::MinTime
2211 | AggregateFunc::SumInt16
2212 | AggregateFunc::SumInt32
2213 | AggregateFunc::SumInt64
2214 | AggregateFunc::SumUInt16
2215 | AggregateFunc::SumUInt32
2216 | AggregateFunc::SumUInt64
2217 | AggregateFunc::SumFloat32
2218 | AggregateFunc::SumFloat64
2219 | AggregateFunc::SumNumeric
2220 | AggregateFunc::Count
2221 | AggregateFunc::JsonbAgg { .. }
2222 | AggregateFunc::JsonbObjectAgg { .. }
2223 | AggregateFunc::MapAgg { .. }
2224 | AggregateFunc::StringAgg { .. } => Datum::Null,
2225 }
2226 }
2227
2228 pub fn can_fuse_with_unnest_list(&self) -> bool {
2229 match self {
2230 AggregateFunc::RowNumber { .. }
2231 | AggregateFunc::Rank { .. }
2232 | AggregateFunc::DenseRank { .. }
2233 | AggregateFunc::LagLead { .. }
2234 | AggregateFunc::FirstValue { .. }
2235 | AggregateFunc::LastValue { .. }
2236 | AggregateFunc::WindowAggregate { .. }
2237 | AggregateFunc::FusedValueWindowFunc { .. }
2238 | AggregateFunc::FusedWindowAggregate { .. } => true,
2239 AggregateFunc::ArrayConcat { .. }
2240 | AggregateFunc::ListConcat { .. }
2241 | AggregateFunc::Any
2242 | AggregateFunc::All
2243 | AggregateFunc::Dummy
2244 | AggregateFunc::MaxNumeric
2245 | AggregateFunc::MaxInt16
2246 | AggregateFunc::MaxInt32
2247 | AggregateFunc::MaxInt64
2248 | AggregateFunc::MaxUInt16
2249 | AggregateFunc::MaxUInt32
2250 | AggregateFunc::MaxUInt64
2251 | AggregateFunc::MaxMzTimestamp
2252 | AggregateFunc::MaxFloat32
2253 | AggregateFunc::MaxFloat64
2254 | AggregateFunc::MaxBool
2255 | AggregateFunc::MaxString
2256 | AggregateFunc::MaxDate
2257 | AggregateFunc::MaxTimestamp
2258 | AggregateFunc::MaxTimestampTz
2259 | AggregateFunc::MaxInterval
2260 | AggregateFunc::MaxTime
2261 | AggregateFunc::MinNumeric
2262 | AggregateFunc::MinInt16
2263 | AggregateFunc::MinInt32
2264 | AggregateFunc::MinInt64
2265 | AggregateFunc::MinUInt16
2266 | AggregateFunc::MinUInt32
2267 | AggregateFunc::MinUInt64
2268 | AggregateFunc::MinMzTimestamp
2269 | AggregateFunc::MinFloat32
2270 | AggregateFunc::MinFloat64
2271 | AggregateFunc::MinBool
2272 | AggregateFunc::MinString
2273 | AggregateFunc::MinDate
2274 | AggregateFunc::MinTimestamp
2275 | AggregateFunc::MinTimestampTz
2276 | AggregateFunc::MinInterval
2277 | AggregateFunc::MinTime
2278 | AggregateFunc::SumInt16
2279 | AggregateFunc::SumInt32
2280 | AggregateFunc::SumInt64
2281 | AggregateFunc::SumUInt16
2282 | AggregateFunc::SumUInt32
2283 | AggregateFunc::SumUInt64
2284 | AggregateFunc::SumFloat32
2285 | AggregateFunc::SumFloat64
2286 | AggregateFunc::SumNumeric
2287 | AggregateFunc::Count
2288 | AggregateFunc::JsonbAgg { .. }
2289 | AggregateFunc::JsonbObjectAgg { .. }
2290 | AggregateFunc::MapAgg { .. }
2291 | AggregateFunc::StringAgg { .. } => false,
2292 }
2293 }
2294
2295 pub fn output_type(&self, input_type: SqlColumnType) -> SqlColumnType {
2301 let scalar_type = match self {
2302 AggregateFunc::Count => SqlScalarType::Int64,
2303 AggregateFunc::Any => SqlScalarType::Bool,
2304 AggregateFunc::All => SqlScalarType::Bool,
2305 AggregateFunc::JsonbAgg { .. } => SqlScalarType::Jsonb,
2306 AggregateFunc::JsonbObjectAgg { .. } => SqlScalarType::Jsonb,
2307 AggregateFunc::SumInt16 => SqlScalarType::Int64,
2308 AggregateFunc::SumInt32 => SqlScalarType::Int64,
2309 AggregateFunc::SumInt64 => SqlScalarType::Numeric {
2310 max_scale: Some(NumericMaxScale::ZERO),
2311 },
2312 AggregateFunc::SumUInt16 => SqlScalarType::UInt64,
2313 AggregateFunc::SumUInt32 => SqlScalarType::UInt64,
2314 AggregateFunc::SumUInt64 => SqlScalarType::Numeric {
2315 max_scale: Some(NumericMaxScale::ZERO),
2316 },
2317 AggregateFunc::MapAgg { value_type, .. } => SqlScalarType::Map {
2318 value_type: Box::new(value_type.clone()),
2319 custom_id: None,
2320 },
2321 AggregateFunc::ArrayConcat { .. } | AggregateFunc::ListConcat { .. } => {
2322 match input_type.scalar_type {
2323 SqlScalarType::Record { ref fields, .. } => fields[0].1.scalar_type.clone(),
2325 _ => unreachable!(),
2326 }
2327 }
2328 AggregateFunc::StringAgg { .. } => SqlScalarType::String,
2329 AggregateFunc::RowNumber { .. } => {
2330 AggregateFunc::output_type_ranking_window_funcs(&input_type, "?row_number?")
2331 }
2332 AggregateFunc::Rank { .. } => {
2333 AggregateFunc::output_type_ranking_window_funcs(&input_type, "?rank?")
2334 }
2335 AggregateFunc::DenseRank { .. } => {
2336 AggregateFunc::output_type_ranking_window_funcs(&input_type, "?dense_rank?")
2337 }
2338 AggregateFunc::LagLead { lag_lead: lag_lead_type, .. } => {
2339 let fields = input_type.scalar_type.unwrap_record_element_type();
2341 let original_row_type = fields[0].unwrap_record_element_type()[0]
2342 .clone()
2343 .nullable(false);
2344 let output_type_inner = Self::lag_lead_output_type_inner_from_encoded_args(fields[0].unwrap_record_element_type()[1]);
2345 let column_name = Self::lag_lead_result_column_name(lag_lead_type);
2346
2347 SqlScalarType::List {
2348 element_type: Box::new(SqlScalarType::Record {
2349 fields: [
2350 (column_name, output_type_inner),
2351 (ColumnName::from("?orig_row?"), original_row_type),
2352 ].into(),
2353 custom_id: None,
2354 }),
2355 custom_id: None,
2356 }
2357 }
2358 AggregateFunc::FirstValue { .. } => {
2359 let fields = input_type.scalar_type.unwrap_record_element_type();
2361 let original_row_type = fields[0].unwrap_record_element_type()[0]
2362 .clone()
2363 .nullable(false);
2364 let value_type = fields[0].unwrap_record_element_type()[1]
2365 .clone()
2366 .nullable(true); SqlScalarType::List {
2369 element_type: Box::new(SqlScalarType::Record {
2370 fields: [
2371 (ColumnName::from("?first_value?"), value_type),
2372 (ColumnName::from("?orig_row?"), original_row_type),
2373 ].into(),
2374 custom_id: None,
2375 }),
2376 custom_id: None,
2377 }
2378 }
2379 AggregateFunc::LastValue { .. } => {
2380 let fields = input_type.scalar_type.unwrap_record_element_type();
2382 let original_row_type = fields[0].unwrap_record_element_type()[0]
2383 .clone()
2384 .nullable(false);
2385 let value_type = fields[0].unwrap_record_element_type()[1]
2386 .clone()
2387 .nullable(true); SqlScalarType::List {
2390 element_type: Box::new(SqlScalarType::Record {
2391 fields: [
2392 (ColumnName::from("?last_value?"), value_type),
2393 (ColumnName::from("?orig_row?"), original_row_type),
2394 ].into(),
2395 custom_id: None,
2396 }),
2397 custom_id: None,
2398 }
2399 }
2400 AggregateFunc::WindowAggregate {
2401 wrapped_aggregate, ..
2402 } => {
2403 let fields = input_type.scalar_type.unwrap_record_element_type();
2405 let original_row_type = fields[0].unwrap_record_element_type()[0]
2406 .clone()
2407 .nullable(false);
2408 let arg_type = fields[0].unwrap_record_element_type()[1]
2409 .clone()
2410 .nullable(true);
2411 let wrapped_aggr_out_type = wrapped_aggregate.output_type(arg_type);
2412
2413 SqlScalarType::List {
2414 element_type: Box::new(SqlScalarType::Record {
2415 fields: [
2416 (ColumnName::from("?window_agg?"), wrapped_aggr_out_type),
2417 (ColumnName::from("?orig_row?"), original_row_type),
2418 ].into(),
2419 custom_id: None,
2420 }),
2421 custom_id: None,
2422 }
2423 }
2424 AggregateFunc::FusedWindowAggregate {
2425 wrapped_aggregates, ..
2426 } => {
2427 let fields = input_type.scalar_type.unwrap_record_element_type();
2430 let original_row_type = fields[0].unwrap_record_element_type()[0]
2431 .clone()
2432 .nullable(false);
2433 let args_type = fields[0].unwrap_record_element_type()[1];
2434 let arg_types = args_type.unwrap_record_element_type();
2435 let out_fields = arg_types.iter().zip_eq(wrapped_aggregates).map(|(arg_type, wrapped_agg)| {
2436 (
2437 ColumnName::from(wrapped_agg.name()),
2438 wrapped_agg.output_type((**arg_type).clone().nullable(true)),
2439 )
2440 }).collect_vec();
2441
2442 SqlScalarType::List {
2443 element_type: Box::new(SqlScalarType::Record {
2444 fields: [
2445 (ColumnName::from("?fused_window_agg?"), SqlScalarType::Record {
2446 fields: out_fields.into(),
2447 custom_id: None,
2448 }.nullable(false)),
2449 (ColumnName::from("?orig_row?"), original_row_type),
2450 ].into(),
2451 custom_id: None,
2452 }),
2453 custom_id: None,
2454 }
2455 }
2456 AggregateFunc::FusedValueWindowFunc { funcs, order_by: _ } => {
2457 let fields = input_type.scalar_type.unwrap_record_element_type();
2462 let original_row_type = fields[0].unwrap_record_element_type()[0]
2463 .clone()
2464 .nullable(false);
2465 let encoded_args_type = fields[0].unwrap_record_element_type()[1].unwrap_record_element_type();
2466
2467 SqlScalarType::List {
2468 element_type: Box::new(SqlScalarType::Record {
2469 fields: [
2470 (ColumnName::from("?fused_value_window_func?"), SqlScalarType::Record {
2471 fields: encoded_args_type.into_iter().zip_eq(funcs).map(|(arg_type, func)| {
2472 match func {
2473 AggregateFunc::LagLead { lag_lead: lag_lead_type, .. } => {
2474 (
2475 Self::lag_lead_result_column_name(lag_lead_type),
2476 Self::lag_lead_output_type_inner_from_encoded_args(arg_type)
2477 )
2478 },
2479 AggregateFunc::FirstValue { .. } => {
2480 (
2481 ColumnName::from("?first_value?"),
2482 arg_type.clone().nullable(true),
2483 )
2484 }
2485 AggregateFunc::LastValue { .. } => {
2486 (
2487 ColumnName::from("?last_value?"),
2488 arg_type.clone().nullable(true),
2489 )
2490 }
2491 _ => panic!("FusedValueWindowFunc has an unknown function"),
2492 }
2493 }).collect(),
2494 custom_id: None,
2495 }.nullable(false)),
2496 (ColumnName::from("?orig_row?"), original_row_type),
2497 ].into(),
2498 custom_id: None,
2499 }),
2500 custom_id: None,
2501 }
2502 }
2503 AggregateFunc::Dummy
2504 | AggregateFunc::MaxNumeric
2505 | AggregateFunc::MaxInt16
2506 | AggregateFunc::MaxInt32
2507 | AggregateFunc::MaxInt64
2508 | AggregateFunc::MaxUInt16
2509 | AggregateFunc::MaxUInt32
2510 | AggregateFunc::MaxUInt64
2511 | AggregateFunc::MaxMzTimestamp
2512 | AggregateFunc::MaxFloat32
2513 | AggregateFunc::MaxFloat64
2514 | AggregateFunc::MaxBool
2515 | AggregateFunc::MaxString
2519 | AggregateFunc::MaxDate
2520 | AggregateFunc::MaxTimestamp
2521 | AggregateFunc::MaxTimestampTz
2522 | AggregateFunc::MaxInterval
2523 | AggregateFunc::MaxTime
2524 | AggregateFunc::MinNumeric
2525 | AggregateFunc::MinInt16
2526 | AggregateFunc::MinInt32
2527 | AggregateFunc::MinInt64
2528 | AggregateFunc::MinUInt16
2529 | AggregateFunc::MinUInt32
2530 | AggregateFunc::MinUInt64
2531 | AggregateFunc::MinMzTimestamp
2532 | AggregateFunc::MinFloat32
2533 | AggregateFunc::MinFloat64
2534 | AggregateFunc::MinBool
2535 | AggregateFunc::MinString
2536 | AggregateFunc::MinDate
2537 | AggregateFunc::MinTimestamp
2538 | AggregateFunc::MinTimestampTz
2539 | AggregateFunc::MinInterval
2540 | AggregateFunc::MinTime
2541 | AggregateFunc::SumFloat32
2542 | AggregateFunc::SumFloat64
2543 | AggregateFunc::SumNumeric => input_type.scalar_type.clone(),
2544 };
2545 let nullable = match self {
2548 AggregateFunc::Count => false,
2549 AggregateFunc::StringAgg { .. } => match input_type.scalar_type {
2551 SqlScalarType::Record { fields, .. } => match &fields[0].1.scalar_type {
2553 SqlScalarType::Record { fields, .. } => fields[0].1.nullable,
2555 _ => unreachable!(),
2556 },
2557 _ => unreachable!(),
2558 },
2559 _ => input_type.nullable,
2560 };
2561 scalar_type.nullable(nullable)
2562 }
2563
2564 fn output_type_ranking_window_funcs(
2566 input_type: &SqlColumnType,
2567 col_name: &str,
2568 ) -> SqlScalarType {
2569 match input_type.scalar_type {
2570 SqlScalarType::Record { ref fields, .. } => SqlScalarType::List {
2571 element_type: Box::new(SqlScalarType::Record {
2572 fields: [
2573 (
2574 ColumnName::from(col_name),
2575 SqlScalarType::Int64.nullable(false),
2576 ),
2577 (ColumnName::from("?orig_row?"), {
2578 let inner = match &fields[0].1.scalar_type {
2579 SqlScalarType::List { element_type, .. } => element_type.clone(),
2580 _ => unreachable!(),
2581 };
2582 inner.nullable(false)
2583 }),
2584 ]
2585 .into(),
2586 custom_id: None,
2587 }),
2588 custom_id: None,
2589 },
2590 _ => unreachable!(),
2591 }
2592 }
2593
2594 fn lag_lead_output_type_inner_from_encoded_args(
2598 encoded_args_type: &SqlScalarType,
2599 ) -> SqlColumnType {
2600 encoded_args_type.unwrap_record_element_type()[0]
2604 .clone()
2605 .nullable(true)
2606 }
2607
2608 fn lag_lead_result_column_name(lag_lead_type: &LagLeadType) -> ColumnName {
2609 ColumnName::from(match lag_lead_type {
2610 LagLeadType::Lag => "?lag?",
2611 LagLeadType::Lead => "?lead?",
2612 })
2613 }
2614
2615 pub fn propagates_nonnull_constraint(&self) -> bool {
2620 match self {
2621 AggregateFunc::MaxNumeric
2622 | AggregateFunc::MaxInt16
2623 | AggregateFunc::MaxInt32
2624 | AggregateFunc::MaxInt64
2625 | AggregateFunc::MaxUInt16
2626 | AggregateFunc::MaxUInt32
2627 | AggregateFunc::MaxUInt64
2628 | AggregateFunc::MaxMzTimestamp
2629 | AggregateFunc::MaxFloat32
2630 | AggregateFunc::MaxFloat64
2631 | AggregateFunc::MaxBool
2632 | AggregateFunc::MaxString
2633 | AggregateFunc::MaxDate
2634 | AggregateFunc::MaxTimestamp
2635 | AggregateFunc::MaxTimestampTz
2636 | AggregateFunc::MinNumeric
2637 | AggregateFunc::MinInt16
2638 | AggregateFunc::MinInt32
2639 | AggregateFunc::MinInt64
2640 | AggregateFunc::MinUInt16
2641 | AggregateFunc::MinUInt32
2642 | AggregateFunc::MinUInt64
2643 | AggregateFunc::MinMzTimestamp
2644 | AggregateFunc::MinFloat32
2645 | AggregateFunc::MinFloat64
2646 | AggregateFunc::MinBool
2647 | AggregateFunc::MinString
2648 | AggregateFunc::MinDate
2649 | AggregateFunc::MinTimestamp
2650 | AggregateFunc::MinTimestampTz
2651 | AggregateFunc::SumInt16
2652 | AggregateFunc::SumInt32
2653 | AggregateFunc::SumInt64
2654 | AggregateFunc::SumUInt16
2655 | AggregateFunc::SumUInt32
2656 | AggregateFunc::SumUInt64
2657 | AggregateFunc::SumFloat32
2658 | AggregateFunc::SumFloat64
2659 | AggregateFunc::SumNumeric
2660 | AggregateFunc::StringAgg { .. } => true,
2661 AggregateFunc::Count => false,
2663 _ => false,
2664 }
2665 }
2666}
2667
2668fn jsonb_each<'a>(
2669 a: Datum<'a>,
2670 temp_storage: &'a RowArena,
2671 stringify: bool,
2672) -> impl Iterator<Item = (Row, Diff)> + 'a {
2673 let map = match a {
2675 Datum::Map(dict) => dict,
2676 _ => mz_repr::DatumMap::empty(),
2677 };
2678
2679 map.iter().map(move |(k, mut v)| {
2680 if stringify {
2681 v = jsonb_stringify(v, temp_storage);
2682 }
2683 (Row::pack_slice(&[Datum::String(k), v]), Diff::ONE)
2684 })
2685}
2686
2687fn jsonb_object_keys<'a>(a: Datum<'a>) -> impl Iterator<Item = (Row, Diff)> + 'a {
2688 let map = match a {
2689 Datum::Map(dict) => dict,
2690 _ => mz_repr::DatumMap::empty(),
2691 };
2692
2693 map.iter()
2694 .map(move |(k, _)| (Row::pack_slice(&[Datum::String(k)]), Diff::ONE))
2695}
2696
2697fn jsonb_array_elements<'a>(
2698 a: Datum<'a>,
2699 temp_storage: &'a RowArena,
2700 stringify: bool,
2701) -> impl Iterator<Item = (Row, Diff)> + 'a {
2702 let list = match a {
2703 Datum::List(list) => list,
2704 _ => mz_repr::DatumList::empty(),
2705 };
2706 list.iter().map(move |mut e| {
2707 if stringify {
2708 e = jsonb_stringify(e, temp_storage);
2709 }
2710 (Row::pack_slice(&[e]), Diff::ONE)
2711 })
2712}
2713
2714fn regexp_extract(a: Datum, r: &AnalyzedRegex) -> Option<(Row, Diff)> {
2715 let r = r.inner();
2716 let a = a.unwrap_str();
2717 let captures = r.captures(a)?;
2718 let datums = captures
2719 .iter()
2720 .skip(1)
2721 .map(|m| Datum::from(m.map(|m| m.as_str())));
2722 Some((Row::pack(datums), Diff::ONE))
2723}
2724
2725fn regexp_matches<'a, 'r: 'a>(
2726 exprs: &[Datum<'a>],
2727) -> Result<impl Iterator<Item = (Row, Diff)> + 'a, EvalError> {
2728 assert!(exprs.len() == 2 || exprs.len() == 3);
2732 let a = exprs[0].unwrap_str();
2733 let r = exprs[1].unwrap_str();
2734
2735 let (regex, opts) = if exprs.len() == 3 {
2736 let flag = exprs[2].unwrap_str();
2737 let opts = AnalyzedRegexOpts::from_str(flag)?;
2738 (AnalyzedRegex::new(r, opts)?, opts)
2739 } else {
2740 let opts = AnalyzedRegexOpts::default();
2741 (AnalyzedRegex::new(r, opts)?, opts)
2742 };
2743
2744 let regex = regex.inner().clone();
2745
2746 let iter = regex.captures_iter(a).map(move |captures| {
2747 let matches = captures
2748 .iter()
2749 .skip(1)
2751 .map(|m| Datum::from(m.map(|m| m.as_str())))
2752 .collect::<Vec<_>>();
2753
2754 let mut binding = SharedRow::get();
2755 let mut packer = binding.packer();
2756
2757 let dimension = ArrayDimension {
2758 lower_bound: 1,
2759 length: matches.len(),
2760 };
2761 packer
2762 .try_push_array(&[dimension], matches)
2763 .expect("generated dimensions above");
2764
2765 (binding.clone(), Diff::ONE)
2766 });
2767
2768 let out = iter.collect::<SmallVec<[_; 3]>>();
2773
2774 if opts.global {
2775 Ok(Either::Left(out.into_iter()))
2776 } else {
2777 Ok(Either::Right(out.into_iter().take(1)))
2778 }
2779}
2780
2781fn generate_series<N>(
2782 start: N,
2783 stop: N,
2784 step: N,
2785) -> Result<impl Iterator<Item = (Row, Diff)>, EvalError>
2786where
2787 N: Integer + Signed + CheckedAdd + Clone,
2788 Datum<'static>: From<N>,
2789{
2790 if step == N::zero() {
2791 return Err(EvalError::InvalidParameterValue(
2792 "step size cannot equal zero".into(),
2793 ));
2794 }
2795 Ok(num::range_step_inclusive(start, stop, step)
2796 .map(move |i| (Row::pack_slice(&[Datum::from(i)]), Diff::ONE)))
2797}
2798
2799#[derive(Clone)]
2803pub struct TimestampRangeStepInclusive<T> {
2804 state: CheckedTimestamp<T>,
2805 stop: CheckedTimestamp<T>,
2806 step: Interval,
2807 rev: bool,
2808 done: bool,
2809}
2810
2811impl<T: TimestampLike> Iterator for TimestampRangeStepInclusive<T> {
2812 type Item = CheckedTimestamp<T>;
2813
2814 #[inline]
2815 fn next(&mut self) -> Option<CheckedTimestamp<T>> {
2816 if !self.done
2817 && ((self.rev && self.state >= self.stop) || (!self.rev && self.state <= self.stop))
2818 {
2819 let result = self.state.clone();
2820 match add_timestamp_months(self.state.deref(), self.step.months) {
2821 Ok(state) => match state.checked_add_signed(self.step.duration_as_chrono()) {
2822 Some(v) => match CheckedTimestamp::from_timestamplike(v) {
2823 Ok(v) => self.state = v,
2824 Err(_) => self.done = true,
2825 },
2826 None => self.done = true,
2827 },
2828 Err(..) => {
2829 self.done = true;
2830 }
2831 }
2832
2833 Some(result)
2834 } else {
2835 None
2836 }
2837 }
2838}
2839
2840fn generate_series_ts<T: TimestampLike>(
2841 start: CheckedTimestamp<T>,
2842 stop: CheckedTimestamp<T>,
2843 step: Interval,
2844 conv: fn(CheckedTimestamp<T>) -> Datum<'static>,
2845) -> Result<impl Iterator<Item = (Row, Diff)>, EvalError> {
2846 let normalized_step = step.as_microseconds();
2847 if normalized_step == 0 {
2848 return Err(EvalError::InvalidParameterValue(
2849 "step size cannot equal zero".into(),
2850 ));
2851 }
2852 let rev = normalized_step < 0;
2853
2854 let trsi = TimestampRangeStepInclusive {
2855 state: start,
2856 stop,
2857 step,
2858 rev,
2859 done: false,
2860 };
2861
2862 Ok(trsi.map(move |i| (Row::pack_slice(&[conv(i)]), Diff::ONE)))
2863}
2864
2865fn generate_subscripts_array(
2866 a: Datum,
2867 dim: i32,
2868) -> Result<Box<dyn Iterator<Item = (Row, Diff)>>, EvalError> {
2869 if dim <= 0 {
2870 return Ok(Box::new(iter::empty()));
2871 }
2872
2873 match a.unwrap_array().dims().into_iter().nth(
2874 (dim - 1)
2875 .try_into()
2876 .map_err(|_| EvalError::Int32OutOfRange((dim - 1).to_string().into()))?,
2877 ) {
2878 Some(requested_dim) => Ok(Box::new(generate_series::<i32>(
2879 requested_dim.lower_bound.try_into().map_err(|_| {
2880 EvalError::Int32OutOfRange(requested_dim.lower_bound.to_string().into())
2881 })?,
2882 requested_dim
2883 .length
2884 .try_into()
2885 .map_err(|_| EvalError::Int32OutOfRange(requested_dim.length.to_string().into()))?,
2886 1,
2887 )?)),
2888 None => Ok(Box::new(iter::empty())),
2889 }
2890}
2891
2892fn unnest_array<'a>(a: Datum<'a>) -> impl Iterator<Item = (Row, Diff)> + 'a {
2893 a.unwrap_array()
2894 .elements()
2895 .iter()
2896 .map(move |e| (Row::pack_slice(&[e]), Diff::ONE))
2897}
2898
2899fn unnest_list<'a>(a: Datum<'a>) -> impl Iterator<Item = (Row, Diff)> + 'a {
2900 a.unwrap_list()
2901 .iter()
2902 .map(move |e| (Row::pack_slice(&[e]), Diff::ONE))
2903}
2904
2905fn unnest_map<'a>(a: Datum<'a>) -> impl Iterator<Item = (Row, Diff)> + 'a {
2906 a.unwrap_map()
2907 .iter()
2908 .map(move |(k, v)| (Row::pack_slice(&[Datum::from(k), v]), Diff::ONE))
2909}
2910
2911impl AggregateFunc {
2912 pub fn name(&self) -> &'static str {
2915 match self {
2916 Self::MaxNumeric => "max",
2917 Self::MaxInt16 => "max",
2918 Self::MaxInt32 => "max",
2919 Self::MaxInt64 => "max",
2920 Self::MaxUInt16 => "max",
2921 Self::MaxUInt32 => "max",
2922 Self::MaxUInt64 => "max",
2923 Self::MaxMzTimestamp => "max",
2924 Self::MaxFloat32 => "max",
2925 Self::MaxFloat64 => "max",
2926 Self::MaxBool => "max",
2927 Self::MaxString => "max",
2928 Self::MaxDate => "max",
2929 Self::MaxTimestamp => "max",
2930 Self::MaxTimestampTz => "max",
2931 Self::MaxInterval => "max",
2932 Self::MaxTime => "max",
2933 Self::MinNumeric => "min",
2934 Self::MinInt16 => "min",
2935 Self::MinInt32 => "min",
2936 Self::MinInt64 => "min",
2937 Self::MinUInt16 => "min",
2938 Self::MinUInt32 => "min",
2939 Self::MinUInt64 => "min",
2940 Self::MinMzTimestamp => "min",
2941 Self::MinFloat32 => "min",
2942 Self::MinFloat64 => "min",
2943 Self::MinBool => "min",
2944 Self::MinString => "min",
2945 Self::MinDate => "min",
2946 Self::MinTimestamp => "min",
2947 Self::MinTimestampTz => "min",
2948 Self::MinInterval => "min",
2949 Self::MinTime => "min",
2950 Self::SumInt16 => "sum",
2951 Self::SumInt32 => "sum",
2952 Self::SumInt64 => "sum",
2953 Self::SumUInt16 => "sum",
2954 Self::SumUInt32 => "sum",
2955 Self::SumUInt64 => "sum",
2956 Self::SumFloat32 => "sum",
2957 Self::SumFloat64 => "sum",
2958 Self::SumNumeric => "sum",
2959 Self::Count => "count",
2960 Self::Any => "any",
2961 Self::All => "all",
2962 Self::JsonbAgg { .. } => "jsonb_agg",
2963 Self::JsonbObjectAgg { .. } => "jsonb_object_agg",
2964 Self::MapAgg { .. } => "map_agg",
2965 Self::ArrayConcat { .. } => "array_agg",
2966 Self::ListConcat { .. } => "list_agg",
2967 Self::StringAgg { .. } => "string_agg",
2968 Self::RowNumber { .. } => "row_number",
2969 Self::Rank { .. } => "rank",
2970 Self::DenseRank { .. } => "dense_rank",
2971 Self::LagLead {
2972 lag_lead: LagLeadType::Lag,
2973 ..
2974 } => "lag",
2975 Self::LagLead {
2976 lag_lead: LagLeadType::Lead,
2977 ..
2978 } => "lead",
2979 Self::FirstValue { .. } => "first_value",
2980 Self::LastValue { .. } => "last_value",
2981 Self::WindowAggregate { .. } => "window_agg",
2982 Self::FusedValueWindowFunc { .. } => "fused_value_window_func",
2983 Self::FusedWindowAggregate { .. } => "fused_window_agg",
2984 Self::Dummy => "dummy",
2985 }
2986 }
2987}
2988
2989impl<'a, M> fmt::Display for HumanizedExpr<'a, AggregateFunc, M>
2990where
2991 M: HumanizerMode,
2992{
2993 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
2994 use AggregateFunc::*;
2995 let name = self.expr.name();
2996 match self.expr {
2997 JsonbAgg { order_by }
2998 | JsonbObjectAgg { order_by }
2999 | MapAgg { order_by, .. }
3000 | ArrayConcat { order_by }
3001 | ListConcat { order_by }
3002 | StringAgg { order_by }
3003 | RowNumber { order_by }
3004 | Rank { order_by }
3005 | DenseRank { order_by } => {
3006 let order_by = order_by.iter().map(|col| self.child(col));
3007 write!(f, "{}[order_by=[{}]]", name, separated(", ", order_by))
3008 }
3009 LagLead {
3010 lag_lead: _,
3011 ignore_nulls,
3012 order_by,
3013 } => {
3014 let order_by = order_by.iter().map(|col| self.child(col));
3015 f.write_str(name)?;
3016 f.write_str("[")?;
3017 if *ignore_nulls {
3018 f.write_str("ignore_nulls=true, ")?;
3019 }
3020 write!(f, "order_by=[{}]", separated(", ", order_by))?;
3021 f.write_str("]")
3022 }
3023 FirstValue {
3024 order_by,
3025 window_frame,
3026 } => {
3027 let order_by = order_by.iter().map(|col| self.child(col));
3028 f.write_str(name)?;
3029 f.write_str("[")?;
3030 write!(f, "order_by=[{}]", separated(", ", order_by))?;
3031 if *window_frame != WindowFrame::default() {
3032 write!(f, " {}", window_frame)?;
3033 }
3034 f.write_str("]")
3035 }
3036 LastValue {
3037 order_by,
3038 window_frame,
3039 } => {
3040 let order_by = order_by.iter().map(|col| self.child(col));
3041 f.write_str(name)?;
3042 f.write_str("[")?;
3043 write!(f, "order_by=[{}]", separated(", ", order_by))?;
3044 if *window_frame != WindowFrame::default() {
3045 write!(f, " {}", window_frame)?;
3046 }
3047 f.write_str("]")
3048 }
3049 WindowAggregate {
3050 wrapped_aggregate,
3051 order_by,
3052 window_frame,
3053 } => {
3054 let order_by = order_by.iter().map(|col| self.child(col));
3055 let wrapped_aggregate = self.child(wrapped_aggregate.deref());
3056 f.write_str(name)?;
3057 f.write_str("[")?;
3058 write!(f, "{} ", wrapped_aggregate)?;
3059 write!(f, "order_by=[{}]", separated(", ", order_by))?;
3060 if *window_frame != WindowFrame::default() {
3061 write!(f, " {}", window_frame)?;
3062 }
3063 f.write_str("]")
3064 }
3065 FusedValueWindowFunc { funcs, order_by } => {
3066 let order_by = order_by.iter().map(|col| self.child(col));
3067 let funcs = separated(", ", funcs.iter().map(|func| self.child(func)));
3068 f.write_str(name)?;
3069 f.write_str("[")?;
3070 write!(f, "{} ", funcs)?;
3071 write!(f, "order_by=[{}]", separated(", ", order_by))?;
3072 f.write_str("]")
3073 }
3074 _ => f.write_str(name),
3075 }
3076 }
3077}
3078
3079#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize, Hash, MzReflect)]
3080pub struct CaptureGroupDesc {
3081 pub index: u32,
3082 pub name: Option<String>,
3083 pub nullable: bool,
3084}
3085
3086#[derive(
3087 Clone,
3088 Copy,
3089 Debug,
3090 Eq,
3091 PartialEq,
3092 Ord,
3093 PartialOrd,
3094 Serialize,
3095 Deserialize,
3096 Hash,
3097 MzReflect,
3098 Default,
3099)]
3100pub struct AnalyzedRegexOpts {
3101 pub case_insensitive: bool,
3102 pub global: bool,
3103}
3104
3105impl FromStr for AnalyzedRegexOpts {
3106 type Err = EvalError;
3107
3108 fn from_str(s: &str) -> Result<Self, Self::Err> {
3109 let mut opts = AnalyzedRegexOpts::default();
3110 for c in s.chars() {
3111 match c {
3112 'i' => opts.case_insensitive = true,
3113 'g' => opts.global = true,
3114 _ => return Err(EvalError::InvalidRegexFlag(c)),
3115 }
3116 }
3117 Ok(opts)
3118 }
3119}
3120
3121#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize, Hash, MzReflect)]
3122pub struct AnalyzedRegex(ReprRegex, Vec<CaptureGroupDesc>, AnalyzedRegexOpts);
3123
3124impl AnalyzedRegex {
3125 pub fn new(s: &str, opts: AnalyzedRegexOpts) -> Result<Self, regex::Error> {
3126 let r = ReprRegex::new(s, opts.case_insensitive)?;
3127 #[allow(clippy::as_conversions)]
3129 let descs: Vec<_> = r
3130 .capture_names()
3131 .enumerate()
3132 .skip(1)
3137 .map(|(i, name)| CaptureGroupDesc {
3138 index: i as u32,
3139 name: name.map(String::from),
3140 nullable: true,
3143 })
3144 .collect();
3145 Ok(Self(r, descs, opts))
3146 }
3147 pub fn capture_groups_len(&self) -> usize {
3148 self.1.len()
3149 }
3150 pub fn capture_groups_iter(&self) -> impl Iterator<Item = &CaptureGroupDesc> {
3151 self.1.iter()
3152 }
3153 pub fn inner(&self) -> &Regex {
3154 &(self.0).regex
3155 }
3156 pub fn opts(&self) -> &AnalyzedRegexOpts {
3157 &self.2
3158 }
3159}
3160
3161pub fn csv_extract(a: Datum<'_>, n_cols: usize) -> impl Iterator<Item = (Row, Diff)> + '_ {
3162 let bytes = a.unwrap_str().as_bytes();
3163 let mut row = Row::default();
3164 let csv_reader = csv::ReaderBuilder::new()
3165 .has_headers(false)
3166 .from_reader(bytes);
3167 csv_reader.into_records().filter_map(move |res| match res {
3168 Ok(sr) if sr.len() == n_cols => {
3169 row.packer().extend(sr.iter().map(Datum::String));
3170 Some((row.clone(), Diff::ONE))
3171 }
3172 _ => None,
3173 })
3174}
3175
3176pub fn repeat(a: Datum) -> Option<(Row, Diff)> {
3177 let n = a.unwrap_int64();
3178 if n != 0 {
3179 Some((Row::default(), n.into()))
3180 } else {
3181 None
3182 }
3183}
3184
3185fn wrap<'a>(datums: &'a [Datum<'a>], width: usize) -> impl Iterator<Item = (Row, Diff)> + 'a {
3186 datums
3187 .chunks(width)
3188 .map(|chunk| (Row::pack(chunk), Diff::ONE))
3189}
3190
3191fn acl_explode<'a>(
3192 acl_items: Datum<'a>,
3193 temp_storage: &'a RowArena,
3194) -> Result<impl Iterator<Item = (Row, Diff)> + 'a, EvalError> {
3195 let acl_items = acl_items.unwrap_array();
3196 let mut res = Vec::new();
3197 for acl_item in acl_items.elements().iter() {
3198 if acl_item.is_null() {
3199 return Err(EvalError::AclArrayNullElement);
3200 }
3201 let acl_item = acl_item.unwrap_acl_item();
3202 for privilege in acl_item.acl_mode.explode() {
3203 let row = [
3204 Datum::UInt32(acl_item.grantor.0),
3205 Datum::UInt32(acl_item.grantee.0),
3206 Datum::String(temp_storage.push_string(privilege.to_string())),
3207 Datum::False,
3209 ];
3210 res.push((Row::pack_slice(&row), Diff::ONE));
3211 }
3212 }
3213 Ok(res.into_iter())
3214}
3215
3216fn mz_acl_explode<'a>(
3217 mz_acl_items: Datum<'a>,
3218 temp_storage: &'a RowArena,
3219) -> Result<impl Iterator<Item = (Row, Diff)> + 'a, EvalError> {
3220 let mz_acl_items = mz_acl_items.unwrap_array();
3221 let mut res = Vec::new();
3222 for mz_acl_item in mz_acl_items.elements().iter() {
3223 if mz_acl_item.is_null() {
3224 return Err(EvalError::MzAclArrayNullElement);
3225 }
3226 let mz_acl_item = mz_acl_item.unwrap_mz_acl_item();
3227 for privilege in mz_acl_item.acl_mode.explode() {
3228 let row = [
3229 Datum::String(temp_storage.push_string(mz_acl_item.grantor.to_string())),
3230 Datum::String(temp_storage.push_string(mz_acl_item.grantee.to_string())),
3231 Datum::String(temp_storage.push_string(privilege.to_string())),
3232 Datum::False,
3234 ];
3235 res.push((Row::pack_slice(&row), Diff::ONE));
3236 }
3237 }
3238 Ok(res.into_iter())
3239}
3240
3241#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize, Hash, MzReflect)]
3244pub enum TableFunc {
3245 AclExplode,
3246 MzAclExplode,
3247 JsonbEach {
3248 stringify: bool,
3249 },
3250 JsonbObjectKeys,
3251 JsonbArrayElements {
3252 stringify: bool,
3253 },
3254 RegexpExtract(AnalyzedRegex),
3255 CsvExtract(usize),
3256 GenerateSeriesInt32,
3257 GenerateSeriesInt64,
3258 GenerateSeriesTimestamp,
3259 GenerateSeriesTimestampTz,
3260 GuardSubquerySize {
3281 column_type: SqlScalarType,
3282 },
3283 Repeat,
3284 UnnestArray {
3285 el_typ: SqlScalarType,
3286 },
3287 UnnestList {
3288 el_typ: SqlScalarType,
3289 },
3290 UnnestMap {
3291 value_type: SqlScalarType,
3292 },
3293 Wrap {
3299 types: Vec<SqlColumnType>,
3300 width: usize,
3301 },
3302 GenerateSubscriptsArray,
3303 TabletizedScalar {
3305 name: String,
3306 relation: SqlRelationType,
3307 },
3308 RegexpMatches,
3309 #[allow(private_interfaces)]
3314 WithOrdinality(WithOrdinality),
3315}
3316
3317#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize, Hash, MzReflect)]
3325struct WithOrdinality {
3326 inner: Box<TableFunc>,
3327}
3328
3329impl TableFunc {
3330 pub fn with_ordinality(inner: TableFunc) -> Option<TableFunc> {
3332 match inner {
3333 TableFunc::AclExplode
3334 | TableFunc::MzAclExplode
3335 | TableFunc::JsonbEach { .. }
3336 | TableFunc::JsonbObjectKeys
3337 | TableFunc::JsonbArrayElements { .. }
3338 | TableFunc::RegexpExtract(_)
3339 | TableFunc::CsvExtract(_)
3340 | TableFunc::GenerateSeriesInt32
3341 | TableFunc::GenerateSeriesInt64
3342 | TableFunc::GenerateSeriesTimestamp
3343 | TableFunc::GenerateSeriesTimestampTz
3344 | TableFunc::GuardSubquerySize { .. }
3345 | TableFunc::Repeat
3346 | TableFunc::UnnestArray { .. }
3347 | TableFunc::UnnestList { .. }
3348 | TableFunc::UnnestMap { .. }
3349 | TableFunc::Wrap { .. }
3350 | TableFunc::GenerateSubscriptsArray
3351 | TableFunc::TabletizedScalar { .. }
3352 | TableFunc::RegexpMatches => Some(TableFunc::WithOrdinality(WithOrdinality {
3353 inner: Box::new(inner),
3354 })),
3355 _ => None,
3358 }
3359 }
3360}
3361
3362impl TableFunc {
3363 pub fn eval<'a>(
3365 &'a self,
3366 datums: &'a [Datum<'a>],
3367 temp_storage: &'a RowArena,
3368 ) -> Result<Box<dyn Iterator<Item = (Row, Diff)> + 'a>, EvalError> {
3369 if self.empty_on_null_input() && datums.iter().any(|d| d.is_null()) {
3370 return Ok(Box::new(vec![].into_iter()));
3371 }
3372 match self {
3373 TableFunc::AclExplode => Ok(Box::new(acl_explode(datums[0], temp_storage)?)),
3374 TableFunc::MzAclExplode => Ok(Box::new(mz_acl_explode(datums[0], temp_storage)?)),
3375 TableFunc::JsonbEach { stringify } => {
3376 Ok(Box::new(jsonb_each(datums[0], temp_storage, *stringify)))
3377 }
3378 TableFunc::JsonbObjectKeys => Ok(Box::new(jsonb_object_keys(datums[0]))),
3379 TableFunc::JsonbArrayElements { stringify } => Ok(Box::new(jsonb_array_elements(
3380 datums[0],
3381 temp_storage,
3382 *stringify,
3383 ))),
3384 TableFunc::RegexpExtract(a) => Ok(Box::new(regexp_extract(datums[0], a).into_iter())),
3385 TableFunc::CsvExtract(n_cols) => Ok(Box::new(csv_extract(datums[0], *n_cols))),
3386 TableFunc::GenerateSeriesInt32 => {
3387 let res = generate_series(
3388 datums[0].unwrap_int32(),
3389 datums[1].unwrap_int32(),
3390 datums[2].unwrap_int32(),
3391 )?;
3392 Ok(Box::new(res))
3393 }
3394 TableFunc::GenerateSeriesInt64 => {
3395 let res = generate_series(
3396 datums[0].unwrap_int64(),
3397 datums[1].unwrap_int64(),
3398 datums[2].unwrap_int64(),
3399 )?;
3400 Ok(Box::new(res))
3401 }
3402 TableFunc::GenerateSeriesTimestamp => {
3403 fn pass_through<'a>(d: CheckedTimestamp<NaiveDateTime>) -> Datum<'a> {
3404 Datum::from(d)
3405 }
3406 let res = generate_series_ts(
3407 datums[0].unwrap_timestamp(),
3408 datums[1].unwrap_timestamp(),
3409 datums[2].unwrap_interval(),
3410 pass_through,
3411 )?;
3412 Ok(Box::new(res))
3413 }
3414 TableFunc::GenerateSeriesTimestampTz => {
3415 fn gen_ts_tz<'a>(d: CheckedTimestamp<DateTime<Utc>>) -> Datum<'a> {
3416 Datum::from(d)
3417 }
3418 let res = generate_series_ts(
3419 datums[0].unwrap_timestamptz(),
3420 datums[1].unwrap_timestamptz(),
3421 datums[2].unwrap_interval(),
3422 gen_ts_tz,
3423 )?;
3424 Ok(Box::new(res))
3425 }
3426 TableFunc::GenerateSubscriptsArray => {
3427 generate_subscripts_array(datums[0], datums[1].unwrap_int32())
3428 }
3429 TableFunc::GuardSubquerySize { column_type: _ } => {
3430 let count = datums[0].unwrap_int64();
3433 if count != 1 {
3434 Err(EvalError::MultipleRowsFromSubquery)
3435 } else {
3436 Ok(Box::new([].into_iter()))
3437 }
3438 }
3439 TableFunc::Repeat => Ok(Box::new(repeat(datums[0]).into_iter())),
3440 TableFunc::UnnestArray { .. } => Ok(Box::new(unnest_array(datums[0]))),
3441 TableFunc::UnnestList { .. } => Ok(Box::new(unnest_list(datums[0]))),
3442 TableFunc::UnnestMap { .. } => Ok(Box::new(unnest_map(datums[0]))),
3443 TableFunc::Wrap { width, .. } => Ok(Box::new(wrap(datums, *width))),
3444 TableFunc::TabletizedScalar { .. } => {
3445 let r = Row::pack_slice(datums);
3446 Ok(Box::new(std::iter::once((r, Diff::ONE))))
3447 }
3448 TableFunc::RegexpMatches => Ok(Box::new(regexp_matches(datums)?)),
3449 TableFunc::WithOrdinality(func_with_ordinality) => {
3450 func_with_ordinality.eval(datums, temp_storage)
3451 }
3452 }
3453 }
3454
3455 pub fn output_type(&self) -> SqlRelationType {
3456 let (column_types, keys) = match self {
3457 TableFunc::AclExplode => {
3458 let column_types = vec![
3459 SqlScalarType::Oid.nullable(false),
3460 SqlScalarType::Oid.nullable(false),
3461 SqlScalarType::String.nullable(false),
3462 SqlScalarType::Bool.nullable(false),
3463 ];
3464 let keys = vec![];
3465 (column_types, keys)
3466 }
3467 TableFunc::MzAclExplode => {
3468 let column_types = vec![
3469 SqlScalarType::String.nullable(false),
3470 SqlScalarType::String.nullable(false),
3471 SqlScalarType::String.nullable(false),
3472 SqlScalarType::Bool.nullable(false),
3473 ];
3474 let keys = vec![];
3475 (column_types, keys)
3476 }
3477 TableFunc::JsonbEach { stringify: true } => {
3478 let column_types = vec![
3479 SqlScalarType::String.nullable(false),
3480 SqlScalarType::String.nullable(true),
3481 ];
3482 let keys = vec![];
3483 (column_types, keys)
3484 }
3485 TableFunc::JsonbEach { stringify: false } => {
3486 let column_types = vec![
3487 SqlScalarType::String.nullable(false),
3488 SqlScalarType::Jsonb.nullable(false),
3489 ];
3490 let keys = vec![];
3491 (column_types, keys)
3492 }
3493 TableFunc::JsonbObjectKeys => {
3494 let column_types = vec![SqlScalarType::String.nullable(false)];
3495 let keys = vec![];
3496 (column_types, keys)
3497 }
3498 TableFunc::JsonbArrayElements { stringify: true } => {
3499 let column_types = vec![SqlScalarType::String.nullable(true)];
3500 let keys = vec![];
3501 (column_types, keys)
3502 }
3503 TableFunc::JsonbArrayElements { stringify: false } => {
3504 let column_types = vec![SqlScalarType::Jsonb.nullable(false)];
3505 let keys = vec![];
3506 (column_types, keys)
3507 }
3508 TableFunc::RegexpExtract(a) => {
3509 let column_types = a
3510 .capture_groups_iter()
3511 .map(|cg| SqlScalarType::String.nullable(cg.nullable))
3512 .collect();
3513 let keys = vec![];
3514 (column_types, keys)
3515 }
3516 TableFunc::CsvExtract(n_cols) => {
3517 let column_types = iter::repeat(SqlScalarType::String.nullable(false))
3518 .take(*n_cols)
3519 .collect();
3520 let keys = vec![];
3521 (column_types, keys)
3522 }
3523 TableFunc::GenerateSeriesInt32 => {
3524 let column_types = vec![SqlScalarType::Int32.nullable(false)];
3525 let keys = vec![vec![0]];
3526 (column_types, keys)
3527 }
3528 TableFunc::GenerateSeriesInt64 => {
3529 let column_types = vec![SqlScalarType::Int64.nullable(false)];
3530 let keys = vec![vec![0]];
3531 (column_types, keys)
3532 }
3533 TableFunc::GenerateSeriesTimestamp => {
3534 let column_types =
3535 vec![SqlScalarType::Timestamp { precision: None }.nullable(false)];
3536 let keys = vec![vec![0]];
3537 (column_types, keys)
3538 }
3539 TableFunc::GenerateSeriesTimestampTz => {
3540 let column_types =
3541 vec![SqlScalarType::TimestampTz { precision: None }.nullable(false)];
3542 let keys = vec![vec![0]];
3543 (column_types, keys)
3544 }
3545 TableFunc::GenerateSubscriptsArray => {
3546 let column_types = vec![SqlScalarType::Int32.nullable(false)];
3547 let keys = vec![vec![0]];
3548 (column_types, keys)
3549 }
3550 TableFunc::GuardSubquerySize { column_type } => {
3551 let column_types = vec![column_type.clone().nullable(false)];
3552 let keys = vec![];
3553 (column_types, keys)
3554 }
3555 TableFunc::Repeat => {
3556 let column_types = vec![];
3557 let keys = vec![];
3558 (column_types, keys)
3559 }
3560 TableFunc::UnnestArray { el_typ } => {
3561 let column_types = vec![el_typ.clone().nullable(true)];
3562 let keys = vec![];
3563 (column_types, keys)
3564 }
3565 TableFunc::UnnestList { el_typ } => {
3566 let column_types = vec![el_typ.clone().nullable(true)];
3567 let keys = vec![];
3568 (column_types, keys)
3569 }
3570 TableFunc::UnnestMap { value_type } => {
3571 let column_types = vec![
3572 SqlScalarType::String.nullable(false),
3573 value_type.clone().nullable(true),
3574 ];
3575 let keys = vec![vec![0]];
3576 (column_types, keys)
3577 }
3578 TableFunc::Wrap { types, .. } => {
3579 let column_types = types.clone();
3580 let keys = vec![];
3581 (column_types, keys)
3582 }
3583 TableFunc::TabletizedScalar { relation, .. } => {
3584 return relation.clone();
3585 }
3586 TableFunc::RegexpMatches => {
3587 let column_types =
3588 vec![SqlScalarType::Array(Box::new(SqlScalarType::String)).nullable(false)];
3589 let keys = vec![];
3590
3591 (column_types, keys)
3592 }
3593 TableFunc::WithOrdinality(WithOrdinality { inner }) => {
3594 let mut typ = inner.output_type();
3595 typ.column_types.push(SqlScalarType::Int64.nullable(false));
3597 typ.keys.push(vec![typ.column_types.len() - 1]);
3599 (typ.column_types, typ.keys)
3600 }
3601 };
3602
3603 soft_assert_eq_no_log!(column_types.len(), self.output_arity());
3604
3605 if !keys.is_empty() {
3606 SqlRelationType::new(column_types).with_keys(keys)
3607 } else {
3608 SqlRelationType::new(column_types)
3609 }
3610 }
3611
3612 pub fn output_arity(&self) -> usize {
3613 match self {
3614 TableFunc::AclExplode => 4,
3615 TableFunc::MzAclExplode => 4,
3616 TableFunc::JsonbEach { .. } => 2,
3617 TableFunc::JsonbObjectKeys => 1,
3618 TableFunc::JsonbArrayElements { .. } => 1,
3619 TableFunc::RegexpExtract(a) => a.capture_groups_len(),
3620 TableFunc::CsvExtract(n_cols) => *n_cols,
3621 TableFunc::GenerateSeriesInt32 => 1,
3622 TableFunc::GenerateSeriesInt64 => 1,
3623 TableFunc::GenerateSeriesTimestamp => 1,
3624 TableFunc::GenerateSeriesTimestampTz => 1,
3625 TableFunc::GenerateSubscriptsArray => 1,
3626 TableFunc::GuardSubquerySize { .. } => 1,
3627 TableFunc::Repeat => 0,
3628 TableFunc::UnnestArray { .. } => 1,
3629 TableFunc::UnnestList { .. } => 1,
3630 TableFunc::UnnestMap { .. } => 2,
3631 TableFunc::Wrap { width, .. } => *width,
3632 TableFunc::TabletizedScalar { relation, .. } => relation.column_types.len(),
3633 TableFunc::RegexpMatches => 1,
3634 TableFunc::WithOrdinality(WithOrdinality { inner }) => inner.output_arity() + 1,
3635 }
3636 }
3637
3638 pub fn empty_on_null_input(&self) -> bool {
3639 match self {
3640 TableFunc::AclExplode
3641 | TableFunc::MzAclExplode
3642 | TableFunc::JsonbEach { .. }
3643 | TableFunc::JsonbObjectKeys
3644 | TableFunc::JsonbArrayElements { .. }
3645 | TableFunc::GenerateSeriesInt32
3646 | TableFunc::GenerateSeriesInt64
3647 | TableFunc::GenerateSeriesTimestamp
3648 | TableFunc::GenerateSeriesTimestampTz
3649 | TableFunc::GenerateSubscriptsArray
3650 | TableFunc::RegexpExtract(_)
3651 | TableFunc::CsvExtract(_)
3652 | TableFunc::Repeat
3653 | TableFunc::UnnestArray { .. }
3654 | TableFunc::UnnestList { .. }
3655 | TableFunc::UnnestMap { .. }
3656 | TableFunc::RegexpMatches => true,
3657 TableFunc::GuardSubquerySize { .. } => false,
3658 TableFunc::Wrap { .. } => false,
3659 TableFunc::TabletizedScalar { .. } => false,
3660 TableFunc::WithOrdinality(WithOrdinality { inner }) => inner.empty_on_null_input(),
3661 }
3662 }
3663
3664 pub fn preserves_monotonicity(&self) -> bool {
3666 match self {
3669 TableFunc::AclExplode => false,
3670 TableFunc::MzAclExplode => false,
3671 TableFunc::JsonbEach { .. } => true,
3672 TableFunc::JsonbObjectKeys => true,
3673 TableFunc::JsonbArrayElements { .. } => true,
3674 TableFunc::RegexpExtract(_) => true,
3675 TableFunc::CsvExtract(_) => true,
3676 TableFunc::GenerateSeriesInt32 => true,
3677 TableFunc::GenerateSeriesInt64 => true,
3678 TableFunc::GenerateSeriesTimestamp => true,
3679 TableFunc::GenerateSeriesTimestampTz => true,
3680 TableFunc::GenerateSubscriptsArray => true,
3681 TableFunc::Repeat => false,
3682 TableFunc::UnnestArray { .. } => true,
3683 TableFunc::UnnestList { .. } => true,
3684 TableFunc::UnnestMap { .. } => true,
3685 TableFunc::Wrap { .. } => true,
3686 TableFunc::TabletizedScalar { .. } => true,
3687 TableFunc::RegexpMatches => true,
3688 TableFunc::GuardSubquerySize { .. } => false,
3689 TableFunc::WithOrdinality(WithOrdinality { inner }) => inner.preserves_monotonicity(),
3690 }
3691 }
3692}
3693
3694impl fmt::Display for TableFunc {
3695 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
3696 match self {
3697 TableFunc::AclExplode => f.write_str("aclexplode"),
3698 TableFunc::MzAclExplode => f.write_str("mz_aclexplode"),
3699 TableFunc::JsonbEach { .. } => f.write_str("jsonb_each"),
3700 TableFunc::JsonbObjectKeys => f.write_str("jsonb_object_keys"),
3701 TableFunc::JsonbArrayElements { .. } => f.write_str("jsonb_array_elements"),
3702 TableFunc::RegexpExtract(a) => write!(f, "regexp_extract({:?}, _)", a.0),
3703 TableFunc::CsvExtract(n_cols) => write!(f, "csv_extract({}, _)", n_cols),
3704 TableFunc::GenerateSeriesInt32 => f.write_str("generate_series"),
3705 TableFunc::GenerateSeriesInt64 => f.write_str("generate_series"),
3706 TableFunc::GenerateSeriesTimestamp => f.write_str("generate_series"),
3707 TableFunc::GenerateSeriesTimestampTz => f.write_str("generate_series"),
3708 TableFunc::GenerateSubscriptsArray => f.write_str("generate_subscripts"),
3709 TableFunc::GuardSubquerySize { .. } => f.write_str("guard_subquery_size"),
3710 TableFunc::Repeat => f.write_str("repeat_row"),
3711 TableFunc::UnnestArray { .. } => f.write_str("unnest_array"),
3712 TableFunc::UnnestList { .. } => f.write_str("unnest_list"),
3713 TableFunc::UnnestMap { .. } => f.write_str("unnest_map"),
3714 TableFunc::Wrap { width, .. } => write!(f, "wrap{}", width),
3715 TableFunc::TabletizedScalar { name, .. } => f.write_str(name),
3716 TableFunc::RegexpMatches => write!(f, "regexp_matches(_, _, _)"),
3717 TableFunc::WithOrdinality(WithOrdinality { inner }) => {
3718 write!(f, "{}[with_ordinality]", inner)
3719 }
3720 }
3721 }
3722}
3723
3724impl WithOrdinality {
3725 fn eval<'a>(
3734 &'a self,
3735 datums: &'a [Datum<'a>],
3736 temp_storage: &'a RowArena,
3737 ) -> Result<Box<dyn Iterator<Item = (Row, Diff)> + 'a>, EvalError> {
3738 let mut next_ordinal: i64 = 1;
3739 let it = self
3740 .inner
3741 .eval(datums, temp_storage)?
3742 .flat_map(move |(mut row, diff)| {
3743 let diff = diff.into_inner();
3744 assert!(diff >= 0);
3751 let mut ordinals = next_ordinal..(next_ordinal + diff);
3753 next_ordinal += diff;
3754 let cap = row.data_len() + datum_size(&Datum::Int64(next_ordinal));
3756 iter::from_fn(move || {
3757 let ordinal = ordinals.next()?;
3758 let mut row = if ordinals.is_empty() {
3759 std::mem::take(&mut row)
3762 } else {
3763 let mut new_row = Row::with_capacity(cap);
3764 new_row.clone_from(&row);
3765 new_row
3766 };
3767 RowPacker::for_existing_row(&mut row).push(Datum::Int64(ordinal));
3768 Some((row, Diff::ONE))
3769 })
3770 });
3771 Ok(Box::new(it))
3772 }
3773}