1#![allow(missing_docs)]
11
12use std::cmp::{max, min};
13use std::iter::Sum;
14use std::ops::Deref;
15use std::str::FromStr;
16use std::{fmt, iter};
17
18use chrono::{DateTime, NaiveDateTime, NaiveTime, Utc};
19use dec::OrderedDecimal;
20use itertools::{Either, Itertools};
21use mz_lowertest::MzReflect;
22use mz_ore::cast::CastFrom;
23
24use mz_ore::str::separated;
25use mz_ore::{soft_assert_eq_no_log, soft_assert_or_log};
26use mz_proto::{IntoRustIfSome, ProtoType, RustType, TryFromProtoError};
27use mz_repr::adt::array::ArrayDimension;
28use mz_repr::adt::date::Date;
29use mz_repr::adt::interval::Interval;
30use mz_repr::adt::numeric::{self, Numeric, NumericMaxScale};
31use mz_repr::adt::regex::Regex as ReprRegex;
32use mz_repr::adt::timestamp::{CheckedTimestamp, TimestampLike};
33use mz_repr::{
34 ColumnName, ColumnType, Datum, Diff, RelationType, Row, RowArena, RowPacker, ScalarType,
35 SharedRow, datum_size,
36};
37use num::{CheckedAdd, Integer, Signed, ToPrimitive};
38use ordered_float::OrderedFloat;
39use proptest::prelude::{Arbitrary, Just};
40use proptest::strategy::{BoxedStrategy, Strategy, Union};
41use proptest_derive::Arbitrary;
42use regex::Regex;
43use serde::{Deserialize, Serialize};
44use smallvec::SmallVec;
45
46use crate::EvalError;
47use crate::WindowFrameBound::{
48 CurrentRow, OffsetFollowing, OffsetPreceding, UnboundedFollowing, UnboundedPreceding,
49};
50use crate::WindowFrameUnits::{Groups, Range, Rows};
51use crate::explain::{HumanizedExpr, HumanizerMode};
52use crate::relation::proto_aggregate_func::{
53 self, ProtoColumnOrders, ProtoFusedValueWindowFunc, ProtoFusedWindowAggregate,
54};
55use crate::relation::proto_table_func::{ProtoTabletizedScalar, ProtoWithOrdinality};
56use crate::relation::{
57 ColumnOrder, ProtoAggregateFunc, ProtoTableFunc, WindowFrame, WindowFrameBound,
58 WindowFrameUnits, compare_columns, proto_table_func,
59};
60use crate::scalar::func::{add_timestamp_months, jsonb_stringify};
61
62include!(concat!(env!("OUT_DIR"), "/mz_expr.relation.func.rs"));
63
64fn max_string<'a, I>(datums: I) -> Datum<'a>
68where
69 I: IntoIterator<Item = Datum<'a>>,
70{
71 match datums
72 .into_iter()
73 .filter(|d| !d.is_null())
74 .max_by(|a, b| a.unwrap_str().cmp(b.unwrap_str()))
75 {
76 Some(datum) => datum,
77 None => Datum::Null,
78 }
79}
80
81fn max_datum<'a, I, DatumType>(datums: I) -> Datum<'a>
82where
83 I: IntoIterator<Item = Datum<'a>>,
84 DatumType: TryFrom<Datum<'a>> + Ord,
85 <DatumType as TryFrom<Datum<'a>>>::Error: std::fmt::Debug,
86 Datum<'a>: From<Option<DatumType>>,
87{
88 let x: Option<DatumType> = datums
89 .into_iter()
90 .filter(|d| !d.is_null())
91 .map(|d| DatumType::try_from(d).expect("unexpected type"))
92 .max();
93
94 x.into()
95}
96
97fn min_datum<'a, I, DatumType>(datums: I) -> Datum<'a>
98where
99 I: IntoIterator<Item = Datum<'a>>,
100 DatumType: TryFrom<Datum<'a>> + Ord,
101 <DatumType as TryFrom<Datum<'a>>>::Error: std::fmt::Debug,
102 Datum<'a>: From<Option<DatumType>>,
103{
104 let x: Option<DatumType> = datums
105 .into_iter()
106 .filter(|d| !d.is_null())
107 .map(|d| DatumType::try_from(d).expect("unexpected type"))
108 .min();
109
110 x.into()
111}
112
113fn min_string<'a, I>(datums: I) -> Datum<'a>
114where
115 I: IntoIterator<Item = Datum<'a>>,
116{
117 match datums
118 .into_iter()
119 .filter(|d| !d.is_null())
120 .min_by(|a, b| a.unwrap_str().cmp(b.unwrap_str()))
121 {
122 Some(datum) => datum,
123 None => Datum::Null,
124 }
125}
126
127fn sum_datum<'a, I, DatumType, ResultType>(datums: I) -> Datum<'a>
128where
129 I: IntoIterator<Item = Datum<'a>>,
130 DatumType: TryFrom<Datum<'a>>,
131 <DatumType as TryFrom<Datum<'a>>>::Error: std::fmt::Debug,
132 ResultType: From<DatumType> + Sum + Into<Datum<'a>>,
133{
134 let mut datums = datums.into_iter().filter(|d| !d.is_null()).peekable();
135 if datums.peek().is_none() {
136 Datum::Null
137 } else {
138 let x = datums
139 .map(|d| ResultType::from(DatumType::try_from(d).expect("unexpected type")))
140 .sum::<ResultType>();
141 x.into()
142 }
143}
144
145fn sum_numeric<'a, I>(datums: I) -> Datum<'a>
146where
147 I: IntoIterator<Item = Datum<'a>>,
148{
149 let mut cx = numeric::cx_datum();
150 let mut sum = Numeric::zero();
151 let mut empty = true;
152 for d in datums {
153 if !d.is_null() {
154 empty = false;
155 cx.add(&mut sum, &d.unwrap_numeric().0);
156 }
157 }
158 match empty {
159 true => Datum::Null,
160 false => Datum::from(sum),
161 }
162}
163
164#[allow(clippy::as_conversions)]
166fn count<'a, I>(datums: I) -> Datum<'a>
167where
168 I: IntoIterator<Item = Datum<'a>>,
169{
170 let x: i64 = datums.into_iter().filter(|d| !d.is_null()).count() as i64;
172 Datum::from(x)
173}
174
175fn any<'a, I>(datums: I) -> Datum<'a>
176where
177 I: IntoIterator<Item = Datum<'a>>,
178{
179 datums
180 .into_iter()
181 .fold(Datum::False, |state, next| match (state, next) {
182 (Datum::True, _) | (_, Datum::True) => Datum::True,
183 (Datum::Null, _) | (_, Datum::Null) => Datum::Null,
184 _ => Datum::False,
185 })
186}
187
188fn all<'a, I>(datums: I) -> Datum<'a>
189where
190 I: IntoIterator<Item = Datum<'a>>,
191{
192 datums
193 .into_iter()
194 .fold(Datum::True, |state, next| match (state, next) {
195 (Datum::False, _) | (_, Datum::False) => Datum::False,
196 (Datum::Null, _) | (_, Datum::Null) => Datum::Null,
197 _ => Datum::True,
198 })
199}
200
201fn string_agg<'a, I>(datums: I, temp_storage: &'a RowArena, order_by: &[ColumnOrder]) -> Datum<'a>
202where
203 I: IntoIterator<Item = Datum<'a>>,
204{
205 const EMPTY_SEP: &str = "";
206
207 let datums = order_aggregate_datums(datums, order_by);
208 let mut sep_value_pairs = datums.into_iter().filter_map(|d| {
209 if d.is_null() {
210 return None;
211 }
212 let mut value_sep = d.unwrap_list().iter();
213 match (value_sep.next().unwrap(), value_sep.next().unwrap()) {
214 (Datum::Null, _) => None,
215 (Datum::String(val), Datum::Null) => Some((EMPTY_SEP, val)),
216 (Datum::String(val), Datum::String(sep)) => Some((sep, val)),
217 _ => unreachable!(),
218 }
219 });
220
221 let mut s = String::default();
222 match sep_value_pairs.next() {
223 Some((_, value)) => s.push_str(value),
225 None => return Datum::Null,
227 }
228
229 for (sep, value) in sep_value_pairs {
230 s.push_str(sep);
231 s.push_str(value);
232 }
233
234 Datum::String(temp_storage.push_string(s))
235}
236
237fn jsonb_agg<'a, I>(datums: I, temp_storage: &'a RowArena, order_by: &[ColumnOrder]) -> Datum<'a>
238where
239 I: IntoIterator<Item = Datum<'a>>,
240{
241 let datums = order_aggregate_datums(datums, order_by);
242 temp_storage.make_datum(|packer| {
243 packer.push_list(datums.into_iter().filter(|d| !d.is_null()));
244 })
245}
246
247fn dict_agg<'a, I>(datums: I, temp_storage: &'a RowArena, order_by: &[ColumnOrder]) -> Datum<'a>
248where
249 I: IntoIterator<Item = Datum<'a>>,
250{
251 let datums = order_aggregate_datums(datums, order_by);
252 temp_storage.make_datum(|packer| {
253 let mut datums: Vec<_> = datums
254 .into_iter()
255 .filter_map(|d| {
256 if d.is_null() {
257 return None;
258 }
259 let mut list = d.unwrap_list().iter();
260 let key = list.next().unwrap();
261 let val = list.next().unwrap();
262 if key.is_null() {
263 None
266 } else {
267 Some((key.unwrap_str(), val))
268 }
269 })
270 .collect();
271 datums.sort_by_key(|(k, _v)| *k);
277 datums.reverse();
278 datums.dedup_by_key(|(k, _v)| *k);
279 datums.reverse();
280 packer.push_dict(datums);
281 })
282}
283
284pub fn order_aggregate_datums<'a: 'b, 'b, I>(
294 datums: I,
295 order_by: &[ColumnOrder],
296) -> impl Iterator<Item = Datum<'b>>
297where
298 I: IntoIterator<Item = Datum<'a>>,
299{
300 order_aggregate_datums_with_rank_inner(datums, order_by)
301 .into_iter()
302 .map(|(payload, _order_datums)| payload)
304}
305
306fn order_aggregate_datums_with_rank<'a, I>(
309 datums: I,
310 order_by: &[ColumnOrder],
311) -> impl Iterator<Item = (Datum<'a>, Row)>
312where
313 I: IntoIterator<Item = Datum<'a>>,
314{
315 order_aggregate_datums_with_rank_inner(datums, order_by)
316 .into_iter()
317 .map(|(payload, order_by_datums)| (payload, Row::pack(order_by_datums)))
318}
319
320fn order_aggregate_datums_with_rank_inner<'a, I>(
321 datums: I,
322 order_by: &[ColumnOrder],
323) -> Vec<(Datum<'a>, Vec<Datum<'a>>)>
324where
325 I: IntoIterator<Item = Datum<'a>>,
326{
327 let mut decoded: Vec<(Datum, Vec<Datum>)> = datums
328 .into_iter()
329 .map(|d| {
330 let list = d.unwrap_list();
331 let mut list_it = list.iter();
332 let payload = list_it.next().unwrap();
333
334 let mut order_by_datums = Vec::with_capacity(order_by.len());
344 for _ in 0..order_by.len() {
345 order_by_datums.push(
346 list_it
347 .next()
348 .expect("must have exactly the same number of Datums as `order_by`"),
349 );
350 }
351
352 (payload, order_by_datums)
353 })
354 .collect();
355
356 let mut sort_by =
357 |(payload_left, left_order_by_datums): &(Datum, Vec<Datum>),
358 (payload_right, right_order_by_datums): &(Datum, Vec<Datum>)| {
359 compare_columns(
360 order_by,
361 left_order_by_datums,
362 right_order_by_datums,
363 || payload_left.cmp(payload_right),
364 )
365 };
366 decoded.sort_unstable_by(&mut sort_by);
371 decoded
372}
373
374fn array_concat<'a, I>(datums: I, temp_storage: &'a RowArena, order_by: &[ColumnOrder]) -> Datum<'a>
375where
376 I: IntoIterator<Item = Datum<'a>>,
377{
378 let datums = order_aggregate_datums(datums, order_by);
379 let datums: Vec<_> = datums
380 .into_iter()
381 .map(|d| d.unwrap_array().elements().iter())
382 .flatten()
383 .collect();
384 let dims = ArrayDimension {
385 lower_bound: 1,
386 length: datums.len(),
387 };
388 temp_storage.make_datum(|packer| {
389 packer.try_push_array(&[dims], datums).unwrap();
390 })
391}
392
393fn list_concat<'a, I>(datums: I, temp_storage: &'a RowArena, order_by: &[ColumnOrder]) -> Datum<'a>
394where
395 I: IntoIterator<Item = Datum<'a>>,
396{
397 let datums = order_aggregate_datums(datums, order_by);
398 temp_storage.make_datum(|packer| {
399 packer.push_list(datums.into_iter().map(|d| d.unwrap_list().iter()).flatten());
400 })
401}
402
403fn row_number<'a, I>(
407 datums: I,
408 callers_temp_storage: &'a RowArena,
409 order_by: &[ColumnOrder],
410) -> Datum<'a>
411where
412 I: IntoIterator<Item = Datum<'a>>,
413{
414 let temp_storage = RowArena::new();
418 let datums = row_number_no_list(datums, &temp_storage, order_by);
419
420 callers_temp_storage.make_datum(|packer| {
421 packer.push_list(datums);
422 })
423}
424
425fn row_number_no_list<'a: 'b, 'b, I>(
428 datums: I,
429 callers_temp_storage: &'b RowArena,
430 order_by: &[ColumnOrder],
431) -> impl Iterator<Item = Datum<'b>>
432where
433 I: IntoIterator<Item = Datum<'a>>,
434{
435 let datums = order_aggregate_datums(datums, order_by);
436
437 callers_temp_storage.reserve(datums.size_hint().0);
438 datums
439 .into_iter()
440 .map(|d| d.unwrap_list().iter())
441 .flatten()
442 .zip(1i64..)
443 .map(|(d, i)| {
444 callers_temp_storage.make_datum(|packer| {
445 packer.push_list_with(|packer| {
446 packer.push(Datum::Int64(i));
447 packer.push(d);
448 });
449 })
450 })
451}
452
453fn rank<'a, I>(datums: I, callers_temp_storage: &'a RowArena, order_by: &[ColumnOrder]) -> Datum<'a>
457where
458 I: IntoIterator<Item = Datum<'a>>,
459{
460 let temp_storage = RowArena::new();
461 let datums = rank_no_list(datums, &temp_storage, order_by);
462
463 callers_temp_storage.make_datum(|packer| {
464 packer.push_list(datums);
465 })
466}
467
468fn rank_no_list<'a: 'b, 'b, I>(
471 datums: I,
472 callers_temp_storage: &'b RowArena,
473 order_by: &[ColumnOrder],
474) -> impl Iterator<Item = Datum<'b>>
475where
476 I: IntoIterator<Item = Datum<'a>>,
477{
478 let datums = order_aggregate_datums_with_rank(datums, order_by);
480
481 let mut datums = datums
482 .into_iter()
483 .map(|(d0, order_row)| {
484 d0.unwrap_list()
485 .iter()
486 .map(move |d1| (d1, order_row.clone()))
487 })
488 .flatten();
489
490 callers_temp_storage.reserve(datums.size_hint().0);
491 datums
492 .next()
493 .map_or(vec![], |(first_datum, first_order_row)| {
494 datums.fold((first_order_row, 1, 1, vec![(first_datum, 1)]), |mut acc, (next_datum, next_order_row)| {
496 let (ref mut acc_row, ref mut acc_rank, ref mut acc_row_num, ref mut output) = acc;
497 *acc_row_num += 1;
498 if *acc_row != next_order_row {
500 *acc_rank = *acc_row_num;
501 *acc_row = next_order_row;
502 }
503
504 (*output).push((next_datum, *acc_rank));
505 acc
506 })
507 }.3).into_iter().map(|(d, i)| {
508 callers_temp_storage.make_datum(|packer| {
509 packer.push_list_with(|packer| {
510 packer.push(Datum::Int64(i));
511 packer.push(d);
512 });
513 })
514 })
515}
516
517fn dense_rank<'a, I>(
521 datums: I,
522 callers_temp_storage: &'a RowArena,
523 order_by: &[ColumnOrder],
524) -> Datum<'a>
525where
526 I: IntoIterator<Item = Datum<'a>>,
527{
528 let temp_storage = RowArena::new();
529 let datums = dense_rank_no_list(datums, &temp_storage, order_by);
530
531 callers_temp_storage.make_datum(|packer| {
532 packer.push_list(datums);
533 })
534}
535
536fn dense_rank_no_list<'a: 'b, 'b, I>(
539 datums: I,
540 callers_temp_storage: &'b RowArena,
541 order_by: &[ColumnOrder],
542) -> impl Iterator<Item = Datum<'b>>
543where
544 I: IntoIterator<Item = Datum<'a>>,
545{
546 let datums = order_aggregate_datums_with_rank(datums, order_by);
548
549 let mut datums = datums
550 .into_iter()
551 .map(|(d0, order_row)| {
552 d0.unwrap_list()
553 .iter()
554 .map(move |d1| (d1, order_row.clone()))
555 })
556 .flatten();
557
558 callers_temp_storage.reserve(datums.size_hint().0);
559 datums
560 .next()
561 .map_or(vec![], |(first_datum, first_order_row)| {
562 datums.fold((first_order_row, 1, vec![(first_datum, 1)]), |mut acc, (next_datum, next_order_row)| {
564 let (ref mut acc_row, ref mut acc_rank, ref mut output) = acc;
565 if *acc_row != next_order_row {
567 *acc_rank += 1;
568 *acc_row = next_order_row;
569 }
570
571 (*output).push((next_datum, *acc_rank));
572 acc
573 })
574 }.2).into_iter().map(|(d, i)| {
575 callers_temp_storage.make_datum(|packer| {
576 packer.push_list_with(|packer| {
577 packer.push(Datum::Int64(i));
578 packer.push(d);
579 });
580 })
581 })
582}
583
584fn lag_lead<'a, I>(
606 datums: I,
607 callers_temp_storage: &'a RowArena,
608 order_by: &[ColumnOrder],
609 lag_lead_type: &LagLeadType,
610 ignore_nulls: &bool,
611) -> Datum<'a>
612where
613 I: IntoIterator<Item = Datum<'a>>,
614{
615 let temp_storage = RowArena::new();
616 let iter = lag_lead_no_list(datums, &temp_storage, order_by, lag_lead_type, ignore_nulls);
617 callers_temp_storage.make_datum(|packer| {
618 packer.push_list(iter);
619 })
620}
621
622fn lag_lead_no_list<'a: 'b, 'b, I>(
625 datums: I,
626 callers_temp_storage: &'b RowArena,
627 order_by: &[ColumnOrder],
628 lag_lead_type: &LagLeadType,
629 ignore_nulls: &bool,
630) -> impl Iterator<Item = Datum<'b>>
631where
632 I: IntoIterator<Item = Datum<'a>>,
633{
634 let datums = order_aggregate_datums(datums, order_by);
636
637 let (orig_rows, unwrapped_args): (Vec<_>, Vec<_>) = datums
641 .into_iter()
642 .map(|d| {
643 let mut iter = d.unwrap_list().iter();
644 let original_row = iter.next().unwrap();
645 let (input_value, offset, default_value) =
646 unwrap_lag_lead_encoded_args(iter.next().unwrap());
647 (original_row, (input_value, offset, default_value))
648 })
649 .unzip();
650
651 let result = lag_lead_inner(unwrapped_args, lag_lead_type, ignore_nulls);
652
653 callers_temp_storage.reserve(result.len());
654 result
655 .into_iter()
656 .zip_eq(orig_rows)
657 .map(|(result_value, original_row)| {
658 callers_temp_storage.make_datum(|packer| {
659 packer.push_list_with(|packer| {
660 packer.push(result_value);
661 packer.push(original_row);
662 });
663 })
664 })
665}
666
667fn unwrap_lag_lead_encoded_args(encoded_args: Datum) -> (Datum, Datum, Datum) {
669 let mut encoded_args_iter = encoded_args.unwrap_list().iter();
670 let (input_value, offset, default_value) = (
671 encoded_args_iter.next().unwrap(),
672 encoded_args_iter.next().unwrap(),
673 encoded_args_iter.next().unwrap(),
674 );
675 (input_value, offset, default_value)
676}
677
678fn lag_lead_inner<'a>(
681 args: Vec<(Datum<'a>, Datum<'a>, Datum<'a>)>,
682 lag_lead_type: &LagLeadType,
683 ignore_nulls: &bool,
684) -> Vec<Datum<'a>> {
685 if *ignore_nulls {
686 lag_lead_inner_ignore_nulls(args, lag_lead_type)
687 } else {
688 lag_lead_inner_respect_nulls(args, lag_lead_type)
689 }
690}
691
692fn lag_lead_inner_respect_nulls<'a>(
693 args: Vec<(Datum<'a>, Datum<'a>, Datum<'a>)>,
694 lag_lead_type: &LagLeadType,
695) -> Vec<Datum<'a>> {
696 let mut result: Vec<Datum> = Vec::with_capacity(args.len());
697 for (idx, (_, offset, default_value)) in args.iter().enumerate() {
698 if offset.is_null() {
700 result.push(Datum::Null);
701 continue;
702 }
703
704 let idx = i64::try_from(idx).expect("Array index does not fit in i64");
705 let offset = i64::from(offset.unwrap_int32());
706 let offset = match lag_lead_type {
707 LagLeadType::Lag => -offset,
708 LagLeadType::Lead => offset,
709 };
710
711 let datums_get = |i: i64| -> Option<Datum> {
713 match u64::try_from(i) {
714 Ok(i) => args
715 .get(usize::cast_from(i))
716 .map(|d| Some(d.0)) .unwrap_or(None), Err(_) => None, }
720 };
721
722 let lagged_value = datums_get(idx + offset).unwrap_or(*default_value);
723
724 result.push(lagged_value);
725 }
726
727 result
728}
729
730#[allow(clippy::as_conversions)]
734fn lag_lead_inner_ignore_nulls<'a>(
735 args: Vec<(Datum<'a>, Datum<'a>, Datum<'a>)>,
736 lag_lead_type: &LagLeadType,
737) -> Vec<Datum<'a>> {
738 if i64::try_from(args.len()).is_err() {
741 panic!("window partition way too big")
742 }
743 let mut skip_nulls_backward = vec![None; args.len()];
746 let mut last_non_null: i64 = -1;
747 let pairs = args
748 .iter()
749 .enumerate()
750 .zip_eq(skip_nulls_backward.iter_mut());
751 for ((i, (d, _, _)), slot) in pairs {
752 if d.is_null() {
753 *slot = Some(last_non_null);
754 } else {
755 last_non_null = i as i64;
756 }
757 }
758 let mut skip_nulls_forward = vec![None; args.len()];
759 let mut last_non_null: i64 = args.len() as i64;
760 let pairs = args
761 .iter()
762 .enumerate()
763 .rev()
764 .zip_eq(skip_nulls_forward.iter_mut().rev());
765 for ((i, (d, _, _)), slot) in pairs {
766 if d.is_null() {
767 *slot = Some(last_non_null);
768 } else {
769 last_non_null = i as i64;
770 }
771 }
772
773 let mut result: Vec<Datum> = Vec::with_capacity(args.len());
775 for (idx, (_, offset, default_value)) in args.iter().enumerate() {
776 if offset.is_null() {
778 result.push(Datum::Null);
779 continue;
780 }
781
782 let idx = idx as i64; let offset = i64::cast_from(offset.unwrap_int32());
784 let offset = match lag_lead_type {
785 LagLeadType::Lag => -offset,
786 LagLeadType::Lead => offset,
787 };
788 let increment = offset.signum();
789
790 let datums_get = |i: i64| -> Option<Datum> {
792 match u64::try_from(i) {
793 Ok(i) => args
794 .get(usize::cast_from(i))
795 .map(|d| Some(d.0)) .unwrap_or(None), Err(_) => None, }
799 };
800
801 let lagged_value = if increment != 0 {
802 let mut j = idx;
812 for _ in 0..num::abs(offset) {
813 j += increment;
814 if datums_get(j).is_some_and(|d| d.is_null()) {
816 let ju = j as usize; if increment > 0 {
818 j = skip_nulls_forward[ju].expect("checked above that it's null");
819 } else {
820 j = skip_nulls_backward[ju].expect("checked above that it's null");
821 }
822 }
823 if datums_get(j).is_none() {
824 break;
825 }
826 }
827 match datums_get(j) {
828 Some(datum) => datum,
829 None => *default_value,
830 }
831 } else {
832 assert_eq!(offset, 0);
833 let datum = datums_get(idx).expect("known to exist");
834 if !datum.is_null() {
835 datum
836 } else {
837 panic!("0 offset in lag/lead IGNORE NULLS");
843 }
844 };
845
846 result.push(lagged_value);
847 }
848
849 result
850}
851
852fn first_value<'a, I>(
854 datums: I,
855 callers_temp_storage: &'a RowArena,
856 order_by: &[ColumnOrder],
857 window_frame: &WindowFrame,
858) -> Datum<'a>
859where
860 I: IntoIterator<Item = Datum<'a>>,
861{
862 let temp_storage = RowArena::new();
863 let iter = first_value_no_list(datums, &temp_storage, order_by, window_frame);
864 callers_temp_storage.make_datum(|packer| {
865 packer.push_list(iter);
866 })
867}
868
869fn first_value_no_list<'a: 'b, 'b, I>(
872 datums: I,
873 callers_temp_storage: &'b RowArena,
874 order_by: &[ColumnOrder],
875 window_frame: &WindowFrame,
876) -> impl Iterator<Item = Datum<'b>>
877where
878 I: IntoIterator<Item = Datum<'a>>,
879{
880 let datums = order_aggregate_datums(datums, order_by);
882
883 let (orig_rows, args): (Vec<_>, Vec<_>) = datums
885 .into_iter()
886 .map(|d| {
887 let mut iter = d.unwrap_list().iter();
888 let original_row = iter.next().unwrap();
889 let arg = iter.next().unwrap();
890
891 (original_row, arg)
892 })
893 .unzip();
894
895 let results = first_value_inner(args, window_frame);
896
897 callers_temp_storage.reserve(results.len());
898 results
899 .into_iter()
900 .zip_eq(orig_rows)
901 .map(|(result_value, original_row)| {
902 callers_temp_storage.make_datum(|packer| {
903 packer.push_list_with(|packer| {
904 packer.push(result_value);
905 packer.push(original_row);
906 });
907 })
908 })
909}
910
911fn first_value_inner<'a>(datums: Vec<Datum<'a>>, window_frame: &WindowFrame) -> Vec<Datum<'a>> {
912 let length = datums.len();
913 let mut result: Vec<Datum> = Vec::with_capacity(length);
914 for (idx, current_datum) in datums.iter().enumerate() {
915 let first_value = match &window_frame.start_bound {
916 WindowFrameBound::CurrentRow => *current_datum,
918 WindowFrameBound::UnboundedPreceding => {
919 if let WindowFrameBound::OffsetPreceding(end_offset) = &window_frame.end_bound {
920 let end_offset = usize::cast_from(*end_offset);
921
922 if idx < end_offset {
924 Datum::Null
925 } else {
926 datums[0]
927 }
928 } else {
929 datums[0]
930 }
931 }
932 WindowFrameBound::OffsetPreceding(offset) => {
933 let start_offset = usize::cast_from(*offset);
934 let start_idx = idx.saturating_sub(start_offset);
935 if let WindowFrameBound::OffsetPreceding(end_offset) = &window_frame.end_bound {
936 let end_offset = usize::cast_from(*end_offset);
937
938 if start_offset < end_offset || idx < end_offset {
940 Datum::Null
941 } else {
942 datums[start_idx]
943 }
944 } else {
945 datums[start_idx]
946 }
947 }
948 WindowFrameBound::OffsetFollowing(offset) => {
949 let start_offset = usize::cast_from(*offset);
950 let start_idx = idx.saturating_add(start_offset);
951 if let WindowFrameBound::OffsetFollowing(end_offset) = &window_frame.end_bound {
952 if offset > end_offset || start_idx >= length {
954 Datum::Null
955 } else {
956 datums[start_idx]
957 }
958 } else {
959 datums
960 .get(start_idx)
961 .map(|d| d.clone())
962 .unwrap_or(Datum::Null)
963 }
964 }
965 WindowFrameBound::UnboundedFollowing => unreachable!(),
967 };
968 result.push(first_value);
969 }
970 result
971}
972
973fn last_value<'a, I>(
975 datums: I,
976 callers_temp_storage: &'a RowArena,
977 order_by: &[ColumnOrder],
978 window_frame: &WindowFrame,
979) -> Datum<'a>
980where
981 I: IntoIterator<Item = Datum<'a>>,
982{
983 let temp_storage = RowArena::new();
984 let iter = last_value_no_list(datums, &temp_storage, order_by, window_frame);
985 callers_temp_storage.make_datum(|packer| {
986 packer.push_list(iter);
987 })
988}
989
990fn last_value_no_list<'a: 'b, 'b, I>(
993 datums: I,
994 callers_temp_storage: &'b RowArena,
995 order_by: &[ColumnOrder],
996 window_frame: &WindowFrame,
997) -> impl Iterator<Item = Datum<'b>>
998where
999 I: IntoIterator<Item = Datum<'a>>,
1000{
1001 let datums = order_aggregate_datums_with_rank(datums, order_by);
1004
1005 let size_hint = datums.size_hint().0;
1007 let mut args = Vec::with_capacity(size_hint);
1008 let mut original_rows = Vec::with_capacity(size_hint);
1009 let mut order_by_rows = Vec::with_capacity(size_hint);
1010 for (d, order_by_row) in datums.into_iter() {
1011 let mut iter = d.unwrap_list().iter();
1012 let original_row = iter.next().unwrap();
1013 let arg = iter.next().unwrap();
1014 order_by_rows.push(order_by_row);
1015 original_rows.push(original_row);
1016 args.push(arg);
1017 }
1018
1019 let results = last_value_inner(args, &order_by_rows, window_frame);
1020
1021 callers_temp_storage.reserve(results.len());
1022 results
1023 .into_iter()
1024 .zip_eq(original_rows)
1025 .map(|(result_value, original_row)| {
1026 callers_temp_storage.make_datum(|packer| {
1027 packer.push_list_with(|packer| {
1028 packer.push(result_value);
1029 packer.push(original_row);
1030 });
1031 })
1032 })
1033}
1034
1035fn last_value_inner<'a>(
1036 args: Vec<Datum<'a>>,
1037 order_by_rows: &Vec<Row>,
1038 window_frame: &WindowFrame,
1039) -> Vec<Datum<'a>> {
1040 let length = args.len();
1041 let mut results: Vec<Datum> = Vec::with_capacity(length);
1042 for (idx, (current_datum, order_by_row)) in args.iter().zip_eq(order_by_rows).enumerate() {
1043 let last_value = match &window_frame.end_bound {
1044 WindowFrameBound::CurrentRow => match &window_frame.units {
1045 WindowFrameUnits::Rows => *current_datum,
1047 WindowFrameUnits::Range => {
1048 let target_idx = order_by_rows[idx..]
1053 .iter()
1054 .enumerate()
1055 .take_while(|(_, row)| *row == order_by_row)
1056 .last()
1057 .unwrap()
1058 .0
1059 + idx;
1060 args[target_idx]
1061 }
1062 WindowFrameUnits::Groups => unreachable!(),
1064 },
1065 WindowFrameBound::UnboundedFollowing => {
1066 if let WindowFrameBound::OffsetFollowing(start_offset) = &window_frame.start_bound {
1067 let start_offset = usize::cast_from(*start_offset);
1068
1069 if idx + start_offset > length - 1 {
1071 Datum::Null
1072 } else {
1073 args[length - 1]
1074 }
1075 } else {
1076 args[length - 1]
1077 }
1078 }
1079 WindowFrameBound::OffsetFollowing(offset) => {
1080 let end_offset = usize::cast_from(*offset);
1081 let end_idx = idx.saturating_add(end_offset);
1082 if let WindowFrameBound::OffsetFollowing(start_offset) = &window_frame.start_bound {
1083 let start_offset = usize::cast_from(*start_offset);
1084 let start_idx = idx.saturating_add(start_offset);
1085
1086 if end_offset < start_offset || start_idx >= length {
1088 Datum::Null
1089 } else {
1090 args.get(end_idx).unwrap_or(&args[length - 1]).clone()
1092 }
1093 } else {
1094 args.get(end_idx).unwrap_or(&args[length - 1]).clone()
1095 }
1096 }
1097 WindowFrameBound::OffsetPreceding(offset) => {
1098 let end_offset = usize::cast_from(*offset);
1099 let end_idx = idx.saturating_sub(end_offset);
1100 if idx < end_offset {
1101 Datum::Null
1103 } else if let WindowFrameBound::OffsetPreceding(start_offset) =
1104 &window_frame.start_bound
1105 {
1106 if offset > start_offset {
1108 Datum::Null
1109 } else {
1110 args[end_idx]
1111 }
1112 } else {
1113 args[end_idx]
1114 }
1115 }
1116 WindowFrameBound::UnboundedPreceding => unreachable!(),
1118 };
1119 results.push(last_value);
1120 }
1121 results
1122}
1123
1124fn fused_value_window_func<'a, I>(
1130 input_datums: I,
1131 callers_temp_storage: &'a RowArena,
1132 funcs: &Vec<AggregateFunc>,
1133 order_by: &Vec<ColumnOrder>,
1134) -> Datum<'a>
1135where
1136 I: IntoIterator<Item = Datum<'a>>,
1137{
1138 let temp_storage = RowArena::new();
1139 let iter = fused_value_window_func_no_list(input_datums, &temp_storage, funcs, order_by);
1140 callers_temp_storage.make_datum(|packer| {
1141 packer.push_list(iter);
1142 })
1143}
1144
1145fn fused_value_window_func_no_list<'a: 'b, 'b, I>(
1148 input_datums: I,
1149 callers_temp_storage: &'b RowArena,
1150 funcs: &Vec<AggregateFunc>,
1151 order_by: &Vec<ColumnOrder>,
1152) -> impl Iterator<Item = Datum<'b>>
1153where
1154 I: IntoIterator<Item = Datum<'a>>,
1155{
1156 let has_last_value = funcs
1157 .iter()
1158 .any(|f| matches!(f, AggregateFunc::LastValue { .. }));
1159
1160 let input_datums_with_ranks = order_aggregate_datums_with_rank(input_datums, order_by);
1161
1162 let size_hint = input_datums_with_ranks.size_hint().0;
1163 let mut encoded_argsss = vec![Vec::with_capacity(size_hint); funcs.len()];
1164 let mut original_rows = Vec::with_capacity(size_hint);
1165 let mut order_by_rows = Vec::with_capacity(size_hint);
1166 for (d, order_by_row) in input_datums_with_ranks {
1167 let mut iter = d.unwrap_list().iter();
1168 let original_row = iter.next().unwrap();
1169 original_rows.push(original_row);
1170 let mut argss_iter = iter.next().unwrap().unwrap_list().iter();
1171 for i in 0..funcs.len() {
1172 let encoded_args = argss_iter.next().unwrap();
1173 encoded_argsss[i].push(encoded_args);
1174 }
1175 if has_last_value {
1176 order_by_rows.push(order_by_row);
1177 }
1178 }
1179
1180 let mut results_per_row = vec![Vec::with_capacity(funcs.len()); original_rows.len()];
1181 for (func, encoded_argss) in funcs.iter().zip_eq(encoded_argsss) {
1182 let results = match func {
1183 AggregateFunc::LagLead {
1184 order_by: inner_order_by,
1185 lag_lead,
1186 ignore_nulls,
1187 } => {
1188 assert_eq!(order_by, inner_order_by);
1189 let unwrapped_argss = encoded_argss
1190 .into_iter()
1191 .map(|encoded_args| unwrap_lag_lead_encoded_args(encoded_args))
1192 .collect();
1193 lag_lead_inner(unwrapped_argss, lag_lead, ignore_nulls)
1194 }
1195 AggregateFunc::FirstValue {
1196 order_by: inner_order_by,
1197 window_frame,
1198 } => {
1199 assert_eq!(order_by, inner_order_by);
1200 first_value_inner(encoded_argss, window_frame)
1203 }
1204 AggregateFunc::LastValue {
1205 order_by: inner_order_by,
1206 window_frame,
1207 } => {
1208 assert_eq!(order_by, inner_order_by);
1209 last_value_inner(encoded_argss, &order_by_rows, window_frame)
1212 }
1213 _ => panic!("unknown window function in FusedValueWindowFunc"),
1214 };
1215 for (results, result) in results_per_row.iter_mut().zip_eq(results) {
1216 results.push(result);
1217 }
1218 }
1219
1220 callers_temp_storage.reserve(2 * original_rows.len());
1221 results_per_row
1222 .into_iter()
1223 .enumerate()
1224 .map(move |(i, results)| {
1225 callers_temp_storage.make_datum(|packer| {
1226 packer.push_list_with(|packer| {
1227 packer
1228 .push(callers_temp_storage.make_datum(|packer| packer.push_list(results)));
1229 packer.push(original_rows[i]);
1230 });
1231 })
1232 })
1233}
1234
1235fn window_aggr<'a, I, A>(
1244 input_datums: I,
1245 callers_temp_storage: &'a RowArena,
1246 wrapped_aggregate: &AggregateFunc,
1247 order_by: &[ColumnOrder],
1248 window_frame: &WindowFrame,
1249) -> Datum<'a>
1250where
1251 I: IntoIterator<Item = Datum<'a>>,
1252 A: OneByOneAggr,
1253{
1254 let temp_storage = RowArena::new();
1255 let iter = window_aggr_no_list::<I, A>(
1256 input_datums,
1257 &temp_storage,
1258 wrapped_aggregate,
1259 order_by,
1260 window_frame,
1261 );
1262 callers_temp_storage.make_datum(|packer| {
1263 packer.push_list(iter);
1264 })
1265}
1266
1267fn window_aggr_no_list<'a: 'b, 'b, I, A>(
1270 input_datums: I,
1271 callers_temp_storage: &'b RowArena,
1272 wrapped_aggregate: &AggregateFunc,
1273 order_by: &[ColumnOrder],
1274 window_frame: &WindowFrame,
1275) -> impl Iterator<Item = Datum<'b>>
1276where
1277 I: IntoIterator<Item = Datum<'a>>,
1278 A: OneByOneAggr,
1279{
1280 let datums = order_aggregate_datums_with_rank(input_datums, order_by);
1283
1284 let size_hint = datums.size_hint().0;
1286 let mut args: Vec<Datum> = Vec::with_capacity(size_hint);
1287 let mut original_rows: Vec<Datum> = Vec::with_capacity(size_hint);
1288 let mut order_by_rows = Vec::with_capacity(size_hint);
1289 for (d, order_by_row) in datums.into_iter() {
1290 let mut iter = d.unwrap_list().iter();
1291 let original_row = iter.next().unwrap();
1292 let arg = iter.next().unwrap();
1293 order_by_rows.push(order_by_row);
1294 original_rows.push(original_row);
1295 args.push(arg);
1296 }
1297
1298 let results = window_aggr_inner::<A>(
1299 args,
1300 &order_by_rows,
1301 wrapped_aggregate,
1302 order_by,
1303 window_frame,
1304 callers_temp_storage,
1305 );
1306
1307 callers_temp_storage.reserve(results.len());
1308 results
1309 .into_iter()
1310 .zip_eq(original_rows)
1311 .map(|(result_value, original_row)| {
1312 callers_temp_storage.make_datum(|packer| {
1313 packer.push_list_with(|packer| {
1314 packer.push(result_value);
1315 packer.push(original_row);
1316 });
1317 })
1318 })
1319}
1320
1321fn window_aggr_inner<'a, A>(
1322 mut args: Vec<Datum<'a>>,
1323 order_by_rows: &Vec<Row>,
1324 wrapped_aggregate: &AggregateFunc,
1325 order_by: &[ColumnOrder],
1326 window_frame: &WindowFrame,
1327 temp_storage: &'a RowArena,
1328) -> Vec<Datum<'a>>
1329where
1330 A: OneByOneAggr,
1331{
1332 let length = args.len();
1333 let mut result: Vec<Datum> = Vec::with_capacity(length);
1334
1335 soft_assert_or_log!(
1341 !((matches!(window_frame.units, WindowFrameUnits::Groups)
1342 || matches!(window_frame.units, WindowFrameUnits::Range))
1343 && !window_frame.includes_current_row()),
1344 "window frame without current row"
1345 );
1346
1347 if (matches!(
1348 window_frame.start_bound,
1349 WindowFrameBound::UnboundedPreceding
1350 ) && matches!(window_frame.end_bound, WindowFrameBound::UnboundedFollowing))
1351 || (order_by.is_empty()
1352 && (matches!(window_frame.units, WindowFrameUnits::Groups)
1353 || matches!(window_frame.units, WindowFrameUnits::Range))
1354 && window_frame.includes_current_row())
1355 {
1356 let result_value = wrapped_aggregate.eval(args, temp_storage);
1363 for _ in 0..length {
1365 result.push(result_value);
1366 }
1367 } else {
1368 fn rows_between_unbounded_preceding_and_current_row<'a, A>(
1369 args: Vec<Datum<'a>>,
1370 result: &mut Vec<Datum<'a>>,
1371 mut one_by_one_aggr: A,
1372 temp_storage: &'a RowArena,
1373 ) where
1374 A: OneByOneAggr,
1375 {
1376 for current_arg in args.into_iter() {
1377 one_by_one_aggr.give(¤t_arg);
1378 let result_value = one_by_one_aggr.get_current_aggregate(temp_storage);
1379 result.push(result_value);
1380 }
1381 }
1382
1383 fn groups_between_unbounded_preceding_and_current_row<'a, A>(
1384 args: Vec<Datum<'a>>,
1385 order_by_rows: &Vec<Row>,
1386 result: &mut Vec<Datum<'a>>,
1387 mut one_by_one_aggr: A,
1388 temp_storage: &'a RowArena,
1389 ) where
1390 A: OneByOneAggr,
1391 {
1392 let mut peer_group_start = 0;
1393 while peer_group_start < args.len() {
1394 let mut peer_group_end = peer_group_start + 1;
1398 while peer_group_end < args.len()
1399 && order_by_rows[peer_group_start] == order_by_rows[peer_group_end]
1400 {
1401 peer_group_end += 1;
1403 }
1404 for current_arg in args[peer_group_start..peer_group_end].iter() {
1407 one_by_one_aggr.give(current_arg);
1408 }
1409 let agg_for_peer_group = one_by_one_aggr.get_current_aggregate(temp_storage);
1410 for _ in args[peer_group_start..peer_group_end].iter() {
1412 result.push(agg_for_peer_group);
1413 }
1414 peer_group_start = peer_group_end;
1416 }
1417 }
1418
1419 fn rows_between_offset_and_offset<'a>(
1420 args: Vec<Datum<'a>>,
1421 result: &mut Vec<Datum<'a>>,
1422 wrapped_aggregate: &AggregateFunc,
1423 temp_storage: &'a RowArena,
1424 offset_start: i64,
1425 offset_end: i64,
1426 ) {
1427 let len = args
1428 .len()
1429 .to_i64()
1430 .expect("window partition's len should fit into i64");
1431 for i in 0..len {
1432 let i = i.to_i64().expect("window partition shouldn't be super big");
1433 let frame_start = max(i + offset_start, 0)
1436 .to_usize()
1437 .expect("The max made sure it's not negative");
1438 let frame_end = min(i + offset_end, len - 1).to_usize();
1441 match frame_end {
1442 Some(frame_end) => {
1443 if frame_start <= frame_end {
1444 let frame_values = args[frame_start..=frame_end].iter().cloned();
1455 let result_value = wrapped_aggregate.eval(frame_values, temp_storage);
1456 result.push(result_value);
1457 } else {
1458 let result_value = wrapped_aggregate.default();
1460 result.push(result_value);
1461 }
1462 }
1463 None => {
1464 let result_value = wrapped_aggregate.default();
1466 result.push(result_value);
1467 }
1468 }
1469 }
1470 }
1471
1472 match (
1473 &window_frame.units,
1474 &window_frame.start_bound,
1475 &window_frame.end_bound,
1476 ) {
1477 (Rows, UnboundedPreceding, CurrentRow) => {
1482 rows_between_unbounded_preceding_and_current_row::<A>(
1483 args,
1484 &mut result,
1485 A::new(wrapped_aggregate, false),
1486 temp_storage,
1487 );
1488 }
1489 (Rows, CurrentRow, UnboundedFollowing) => {
1490 args.reverse();
1492 rows_between_unbounded_preceding_and_current_row::<A>(
1493 args,
1494 &mut result,
1495 A::new(wrapped_aggregate, true),
1496 temp_storage,
1497 );
1498 result.reverse();
1499 }
1500 (Range, UnboundedPreceding, CurrentRow) => {
1501 groups_between_unbounded_preceding_and_current_row::<A>(
1504 args,
1505 order_by_rows,
1506 &mut result,
1507 A::new(wrapped_aggregate, false),
1508 temp_storage,
1509 );
1510 }
1511 (Rows, OffsetPreceding(start_prec), OffsetPreceding(end_prec)) => {
1515 let start_prec = start_prec.to_i64().expect(
1516 "window frame start OFFSET shouldn't be super big (the planning ensured this)",
1517 );
1518 let end_prec = end_prec.to_i64().expect(
1519 "window frame end OFFSET shouldn't be super big (the planning ensured this)",
1520 );
1521 rows_between_offset_and_offset(
1522 args,
1523 &mut result,
1524 wrapped_aggregate,
1525 temp_storage,
1526 -start_prec,
1527 -end_prec,
1528 );
1529 }
1530 (Rows, OffsetPreceding(start_prec), OffsetFollowing(end_fol)) => {
1531 let start_prec = start_prec.to_i64().expect(
1532 "window frame start OFFSET shouldn't be super big (the planning ensured this)",
1533 );
1534 let end_fol = end_fol.to_i64().expect(
1535 "window frame end OFFSET shouldn't be super big (the planning ensured this)",
1536 );
1537 rows_between_offset_and_offset(
1538 args,
1539 &mut result,
1540 wrapped_aggregate,
1541 temp_storage,
1542 -start_prec,
1543 end_fol,
1544 );
1545 }
1546 (Rows, OffsetFollowing(start_fol), OffsetFollowing(end_fol)) => {
1547 let start_fol = start_fol.to_i64().expect(
1548 "window frame start OFFSET shouldn't be super big (the planning ensured this)",
1549 );
1550 let end_fol = end_fol.to_i64().expect(
1551 "window frame end OFFSET shouldn't be super big (the planning ensured this)",
1552 );
1553 rows_between_offset_and_offset(
1554 args,
1555 &mut result,
1556 wrapped_aggregate,
1557 temp_storage,
1558 start_fol,
1559 end_fol,
1560 );
1561 }
1562 (Rows, OffsetFollowing(_), OffsetPreceding(_)) => {
1563 unreachable!() }
1565 (Rows, OffsetPreceding(start_prec), CurrentRow) => {
1566 let start_prec = start_prec.to_i64().expect(
1567 "window frame start OFFSET shouldn't be super big (the planning ensured this)",
1568 );
1569 let end_fol = 0;
1570 rows_between_offset_and_offset(
1571 args,
1572 &mut result,
1573 wrapped_aggregate,
1574 temp_storage,
1575 -start_prec,
1576 end_fol,
1577 );
1578 }
1579 (Rows, CurrentRow, OffsetFollowing(end_fol)) => {
1580 let start_fol = 0;
1581 let end_fol = end_fol.to_i64().expect(
1582 "window frame end OFFSET shouldn't be super big (the planning ensured this)",
1583 );
1584 rows_between_offset_and_offset(
1585 args,
1586 &mut result,
1587 wrapped_aggregate,
1588 temp_storage,
1589 start_fol,
1590 end_fol,
1591 );
1592 }
1593 (Rows, CurrentRow, CurrentRow) => {
1594 let start_fol = 0;
1597 let end_fol = 0;
1598 rows_between_offset_and_offset(
1599 args,
1600 &mut result,
1601 wrapped_aggregate,
1602 temp_storage,
1603 start_fol,
1604 end_fol,
1605 );
1606 }
1607 (Rows, CurrentRow, OffsetPreceding(_))
1608 | (Rows, UnboundedFollowing, _)
1609 | (Rows, _, UnboundedPreceding)
1610 | (Rows, OffsetFollowing(..), CurrentRow) => {
1611 unreachable!() }
1613 (Rows, UnboundedPreceding, UnboundedFollowing) => {
1614 unreachable!()
1617 }
1618 (Rows, UnboundedPreceding, OffsetPreceding(_))
1619 | (Rows, UnboundedPreceding, OffsetFollowing(_))
1620 | (Rows, OffsetPreceding(..), UnboundedFollowing)
1621 | (Rows, OffsetFollowing(..), UnboundedFollowing) => {
1622 unreachable!()
1625 }
1626 (Range, _, _) => {
1627 unreachable!()
1634 }
1635 (Groups, _, _) => {
1636 unreachable!()
1640 }
1641 }
1642 }
1643
1644 result
1645}
1646
1647fn fused_window_aggr<'a, I, A>(
1651 input_datums: I,
1652 callers_temp_storage: &'a RowArena,
1653 wrapped_aggregates: &Vec<AggregateFunc>,
1654 order_by: &Vec<ColumnOrder>,
1655 window_frame: &WindowFrame,
1656) -> Datum<'a>
1657where
1658 I: IntoIterator<Item = Datum<'a>>,
1659 A: OneByOneAggr,
1660{
1661 let temp_storage = RowArena::new();
1662 let iter = fused_window_aggr_no_list::<_, A>(
1663 input_datums,
1664 &temp_storage,
1665 wrapped_aggregates,
1666 order_by,
1667 window_frame,
1668 );
1669 callers_temp_storage.make_datum(|packer| {
1670 packer.push_list(iter);
1671 })
1672}
1673
1674fn fused_window_aggr_no_list<'a: 'b, 'b, I, A>(
1677 input_datums: I,
1678 callers_temp_storage: &'b RowArena,
1679 wrapped_aggregates: &Vec<AggregateFunc>,
1680 order_by: &Vec<ColumnOrder>,
1681 window_frame: &WindowFrame,
1682) -> impl Iterator<Item = Datum<'b>>
1683where
1684 I: IntoIterator<Item = Datum<'a>>,
1685 A: OneByOneAggr,
1686{
1687 let datums = order_aggregate_datums_with_rank(input_datums, order_by);
1690
1691 let size_hint = datums.size_hint().0;
1692 let mut argss = vec![Vec::with_capacity(size_hint); wrapped_aggregates.len()];
1693 let mut original_rows = Vec::with_capacity(size_hint);
1694 let mut order_by_rows = Vec::with_capacity(size_hint);
1695 for (d, order_by_row) in datums {
1696 let mut iter = d.unwrap_list().iter();
1697 let original_row = iter.next().unwrap();
1698 original_rows.push(original_row);
1699 let args_iter = iter.next().unwrap().unwrap_list().iter();
1700 for (args, arg) in argss.iter_mut().zip_eq(args_iter) {
1702 args.push(arg);
1703 }
1704 order_by_rows.push(order_by_row);
1705 }
1706
1707 let mut results_per_row =
1708 vec![Vec::with_capacity(wrapped_aggregates.len()); original_rows.len()];
1709 for (wrapped_aggr, args) in wrapped_aggregates.iter().zip_eq(argss) {
1710 let results = window_aggr_inner::<A>(
1711 args,
1712 &order_by_rows,
1713 wrapped_aggr,
1714 order_by,
1715 window_frame,
1716 callers_temp_storage,
1717 );
1718 for (results, result) in results_per_row.iter_mut().zip_eq(results) {
1719 results.push(result);
1720 }
1721 }
1722
1723 callers_temp_storage.reserve(2 * original_rows.len());
1724 results_per_row
1725 .into_iter()
1726 .enumerate()
1727 .map(move |(i, results)| {
1728 callers_temp_storage.make_datum(|packer| {
1729 packer.push_list_with(|packer| {
1730 packer
1731 .push(callers_temp_storage.make_datum(|packer| packer.push_list(results)));
1732 packer.push(original_rows[i]);
1733 });
1734 })
1735 })
1736}
1737
1738pub trait OneByOneAggr {
1742 fn new(agg: &AggregateFunc, reverse: bool) -> Self;
1747 fn give(&mut self, d: &Datum);
1749 fn get_current_aggregate<'a>(&self, temp_storage: &'a RowArena) -> Datum<'a>;
1751}
1752
1753#[derive(Debug)]
1759pub struct NaiveOneByOneAggr {
1760 agg: AggregateFunc,
1761 input: Vec<Row>,
1762 reverse: bool,
1763}
1764
1765impl OneByOneAggr for NaiveOneByOneAggr {
1766 fn new(agg: &AggregateFunc, reverse: bool) -> Self {
1767 NaiveOneByOneAggr {
1768 agg: agg.clone(),
1769 input: Vec::new(),
1770 reverse,
1771 }
1772 }
1773
1774 fn give(&mut self, d: &Datum) {
1775 let mut row = Row::default();
1776 row.packer().push(d);
1777 self.input.push(row);
1778 }
1779
1780 fn get_current_aggregate<'a>(&self, temp_storage: &'a RowArena) -> Datum<'a> {
1781 temp_storage.make_datum(|packer| {
1782 packer.push(if !self.reverse {
1783 self.agg
1784 .eval(self.input.iter().map(|r| r.unpack_first()), temp_storage)
1785 } else {
1786 self.agg.eval(
1787 self.input.iter().rev().map(|r| r.unpack_first()),
1788 temp_storage,
1789 )
1790 });
1791 })
1792 }
1793}
1794
1795#[derive(
1798 Arbitrary, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize, Hash, MzReflect,
1799)]
1800pub enum LagLeadType {
1801 Lag,
1802 Lead,
1803}
1804
1805#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize, Hash, MzReflect)]
1806pub enum AggregateFunc {
1807 MaxNumeric,
1808 MaxInt16,
1809 MaxInt32,
1810 MaxInt64,
1811 MaxUInt16,
1812 MaxUInt32,
1813 MaxUInt64,
1814 MaxMzTimestamp,
1815 MaxFloat32,
1816 MaxFloat64,
1817 MaxBool,
1818 MaxString,
1819 MaxDate,
1820 MaxTimestamp,
1821 MaxTimestampTz,
1822 MaxInterval,
1823 MaxTime,
1824 MinNumeric,
1825 MinInt16,
1826 MinInt32,
1827 MinInt64,
1828 MinUInt16,
1829 MinUInt32,
1830 MinUInt64,
1831 MinMzTimestamp,
1832 MinFloat32,
1833 MinFloat64,
1834 MinBool,
1835 MinString,
1836 MinDate,
1837 MinTimestamp,
1838 MinTimestampTz,
1839 MinInterval,
1840 MinTime,
1841 SumInt16,
1842 SumInt32,
1843 SumInt64,
1844 SumUInt16,
1845 SumUInt32,
1846 SumUInt64,
1847 SumFloat32,
1848 SumFloat64,
1849 SumNumeric,
1850 Count,
1851 Any,
1852 All,
1853 JsonbAgg {
1860 order_by: Vec<ColumnOrder>,
1861 },
1862 JsonbObjectAgg {
1869 order_by: Vec<ColumnOrder>,
1870 },
1871 MapAgg {
1875 order_by: Vec<ColumnOrder>,
1876 value_type: ScalarType,
1877 },
1878 ArrayConcat {
1881 order_by: Vec<ColumnOrder>,
1882 },
1883 ListConcat {
1886 order_by: Vec<ColumnOrder>,
1887 },
1888 StringAgg {
1889 order_by: Vec<ColumnOrder>,
1890 },
1891 RowNumber {
1892 order_by: Vec<ColumnOrder>,
1893 },
1894 Rank {
1895 order_by: Vec<ColumnOrder>,
1896 },
1897 DenseRank {
1898 order_by: Vec<ColumnOrder>,
1899 },
1900 LagLead {
1901 order_by: Vec<ColumnOrder>,
1902 lag_lead: LagLeadType,
1903 ignore_nulls: bool,
1904 },
1905 FirstValue {
1906 order_by: Vec<ColumnOrder>,
1907 window_frame: WindowFrame,
1908 },
1909 LastValue {
1910 order_by: Vec<ColumnOrder>,
1911 window_frame: WindowFrame,
1912 },
1913 FusedValueWindowFunc {
1915 funcs: Vec<AggregateFunc>,
1916 order_by: Vec<ColumnOrder>,
1919 },
1920 WindowAggregate {
1921 wrapped_aggregate: Box<AggregateFunc>,
1922 order_by: Vec<ColumnOrder>,
1923 window_frame: WindowFrame,
1924 },
1925 FusedWindowAggregate {
1926 wrapped_aggregates: Vec<AggregateFunc>,
1927 order_by: Vec<ColumnOrder>,
1928 window_frame: WindowFrame,
1929 },
1930 Dummy,
1935}
1936
1937impl Arbitrary for AggregateFunc {
1944 type Parameters = ();
1945
1946 type Strategy = Union<BoxedStrategy<Self>>;
1947
1948 fn arbitrary_with(_: Self::Parameters) -> Self::Strategy {
1949 use proptest::collection::vec;
1950 use proptest::prelude::any as proptest_any;
1951 Union::new(vec![
1952 Just(AggregateFunc::MaxNumeric).boxed(),
1953 Just(AggregateFunc::MaxInt16).boxed(),
1954 Just(AggregateFunc::MaxInt32).boxed(),
1955 Just(AggregateFunc::MaxInt64).boxed(),
1956 Just(AggregateFunc::MaxUInt16).boxed(),
1957 Just(AggregateFunc::MaxUInt32).boxed(),
1958 Just(AggregateFunc::MaxUInt64).boxed(),
1959 Just(AggregateFunc::MaxMzTimestamp).boxed(),
1960 Just(AggregateFunc::MaxFloat32).boxed(),
1961 Just(AggregateFunc::MaxFloat64).boxed(),
1962 Just(AggregateFunc::MaxBool).boxed(),
1963 Just(AggregateFunc::MaxString).boxed(),
1964 Just(AggregateFunc::MaxTimestamp).boxed(),
1965 Just(AggregateFunc::MaxDate).boxed(),
1966 Just(AggregateFunc::MaxTimestampTz).boxed(),
1967 Just(AggregateFunc::MaxInterval).boxed(),
1968 Just(AggregateFunc::MaxTime).boxed(),
1969 Just(AggregateFunc::MinNumeric).boxed(),
1970 Just(AggregateFunc::MinInt16).boxed(),
1971 Just(AggregateFunc::MinInt32).boxed(),
1972 Just(AggregateFunc::MinInt64).boxed(),
1973 Just(AggregateFunc::MinUInt16).boxed(),
1974 Just(AggregateFunc::MinUInt32).boxed(),
1975 Just(AggregateFunc::MinUInt64).boxed(),
1976 Just(AggregateFunc::MinMzTimestamp).boxed(),
1977 Just(AggregateFunc::MinFloat32).boxed(),
1978 Just(AggregateFunc::MinFloat64).boxed(),
1979 Just(AggregateFunc::MinBool).boxed(),
1980 Just(AggregateFunc::MinString).boxed(),
1981 Just(AggregateFunc::MinDate).boxed(),
1982 Just(AggregateFunc::MinTimestamp).boxed(),
1983 Just(AggregateFunc::MinTimestampTz).boxed(),
1984 Just(AggregateFunc::MinInterval).boxed(),
1985 Just(AggregateFunc::MinTime).boxed(),
1986 Just(AggregateFunc::SumInt16).boxed(),
1987 Just(AggregateFunc::SumInt32).boxed(),
1988 Just(AggregateFunc::SumInt64).boxed(),
1989 Just(AggregateFunc::SumUInt16).boxed(),
1990 Just(AggregateFunc::SumUInt32).boxed(),
1991 Just(AggregateFunc::SumUInt64).boxed(),
1992 Just(AggregateFunc::SumFloat32).boxed(),
1993 Just(AggregateFunc::SumFloat64).boxed(),
1994 Just(AggregateFunc::SumNumeric).boxed(),
1995 Just(AggregateFunc::Count).boxed(),
1996 Just(AggregateFunc::Any).boxed(),
1997 Just(AggregateFunc::All).boxed(),
1998 vec(proptest_any::<ColumnOrder>(), 1..4)
1999 .prop_map(|order_by| AggregateFunc::JsonbAgg { order_by })
2000 .boxed(),
2001 vec(proptest_any::<ColumnOrder>(), 1..4)
2002 .prop_map(|order_by| AggregateFunc::JsonbObjectAgg { order_by })
2003 .boxed(),
2004 (
2005 vec(proptest_any::<ColumnOrder>(), 1..4),
2006 proptest_any::<ScalarType>(),
2007 )
2008 .prop_map(|(order_by, value_type)| AggregateFunc::MapAgg {
2009 order_by,
2010 value_type,
2011 })
2012 .boxed(),
2013 vec(proptest_any::<ColumnOrder>(), 1..4)
2014 .prop_map(|order_by| AggregateFunc::ArrayConcat { order_by })
2015 .boxed(),
2016 vec(proptest_any::<ColumnOrder>(), 1..4)
2017 .prop_map(|order_by| AggregateFunc::ListConcat { order_by })
2018 .boxed(),
2019 vec(proptest_any::<ColumnOrder>(), 1..4)
2020 .prop_map(|order_by| AggregateFunc::StringAgg { order_by })
2021 .boxed(),
2022 vec(proptest_any::<ColumnOrder>(), 1..4)
2023 .prop_map(|order_by| AggregateFunc::RowNumber { order_by })
2024 .boxed(),
2025 vec(proptest_any::<ColumnOrder>(), 1..4)
2026 .prop_map(|order_by| AggregateFunc::DenseRank { order_by })
2027 .boxed(),
2028 (
2029 vec(proptest_any::<ColumnOrder>(), 1..4),
2030 proptest_any::<LagLeadType>(),
2031 proptest_any::<bool>(),
2032 )
2033 .prop_map(
2034 |(order_by, lag_lead, ignore_nulls)| AggregateFunc::LagLead {
2035 order_by,
2036 lag_lead,
2037 ignore_nulls,
2038 },
2039 )
2040 .boxed(),
2041 (
2042 vec(proptest_any::<ColumnOrder>(), 1..4),
2043 proptest_any::<WindowFrame>(),
2044 )
2045 .prop_map(|(order_by, window_frame)| AggregateFunc::FirstValue {
2046 order_by,
2047 window_frame,
2048 })
2049 .boxed(),
2050 (
2051 vec(proptest_any::<ColumnOrder>(), 1..4),
2052 proptest_any::<WindowFrame>(),
2053 )
2054 .prop_map(|(order_by, window_frame)| AggregateFunc::LastValue {
2055 order_by,
2056 window_frame,
2057 })
2058 .boxed(),
2059 Just(AggregateFunc::Dummy).boxed(),
2060 ])
2061 }
2062}
2063
2064impl RustType<ProtoColumnOrders> for Vec<ColumnOrder> {
2065 fn into_proto(&self) -> ProtoColumnOrders {
2066 ProtoColumnOrders {
2067 orders: self.into_proto(),
2068 }
2069 }
2070
2071 fn from_proto(proto: ProtoColumnOrders) -> Result<Self, TryFromProtoError> {
2072 proto.orders.into_rust()
2073 }
2074}
2075
2076impl RustType<ProtoAggregateFunc> for AggregateFunc {
2077 fn into_proto(&self) -> ProtoAggregateFunc {
2078 use proto_aggregate_func::Kind;
2079 ProtoAggregateFunc {
2080 kind: Some(match self {
2081 AggregateFunc::MaxNumeric => Kind::MaxNumeric(()),
2082 AggregateFunc::MaxInt16 => Kind::MaxInt16(()),
2083 AggregateFunc::MaxInt32 => Kind::MaxInt32(()),
2084 AggregateFunc::MaxInt64 => Kind::MaxInt64(()),
2085 AggregateFunc::MaxUInt16 => Kind::MaxUint16(()),
2086 AggregateFunc::MaxUInt32 => Kind::MaxUint32(()),
2087 AggregateFunc::MaxUInt64 => Kind::MaxUint64(()),
2088 AggregateFunc::MaxMzTimestamp => Kind::MaxMzTimestamp(()),
2089 AggregateFunc::MaxFloat32 => Kind::MaxFloat32(()),
2090 AggregateFunc::MaxFloat64 => Kind::MaxFloat64(()),
2091 AggregateFunc::MaxBool => Kind::MaxBool(()),
2092 AggregateFunc::MaxString => Kind::MaxString(()),
2093 AggregateFunc::MaxDate => Kind::MaxDate(()),
2094 AggregateFunc::MaxTimestamp => Kind::MaxTimestamp(()),
2095 AggregateFunc::MaxTimestampTz => Kind::MaxTimestampTz(()),
2096 AggregateFunc::MinNumeric => Kind::MinNumeric(()),
2097 AggregateFunc::MaxInterval => Kind::MaxInterval(()),
2098 AggregateFunc::MaxTime => Kind::MaxTime(()),
2099 AggregateFunc::MinInt16 => Kind::MinInt16(()),
2100 AggregateFunc::MinInt32 => Kind::MinInt32(()),
2101 AggregateFunc::MinInt64 => Kind::MinInt64(()),
2102 AggregateFunc::MinUInt16 => Kind::MinUint16(()),
2103 AggregateFunc::MinUInt32 => Kind::MinUint32(()),
2104 AggregateFunc::MinUInt64 => Kind::MinUint64(()),
2105 AggregateFunc::MinMzTimestamp => Kind::MinMzTimestamp(()),
2106 AggregateFunc::MinFloat32 => Kind::MinFloat32(()),
2107 AggregateFunc::MinFloat64 => Kind::MinFloat64(()),
2108 AggregateFunc::MinBool => Kind::MinBool(()),
2109 AggregateFunc::MinString => Kind::MinString(()),
2110 AggregateFunc::MinDate => Kind::MinDate(()),
2111 AggregateFunc::MinTimestamp => Kind::MinTimestamp(()),
2112 AggregateFunc::MinTimestampTz => Kind::MinTimestampTz(()),
2113 AggregateFunc::MinInterval => Kind::MinInterval(()),
2114 AggregateFunc::MinTime => Kind::MinTime(()),
2115 AggregateFunc::SumInt16 => Kind::SumInt16(()),
2116 AggregateFunc::SumInt32 => Kind::SumInt32(()),
2117 AggregateFunc::SumInt64 => Kind::SumInt64(()),
2118 AggregateFunc::SumUInt16 => Kind::SumUint16(()),
2119 AggregateFunc::SumUInt32 => Kind::SumUint32(()),
2120 AggregateFunc::SumUInt64 => Kind::SumUint64(()),
2121 AggregateFunc::SumFloat32 => Kind::SumFloat32(()),
2122 AggregateFunc::SumFloat64 => Kind::SumFloat64(()),
2123 AggregateFunc::SumNumeric => Kind::SumNumeric(()),
2124 AggregateFunc::Count => Kind::Count(()),
2125 AggregateFunc::Any => Kind::Any(()),
2126 AggregateFunc::All => Kind::All(()),
2127 AggregateFunc::JsonbAgg { order_by } => Kind::JsonbAgg(order_by.into_proto()),
2128 AggregateFunc::JsonbObjectAgg { order_by } => {
2129 Kind::JsonbObjectAgg(order_by.into_proto())
2130 }
2131 AggregateFunc::MapAgg {
2132 order_by,
2133 value_type,
2134 } => Kind::MapAgg(proto_aggregate_func::ProtoMapAgg {
2135 order_by: Some(order_by.into_proto()),
2136 value_type: Some(value_type.into_proto()),
2137 }),
2138 AggregateFunc::ArrayConcat { order_by } => Kind::ArrayConcat(order_by.into_proto()),
2139 AggregateFunc::ListConcat { order_by } => Kind::ListConcat(order_by.into_proto()),
2140 AggregateFunc::StringAgg { order_by } => Kind::StringAgg(order_by.into_proto()),
2141 AggregateFunc::RowNumber { order_by } => Kind::RowNumber(order_by.into_proto()),
2142 AggregateFunc::Rank { order_by } => Kind::Rank(order_by.into_proto()),
2143 AggregateFunc::DenseRank { order_by } => Kind::DenseRank(order_by.into_proto()),
2144 AggregateFunc::LagLead {
2145 order_by,
2146 lag_lead,
2147 ignore_nulls,
2148 } => Kind::LagLead(proto_aggregate_func::ProtoLagLead {
2149 order_by: Some(order_by.into_proto()),
2150 lag_lead: Some(match lag_lead {
2151 LagLeadType::Lag => proto_aggregate_func::proto_lag_lead::LagLead::Lag(()),
2152 LagLeadType::Lead => {
2153 proto_aggregate_func::proto_lag_lead::LagLead::Lead(())
2154 }
2155 }),
2156 ignore_nulls: *ignore_nulls,
2157 }),
2158 AggregateFunc::FirstValue {
2159 order_by,
2160 window_frame,
2161 } => Kind::FirstValue(proto_aggregate_func::ProtoFramedWindowFunc {
2162 order_by: Some(order_by.into_proto()),
2163 window_frame: Some(window_frame.into_proto()),
2164 }),
2165 AggregateFunc::LastValue {
2166 order_by,
2167 window_frame,
2168 } => Kind::LastValue(proto_aggregate_func::ProtoFramedWindowFunc {
2169 order_by: Some(order_by.into_proto()),
2170 window_frame: Some(window_frame.into_proto()),
2171 }),
2172 AggregateFunc::WindowAggregate {
2173 wrapped_aggregate,
2174 order_by,
2175 window_frame,
2176 } => Kind::WindowAggregate(Box::new(proto_aggregate_func::ProtoWindowAggregate {
2177 wrapped_aggregate: Some(wrapped_aggregate.into_proto()),
2178 order_by: Some(order_by.into_proto()),
2179 window_frame: Some(window_frame.into_proto()),
2180 })),
2181 AggregateFunc::FusedValueWindowFunc { funcs, order_by } => {
2182 Kind::FusedValueWindowFunc(ProtoFusedValueWindowFunc {
2183 funcs: funcs.into_proto(),
2184 order_by: Some(order_by.into_proto()),
2185 })
2186 }
2187 AggregateFunc::FusedWindowAggregate {
2188 wrapped_aggregates,
2189 order_by,
2190 window_frame,
2191 } => Kind::FusedWindowAggregate(ProtoFusedWindowAggregate {
2192 wrapped_aggregates: wrapped_aggregates.into_proto(),
2193 order_by: Some(order_by.into_proto()),
2194 window_frame: Some(window_frame.into_proto()),
2195 }),
2196 AggregateFunc::Dummy => Kind::Dummy(()),
2197 }),
2198 }
2199 }
2200
2201 fn from_proto(proto: ProtoAggregateFunc) -> Result<Self, TryFromProtoError> {
2202 use proto_aggregate_func::Kind;
2203 let kind = proto
2204 .kind
2205 .ok_or_else(|| TryFromProtoError::missing_field("ProtoAggregateFunc::kind"))?;
2206 Ok(match kind {
2207 Kind::MaxNumeric(()) => AggregateFunc::MaxNumeric,
2208 Kind::MaxInt16(()) => AggregateFunc::MaxInt16,
2209 Kind::MaxInt32(()) => AggregateFunc::MaxInt32,
2210 Kind::MaxInt64(()) => AggregateFunc::MaxInt64,
2211 Kind::MaxUint16(()) => AggregateFunc::MaxUInt16,
2212 Kind::MaxUint32(()) => AggregateFunc::MaxUInt32,
2213 Kind::MaxUint64(()) => AggregateFunc::MaxUInt64,
2214 Kind::MaxMzTimestamp(()) => AggregateFunc::MaxMzTimestamp,
2215 Kind::MaxFloat32(()) => AggregateFunc::MaxFloat32,
2216 Kind::MaxFloat64(()) => AggregateFunc::MaxFloat64,
2217 Kind::MaxBool(()) => AggregateFunc::MaxBool,
2218 Kind::MaxString(()) => AggregateFunc::MaxString,
2219 Kind::MaxDate(()) => AggregateFunc::MaxDate,
2220 Kind::MaxTimestamp(()) => AggregateFunc::MaxTimestamp,
2221 Kind::MaxTimestampTz(()) => AggregateFunc::MaxTimestampTz,
2222 Kind::MaxInterval(()) => AggregateFunc::MaxInterval,
2223 Kind::MaxTime(()) => AggregateFunc::MaxTime,
2224 Kind::MinNumeric(()) => AggregateFunc::MinNumeric,
2225 Kind::MinInt16(()) => AggregateFunc::MinInt16,
2226 Kind::MinInt32(()) => AggregateFunc::MinInt32,
2227 Kind::MinInt64(()) => AggregateFunc::MinInt64,
2228 Kind::MinUint16(()) => AggregateFunc::MinUInt16,
2229 Kind::MinUint32(()) => AggregateFunc::MinUInt32,
2230 Kind::MinUint64(()) => AggregateFunc::MinUInt64,
2231 Kind::MinMzTimestamp(()) => AggregateFunc::MinMzTimestamp,
2232 Kind::MinFloat32(()) => AggregateFunc::MinFloat32,
2233 Kind::MinFloat64(()) => AggregateFunc::MinFloat64,
2234 Kind::MinBool(()) => AggregateFunc::MinBool,
2235 Kind::MinString(()) => AggregateFunc::MinString,
2236 Kind::MinDate(()) => AggregateFunc::MinDate,
2237 Kind::MinTimestamp(()) => AggregateFunc::MinTimestamp,
2238 Kind::MinTimestampTz(()) => AggregateFunc::MinTimestampTz,
2239 Kind::MinInterval(()) => AggregateFunc::MinInterval,
2240 Kind::MinTime(()) => AggregateFunc::MinTime,
2241 Kind::SumInt16(()) => AggregateFunc::SumInt16,
2242 Kind::SumInt32(()) => AggregateFunc::SumInt32,
2243 Kind::SumInt64(()) => AggregateFunc::SumInt64,
2244 Kind::SumUint16(()) => AggregateFunc::SumUInt16,
2245 Kind::SumUint32(()) => AggregateFunc::SumUInt32,
2246 Kind::SumUint64(()) => AggregateFunc::SumUInt64,
2247 Kind::SumFloat32(()) => AggregateFunc::SumFloat32,
2248 Kind::SumFloat64(()) => AggregateFunc::SumFloat64,
2249 Kind::SumNumeric(()) => AggregateFunc::SumNumeric,
2250 Kind::Count(()) => AggregateFunc::Count,
2251 Kind::Any(()) => AggregateFunc::Any,
2252 Kind::All(()) => AggregateFunc::All,
2253 Kind::JsonbAgg(order_by) => AggregateFunc::JsonbAgg {
2254 order_by: order_by.into_rust()?,
2255 },
2256 Kind::JsonbObjectAgg(order_by) => AggregateFunc::JsonbObjectAgg {
2257 order_by: order_by.into_rust()?,
2258 },
2259 Kind::MapAgg(pma) => AggregateFunc::MapAgg {
2260 order_by: pma.order_by.into_rust_if_some("ProtoMapAgg::order_by")?,
2261 value_type: pma
2262 .value_type
2263 .into_rust_if_some("ProtoMapAgg::value_type")?,
2264 },
2265 Kind::ArrayConcat(order_by) => AggregateFunc::ArrayConcat {
2266 order_by: order_by.into_rust()?,
2267 },
2268 Kind::ListConcat(order_by) => AggregateFunc::ListConcat {
2269 order_by: order_by.into_rust()?,
2270 },
2271 Kind::StringAgg(order_by) => AggregateFunc::StringAgg {
2272 order_by: order_by.into_rust()?,
2273 },
2274 Kind::RowNumber(order_by) => AggregateFunc::RowNumber {
2275 order_by: order_by.into_rust()?,
2276 },
2277 Kind::Rank(order_by) => AggregateFunc::Rank {
2278 order_by: order_by.into_rust()?,
2279 },
2280 Kind::DenseRank(order_by) => AggregateFunc::DenseRank {
2281 order_by: order_by.into_rust()?,
2282 },
2283 Kind::LagLead(pll) => AggregateFunc::LagLead {
2284 order_by: pll.order_by.into_rust_if_some("ProtoLagLead::order_by")?,
2285 lag_lead: match pll.lag_lead {
2286 Some(proto_aggregate_func::proto_lag_lead::LagLead::Lag(())) => {
2287 LagLeadType::Lag
2288 }
2289 Some(proto_aggregate_func::proto_lag_lead::LagLead::Lead(())) => {
2290 LagLeadType::Lead
2291 }
2292 None => {
2293 return Err(TryFromProtoError::MissingField(
2294 "ProtoLagLead::lag_lead".into(),
2295 ));
2296 }
2297 },
2298 ignore_nulls: pll.ignore_nulls,
2299 },
2300 Kind::FirstValue(pfv) => AggregateFunc::FirstValue {
2301 order_by: pfv
2302 .order_by
2303 .into_rust_if_some("ProtoFramedWindowFunc::order_by")?,
2304 window_frame: pfv
2305 .window_frame
2306 .into_rust_if_some("ProtoFramedWindowFunc::window_frame")?,
2307 },
2308 Kind::LastValue(pfv) => AggregateFunc::LastValue {
2309 order_by: pfv
2310 .order_by
2311 .into_rust_if_some("ProtoFramedWindowFunc::order_by")?,
2312 window_frame: pfv
2313 .window_frame
2314 .into_rust_if_some("ProtoFramedWindowFunc::window_frame")?,
2315 },
2316 Kind::WindowAggregate(paf) => AggregateFunc::WindowAggregate {
2317 wrapped_aggregate: paf
2318 .wrapped_aggregate
2319 .into_rust_if_some("ProtoWindowAggregate::wrapped_aggregate")?,
2320 order_by: paf
2321 .order_by
2322 .into_rust_if_some("ProtoWindowAggregate::order_by")?,
2323 window_frame: paf
2324 .window_frame
2325 .into_rust_if_some("ProtoWindowAggregate::window_frame")?,
2326 },
2327 Kind::FusedValueWindowFunc(fvwf) => AggregateFunc::FusedValueWindowFunc {
2328 funcs: fvwf.funcs.into_rust()?,
2329 order_by: fvwf
2330 .order_by
2331 .into_rust_if_some("ProtoFusedValueWindowFunc::order_by")?,
2332 },
2333 Kind::FusedWindowAggregate(fwa) => AggregateFunc::FusedWindowAggregate {
2334 wrapped_aggregates: fwa.wrapped_aggregates.into_rust()?,
2335 order_by: fwa
2336 .order_by
2337 .into_rust_if_some("ProtoFusedWindowAggregate::order_by")?,
2338 window_frame: fwa
2339 .window_frame
2340 .into_rust_if_some("ProtoFusedWindowAggregate::window_frame")?,
2341 },
2342 Kind::Dummy(()) => AggregateFunc::Dummy,
2343 })
2344 }
2345}
2346
2347impl AggregateFunc {
2348 pub fn eval<'a, I>(&self, datums: I, temp_storage: &'a RowArena) -> Datum<'a>
2349 where
2350 I: IntoIterator<Item = Datum<'a>>,
2351 {
2352 match self {
2353 AggregateFunc::MaxNumeric => {
2354 max_datum::<'a, I, OrderedDecimal<numeric::Numeric>>(datums)
2355 }
2356 AggregateFunc::MaxInt16 => max_datum::<'a, I, i16>(datums),
2357 AggregateFunc::MaxInt32 => max_datum::<'a, I, i32>(datums),
2358 AggregateFunc::MaxInt64 => max_datum::<'a, I, i64>(datums),
2359 AggregateFunc::MaxUInt16 => max_datum::<'a, I, u16>(datums),
2360 AggregateFunc::MaxUInt32 => max_datum::<'a, I, u32>(datums),
2361 AggregateFunc::MaxUInt64 => max_datum::<'a, I, u64>(datums),
2362 AggregateFunc::MaxMzTimestamp => max_datum::<'a, I, mz_repr::Timestamp>(datums),
2363 AggregateFunc::MaxFloat32 => max_datum::<'a, I, OrderedFloat<f32>>(datums),
2364 AggregateFunc::MaxFloat64 => max_datum::<'a, I, OrderedFloat<f64>>(datums),
2365 AggregateFunc::MaxBool => max_datum::<'a, I, bool>(datums),
2366 AggregateFunc::MaxString => max_string(datums),
2367 AggregateFunc::MaxDate => max_datum::<'a, I, Date>(datums),
2368 AggregateFunc::MaxTimestamp => {
2369 max_datum::<'a, I, CheckedTimestamp<NaiveDateTime>>(datums)
2370 }
2371 AggregateFunc::MaxTimestampTz => {
2372 max_datum::<'a, I, CheckedTimestamp<DateTime<Utc>>>(datums)
2373 }
2374 AggregateFunc::MaxInterval => max_datum::<'a, I, Interval>(datums),
2375 AggregateFunc::MaxTime => max_datum::<'a, I, NaiveTime>(datums),
2376 AggregateFunc::MinNumeric => {
2377 min_datum::<'a, I, OrderedDecimal<numeric::Numeric>>(datums)
2378 }
2379 AggregateFunc::MinInt16 => min_datum::<'a, I, i16>(datums),
2380 AggregateFunc::MinInt32 => min_datum::<'a, I, i32>(datums),
2381 AggregateFunc::MinInt64 => min_datum::<'a, I, i64>(datums),
2382 AggregateFunc::MinUInt16 => min_datum::<'a, I, u16>(datums),
2383 AggregateFunc::MinUInt32 => min_datum::<'a, I, u32>(datums),
2384 AggregateFunc::MinUInt64 => min_datum::<'a, I, u64>(datums),
2385 AggregateFunc::MinMzTimestamp => min_datum::<'a, I, mz_repr::Timestamp>(datums),
2386 AggregateFunc::MinFloat32 => min_datum::<'a, I, OrderedFloat<f32>>(datums),
2387 AggregateFunc::MinFloat64 => min_datum::<'a, I, OrderedFloat<f64>>(datums),
2388 AggregateFunc::MinBool => min_datum::<'a, I, bool>(datums),
2389 AggregateFunc::MinString => min_string(datums),
2390 AggregateFunc::MinDate => min_datum::<'a, I, Date>(datums),
2391 AggregateFunc::MinTimestamp => {
2392 min_datum::<'a, I, CheckedTimestamp<NaiveDateTime>>(datums)
2393 }
2394 AggregateFunc::MinTimestampTz => {
2395 min_datum::<'a, I, CheckedTimestamp<DateTime<Utc>>>(datums)
2396 }
2397 AggregateFunc::MinInterval => min_datum::<'a, I, Interval>(datums),
2398 AggregateFunc::MinTime => min_datum::<'a, I, NaiveTime>(datums),
2399 AggregateFunc::SumInt16 => sum_datum::<'a, I, i16, i64>(datums),
2400 AggregateFunc::SumInt32 => sum_datum::<'a, I, i32, i64>(datums),
2401 AggregateFunc::SumInt64 => sum_datum::<'a, I, i64, i128>(datums),
2402 AggregateFunc::SumUInt16 => sum_datum::<'a, I, u16, u64>(datums),
2403 AggregateFunc::SumUInt32 => sum_datum::<'a, I, u32, u64>(datums),
2404 AggregateFunc::SumUInt64 => sum_datum::<'a, I, u64, u128>(datums),
2405 AggregateFunc::SumFloat32 => sum_datum::<'a, I, f32, f32>(datums),
2406 AggregateFunc::SumFloat64 => sum_datum::<'a, I, f64, f64>(datums),
2407 AggregateFunc::SumNumeric => sum_numeric(datums),
2408 AggregateFunc::Count => count(datums),
2409 AggregateFunc::Any => any(datums),
2410 AggregateFunc::All => all(datums),
2411 AggregateFunc::JsonbAgg { order_by } => jsonb_agg(datums, temp_storage, order_by),
2412 AggregateFunc::MapAgg { order_by, .. } | AggregateFunc::JsonbObjectAgg { order_by } => {
2413 dict_agg(datums, temp_storage, order_by)
2414 }
2415 AggregateFunc::ArrayConcat { order_by } => array_concat(datums, temp_storage, order_by),
2416 AggregateFunc::ListConcat { order_by } => list_concat(datums, temp_storage, order_by),
2417 AggregateFunc::StringAgg { order_by } => string_agg(datums, temp_storage, order_by),
2418 AggregateFunc::RowNumber { order_by } => row_number(datums, temp_storage, order_by),
2419 AggregateFunc::Rank { order_by } => rank(datums, temp_storage, order_by),
2420 AggregateFunc::DenseRank { order_by } => dense_rank(datums, temp_storage, order_by),
2421 AggregateFunc::LagLead {
2422 order_by,
2423 lag_lead: lag_lead_type,
2424 ignore_nulls,
2425 } => lag_lead(datums, temp_storage, order_by, lag_lead_type, ignore_nulls),
2426 AggregateFunc::FirstValue {
2427 order_by,
2428 window_frame,
2429 } => first_value(datums, temp_storage, order_by, window_frame),
2430 AggregateFunc::LastValue {
2431 order_by,
2432 window_frame,
2433 } => last_value(datums, temp_storage, order_by, window_frame),
2434 AggregateFunc::WindowAggregate {
2435 wrapped_aggregate,
2436 order_by,
2437 window_frame,
2438 } => window_aggr::<_, NaiveOneByOneAggr>(
2439 datums,
2440 temp_storage,
2441 wrapped_aggregate,
2442 order_by,
2443 window_frame,
2444 ),
2445 AggregateFunc::FusedValueWindowFunc { funcs, order_by } => {
2446 fused_value_window_func(datums, temp_storage, funcs, order_by)
2447 }
2448 AggregateFunc::FusedWindowAggregate {
2449 wrapped_aggregates,
2450 order_by,
2451 window_frame,
2452 } => fused_window_aggr::<_, NaiveOneByOneAggr>(
2453 datums,
2454 temp_storage,
2455 wrapped_aggregates,
2456 order_by,
2457 window_frame,
2458 ),
2459 AggregateFunc::Dummy => Datum::Dummy,
2460 }
2461 }
2462
2463 pub fn eval_with_fast_window_agg<'a, I, W>(
2467 &self,
2468 datums: I,
2469 temp_storage: &'a RowArena,
2470 ) -> Datum<'a>
2471 where
2472 I: IntoIterator<Item = Datum<'a>>,
2473 W: OneByOneAggr,
2474 {
2475 match self {
2476 AggregateFunc::WindowAggregate {
2477 wrapped_aggregate,
2478 order_by,
2479 window_frame,
2480 } => window_aggr::<_, W>(
2481 datums,
2482 temp_storage,
2483 wrapped_aggregate,
2484 order_by,
2485 window_frame,
2486 ),
2487 AggregateFunc::FusedWindowAggregate {
2488 wrapped_aggregates,
2489 order_by,
2490 window_frame,
2491 } => fused_window_aggr::<_, W>(
2492 datums,
2493 temp_storage,
2494 wrapped_aggregates,
2495 order_by,
2496 window_frame,
2497 ),
2498 _ => self.eval(datums, temp_storage),
2499 }
2500 }
2501
2502 pub fn eval_with_unnest_list<'a, I, W>(
2503 &self,
2504 datums: I,
2505 temp_storage: &'a RowArena,
2506 ) -> impl Iterator<Item = Datum<'a>>
2507 where
2508 I: IntoIterator<Item = Datum<'a>>,
2509 W: OneByOneAggr,
2510 {
2511 assert!(self.can_fuse_with_unnest_list());
2513 match self {
2514 AggregateFunc::RowNumber { order_by } => {
2515 row_number_no_list(datums, temp_storage, order_by).collect_vec()
2516 }
2517 AggregateFunc::Rank { order_by } => {
2518 rank_no_list(datums, temp_storage, order_by).collect_vec()
2519 }
2520 AggregateFunc::DenseRank { order_by } => {
2521 dense_rank_no_list(datums, temp_storage, order_by).collect_vec()
2522 }
2523 AggregateFunc::LagLead {
2524 order_by,
2525 lag_lead: lag_lead_type,
2526 ignore_nulls,
2527 } => lag_lead_no_list(datums, temp_storage, order_by, lag_lead_type, ignore_nulls)
2528 .collect_vec(),
2529 AggregateFunc::FirstValue {
2530 order_by,
2531 window_frame,
2532 } => first_value_no_list(datums, temp_storage, order_by, window_frame).collect_vec(),
2533 AggregateFunc::LastValue {
2534 order_by,
2535 window_frame,
2536 } => last_value_no_list(datums, temp_storage, order_by, window_frame).collect_vec(),
2537 AggregateFunc::FusedValueWindowFunc { funcs, order_by } => {
2538 fused_value_window_func_no_list(datums, temp_storage, funcs, order_by).collect_vec()
2539 }
2540 AggregateFunc::WindowAggregate {
2541 wrapped_aggregate,
2542 order_by,
2543 window_frame,
2544 } => window_aggr_no_list::<_, W>(
2545 datums,
2546 temp_storage,
2547 wrapped_aggregate,
2548 order_by,
2549 window_frame,
2550 )
2551 .collect_vec(),
2552 AggregateFunc::FusedWindowAggregate {
2553 wrapped_aggregates,
2554 order_by,
2555 window_frame,
2556 } => fused_window_aggr_no_list::<_, W>(
2557 datums,
2558 temp_storage,
2559 wrapped_aggregates,
2560 order_by,
2561 window_frame,
2562 )
2563 .collect_vec(),
2564 _ => unreachable!("asserted above that `can_fuse_with_unnest_list`"),
2565 }
2566 .into_iter()
2567 }
2568
2569 pub fn default(&self) -> Datum<'static> {
2572 match self {
2573 AggregateFunc::Count => Datum::Int64(0),
2574 AggregateFunc::Any => Datum::False,
2575 AggregateFunc::All => Datum::True,
2576 AggregateFunc::Dummy => Datum::Dummy,
2577 _ => Datum::Null,
2578 }
2579 }
2580
2581 pub fn identity_datum(&self) -> Datum<'static> {
2584 match self {
2585 AggregateFunc::Any => Datum::False,
2586 AggregateFunc::All => Datum::True,
2587 AggregateFunc::Dummy => Datum::Dummy,
2588 AggregateFunc::ArrayConcat { .. } => Datum::empty_array(),
2589 AggregateFunc::ListConcat { .. } => Datum::empty_list(),
2590 AggregateFunc::RowNumber { .. }
2591 | AggregateFunc::Rank { .. }
2592 | AggregateFunc::DenseRank { .. }
2593 | AggregateFunc::LagLead { .. }
2594 | AggregateFunc::FirstValue { .. }
2595 | AggregateFunc::LastValue { .. }
2596 | AggregateFunc::WindowAggregate { .. }
2597 | AggregateFunc::FusedValueWindowFunc { .. }
2598 | AggregateFunc::FusedWindowAggregate { .. } => Datum::empty_list(),
2599 AggregateFunc::MaxNumeric
2600 | AggregateFunc::MaxInt16
2601 | AggregateFunc::MaxInt32
2602 | AggregateFunc::MaxInt64
2603 | AggregateFunc::MaxUInt16
2604 | AggregateFunc::MaxUInt32
2605 | AggregateFunc::MaxUInt64
2606 | AggregateFunc::MaxMzTimestamp
2607 | AggregateFunc::MaxFloat32
2608 | AggregateFunc::MaxFloat64
2609 | AggregateFunc::MaxBool
2610 | AggregateFunc::MaxString
2611 | AggregateFunc::MaxDate
2612 | AggregateFunc::MaxTimestamp
2613 | AggregateFunc::MaxTimestampTz
2614 | AggregateFunc::MaxInterval
2615 | AggregateFunc::MaxTime
2616 | AggregateFunc::MinNumeric
2617 | AggregateFunc::MinInt16
2618 | AggregateFunc::MinInt32
2619 | AggregateFunc::MinInt64
2620 | AggregateFunc::MinUInt16
2621 | AggregateFunc::MinUInt32
2622 | AggregateFunc::MinUInt64
2623 | AggregateFunc::MinMzTimestamp
2624 | AggregateFunc::MinFloat32
2625 | AggregateFunc::MinFloat64
2626 | AggregateFunc::MinBool
2627 | AggregateFunc::MinString
2628 | AggregateFunc::MinDate
2629 | AggregateFunc::MinTimestamp
2630 | AggregateFunc::MinTimestampTz
2631 | AggregateFunc::MinInterval
2632 | AggregateFunc::MinTime
2633 | AggregateFunc::SumInt16
2634 | AggregateFunc::SumInt32
2635 | AggregateFunc::SumInt64
2636 | AggregateFunc::SumUInt16
2637 | AggregateFunc::SumUInt32
2638 | AggregateFunc::SumUInt64
2639 | AggregateFunc::SumFloat32
2640 | AggregateFunc::SumFloat64
2641 | AggregateFunc::SumNumeric
2642 | AggregateFunc::Count
2643 | AggregateFunc::JsonbAgg { .. }
2644 | AggregateFunc::JsonbObjectAgg { .. }
2645 | AggregateFunc::MapAgg { .. }
2646 | AggregateFunc::StringAgg { .. } => Datum::Null,
2647 }
2648 }
2649
2650 pub fn can_fuse_with_unnest_list(&self) -> bool {
2651 match self {
2652 AggregateFunc::RowNumber { .. }
2653 | AggregateFunc::Rank { .. }
2654 | AggregateFunc::DenseRank { .. }
2655 | AggregateFunc::LagLead { .. }
2656 | AggregateFunc::FirstValue { .. }
2657 | AggregateFunc::LastValue { .. }
2658 | AggregateFunc::WindowAggregate { .. }
2659 | AggregateFunc::FusedValueWindowFunc { .. }
2660 | AggregateFunc::FusedWindowAggregate { .. } => true,
2661 AggregateFunc::ArrayConcat { .. }
2662 | AggregateFunc::ListConcat { .. }
2663 | AggregateFunc::Any
2664 | AggregateFunc::All
2665 | AggregateFunc::Dummy
2666 | AggregateFunc::MaxNumeric
2667 | AggregateFunc::MaxInt16
2668 | AggregateFunc::MaxInt32
2669 | AggregateFunc::MaxInt64
2670 | AggregateFunc::MaxUInt16
2671 | AggregateFunc::MaxUInt32
2672 | AggregateFunc::MaxUInt64
2673 | AggregateFunc::MaxMzTimestamp
2674 | AggregateFunc::MaxFloat32
2675 | AggregateFunc::MaxFloat64
2676 | AggregateFunc::MaxBool
2677 | AggregateFunc::MaxString
2678 | AggregateFunc::MaxDate
2679 | AggregateFunc::MaxTimestamp
2680 | AggregateFunc::MaxTimestampTz
2681 | AggregateFunc::MaxInterval
2682 | AggregateFunc::MaxTime
2683 | AggregateFunc::MinNumeric
2684 | AggregateFunc::MinInt16
2685 | AggregateFunc::MinInt32
2686 | AggregateFunc::MinInt64
2687 | AggregateFunc::MinUInt16
2688 | AggregateFunc::MinUInt32
2689 | AggregateFunc::MinUInt64
2690 | AggregateFunc::MinMzTimestamp
2691 | AggregateFunc::MinFloat32
2692 | AggregateFunc::MinFloat64
2693 | AggregateFunc::MinBool
2694 | AggregateFunc::MinString
2695 | AggregateFunc::MinDate
2696 | AggregateFunc::MinTimestamp
2697 | AggregateFunc::MinTimestampTz
2698 | AggregateFunc::MinInterval
2699 | AggregateFunc::MinTime
2700 | AggregateFunc::SumInt16
2701 | AggregateFunc::SumInt32
2702 | AggregateFunc::SumInt64
2703 | AggregateFunc::SumUInt16
2704 | AggregateFunc::SumUInt32
2705 | AggregateFunc::SumUInt64
2706 | AggregateFunc::SumFloat32
2707 | AggregateFunc::SumFloat64
2708 | AggregateFunc::SumNumeric
2709 | AggregateFunc::Count
2710 | AggregateFunc::JsonbAgg { .. }
2711 | AggregateFunc::JsonbObjectAgg { .. }
2712 | AggregateFunc::MapAgg { .. }
2713 | AggregateFunc::StringAgg { .. } => false,
2714 }
2715 }
2716
2717 pub fn output_type(&self, input_type: ColumnType) -> ColumnType {
2723 let scalar_type = match self {
2724 AggregateFunc::Count => ScalarType::Int64,
2725 AggregateFunc::Any => ScalarType::Bool,
2726 AggregateFunc::All => ScalarType::Bool,
2727 AggregateFunc::JsonbAgg { .. } => ScalarType::Jsonb,
2728 AggregateFunc::JsonbObjectAgg { .. } => ScalarType::Jsonb,
2729 AggregateFunc::SumInt16 => ScalarType::Int64,
2730 AggregateFunc::SumInt32 => ScalarType::Int64,
2731 AggregateFunc::SumInt64 => ScalarType::Numeric {
2732 max_scale: Some(NumericMaxScale::ZERO),
2733 },
2734 AggregateFunc::SumUInt16 => ScalarType::UInt64,
2735 AggregateFunc::SumUInt32 => ScalarType::UInt64,
2736 AggregateFunc::SumUInt64 => ScalarType::Numeric {
2737 max_scale: Some(NumericMaxScale::ZERO),
2738 },
2739 AggregateFunc::MapAgg { value_type, .. } => ScalarType::Map {
2740 value_type: Box::new(value_type.clone()),
2741 custom_id: None,
2742 },
2743 AggregateFunc::ArrayConcat { .. } | AggregateFunc::ListConcat { .. } => {
2744 match input_type.scalar_type {
2745 ScalarType::Record { ref fields, .. } => fields[0].1.scalar_type.clone(),
2747 _ => unreachable!(),
2748 }
2749 }
2750 AggregateFunc::StringAgg { .. } => ScalarType::String,
2751 AggregateFunc::RowNumber { .. } => {
2752 AggregateFunc::output_type_ranking_window_funcs(&input_type, "?row_number?")
2753 }
2754 AggregateFunc::Rank { .. } => {
2755 AggregateFunc::output_type_ranking_window_funcs(&input_type, "?rank?")
2756 }
2757 AggregateFunc::DenseRank { .. } => {
2758 AggregateFunc::output_type_ranking_window_funcs(&input_type, "?dense_rank?")
2759 }
2760 AggregateFunc::LagLead { lag_lead: lag_lead_type, .. } => {
2761 let fields = input_type.scalar_type.unwrap_record_element_type();
2763 let original_row_type = fields[0].unwrap_record_element_type()[0]
2764 .clone()
2765 .nullable(false);
2766 let output_type_inner = Self::lag_lead_output_type_inner_from_encoded_args(fields[0].unwrap_record_element_type()[1]);
2767 let column_name = Self::lag_lead_result_column_name(lag_lead_type);
2768
2769 ScalarType::List {
2770 element_type: Box::new(ScalarType::Record {
2771 fields: [
2772 (column_name, output_type_inner),
2773 (ColumnName::from("?orig_row?"), original_row_type),
2774 ].into(),
2775 custom_id: None,
2776 }),
2777 custom_id: None,
2778 }
2779 }
2780 AggregateFunc::FirstValue { .. } => {
2781 let fields = input_type.scalar_type.unwrap_record_element_type();
2783 let original_row_type = fields[0].unwrap_record_element_type()[0]
2784 .clone()
2785 .nullable(false);
2786 let value_type = fields[0].unwrap_record_element_type()[1]
2787 .clone()
2788 .nullable(true); ScalarType::List {
2791 element_type: Box::new(ScalarType::Record {
2792 fields: [
2793 (ColumnName::from("?first_value?"), value_type),
2794 (ColumnName::from("?orig_row?"), original_row_type),
2795 ].into(),
2796 custom_id: None,
2797 }),
2798 custom_id: None,
2799 }
2800 }
2801 AggregateFunc::LastValue { .. } => {
2802 let fields = input_type.scalar_type.unwrap_record_element_type();
2804 let original_row_type = fields[0].unwrap_record_element_type()[0]
2805 .clone()
2806 .nullable(false);
2807 let value_type = fields[0].unwrap_record_element_type()[1]
2808 .clone()
2809 .nullable(true); ScalarType::List {
2812 element_type: Box::new(ScalarType::Record {
2813 fields: [
2814 (ColumnName::from("?last_value?"), value_type),
2815 (ColumnName::from("?orig_row?"), original_row_type),
2816 ].into(),
2817 custom_id: None,
2818 }),
2819 custom_id: None,
2820 }
2821 }
2822 AggregateFunc::WindowAggregate {
2823 wrapped_aggregate, ..
2824 } => {
2825 let fields = input_type.scalar_type.unwrap_record_element_type();
2827 let original_row_type = fields[0].unwrap_record_element_type()[0]
2828 .clone()
2829 .nullable(false);
2830 let arg_type = fields[0].unwrap_record_element_type()[1]
2831 .clone()
2832 .nullable(true);
2833 let wrapped_aggr_out_type = wrapped_aggregate.output_type(arg_type);
2834
2835 ScalarType::List {
2836 element_type: Box::new(ScalarType::Record {
2837 fields: [
2838 (ColumnName::from("?window_agg?"), wrapped_aggr_out_type),
2839 (ColumnName::from("?orig_row?"), original_row_type),
2840 ].into(),
2841 custom_id: None,
2842 }),
2843 custom_id: None,
2844 }
2845 }
2846 AggregateFunc::FusedWindowAggregate {
2847 wrapped_aggregates, ..
2848 } => {
2849 let fields = input_type.scalar_type.unwrap_record_element_type();
2852 let original_row_type = fields[0].unwrap_record_element_type()[0]
2853 .clone()
2854 .nullable(false);
2855 let args_type = fields[0].unwrap_record_element_type()[1];
2856 let arg_types = args_type.unwrap_record_element_type();
2857 let out_fields = arg_types.iter().zip_eq(wrapped_aggregates).map(|(arg_type, wrapped_agg)| {
2858 (
2859 ColumnName::from(wrapped_agg.name()),
2860 wrapped_agg.output_type((**arg_type).clone().nullable(true)),
2861 )
2862 }).collect_vec();
2863
2864 ScalarType::List {
2865 element_type: Box::new(ScalarType::Record {
2866 fields: [
2867 (ColumnName::from("?fused_window_agg?"), ScalarType::Record {
2868 fields: out_fields.into(),
2869 custom_id: None,
2870 }.nullable(false)),
2871 (ColumnName::from("?orig_row?"), original_row_type),
2872 ].into(),
2873 custom_id: None,
2874 }),
2875 custom_id: None,
2876 }
2877 }
2878 AggregateFunc::FusedValueWindowFunc { funcs, order_by: _ } => {
2879 let fields = input_type.scalar_type.unwrap_record_element_type();
2884 let original_row_type = fields[0].unwrap_record_element_type()[0]
2885 .clone()
2886 .nullable(false);
2887 let encoded_args_type = fields[0].unwrap_record_element_type()[1].unwrap_record_element_type();
2888
2889 ScalarType::List {
2890 element_type: Box::new(ScalarType::Record {
2891 fields: [
2892 (ColumnName::from("?fused_value_window_func?"), ScalarType::Record {
2893 fields: encoded_args_type.into_iter().zip_eq(funcs).map(|(arg_type, func)| {
2894 match func {
2895 AggregateFunc::LagLead { lag_lead: lag_lead_type, .. } => {
2896 (
2897 Self::lag_lead_result_column_name(lag_lead_type),
2898 Self::lag_lead_output_type_inner_from_encoded_args(arg_type)
2899 )
2900 },
2901 AggregateFunc::FirstValue { .. } => {
2902 (
2903 ColumnName::from("?first_value?"),
2904 arg_type.clone().nullable(true),
2905 )
2906 }
2907 AggregateFunc::LastValue { .. } => {
2908 (
2909 ColumnName::from("?last_value?"),
2910 arg_type.clone().nullable(true),
2911 )
2912 }
2913 _ => panic!("FusedValueWindowFunc has an unknown function"),
2914 }
2915 }).collect(),
2916 custom_id: None,
2917 }.nullable(false)),
2918 (ColumnName::from("?orig_row?"), original_row_type),
2919 ].into(),
2920 custom_id: None,
2921 }),
2922 custom_id: None,
2923 }
2924 }
2925 AggregateFunc::Dummy
2926 | AggregateFunc::MaxNumeric
2927 | AggregateFunc::MaxInt16
2928 | AggregateFunc::MaxInt32
2929 | AggregateFunc::MaxInt64
2930 | AggregateFunc::MaxUInt16
2931 | AggregateFunc::MaxUInt32
2932 | AggregateFunc::MaxUInt64
2933 | AggregateFunc::MaxMzTimestamp
2934 | AggregateFunc::MaxFloat32
2935 | AggregateFunc::MaxFloat64
2936 | AggregateFunc::MaxBool
2937 | AggregateFunc::MaxString
2941 | AggregateFunc::MaxDate
2942 | AggregateFunc::MaxTimestamp
2943 | AggregateFunc::MaxTimestampTz
2944 | AggregateFunc::MaxInterval
2945 | AggregateFunc::MaxTime
2946 | AggregateFunc::MinNumeric
2947 | AggregateFunc::MinInt16
2948 | AggregateFunc::MinInt32
2949 | AggregateFunc::MinInt64
2950 | AggregateFunc::MinUInt16
2951 | AggregateFunc::MinUInt32
2952 | AggregateFunc::MinUInt64
2953 | AggregateFunc::MinMzTimestamp
2954 | AggregateFunc::MinFloat32
2955 | AggregateFunc::MinFloat64
2956 | AggregateFunc::MinBool
2957 | AggregateFunc::MinString
2958 | AggregateFunc::MinDate
2959 | AggregateFunc::MinTimestamp
2960 | AggregateFunc::MinTimestampTz
2961 | AggregateFunc::MinInterval
2962 | AggregateFunc::MinTime
2963 | AggregateFunc::SumFloat32
2964 | AggregateFunc::SumFloat64
2965 | AggregateFunc::SumNumeric => input_type.scalar_type.clone(),
2966 };
2967 let nullable = match self {
2970 AggregateFunc::Count => false,
2971 AggregateFunc::StringAgg { .. } => match input_type.scalar_type {
2973 ScalarType::Record { fields, .. } => match &fields[0].1.scalar_type {
2975 ScalarType::Record { fields, .. } => fields[0].1.nullable,
2977 _ => unreachable!(),
2978 },
2979 _ => unreachable!(),
2980 },
2981 _ => input_type.nullable,
2982 };
2983 scalar_type.nullable(nullable)
2984 }
2985
2986 fn output_type_ranking_window_funcs(input_type: &ColumnType, col_name: &str) -> ScalarType {
2988 match input_type.scalar_type {
2989 ScalarType::Record { ref fields, .. } => ScalarType::List {
2990 element_type: Box::new(ScalarType::Record {
2991 fields: [
2992 (
2993 ColumnName::from(col_name),
2994 ScalarType::Int64.nullable(false),
2995 ),
2996 (ColumnName::from("?orig_row?"), {
2997 let inner = match &fields[0].1.scalar_type {
2998 ScalarType::List { element_type, .. } => element_type.clone(),
2999 _ => unreachable!(),
3000 };
3001 inner.nullable(false)
3002 }),
3003 ]
3004 .into(),
3005 custom_id: None,
3006 }),
3007 custom_id: None,
3008 },
3009 _ => unreachable!(),
3010 }
3011 }
3012
3013 fn lag_lead_output_type_inner_from_encoded_args(encoded_args_type: &ScalarType) -> ColumnType {
3017 encoded_args_type.unwrap_record_element_type()[0]
3021 .clone()
3022 .nullable(true)
3023 }
3024
3025 fn lag_lead_result_column_name(lag_lead_type: &LagLeadType) -> ColumnName {
3026 ColumnName::from(match lag_lead_type {
3027 LagLeadType::Lag => "?lag?",
3028 LagLeadType::Lead => "?lead?",
3029 })
3030 }
3031
3032 pub fn propagates_nonnull_constraint(&self) -> bool {
3037 match self {
3038 AggregateFunc::MaxNumeric
3039 | AggregateFunc::MaxInt16
3040 | AggregateFunc::MaxInt32
3041 | AggregateFunc::MaxInt64
3042 | AggregateFunc::MaxUInt16
3043 | AggregateFunc::MaxUInt32
3044 | AggregateFunc::MaxUInt64
3045 | AggregateFunc::MaxMzTimestamp
3046 | AggregateFunc::MaxFloat32
3047 | AggregateFunc::MaxFloat64
3048 | AggregateFunc::MaxBool
3049 | AggregateFunc::MaxString
3050 | AggregateFunc::MaxDate
3051 | AggregateFunc::MaxTimestamp
3052 | AggregateFunc::MaxTimestampTz
3053 | AggregateFunc::MinNumeric
3054 | AggregateFunc::MinInt16
3055 | AggregateFunc::MinInt32
3056 | AggregateFunc::MinInt64
3057 | AggregateFunc::MinUInt16
3058 | AggregateFunc::MinUInt32
3059 | AggregateFunc::MinUInt64
3060 | AggregateFunc::MinMzTimestamp
3061 | AggregateFunc::MinFloat32
3062 | AggregateFunc::MinFloat64
3063 | AggregateFunc::MinBool
3064 | AggregateFunc::MinString
3065 | AggregateFunc::MinDate
3066 | AggregateFunc::MinTimestamp
3067 | AggregateFunc::MinTimestampTz
3068 | AggregateFunc::SumInt16
3069 | AggregateFunc::SumInt32
3070 | AggregateFunc::SumInt64
3071 | AggregateFunc::SumUInt16
3072 | AggregateFunc::SumUInt32
3073 | AggregateFunc::SumUInt64
3074 | AggregateFunc::SumFloat32
3075 | AggregateFunc::SumFloat64
3076 | AggregateFunc::SumNumeric
3077 | AggregateFunc::StringAgg { .. } => true,
3078 AggregateFunc::Count => false,
3080 _ => false,
3081 }
3082 }
3083}
3084
3085fn jsonb_each<'a>(
3086 a: Datum<'a>,
3087 temp_storage: &'a RowArena,
3088 stringify: bool,
3089) -> impl Iterator<Item = (Row, Diff)> + 'a {
3090 let map = match a {
3092 Datum::Map(dict) => dict,
3093 _ => mz_repr::DatumMap::empty(),
3094 };
3095
3096 map.iter().map(move |(k, mut v)| {
3097 if stringify {
3098 v = jsonb_stringify(v, temp_storage);
3099 }
3100 (Row::pack_slice(&[Datum::String(k), v]), Diff::ONE)
3101 })
3102}
3103
3104fn jsonb_object_keys<'a>(a: Datum<'a>) -> impl Iterator<Item = (Row, Diff)> + 'a {
3105 let map = match a {
3106 Datum::Map(dict) => dict,
3107 _ => mz_repr::DatumMap::empty(),
3108 };
3109
3110 map.iter()
3111 .map(move |(k, _)| (Row::pack_slice(&[Datum::String(k)]), Diff::ONE))
3112}
3113
3114fn jsonb_array_elements<'a>(
3115 a: Datum<'a>,
3116 temp_storage: &'a RowArena,
3117 stringify: bool,
3118) -> impl Iterator<Item = (Row, Diff)> + 'a {
3119 let list = match a {
3120 Datum::List(list) => list,
3121 _ => mz_repr::DatumList::empty(),
3122 };
3123 list.iter().map(move |mut e| {
3124 if stringify {
3125 e = jsonb_stringify(e, temp_storage);
3126 }
3127 (Row::pack_slice(&[e]), Diff::ONE)
3128 })
3129}
3130
3131fn regexp_extract(a: Datum, r: &AnalyzedRegex) -> Option<(Row, Diff)> {
3132 let r = r.inner();
3133 let a = a.unwrap_str();
3134 let captures = r.captures(a)?;
3135 let datums = captures
3136 .iter()
3137 .skip(1)
3138 .map(|m| Datum::from(m.map(|m| m.as_str())));
3139 Some((Row::pack(datums), Diff::ONE))
3140}
3141
3142fn regexp_matches<'a, 'r: 'a>(
3143 exprs: &[Datum<'a>],
3144) -> Result<impl Iterator<Item = (Row, Diff)> + 'a, EvalError> {
3145 assert!(exprs.len() == 2 || exprs.len() == 3);
3149 let a = exprs[0].unwrap_str();
3150 let r = exprs[1].unwrap_str();
3151
3152 let (regex, opts) = if exprs.len() == 3 {
3153 let flag = exprs[2].unwrap_str();
3154 let opts = AnalyzedRegexOpts::from_str(flag)?;
3155 (AnalyzedRegex::new(r, opts)?, opts)
3156 } else {
3157 let opts = AnalyzedRegexOpts::default();
3158 (AnalyzedRegex::new(r, opts)?, opts)
3159 };
3160
3161 let regex = regex.inner().clone();
3162
3163 let iter = regex.captures_iter(a).map(move |captures| {
3164 let matches = captures
3165 .iter()
3166 .skip(1)
3168 .map(|m| Datum::from(m.map(|m| m.as_str())))
3169 .collect::<Vec<_>>();
3170
3171 let mut binding = SharedRow::get();
3172 let mut packer = binding.packer();
3173
3174 let dimension = ArrayDimension {
3175 lower_bound: 1,
3176 length: matches.len(),
3177 };
3178 packer
3179 .try_push_array(&[dimension], matches)
3180 .expect("generated dimensions above");
3181
3182 (binding.clone(), Diff::ONE)
3183 });
3184
3185 let out = iter.collect::<SmallVec<[_; 3]>>();
3190
3191 if opts.global {
3192 Ok(Either::Left(out.into_iter()))
3193 } else {
3194 Ok(Either::Right(out.into_iter().take(1)))
3195 }
3196}
3197
3198fn generate_series<N>(
3199 start: N,
3200 stop: N,
3201 step: N,
3202) -> Result<impl Iterator<Item = (Row, Diff)>, EvalError>
3203where
3204 N: Integer + Signed + CheckedAdd + Clone,
3205 Datum<'static>: From<N>,
3206{
3207 if step == N::zero() {
3208 return Err(EvalError::InvalidParameterValue(
3209 "step size cannot equal zero".into(),
3210 ));
3211 }
3212 Ok(num::range_step_inclusive(start, stop, step)
3213 .map(move |i| (Row::pack_slice(&[Datum::from(i)]), Diff::ONE)))
3214}
3215
3216#[derive(Clone)]
3220pub struct TimestampRangeStepInclusive<T> {
3221 state: CheckedTimestamp<T>,
3222 stop: CheckedTimestamp<T>,
3223 step: Interval,
3224 rev: bool,
3225 done: bool,
3226}
3227
3228impl<T: TimestampLike> Iterator for TimestampRangeStepInclusive<T> {
3229 type Item = CheckedTimestamp<T>;
3230
3231 #[inline]
3232 fn next(&mut self) -> Option<CheckedTimestamp<T>> {
3233 if !self.done
3234 && ((self.rev && self.state >= self.stop) || (!self.rev && self.state <= self.stop))
3235 {
3236 let result = self.state.clone();
3237 match add_timestamp_months(self.state.deref(), self.step.months) {
3238 Ok(state) => match state.checked_add_signed(self.step.duration_as_chrono()) {
3239 Some(v) => match CheckedTimestamp::from_timestamplike(v) {
3240 Ok(v) => self.state = v,
3241 Err(_) => self.done = true,
3242 },
3243 None => self.done = true,
3244 },
3245 Err(..) => {
3246 self.done = true;
3247 }
3248 }
3249
3250 Some(result)
3251 } else {
3252 None
3253 }
3254 }
3255}
3256
3257fn generate_series_ts<T: TimestampLike>(
3258 start: CheckedTimestamp<T>,
3259 stop: CheckedTimestamp<T>,
3260 step: Interval,
3261 conv: fn(CheckedTimestamp<T>) -> Datum<'static>,
3262) -> Result<impl Iterator<Item = (Row, Diff)>, EvalError> {
3263 let normalized_step = step.as_microseconds();
3264 if normalized_step == 0 {
3265 return Err(EvalError::InvalidParameterValue(
3266 "step size cannot equal zero".into(),
3267 ));
3268 }
3269 let rev = normalized_step < 0;
3270
3271 let trsi = TimestampRangeStepInclusive {
3272 state: start,
3273 stop,
3274 step,
3275 rev,
3276 done: false,
3277 };
3278
3279 Ok(trsi.map(move |i| (Row::pack_slice(&[conv(i)]), Diff::ONE)))
3280}
3281
3282fn generate_subscripts_array(
3283 a: Datum,
3284 dim: i32,
3285) -> Result<Box<dyn Iterator<Item = (Row, Diff)>>, EvalError> {
3286 if dim <= 0 {
3287 return Ok(Box::new(iter::empty()));
3288 }
3289
3290 match a.unwrap_array().dims().into_iter().nth(
3291 (dim - 1)
3292 .try_into()
3293 .map_err(|_| EvalError::Int32OutOfRange((dim - 1).to_string().into()))?,
3294 ) {
3295 Some(requested_dim) => Ok(Box::new(generate_series::<i32>(
3296 requested_dim.lower_bound.try_into().map_err(|_| {
3297 EvalError::Int32OutOfRange(requested_dim.lower_bound.to_string().into())
3298 })?,
3299 requested_dim
3300 .length
3301 .try_into()
3302 .map_err(|_| EvalError::Int32OutOfRange(requested_dim.length.to_string().into()))?,
3303 1,
3304 )?)),
3305 None => Ok(Box::new(iter::empty())),
3306 }
3307}
3308
3309fn unnest_array<'a>(a: Datum<'a>) -> impl Iterator<Item = (Row, Diff)> + 'a {
3310 a.unwrap_array()
3311 .elements()
3312 .iter()
3313 .map(move |e| (Row::pack_slice(&[e]), Diff::ONE))
3314}
3315
3316fn unnest_list<'a>(a: Datum<'a>) -> impl Iterator<Item = (Row, Diff)> + 'a {
3317 a.unwrap_list()
3318 .iter()
3319 .map(move |e| (Row::pack_slice(&[e]), Diff::ONE))
3320}
3321
3322fn unnest_map<'a>(a: Datum<'a>) -> impl Iterator<Item = (Row, Diff)> + 'a {
3323 a.unwrap_map()
3324 .iter()
3325 .map(move |(k, v)| (Row::pack_slice(&[Datum::from(k), v]), Diff::ONE))
3326}
3327
3328impl AggregateFunc {
3329 pub fn name(&self) -> &'static str {
3332 match self {
3333 Self::MaxNumeric => "max",
3334 Self::MaxInt16 => "max",
3335 Self::MaxInt32 => "max",
3336 Self::MaxInt64 => "max",
3337 Self::MaxUInt16 => "max",
3338 Self::MaxUInt32 => "max",
3339 Self::MaxUInt64 => "max",
3340 Self::MaxMzTimestamp => "max",
3341 Self::MaxFloat32 => "max",
3342 Self::MaxFloat64 => "max",
3343 Self::MaxBool => "max",
3344 Self::MaxString => "max",
3345 Self::MaxDate => "max",
3346 Self::MaxTimestamp => "max",
3347 Self::MaxTimestampTz => "max",
3348 Self::MaxInterval => "max",
3349 Self::MaxTime => "max",
3350 Self::MinNumeric => "min",
3351 Self::MinInt16 => "min",
3352 Self::MinInt32 => "min",
3353 Self::MinInt64 => "min",
3354 Self::MinUInt16 => "min",
3355 Self::MinUInt32 => "min",
3356 Self::MinUInt64 => "min",
3357 Self::MinMzTimestamp => "min",
3358 Self::MinFloat32 => "min",
3359 Self::MinFloat64 => "min",
3360 Self::MinBool => "min",
3361 Self::MinString => "min",
3362 Self::MinDate => "min",
3363 Self::MinTimestamp => "min",
3364 Self::MinTimestampTz => "min",
3365 Self::MinInterval => "min",
3366 Self::MinTime => "min",
3367 Self::SumInt16 => "sum",
3368 Self::SumInt32 => "sum",
3369 Self::SumInt64 => "sum",
3370 Self::SumUInt16 => "sum",
3371 Self::SumUInt32 => "sum",
3372 Self::SumUInt64 => "sum",
3373 Self::SumFloat32 => "sum",
3374 Self::SumFloat64 => "sum",
3375 Self::SumNumeric => "sum",
3376 Self::Count => "count",
3377 Self::Any => "any",
3378 Self::All => "all",
3379 Self::JsonbAgg { .. } => "jsonb_agg",
3380 Self::JsonbObjectAgg { .. } => "jsonb_object_agg",
3381 Self::MapAgg { .. } => "map_agg",
3382 Self::ArrayConcat { .. } => "array_agg",
3383 Self::ListConcat { .. } => "list_agg",
3384 Self::StringAgg { .. } => "string_agg",
3385 Self::RowNumber { .. } => "row_number",
3386 Self::Rank { .. } => "rank",
3387 Self::DenseRank { .. } => "dense_rank",
3388 Self::LagLead {
3389 lag_lead: LagLeadType::Lag,
3390 ..
3391 } => "lag",
3392 Self::LagLead {
3393 lag_lead: LagLeadType::Lead,
3394 ..
3395 } => "lead",
3396 Self::FirstValue { .. } => "first_value",
3397 Self::LastValue { .. } => "last_value",
3398 Self::WindowAggregate { .. } => "window_agg",
3399 Self::FusedValueWindowFunc { .. } => "fused_value_window_func",
3400 Self::FusedWindowAggregate { .. } => "fused_window_agg",
3401 Self::Dummy => "dummy",
3402 }
3403 }
3404}
3405
3406impl<'a, M> fmt::Display for HumanizedExpr<'a, AggregateFunc, M>
3407where
3408 M: HumanizerMode,
3409{
3410 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
3411 use AggregateFunc::*;
3412 let name = self.expr.name();
3413 match self.expr {
3414 JsonbAgg { order_by }
3415 | JsonbObjectAgg { order_by }
3416 | MapAgg { order_by, .. }
3417 | ArrayConcat { order_by }
3418 | ListConcat { order_by }
3419 | StringAgg { order_by }
3420 | RowNumber { order_by }
3421 | Rank { order_by }
3422 | DenseRank { order_by } => {
3423 let order_by = order_by.iter().map(|col| self.child(col));
3424 write!(f, "{}[order_by=[{}]]", name, separated(", ", order_by))
3425 }
3426 LagLead {
3427 lag_lead: _,
3428 ignore_nulls,
3429 order_by,
3430 } => {
3431 let order_by = order_by.iter().map(|col| self.child(col));
3432 f.write_str(name)?;
3433 f.write_str("[")?;
3434 if *ignore_nulls {
3435 f.write_str("ignore_nulls=true, ")?;
3436 }
3437 write!(f, "order_by=[{}]", separated(", ", order_by))?;
3438 f.write_str("]")
3439 }
3440 FirstValue {
3441 order_by,
3442 window_frame,
3443 } => {
3444 let order_by = order_by.iter().map(|col| self.child(col));
3445 f.write_str(name)?;
3446 f.write_str("[")?;
3447 write!(f, "order_by=[{}]", separated(", ", order_by))?;
3448 if *window_frame != WindowFrame::default() {
3449 write!(f, " {}", window_frame)?;
3450 }
3451 f.write_str("]")
3452 }
3453 LastValue {
3454 order_by,
3455 window_frame,
3456 } => {
3457 let order_by = order_by.iter().map(|col| self.child(col));
3458 f.write_str(name)?;
3459 f.write_str("[")?;
3460 write!(f, "order_by=[{}]", separated(", ", order_by))?;
3461 if *window_frame != WindowFrame::default() {
3462 write!(f, " {}", window_frame)?;
3463 }
3464 f.write_str("]")
3465 }
3466 WindowAggregate {
3467 wrapped_aggregate,
3468 order_by,
3469 window_frame,
3470 } => {
3471 let order_by = order_by.iter().map(|col| self.child(col));
3472 let wrapped_aggregate = self.child(wrapped_aggregate.deref());
3473 f.write_str(name)?;
3474 f.write_str("[")?;
3475 write!(f, "{} ", wrapped_aggregate)?;
3476 write!(f, "order_by=[{}]", separated(", ", order_by))?;
3477 if *window_frame != WindowFrame::default() {
3478 write!(f, " {}", window_frame)?;
3479 }
3480 f.write_str("]")
3481 }
3482 FusedValueWindowFunc { funcs, order_by } => {
3483 let order_by = order_by.iter().map(|col| self.child(col));
3484 let funcs = separated(", ", funcs.iter().map(|func| self.child(func)));
3485 f.write_str(name)?;
3486 f.write_str("[")?;
3487 write!(f, "{} ", funcs)?;
3488 write!(f, "order_by=[{}]", separated(", ", order_by))?;
3489 f.write_str("]")
3490 }
3491 _ => f.write_str(name),
3492 }
3493 }
3494}
3495
3496#[derive(
3497 Arbitrary, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize, Hash, MzReflect,
3498)]
3499pub struct CaptureGroupDesc {
3500 pub index: u32,
3501 pub name: Option<String>,
3502 pub nullable: bool,
3503}
3504
3505impl RustType<ProtoCaptureGroupDesc> for CaptureGroupDesc {
3506 fn into_proto(&self) -> ProtoCaptureGroupDesc {
3507 ProtoCaptureGroupDesc {
3508 index: self.index,
3509 name: self.name.clone(),
3510 nullable: self.nullable,
3511 }
3512 }
3513
3514 fn from_proto(proto: ProtoCaptureGroupDesc) -> Result<Self, TryFromProtoError> {
3515 Ok(Self {
3516 index: proto.index,
3517 name: proto.name,
3518 nullable: proto.nullable,
3519 })
3520 }
3521}
3522
3523#[derive(
3524 Arbitrary,
3525 Clone,
3526 Copy,
3527 Debug,
3528 Eq,
3529 PartialEq,
3530 Ord,
3531 PartialOrd,
3532 Serialize,
3533 Deserialize,
3534 Hash,
3535 MzReflect,
3536 Default,
3537)]
3538pub struct AnalyzedRegexOpts {
3539 pub case_insensitive: bool,
3540 pub global: bool,
3541}
3542
3543impl FromStr for AnalyzedRegexOpts {
3544 type Err = EvalError;
3545
3546 fn from_str(s: &str) -> Result<Self, Self::Err> {
3547 let mut opts = AnalyzedRegexOpts::default();
3548 for c in s.chars() {
3549 match c {
3550 'i' => opts.case_insensitive = true,
3551 'g' => opts.global = true,
3552 _ => return Err(EvalError::InvalidRegexFlag(c)),
3553 }
3554 }
3555 Ok(opts)
3556 }
3557}
3558
3559impl RustType<ProtoAnalyzedRegexOpts> for AnalyzedRegexOpts {
3560 fn into_proto(&self) -> ProtoAnalyzedRegexOpts {
3561 ProtoAnalyzedRegexOpts {
3562 case_insensitive: self.case_insensitive,
3563 global: self.global,
3564 }
3565 }
3566
3567 fn from_proto(proto: ProtoAnalyzedRegexOpts) -> Result<Self, TryFromProtoError> {
3568 Ok(Self {
3569 case_insensitive: proto.case_insensitive,
3570 global: proto.global,
3571 })
3572 }
3573}
3574
3575#[derive(
3576 Arbitrary, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize, Hash, MzReflect,
3577)]
3578pub struct AnalyzedRegex(
3579 #[proptest(strategy = "mz_repr::adt::regex::any_regex()")] ReprRegex,
3580 Vec<CaptureGroupDesc>,
3581 AnalyzedRegexOpts,
3582);
3583
3584impl RustType<ProtoAnalyzedRegex> for AnalyzedRegex {
3585 fn into_proto(&self) -> ProtoAnalyzedRegex {
3586 ProtoAnalyzedRegex {
3587 regex: Some(self.0.into_proto()),
3588 groups: self.1.into_proto(),
3589 opts: Some(self.2.into_proto()),
3590 }
3591 }
3592
3593 fn from_proto(proto: ProtoAnalyzedRegex) -> Result<Self, TryFromProtoError> {
3594 Ok(AnalyzedRegex(
3595 proto.regex.into_rust_if_some("ProtoAnalyzedRegex::regex")?,
3596 proto.groups.into_rust()?,
3597 proto.opts.into_rust_if_some("ProtoAnalyzedRegex::opts")?,
3598 ))
3599 }
3600}
3601
3602impl AnalyzedRegex {
3603 pub fn new(s: &str, opts: AnalyzedRegexOpts) -> Result<Self, regex::Error> {
3604 let r = ReprRegex::new(s, opts.case_insensitive)?;
3605 #[allow(clippy::as_conversions)]
3607 let descs: Vec<_> = r
3608 .capture_names()
3609 .enumerate()
3610 .skip(1)
3615 .map(|(i, name)| CaptureGroupDesc {
3616 index: i as u32,
3617 name: name.map(String::from),
3618 nullable: true,
3621 })
3622 .collect();
3623 Ok(Self(r, descs, opts))
3624 }
3625 pub fn capture_groups_len(&self) -> usize {
3626 self.1.len()
3627 }
3628 pub fn capture_groups_iter(&self) -> impl Iterator<Item = &CaptureGroupDesc> {
3629 self.1.iter()
3630 }
3631 pub fn inner(&self) -> &Regex {
3632 &(self.0).regex
3633 }
3634 pub fn opts(&self) -> &AnalyzedRegexOpts {
3635 &self.2
3636 }
3637}
3638
3639pub fn csv_extract(a: Datum<'_>, n_cols: usize) -> impl Iterator<Item = (Row, Diff)> + '_ {
3640 let bytes = a.unwrap_str().as_bytes();
3641 let mut row = Row::default();
3642 let csv_reader = csv::ReaderBuilder::new()
3643 .has_headers(false)
3644 .from_reader(bytes);
3645 csv_reader.into_records().filter_map(move |res| match res {
3646 Ok(sr) if sr.len() == n_cols => {
3647 row.packer().extend(sr.iter().map(Datum::String));
3648 Some((row.clone(), Diff::ONE))
3649 }
3650 _ => None,
3651 })
3652}
3653
3654pub fn repeat(a: Datum) -> Option<(Row, Diff)> {
3655 let n = a.unwrap_int64();
3656 if n != 0 {
3657 Some((Row::default(), n.into()))
3658 } else {
3659 None
3660 }
3661}
3662
3663fn wrap<'a>(datums: &'a [Datum<'a>], width: usize) -> impl Iterator<Item = (Row, Diff)> + 'a {
3664 datums
3665 .chunks(width)
3666 .map(|chunk| (Row::pack(chunk), Diff::ONE))
3667}
3668
3669fn acl_explode<'a>(
3670 acl_items: Datum<'a>,
3671 temp_storage: &'a RowArena,
3672) -> Result<impl Iterator<Item = (Row, Diff)> + 'a, EvalError> {
3673 let acl_items = acl_items.unwrap_array();
3674 let mut res = Vec::new();
3675 for acl_item in acl_items.elements().iter() {
3676 if acl_item.is_null() {
3677 return Err(EvalError::AclArrayNullElement);
3678 }
3679 let acl_item = acl_item.unwrap_acl_item();
3680 for privilege in acl_item.acl_mode.explode() {
3681 let row = [
3682 Datum::UInt32(acl_item.grantor.0),
3683 Datum::UInt32(acl_item.grantee.0),
3684 Datum::String(temp_storage.push_string(privilege.to_string())),
3685 Datum::False,
3687 ];
3688 res.push((Row::pack_slice(&row), Diff::ONE));
3689 }
3690 }
3691 Ok(res.into_iter())
3692}
3693
3694fn mz_acl_explode<'a>(
3695 mz_acl_items: Datum<'a>,
3696 temp_storage: &'a RowArena,
3697) -> Result<impl Iterator<Item = (Row, Diff)> + 'a, EvalError> {
3698 let mz_acl_items = mz_acl_items.unwrap_array();
3699 let mut res = Vec::new();
3700 for mz_acl_item in mz_acl_items.elements().iter() {
3701 if mz_acl_item.is_null() {
3702 return Err(EvalError::MzAclArrayNullElement);
3703 }
3704 let mz_acl_item = mz_acl_item.unwrap_mz_acl_item();
3705 for privilege in mz_acl_item.acl_mode.explode() {
3706 let row = [
3707 Datum::String(temp_storage.push_string(mz_acl_item.grantor.to_string())),
3708 Datum::String(temp_storage.push_string(mz_acl_item.grantee.to_string())),
3709 Datum::String(temp_storage.push_string(privilege.to_string())),
3710 Datum::False,
3712 ];
3713 res.push((Row::pack_slice(&row), Diff::ONE));
3714 }
3715 }
3716 Ok(res.into_iter())
3717}
3718
3719#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize, Hash, MzReflect)]
3722pub enum TableFunc {
3723 AclExplode,
3724 MzAclExplode,
3725 JsonbEach {
3726 stringify: bool,
3727 },
3728 JsonbObjectKeys,
3729 JsonbArrayElements {
3730 stringify: bool,
3731 },
3732 RegexpExtract(AnalyzedRegex),
3733 CsvExtract(usize),
3734 GenerateSeriesInt32,
3735 GenerateSeriesInt64,
3736 GenerateSeriesTimestamp,
3737 GenerateSeriesTimestampTz,
3738 GuardSubquerySize {
3759 column_type: ScalarType,
3760 },
3761 Repeat,
3762 UnnestArray {
3763 el_typ: ScalarType,
3764 },
3765 UnnestList {
3766 el_typ: ScalarType,
3767 },
3768 UnnestMap {
3769 value_type: ScalarType,
3770 },
3771 Wrap {
3777 types: Vec<ColumnType>,
3778 width: usize,
3779 },
3780 GenerateSubscriptsArray,
3781 TabletizedScalar {
3783 name: String,
3784 relation: RelationType,
3785 },
3786 RegexpMatches,
3787 #[allow(private_interfaces)]
3792 WithOrdinality(WithOrdinality),
3793}
3794
3795#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize, Hash, MzReflect)]
3803struct WithOrdinality {
3804 inner: Box<TableFunc>,
3805}
3806
3807impl TableFunc {
3808 pub fn with_ordinality(inner: TableFunc) -> Option<TableFunc> {
3810 match inner {
3811 TableFunc::AclExplode
3812 | TableFunc::MzAclExplode
3813 | TableFunc::JsonbEach { .. }
3814 | TableFunc::JsonbObjectKeys
3815 | TableFunc::JsonbArrayElements { .. }
3816 | TableFunc::RegexpExtract(_)
3817 | TableFunc::CsvExtract(_)
3818 | TableFunc::GenerateSeriesInt32
3819 | TableFunc::GenerateSeriesInt64
3820 | TableFunc::GenerateSeriesTimestamp
3821 | TableFunc::GenerateSeriesTimestampTz
3822 | TableFunc::GuardSubquerySize { .. }
3823 | TableFunc::Repeat
3824 | TableFunc::UnnestArray { .. }
3825 | TableFunc::UnnestList { .. }
3826 | TableFunc::UnnestMap { .. }
3827 | TableFunc::Wrap { .. }
3828 | TableFunc::GenerateSubscriptsArray
3829 | TableFunc::TabletizedScalar { .. }
3830 | TableFunc::RegexpMatches => Some(TableFunc::WithOrdinality(WithOrdinality {
3831 inner: Box::new(inner),
3832 })),
3833 _ => None,
3836 }
3837 }
3838}
3839
3840impl Arbitrary for TableFunc {
3843 type Parameters = ();
3844 type Strategy = BoxedStrategy<Self>;
3845
3846 fn arbitrary_with(_: Self::Parameters) -> Self::Strategy {
3847 let leaf = Union::new(vec![
3848 Just(TableFunc::AclExplode),
3849 Just(TableFunc::MzAclExplode),
3850 Just(TableFunc::JsonbObjectKeys),
3851 Just(TableFunc::GenerateSeriesInt32),
3852 Just(TableFunc::GenerateSeriesInt64),
3853 Just(TableFunc::GenerateSeriesTimestamp),
3854 Just(TableFunc::GenerateSeriesTimestampTz),
3855 Just(TableFunc::Repeat),
3856 Just(TableFunc::GenerateSubscriptsArray),
3857 Just(TableFunc::RegexpMatches),
3858 ])
3859 .boxed();
3860
3861 leaf.prop_recursive(2, 256, 2, |inner| {
3863 inner.clone().prop_map(|tf| {
3864 TableFunc::WithOrdinality(WithOrdinality {
3865 inner: Box::new(tf),
3866 })
3867 })
3868 })
3869 .boxed()
3870 }
3871}
3872
3873impl RustType<ProtoTableFunc> for TableFunc {
3874 fn into_proto(&self) -> ProtoTableFunc {
3875 use proto_table_func::{Kind, ProtoWrap};
3876
3877 ProtoTableFunc {
3878 kind: Some(match self {
3879 TableFunc::AclExplode => Kind::AclExplode(()),
3880 TableFunc::MzAclExplode => Kind::MzAclExplode(()),
3881 TableFunc::JsonbEach { stringify } => Kind::JsonbEach(*stringify),
3882 TableFunc::JsonbObjectKeys => Kind::JsonbObjectKeys(()),
3883 TableFunc::JsonbArrayElements { stringify } => Kind::JsonbArrayElements(*stringify),
3884 TableFunc::RegexpExtract(x) => Kind::RegexpExtract(x.into_proto()),
3885 TableFunc::CsvExtract(x) => Kind::CsvExtract(x.into_proto()),
3886 TableFunc::GenerateSeriesInt32 => Kind::GenerateSeriesInt32(()),
3887 TableFunc::GenerateSeriesInt64 => Kind::GenerateSeriesInt64(()),
3888 TableFunc::GenerateSeriesTimestamp => Kind::GenerateSeriesTimestamp(()),
3889 TableFunc::GenerateSeriesTimestampTz => Kind::GenerateSeriesTimestampTz(()),
3890 TableFunc::GuardSubquerySize { column_type } => {
3891 Kind::GuardSubquerySize(column_type.into_proto())
3892 }
3893 TableFunc::Repeat => Kind::Repeat(()),
3894 TableFunc::UnnestArray { el_typ } => Kind::UnnestArray(el_typ.into_proto()),
3895 TableFunc::UnnestList { el_typ } => Kind::UnnestList(el_typ.into_proto()),
3896 TableFunc::UnnestMap { value_type } => Kind::UnnestMap(value_type.into_proto()),
3897 TableFunc::Wrap { types, width } => Kind::Wrap(ProtoWrap {
3898 types: types.into_proto(),
3899 width: width.into_proto(),
3900 }),
3901 TableFunc::GenerateSubscriptsArray => Kind::GenerateSubscriptsArray(()),
3902 TableFunc::TabletizedScalar { name, relation } => {
3903 Kind::TabletizedScalar(ProtoTabletizedScalar {
3904 name: name.into_proto(),
3905 relation: Some(relation.into_proto()),
3906 })
3907 }
3908 TableFunc::RegexpMatches => Kind::RegexpMatches(()),
3909 TableFunc::WithOrdinality(WithOrdinality { inner }) => {
3910 Kind::WithOrdinality(Box::new(ProtoWithOrdinality {
3911 inner: Some(inner.into_proto()),
3912 }))
3913 }
3914 }),
3915 }
3916 }
3917
3918 fn from_proto(proto: ProtoTableFunc) -> Result<Self, TryFromProtoError> {
3919 use proto_table_func::Kind;
3920
3921 let kind = proto
3922 .kind
3923 .ok_or_else(|| TryFromProtoError::missing_field("ProtoTableFunc::Kind"))?;
3924
3925 Ok(match kind {
3926 Kind::AclExplode(()) => TableFunc::AclExplode,
3927 Kind::MzAclExplode(()) => TableFunc::MzAclExplode,
3928 Kind::JsonbEach(stringify) => TableFunc::JsonbEach { stringify },
3929 Kind::JsonbObjectKeys(()) => TableFunc::JsonbObjectKeys,
3930 Kind::JsonbArrayElements(stringify) => TableFunc::JsonbArrayElements { stringify },
3931 Kind::RegexpExtract(x) => TableFunc::RegexpExtract(x.into_rust()?),
3932 Kind::CsvExtract(x) => TableFunc::CsvExtract(x.into_rust()?),
3933 Kind::GenerateSeriesInt32(()) => TableFunc::GenerateSeriesInt32,
3934 Kind::GenerateSeriesInt64(()) => TableFunc::GenerateSeriesInt64,
3935 Kind::GenerateSeriesTimestamp(()) => TableFunc::GenerateSeriesTimestamp,
3936 Kind::GenerateSeriesTimestampTz(()) => TableFunc::GenerateSeriesTimestampTz,
3937 Kind::GuardSubquerySize(x) => TableFunc::GuardSubquerySize {
3938 column_type: x.into_rust()?,
3939 },
3940 Kind::Repeat(()) => TableFunc::Repeat,
3941 Kind::UnnestArray(x) => TableFunc::UnnestArray {
3942 el_typ: x.into_rust()?,
3943 },
3944 Kind::UnnestList(x) => TableFunc::UnnestList {
3945 el_typ: x.into_rust()?,
3946 },
3947 Kind::UnnestMap(value_type) => TableFunc::UnnestMap {
3948 value_type: value_type.into_rust()?,
3949 },
3950 Kind::Wrap(x) => TableFunc::Wrap {
3951 width: x.width.into_rust()?,
3952 types: x.types.into_rust()?,
3953 },
3954 Kind::GenerateSubscriptsArray(()) => TableFunc::GenerateSubscriptsArray,
3955 Kind::TabletizedScalar(v) => TableFunc::TabletizedScalar {
3956 name: v.name,
3957 relation: v
3958 .relation
3959 .into_rust_if_some("ProtoTabletizedScalar::relation")?,
3960 },
3961 Kind::RegexpMatches(_) => TableFunc::RegexpMatches,
3962 Kind::WithOrdinality(inner) => TableFunc::WithOrdinality(WithOrdinality {
3963 inner: Box::new(
3964 inner
3965 .inner
3966 .map(|inner| *inner)
3967 .into_rust_if_some("ProtoWithOrdinality::inner")?,
3968 ),
3969 }),
3970 })
3971 }
3972}
3973
3974impl TableFunc {
3975 pub fn eval<'a>(
3977 &'a self,
3978 datums: &'a [Datum<'a>],
3979 temp_storage: &'a RowArena,
3980 ) -> Result<Box<dyn Iterator<Item = (Row, Diff)> + 'a>, EvalError> {
3981 if self.empty_on_null_input() && datums.iter().any(|d| d.is_null()) {
3982 return Ok(Box::new(vec![].into_iter()));
3983 }
3984 match self {
3985 TableFunc::AclExplode => Ok(Box::new(acl_explode(datums[0], temp_storage)?)),
3986 TableFunc::MzAclExplode => Ok(Box::new(mz_acl_explode(datums[0], temp_storage)?)),
3987 TableFunc::JsonbEach { stringify } => {
3988 Ok(Box::new(jsonb_each(datums[0], temp_storage, *stringify)))
3989 }
3990 TableFunc::JsonbObjectKeys => Ok(Box::new(jsonb_object_keys(datums[0]))),
3991 TableFunc::JsonbArrayElements { stringify } => Ok(Box::new(jsonb_array_elements(
3992 datums[0],
3993 temp_storage,
3994 *stringify,
3995 ))),
3996 TableFunc::RegexpExtract(a) => Ok(Box::new(regexp_extract(datums[0], a).into_iter())),
3997 TableFunc::CsvExtract(n_cols) => Ok(Box::new(csv_extract(datums[0], *n_cols))),
3998 TableFunc::GenerateSeriesInt32 => {
3999 let res = generate_series(
4000 datums[0].unwrap_int32(),
4001 datums[1].unwrap_int32(),
4002 datums[2].unwrap_int32(),
4003 )?;
4004 Ok(Box::new(res))
4005 }
4006 TableFunc::GenerateSeriesInt64 => {
4007 let res = generate_series(
4008 datums[0].unwrap_int64(),
4009 datums[1].unwrap_int64(),
4010 datums[2].unwrap_int64(),
4011 )?;
4012 Ok(Box::new(res))
4013 }
4014 TableFunc::GenerateSeriesTimestamp => {
4015 fn pass_through<'a>(d: CheckedTimestamp<NaiveDateTime>) -> Datum<'a> {
4016 Datum::from(d)
4017 }
4018 let res = generate_series_ts(
4019 datums[0].unwrap_timestamp(),
4020 datums[1].unwrap_timestamp(),
4021 datums[2].unwrap_interval(),
4022 pass_through,
4023 )?;
4024 Ok(Box::new(res))
4025 }
4026 TableFunc::GenerateSeriesTimestampTz => {
4027 fn gen_ts_tz<'a>(d: CheckedTimestamp<DateTime<Utc>>) -> Datum<'a> {
4028 Datum::from(d)
4029 }
4030 let res = generate_series_ts(
4031 datums[0].unwrap_timestamptz(),
4032 datums[1].unwrap_timestamptz(),
4033 datums[2].unwrap_interval(),
4034 gen_ts_tz,
4035 )?;
4036 Ok(Box::new(res))
4037 }
4038 TableFunc::GenerateSubscriptsArray => {
4039 generate_subscripts_array(datums[0], datums[1].unwrap_int32())
4040 }
4041 TableFunc::GuardSubquerySize { column_type: _ } => {
4042 let count = datums[0].unwrap_int64();
4045 if count != 1 {
4046 Err(EvalError::MultipleRowsFromSubquery)
4047 } else {
4048 Ok(Box::new([].into_iter()))
4049 }
4050 }
4051 TableFunc::Repeat => Ok(Box::new(repeat(datums[0]).into_iter())),
4052 TableFunc::UnnestArray { .. } => Ok(Box::new(unnest_array(datums[0]))),
4053 TableFunc::UnnestList { .. } => Ok(Box::new(unnest_list(datums[0]))),
4054 TableFunc::UnnestMap { .. } => Ok(Box::new(unnest_map(datums[0]))),
4055 TableFunc::Wrap { width, .. } => Ok(Box::new(wrap(datums, *width))),
4056 TableFunc::TabletizedScalar { .. } => {
4057 let r = Row::pack_slice(datums);
4058 Ok(Box::new(std::iter::once((r, Diff::ONE))))
4059 }
4060 TableFunc::RegexpMatches => Ok(Box::new(regexp_matches(datums)?)),
4061 TableFunc::WithOrdinality(func_with_ordinality) => {
4062 func_with_ordinality.eval(datums, temp_storage)
4063 }
4064 }
4065 }
4066
4067 pub fn output_type(&self) -> RelationType {
4068 let (column_types, keys) = match self {
4069 TableFunc::AclExplode => {
4070 let column_types = vec![
4071 ScalarType::Oid.nullable(false),
4072 ScalarType::Oid.nullable(false),
4073 ScalarType::String.nullable(false),
4074 ScalarType::Bool.nullable(false),
4075 ];
4076 let keys = vec![];
4077 (column_types, keys)
4078 }
4079 TableFunc::MzAclExplode => {
4080 let column_types = vec![
4081 ScalarType::String.nullable(false),
4082 ScalarType::String.nullable(false),
4083 ScalarType::String.nullable(false),
4084 ScalarType::Bool.nullable(false),
4085 ];
4086 let keys = vec![];
4087 (column_types, keys)
4088 }
4089 TableFunc::JsonbEach { stringify: true } => {
4090 let column_types = vec![
4091 ScalarType::String.nullable(false),
4092 ScalarType::String.nullable(true),
4093 ];
4094 let keys = vec![];
4095 (column_types, keys)
4096 }
4097 TableFunc::JsonbEach { stringify: false } => {
4098 let column_types = vec![
4099 ScalarType::String.nullable(false),
4100 ScalarType::Jsonb.nullable(false),
4101 ];
4102 let keys = vec![];
4103 (column_types, keys)
4104 }
4105 TableFunc::JsonbObjectKeys => {
4106 let column_types = vec![ScalarType::String.nullable(false)];
4107 let keys = vec![];
4108 (column_types, keys)
4109 }
4110 TableFunc::JsonbArrayElements { stringify: true } => {
4111 let column_types = vec![ScalarType::String.nullable(true)];
4112 let keys = vec![];
4113 (column_types, keys)
4114 }
4115 TableFunc::JsonbArrayElements { stringify: false } => {
4116 let column_types = vec![ScalarType::Jsonb.nullable(false)];
4117 let keys = vec![];
4118 (column_types, keys)
4119 }
4120 TableFunc::RegexpExtract(a) => {
4121 let column_types = a
4122 .capture_groups_iter()
4123 .map(|cg| ScalarType::String.nullable(cg.nullable))
4124 .collect();
4125 let keys = vec![];
4126 (column_types, keys)
4127 }
4128 TableFunc::CsvExtract(n_cols) => {
4129 let column_types = iter::repeat(ScalarType::String.nullable(false))
4130 .take(*n_cols)
4131 .collect();
4132 let keys = vec![];
4133 (column_types, keys)
4134 }
4135 TableFunc::GenerateSeriesInt32 => {
4136 let column_types = vec![ScalarType::Int32.nullable(false)];
4137 let keys = vec![vec![0]];
4138 (column_types, keys)
4139 }
4140 TableFunc::GenerateSeriesInt64 => {
4141 let column_types = vec![ScalarType::Int64.nullable(false)];
4142 let keys = vec![vec![0]];
4143 (column_types, keys)
4144 }
4145 TableFunc::GenerateSeriesTimestamp => {
4146 let column_types = vec![ScalarType::Timestamp { precision: None }.nullable(false)];
4147 let keys = vec![vec![0]];
4148 (column_types, keys)
4149 }
4150 TableFunc::GenerateSeriesTimestampTz => {
4151 let column_types =
4152 vec![ScalarType::TimestampTz { precision: None }.nullable(false)];
4153 let keys = vec![vec![0]];
4154 (column_types, keys)
4155 }
4156 TableFunc::GenerateSubscriptsArray => {
4157 let column_types = vec![ScalarType::Int32.nullable(false)];
4158 let keys = vec![vec![0]];
4159 (column_types, keys)
4160 }
4161 TableFunc::GuardSubquerySize { column_type } => {
4162 let column_types = vec![column_type.clone().nullable(false)];
4163 let keys = vec![];
4164 (column_types, keys)
4165 }
4166 TableFunc::Repeat => {
4167 let column_types = vec![];
4168 let keys = vec![];
4169 (column_types, keys)
4170 }
4171 TableFunc::UnnestArray { el_typ } => {
4172 let column_types = vec![el_typ.clone().nullable(true)];
4173 let keys = vec![];
4174 (column_types, keys)
4175 }
4176 TableFunc::UnnestList { el_typ } => {
4177 let column_types = vec![el_typ.clone().nullable(true)];
4178 let keys = vec![];
4179 (column_types, keys)
4180 }
4181 TableFunc::UnnestMap { value_type } => {
4182 let column_types = vec![
4183 ScalarType::String.nullable(false),
4184 value_type.clone().nullable(true),
4185 ];
4186 let keys = vec![vec![0]];
4187 (column_types, keys)
4188 }
4189 TableFunc::Wrap { types, .. } => {
4190 let column_types = types.clone();
4191 let keys = vec![];
4192 (column_types, keys)
4193 }
4194 TableFunc::TabletizedScalar { relation, .. } => {
4195 return relation.clone();
4196 }
4197 TableFunc::RegexpMatches => {
4198 let column_types =
4199 vec![ScalarType::Array(Box::new(ScalarType::String)).nullable(false)];
4200 let keys = vec![];
4201
4202 (column_types, keys)
4203 }
4204 TableFunc::WithOrdinality(WithOrdinality { inner }) => {
4205 let mut typ = inner.output_type();
4206 typ.column_types.push(ScalarType::Int64.nullable(false));
4208 typ.keys.push(vec![typ.column_types.len() - 1]);
4210 (typ.column_types, typ.keys)
4211 }
4212 };
4213
4214 soft_assert_eq_no_log!(column_types.len(), self.output_arity());
4215
4216 if !keys.is_empty() {
4217 RelationType::new(column_types).with_keys(keys)
4218 } else {
4219 RelationType::new(column_types)
4220 }
4221 }
4222
4223 pub fn output_arity(&self) -> usize {
4224 match self {
4225 TableFunc::AclExplode => 4,
4226 TableFunc::MzAclExplode => 4,
4227 TableFunc::JsonbEach { .. } => 2,
4228 TableFunc::JsonbObjectKeys => 1,
4229 TableFunc::JsonbArrayElements { .. } => 1,
4230 TableFunc::RegexpExtract(a) => a.capture_groups_len(),
4231 TableFunc::CsvExtract(n_cols) => *n_cols,
4232 TableFunc::GenerateSeriesInt32 => 1,
4233 TableFunc::GenerateSeriesInt64 => 1,
4234 TableFunc::GenerateSeriesTimestamp => 1,
4235 TableFunc::GenerateSeriesTimestampTz => 1,
4236 TableFunc::GenerateSubscriptsArray => 1,
4237 TableFunc::GuardSubquerySize { .. } => 1,
4238 TableFunc::Repeat => 0,
4239 TableFunc::UnnestArray { .. } => 1,
4240 TableFunc::UnnestList { .. } => 1,
4241 TableFunc::UnnestMap { .. } => 2,
4242 TableFunc::Wrap { width, .. } => *width,
4243 TableFunc::TabletizedScalar { relation, .. } => relation.column_types.len(),
4244 TableFunc::RegexpMatches => 1,
4245 TableFunc::WithOrdinality(WithOrdinality { inner }) => inner.output_arity() + 1,
4246 }
4247 }
4248
4249 pub fn empty_on_null_input(&self) -> bool {
4250 match self {
4251 TableFunc::AclExplode
4252 | TableFunc::MzAclExplode
4253 | TableFunc::JsonbEach { .. }
4254 | TableFunc::JsonbObjectKeys
4255 | TableFunc::JsonbArrayElements { .. }
4256 | TableFunc::GenerateSeriesInt32
4257 | TableFunc::GenerateSeriesInt64
4258 | TableFunc::GenerateSeriesTimestamp
4259 | TableFunc::GenerateSeriesTimestampTz
4260 | TableFunc::GenerateSubscriptsArray
4261 | TableFunc::RegexpExtract(_)
4262 | TableFunc::CsvExtract(_)
4263 | TableFunc::Repeat
4264 | TableFunc::UnnestArray { .. }
4265 | TableFunc::UnnestList { .. }
4266 | TableFunc::UnnestMap { .. }
4267 | TableFunc::RegexpMatches => true,
4268 TableFunc::GuardSubquerySize { .. } => false,
4269 TableFunc::Wrap { .. } => false,
4270 TableFunc::TabletizedScalar { .. } => false,
4271 TableFunc::WithOrdinality(WithOrdinality { inner }) => inner.empty_on_null_input(),
4272 }
4273 }
4274
4275 pub fn preserves_monotonicity(&self) -> bool {
4277 match self {
4280 TableFunc::AclExplode => false,
4281 TableFunc::MzAclExplode => false,
4282 TableFunc::JsonbEach { .. } => true,
4283 TableFunc::JsonbObjectKeys => true,
4284 TableFunc::JsonbArrayElements { .. } => true,
4285 TableFunc::RegexpExtract(_) => true,
4286 TableFunc::CsvExtract(_) => true,
4287 TableFunc::GenerateSeriesInt32 => true,
4288 TableFunc::GenerateSeriesInt64 => true,
4289 TableFunc::GenerateSeriesTimestamp => true,
4290 TableFunc::GenerateSeriesTimestampTz => true,
4291 TableFunc::GenerateSubscriptsArray => true,
4292 TableFunc::Repeat => false,
4293 TableFunc::UnnestArray { .. } => true,
4294 TableFunc::UnnestList { .. } => true,
4295 TableFunc::UnnestMap { .. } => true,
4296 TableFunc::Wrap { .. } => true,
4297 TableFunc::TabletizedScalar { .. } => true,
4298 TableFunc::RegexpMatches => true,
4299 TableFunc::GuardSubquerySize { .. } => false,
4300 TableFunc::WithOrdinality(WithOrdinality { inner }) => inner.preserves_monotonicity(),
4301 }
4302 }
4303}
4304
4305impl fmt::Display for TableFunc {
4306 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
4307 match self {
4308 TableFunc::AclExplode => f.write_str("aclexplode"),
4309 TableFunc::MzAclExplode => f.write_str("mz_aclexplode"),
4310 TableFunc::JsonbEach { .. } => f.write_str("jsonb_each"),
4311 TableFunc::JsonbObjectKeys => f.write_str("jsonb_object_keys"),
4312 TableFunc::JsonbArrayElements { .. } => f.write_str("jsonb_array_elements"),
4313 TableFunc::RegexpExtract(a) => write!(f, "regexp_extract({:?}, _)", a.0),
4314 TableFunc::CsvExtract(n_cols) => write!(f, "csv_extract({}, _)", n_cols),
4315 TableFunc::GenerateSeriesInt32 => f.write_str("generate_series"),
4316 TableFunc::GenerateSeriesInt64 => f.write_str("generate_series"),
4317 TableFunc::GenerateSeriesTimestamp => f.write_str("generate_series"),
4318 TableFunc::GenerateSeriesTimestampTz => f.write_str("generate_series"),
4319 TableFunc::GenerateSubscriptsArray => f.write_str("generate_subscripts"),
4320 TableFunc::GuardSubquerySize { .. } => f.write_str("guard_subquery_size"),
4321 TableFunc::Repeat => f.write_str("repeat_row"),
4322 TableFunc::UnnestArray { .. } => f.write_str("unnest_array"),
4323 TableFunc::UnnestList { .. } => f.write_str("unnest_list"),
4324 TableFunc::UnnestMap { .. } => f.write_str("unnest_map"),
4325 TableFunc::Wrap { width, .. } => write!(f, "wrap{}", width),
4326 TableFunc::TabletizedScalar { name, .. } => f.write_str(name),
4327 TableFunc::RegexpMatches => write!(f, "regexp_matches(_, _, _)"),
4328 TableFunc::WithOrdinality(WithOrdinality { inner }) => {
4329 write!(f, "{}[with_ordinality]", inner)
4330 }
4331 }
4332 }
4333}
4334
4335impl WithOrdinality {
4336 fn eval<'a>(
4345 &'a self,
4346 datums: &'a [Datum<'a>],
4347 temp_storage: &'a RowArena,
4348 ) -> Result<Box<dyn Iterator<Item = (Row, Diff)> + 'a>, EvalError> {
4349 let mut next_ordinal: i64 = 1;
4350 let it = self
4351 .inner
4352 .eval(datums, temp_storage)?
4353 .flat_map(move |(mut row, diff)| {
4354 let diff = diff.into_inner();
4355 assert!(diff >= 0);
4362 let mut ordinals = next_ordinal..(next_ordinal + diff);
4364 next_ordinal += diff;
4365 let cap = row.data_len() + datum_size(&Datum::Int64(next_ordinal));
4367 iter::from_fn(move || {
4368 let ordinal = ordinals.next()?;
4369 let mut row = if ordinals.is_empty() {
4370 std::mem::take(&mut row)
4373 } else {
4374 let mut new_row = Row::with_capacity(cap);
4375 new_row.clone_from(&row);
4376 new_row
4377 };
4378 RowPacker::for_existing_row(&mut row).push(Datum::Int64(ordinal));
4379 Some((row, Diff::ONE))
4380 })
4381 });
4382 Ok(Box::new(it))
4383 }
4384}
4385
4386#[cfg(test)]
4387mod tests {
4388 use super::{AggregateFunc, ProtoAggregateFunc, ProtoTableFunc, TableFunc};
4389 use mz_ore::assert_ok;
4390 use mz_proto::protobuf_roundtrip;
4391 use proptest::prelude::*;
4392
4393 proptest! {
4394 #[mz_ore::test]
4395 #[cfg_attr(miri, ignore)] fn aggregate_func_protobuf_roundtrip(expect in any::<AggregateFunc>() ) {
4397 let actual = protobuf_roundtrip::<_, ProtoAggregateFunc>(&expect);
4398 assert_ok!(actual);
4399 assert_eq!(actual.unwrap(), expect);
4400 }
4401 }
4402
4403 proptest! {
4404 #[mz_ore::test]
4405 #[cfg_attr(miri, ignore)] fn table_func_protobuf_roundtrip(expect in any::<TableFunc>() ) {
4407 let actual = protobuf_roundtrip::<_, ProtoTableFunc>(&expect);
4408 assert_ok!(actual);
4409 assert_eq!(actual.unwrap(), expect);
4410 }
4411 }
4412}