1#![allow(missing_docs)]
11
12use std::cmp::{max, min};
13use std::iter::Sum;
14use std::ops::Deref;
15use std::str::FromStr;
16use std::{fmt, iter};
17
18use chrono::{DateTime, NaiveDateTime, NaiveTime, Utc};
19use dec::OrderedDecimal;
20use itertools::{Either, Itertools};
21use mz_lowertest::MzReflect;
22use mz_ore::cast::CastFrom;
23
24use mz_ore::soft_assert_or_log;
25use mz_ore::str::separated;
26use mz_proto::{IntoRustIfSome, ProtoType, RustType, TryFromProtoError};
27use mz_repr::adt::array::ArrayDimension;
28use mz_repr::adt::date::Date;
29use mz_repr::adt::interval::Interval;
30use mz_repr::adt::numeric::{self, Numeric, NumericMaxScale};
31use mz_repr::adt::regex::Regex as ReprRegex;
32use mz_repr::adt::timestamp::{CheckedTimestamp, TimestampLike};
33use mz_repr::{
34 ColumnName, ColumnType, Datum, Diff, RelationType, Row, RowArena, ScalarType, SharedRow,
35};
36use num::{CheckedAdd, Integer, Signed, ToPrimitive};
37use ordered_float::OrderedFloat;
38use proptest::prelude::{Arbitrary, Just};
39use proptest::strategy::{BoxedStrategy, Strategy, Union};
40use proptest_derive::Arbitrary;
41use regex::Regex;
42use serde::{Deserialize, Serialize};
43use smallvec::SmallVec;
44
45use crate::EvalError;
46use crate::WindowFrameBound::{
47 CurrentRow, OffsetFollowing, OffsetPreceding, UnboundedFollowing, UnboundedPreceding,
48};
49use crate::WindowFrameUnits::{Groups, Range, Rows};
50use crate::explain::{HumanizedExpr, HumanizerMode};
51use crate::relation::proto_aggregate_func::{
52 self, ProtoColumnOrders, ProtoFusedValueWindowFunc, ProtoFusedWindowAggregate,
53};
54use crate::relation::proto_table_func::ProtoTabletizedScalar;
55use crate::relation::{
56 ColumnOrder, ProtoAggregateFunc, ProtoTableFunc, WindowFrame, WindowFrameBound,
57 WindowFrameUnits, compare_columns, proto_table_func,
58};
59use crate::scalar::func::{add_timestamp_months, jsonb_stringify};
60
61include!(concat!(env!("OUT_DIR"), "/mz_expr.relation.func.rs"));
62
63fn max_string<'a, I>(datums: I) -> Datum<'a>
67where
68 I: IntoIterator<Item = Datum<'a>>,
69{
70 match datums
71 .into_iter()
72 .filter(|d| !d.is_null())
73 .max_by(|a, b| a.unwrap_str().cmp(b.unwrap_str()))
74 {
75 Some(datum) => datum,
76 None => Datum::Null,
77 }
78}
79
80fn max_datum<'a, I, DatumType>(datums: I) -> Datum<'a>
81where
82 I: IntoIterator<Item = Datum<'a>>,
83 DatumType: TryFrom<Datum<'a>> + Ord,
84 <DatumType as TryFrom<Datum<'a>>>::Error: std::fmt::Debug,
85 Datum<'a>: From<Option<DatumType>>,
86{
87 let x: Option<DatumType> = datums
88 .into_iter()
89 .filter(|d| !d.is_null())
90 .map(|d| DatumType::try_from(d).expect("unexpected type"))
91 .max();
92
93 x.into()
94}
95
96fn min_datum<'a, I, DatumType>(datums: I) -> Datum<'a>
97where
98 I: IntoIterator<Item = Datum<'a>>,
99 DatumType: TryFrom<Datum<'a>> + Ord,
100 <DatumType as TryFrom<Datum<'a>>>::Error: std::fmt::Debug,
101 Datum<'a>: From<Option<DatumType>>,
102{
103 let x: Option<DatumType> = datums
104 .into_iter()
105 .filter(|d| !d.is_null())
106 .map(|d| DatumType::try_from(d).expect("unexpected type"))
107 .min();
108
109 x.into()
110}
111
112fn min_string<'a, I>(datums: I) -> Datum<'a>
113where
114 I: IntoIterator<Item = Datum<'a>>,
115{
116 match datums
117 .into_iter()
118 .filter(|d| !d.is_null())
119 .min_by(|a, b| a.unwrap_str().cmp(b.unwrap_str()))
120 {
121 Some(datum) => datum,
122 None => Datum::Null,
123 }
124}
125
126fn sum_datum<'a, I, DatumType, ResultType>(datums: I) -> Datum<'a>
127where
128 I: IntoIterator<Item = Datum<'a>>,
129 DatumType: TryFrom<Datum<'a>>,
130 <DatumType as TryFrom<Datum<'a>>>::Error: std::fmt::Debug,
131 ResultType: From<DatumType> + Sum + Into<Datum<'a>>,
132{
133 let mut datums = datums.into_iter().filter(|d| !d.is_null()).peekable();
134 if datums.peek().is_none() {
135 Datum::Null
136 } else {
137 let x = datums
138 .map(|d| ResultType::from(DatumType::try_from(d).expect("unexpected type")))
139 .sum::<ResultType>();
140 x.into()
141 }
142}
143
144fn sum_numeric<'a, I>(datums: I) -> Datum<'a>
145where
146 I: IntoIterator<Item = Datum<'a>>,
147{
148 let mut cx = numeric::cx_datum();
149 let mut sum = Numeric::zero();
150 let mut empty = true;
151 for d in datums {
152 if !d.is_null() {
153 empty = false;
154 cx.add(&mut sum, &d.unwrap_numeric().0);
155 }
156 }
157 match empty {
158 true => Datum::Null,
159 false => Datum::from(sum),
160 }
161}
162
163#[allow(clippy::as_conversions)]
165fn count<'a, I>(datums: I) -> Datum<'a>
166where
167 I: IntoIterator<Item = Datum<'a>>,
168{
169 let x: i64 = datums.into_iter().filter(|d| !d.is_null()).count() as i64;
171 Datum::from(x)
172}
173
174fn any<'a, I>(datums: I) -> Datum<'a>
175where
176 I: IntoIterator<Item = Datum<'a>>,
177{
178 datums
179 .into_iter()
180 .fold(Datum::False, |state, next| match (state, next) {
181 (Datum::True, _) | (_, Datum::True) => Datum::True,
182 (Datum::Null, _) | (_, Datum::Null) => Datum::Null,
183 _ => Datum::False,
184 })
185}
186
187fn all<'a, I>(datums: I) -> Datum<'a>
188where
189 I: IntoIterator<Item = Datum<'a>>,
190{
191 datums
192 .into_iter()
193 .fold(Datum::True, |state, next| match (state, next) {
194 (Datum::False, _) | (_, Datum::False) => Datum::False,
195 (Datum::Null, _) | (_, Datum::Null) => Datum::Null,
196 _ => Datum::True,
197 })
198}
199
200fn string_agg<'a, I>(datums: I, temp_storage: &'a RowArena, order_by: &[ColumnOrder]) -> Datum<'a>
201where
202 I: IntoIterator<Item = Datum<'a>>,
203{
204 const EMPTY_SEP: &str = "";
205
206 let datums = order_aggregate_datums(datums, order_by);
207 let mut sep_value_pairs = datums.into_iter().filter_map(|d| {
208 if d.is_null() {
209 return None;
210 }
211 let mut value_sep = d.unwrap_list().iter();
212 match (value_sep.next().unwrap(), value_sep.next().unwrap()) {
213 (Datum::Null, _) => None,
214 (Datum::String(val), Datum::Null) => Some((EMPTY_SEP, val)),
215 (Datum::String(val), Datum::String(sep)) => Some((sep, val)),
216 _ => unreachable!(),
217 }
218 });
219
220 let mut s = String::default();
221 match sep_value_pairs.next() {
222 Some((_, value)) => s.push_str(value),
224 None => return Datum::Null,
226 }
227
228 for (sep, value) in sep_value_pairs {
229 s.push_str(sep);
230 s.push_str(value);
231 }
232
233 Datum::String(temp_storage.push_string(s))
234}
235
236fn jsonb_agg<'a, I>(datums: I, temp_storage: &'a RowArena, order_by: &[ColumnOrder]) -> Datum<'a>
237where
238 I: IntoIterator<Item = Datum<'a>>,
239{
240 let datums = order_aggregate_datums(datums, order_by);
241 temp_storage.make_datum(|packer| {
242 packer.push_list(datums.into_iter().filter(|d| !d.is_null()));
243 })
244}
245
246fn dict_agg<'a, I>(datums: I, temp_storage: &'a RowArena, order_by: &[ColumnOrder]) -> Datum<'a>
247where
248 I: IntoIterator<Item = Datum<'a>>,
249{
250 let datums = order_aggregate_datums(datums, order_by);
251 temp_storage.make_datum(|packer| {
252 let mut datums: Vec<_> = datums
253 .into_iter()
254 .filter_map(|d| {
255 if d.is_null() {
256 return None;
257 }
258 let mut list = d.unwrap_list().iter();
259 let key = list.next().unwrap();
260 let val = list.next().unwrap();
261 if key.is_null() {
262 None
265 } else {
266 Some((key.unwrap_str(), val))
267 }
268 })
269 .collect();
270 datums.sort_by_key(|(k, _v)| *k);
276 datums.reverse();
277 datums.dedup_by_key(|(k, _v)| *k);
278 datums.reverse();
279 packer.push_dict(datums);
280 })
281}
282
283pub fn order_aggregate_datums<'a: 'b, 'b, I>(
293 datums: I,
294 order_by: &[ColumnOrder],
295) -> impl Iterator<Item = Datum<'b>>
296where
297 I: IntoIterator<Item = Datum<'a>>,
298{
299 order_aggregate_datums_with_rank_inner(datums, order_by)
300 .into_iter()
301 .map(|(payload, _order_datums)| payload)
303}
304
305fn order_aggregate_datums_with_rank<'a, I>(
308 datums: I,
309 order_by: &[ColumnOrder],
310) -> impl Iterator<Item = (Datum<'a>, Row)>
311where
312 I: IntoIterator<Item = Datum<'a>>,
313{
314 order_aggregate_datums_with_rank_inner(datums, order_by)
315 .into_iter()
316 .map(|(payload, order_by_datums)| (payload, Row::pack(order_by_datums)))
317}
318
319fn order_aggregate_datums_with_rank_inner<'a, I>(
320 datums: I,
321 order_by: &[ColumnOrder],
322) -> Vec<(Datum<'a>, Vec<Datum<'a>>)>
323where
324 I: IntoIterator<Item = Datum<'a>>,
325{
326 let mut decoded: Vec<(Datum, Vec<Datum>)> = datums
327 .into_iter()
328 .map(|d| {
329 let list = d.unwrap_list();
330 let mut list_it = list.iter();
331 let payload = list_it.next().unwrap();
332
333 let mut order_by_datums = Vec::with_capacity(order_by.len());
343 for _ in 0..order_by.len() {
344 order_by_datums.push(
345 list_it
346 .next()
347 .expect("must have exactly the same number of Datums as `order_by`"),
348 );
349 }
350
351 (payload, order_by_datums)
352 })
353 .collect();
354
355 let mut sort_by =
356 |(payload_left, left_order_by_datums): &(Datum, Vec<Datum>),
357 (payload_right, right_order_by_datums): &(Datum, Vec<Datum>)| {
358 compare_columns(
359 order_by,
360 left_order_by_datums,
361 right_order_by_datums,
362 || payload_left.cmp(payload_right),
363 )
364 };
365 decoded.sort_unstable_by(&mut sort_by);
370 decoded
371}
372
373fn array_concat<'a, I>(datums: I, temp_storage: &'a RowArena, order_by: &[ColumnOrder]) -> Datum<'a>
374where
375 I: IntoIterator<Item = Datum<'a>>,
376{
377 let datums = order_aggregate_datums(datums, order_by);
378 let datums: Vec<_> = datums
379 .into_iter()
380 .map(|d| d.unwrap_array().elements().iter())
381 .flatten()
382 .collect();
383 let dims = ArrayDimension {
384 lower_bound: 1,
385 length: datums.len(),
386 };
387 temp_storage.make_datum(|packer| {
388 packer.try_push_array(&[dims], datums).unwrap();
389 })
390}
391
392fn list_concat<'a, I>(datums: I, temp_storage: &'a RowArena, order_by: &[ColumnOrder]) -> Datum<'a>
393where
394 I: IntoIterator<Item = Datum<'a>>,
395{
396 let datums = order_aggregate_datums(datums, order_by);
397 temp_storage.make_datum(|packer| {
398 packer.push_list(datums.into_iter().map(|d| d.unwrap_list().iter()).flatten());
399 })
400}
401
402fn row_number<'a, I>(
406 datums: I,
407 callers_temp_storage: &'a RowArena,
408 order_by: &[ColumnOrder],
409) -> Datum<'a>
410where
411 I: IntoIterator<Item = Datum<'a>>,
412{
413 let temp_storage = RowArena::new();
417 let datums = row_number_no_list(datums, &temp_storage, order_by);
418
419 callers_temp_storage.make_datum(|packer| {
420 packer.push_list(datums);
421 })
422}
423
424fn row_number_no_list<'a: 'b, 'b, I>(
427 datums: I,
428 callers_temp_storage: &'b RowArena,
429 order_by: &[ColumnOrder],
430) -> impl Iterator<Item = Datum<'b>>
431where
432 I: IntoIterator<Item = Datum<'a>>,
433{
434 let datums = order_aggregate_datums(datums, order_by);
435
436 callers_temp_storage.reserve(datums.size_hint().0);
437 datums
438 .into_iter()
439 .map(|d| d.unwrap_list().iter())
440 .flatten()
441 .zip(1i64..)
442 .map(|(d, i)| {
443 callers_temp_storage.make_datum(|packer| {
444 packer.push_list_with(|packer| {
445 packer.push(Datum::Int64(i));
446 packer.push(d);
447 });
448 })
449 })
450}
451
452fn rank<'a, I>(datums: I, callers_temp_storage: &'a RowArena, order_by: &[ColumnOrder]) -> Datum<'a>
456where
457 I: IntoIterator<Item = Datum<'a>>,
458{
459 let temp_storage = RowArena::new();
460 let datums = rank_no_list(datums, &temp_storage, order_by);
461
462 callers_temp_storage.make_datum(|packer| {
463 packer.push_list(datums);
464 })
465}
466
467fn rank_no_list<'a: 'b, 'b, I>(
470 datums: I,
471 callers_temp_storage: &'b RowArena,
472 order_by: &[ColumnOrder],
473) -> impl Iterator<Item = Datum<'b>>
474where
475 I: IntoIterator<Item = Datum<'a>>,
476{
477 let datums = order_aggregate_datums_with_rank(datums, order_by);
479
480 let mut datums = datums
481 .into_iter()
482 .map(|(d0, order_row)| {
483 d0.unwrap_list()
484 .iter()
485 .map(move |d1| (d1, order_row.clone()))
486 })
487 .flatten();
488
489 callers_temp_storage.reserve(datums.size_hint().0);
490 datums
491 .next()
492 .map_or(vec![], |(first_datum, first_order_row)| {
493 datums.fold((first_order_row, 1, 1, vec![(first_datum, 1)]), |mut acc, (next_datum, next_order_row)| {
495 let (ref mut acc_row, ref mut acc_rank, ref mut acc_row_num, ref mut output) = acc;
496 *acc_row_num += 1;
497 if *acc_row != next_order_row {
499 *acc_rank = *acc_row_num;
500 *acc_row = next_order_row;
501 }
502
503 (*output).push((next_datum, *acc_rank));
504 acc
505 })
506 }.3).into_iter().map(|(d, i)| {
507 callers_temp_storage.make_datum(|packer| {
508 packer.push_list_with(|packer| {
509 packer.push(Datum::Int64(i));
510 packer.push(d);
511 });
512 })
513 })
514}
515
516fn dense_rank<'a, I>(
520 datums: I,
521 callers_temp_storage: &'a RowArena,
522 order_by: &[ColumnOrder],
523) -> Datum<'a>
524where
525 I: IntoIterator<Item = Datum<'a>>,
526{
527 let temp_storage = RowArena::new();
528 let datums = dense_rank_no_list(datums, &temp_storage, order_by);
529
530 callers_temp_storage.make_datum(|packer| {
531 packer.push_list(datums);
532 })
533}
534
535fn dense_rank_no_list<'a: 'b, 'b, I>(
538 datums: I,
539 callers_temp_storage: &'b RowArena,
540 order_by: &[ColumnOrder],
541) -> impl Iterator<Item = Datum<'b>>
542where
543 I: IntoIterator<Item = Datum<'a>>,
544{
545 let datums = order_aggregate_datums_with_rank(datums, order_by);
547
548 let mut datums = datums
549 .into_iter()
550 .map(|(d0, order_row)| {
551 d0.unwrap_list()
552 .iter()
553 .map(move |d1| (d1, order_row.clone()))
554 })
555 .flatten();
556
557 callers_temp_storage.reserve(datums.size_hint().0);
558 datums
559 .next()
560 .map_or(vec![], |(first_datum, first_order_row)| {
561 datums.fold((first_order_row, 1, vec![(first_datum, 1)]), |mut acc, (next_datum, next_order_row)| {
563 let (ref mut acc_row, ref mut acc_rank, ref mut output) = acc;
564 if *acc_row != next_order_row {
566 *acc_rank += 1;
567 *acc_row = next_order_row;
568 }
569
570 (*output).push((next_datum, *acc_rank));
571 acc
572 })
573 }.2).into_iter().map(|(d, i)| {
574 callers_temp_storage.make_datum(|packer| {
575 packer.push_list_with(|packer| {
576 packer.push(Datum::Int64(i));
577 packer.push(d);
578 });
579 })
580 })
581}
582
583fn lag_lead<'a, I>(
605 datums: I,
606 callers_temp_storage: &'a RowArena,
607 order_by: &[ColumnOrder],
608 lag_lead_type: &LagLeadType,
609 ignore_nulls: &bool,
610) -> Datum<'a>
611where
612 I: IntoIterator<Item = Datum<'a>>,
613{
614 let temp_storage = RowArena::new();
615 let iter = lag_lead_no_list(datums, &temp_storage, order_by, lag_lead_type, ignore_nulls);
616 callers_temp_storage.make_datum(|packer| {
617 packer.push_list(iter);
618 })
619}
620
621fn lag_lead_no_list<'a: 'b, 'b, I>(
624 datums: I,
625 callers_temp_storage: &'b RowArena,
626 order_by: &[ColumnOrder],
627 lag_lead_type: &LagLeadType,
628 ignore_nulls: &bool,
629) -> impl Iterator<Item = Datum<'b>>
630where
631 I: IntoIterator<Item = Datum<'a>>,
632{
633 let datums = order_aggregate_datums(datums, order_by);
635
636 let (orig_rows, unwrapped_args): (Vec<_>, Vec<_>) = datums
640 .into_iter()
641 .map(|d| {
642 let mut iter = d.unwrap_list().iter();
643 let original_row = iter.next().unwrap();
644 let (input_value, offset, default_value) =
645 unwrap_lag_lead_encoded_args(iter.next().unwrap());
646 (original_row, (input_value, offset, default_value))
647 })
648 .unzip();
649
650 let result = lag_lead_inner(unwrapped_args, lag_lead_type, ignore_nulls);
651
652 callers_temp_storage.reserve(result.len());
653 result
654 .into_iter()
655 .zip_eq(orig_rows)
656 .map(|(result_value, original_row)| {
657 callers_temp_storage.make_datum(|packer| {
658 packer.push_list_with(|packer| {
659 packer.push(result_value);
660 packer.push(original_row);
661 });
662 })
663 })
664}
665
666fn unwrap_lag_lead_encoded_args(encoded_args: Datum) -> (Datum, Datum, Datum) {
668 let mut encoded_args_iter = encoded_args.unwrap_list().iter();
669 let (input_value, offset, default_value) = (
670 encoded_args_iter.next().unwrap(),
671 encoded_args_iter.next().unwrap(),
672 encoded_args_iter.next().unwrap(),
673 );
674 (input_value, offset, default_value)
675}
676
677fn lag_lead_inner<'a>(
680 args: Vec<(Datum<'a>, Datum<'a>, Datum<'a>)>,
681 lag_lead_type: &LagLeadType,
682 ignore_nulls: &bool,
683) -> Vec<Datum<'a>> {
684 if *ignore_nulls {
685 lag_lead_inner_ignore_nulls(args, lag_lead_type)
686 } else {
687 lag_lead_inner_respect_nulls(args, lag_lead_type)
688 }
689}
690
691fn lag_lead_inner_respect_nulls<'a>(
692 args: Vec<(Datum<'a>, Datum<'a>, Datum<'a>)>,
693 lag_lead_type: &LagLeadType,
694) -> Vec<Datum<'a>> {
695 let mut result: Vec<Datum> = Vec::with_capacity(args.len());
696 for (idx, (_, offset, default_value)) in args.iter().enumerate() {
697 if offset.is_null() {
699 result.push(Datum::Null);
700 continue;
701 }
702
703 let idx = i64::try_from(idx).expect("Array index does not fit in i64");
704 let offset = i64::from(offset.unwrap_int32());
705 let offset = match lag_lead_type {
706 LagLeadType::Lag => -offset,
707 LagLeadType::Lead => offset,
708 };
709
710 let datums_get = |i: i64| -> Option<Datum> {
712 match u64::try_from(i) {
713 Ok(i) => args
714 .get(usize::cast_from(i))
715 .map(|d| Some(d.0)) .unwrap_or(None), Err(_) => None, }
719 };
720
721 let lagged_value = datums_get(idx + offset).unwrap_or(*default_value);
722
723 result.push(lagged_value);
724 }
725
726 result
727}
728
729#[allow(clippy::as_conversions)]
733fn lag_lead_inner_ignore_nulls<'a>(
734 args: Vec<(Datum<'a>, Datum<'a>, Datum<'a>)>,
735 lag_lead_type: &LagLeadType,
736) -> Vec<Datum<'a>> {
737 if i64::try_from(args.len()).is_err() {
740 panic!("window partition way too big")
741 }
742 let mut skip_nulls_backward = vec![None; args.len()];
745 let mut last_non_null: i64 = -1;
746 let pairs = args
747 .iter()
748 .enumerate()
749 .zip_eq(skip_nulls_backward.iter_mut());
750 for ((i, (d, _, _)), slot) in pairs {
751 if d.is_null() {
752 *slot = Some(last_non_null);
753 } else {
754 last_non_null = i as i64;
755 }
756 }
757 let mut skip_nulls_forward = vec![None; args.len()];
758 let mut last_non_null: i64 = args.len() as i64;
759 let pairs = args
760 .iter()
761 .enumerate()
762 .rev()
763 .zip_eq(skip_nulls_forward.iter_mut().rev());
764 for ((i, (d, _, _)), slot) in pairs {
765 if d.is_null() {
766 *slot = Some(last_non_null);
767 } else {
768 last_non_null = i as i64;
769 }
770 }
771
772 let mut result: Vec<Datum> = Vec::with_capacity(args.len());
774 for (idx, (_, offset, default_value)) in args.iter().enumerate() {
775 if offset.is_null() {
777 result.push(Datum::Null);
778 continue;
779 }
780
781 let idx = idx as i64; let offset = i64::cast_from(offset.unwrap_int32());
783 let offset = match lag_lead_type {
784 LagLeadType::Lag => -offset,
785 LagLeadType::Lead => offset,
786 };
787 let increment = offset.signum();
788
789 let datums_get = |i: i64| -> Option<Datum> {
791 match u64::try_from(i) {
792 Ok(i) => args
793 .get(usize::cast_from(i))
794 .map(|d| Some(d.0)) .unwrap_or(None), Err(_) => None, }
798 };
799
800 let lagged_value = if increment != 0 {
801 let mut j = idx;
811 for _ in 0..num::abs(offset) {
812 j += increment;
813 if datums_get(j).is_some_and(|d| d.is_null()) {
815 let ju = j as usize; if increment > 0 {
817 j = skip_nulls_forward[ju].expect("checked above that it's null");
818 } else {
819 j = skip_nulls_backward[ju].expect("checked above that it's null");
820 }
821 }
822 if datums_get(j).is_none() {
823 break;
824 }
825 }
826 match datums_get(j) {
827 Some(datum) => datum,
828 None => *default_value,
829 }
830 } else {
831 assert_eq!(offset, 0);
832 let datum = datums_get(idx).expect("known to exist");
833 if !datum.is_null() {
834 datum
835 } else {
836 panic!("0 offset in lag/lead IGNORE NULLS");
842 }
843 };
844
845 result.push(lagged_value);
846 }
847
848 result
849}
850
851fn first_value<'a, I>(
853 datums: I,
854 callers_temp_storage: &'a RowArena,
855 order_by: &[ColumnOrder],
856 window_frame: &WindowFrame,
857) -> Datum<'a>
858where
859 I: IntoIterator<Item = Datum<'a>>,
860{
861 let temp_storage = RowArena::new();
862 let iter = first_value_no_list(datums, &temp_storage, order_by, window_frame);
863 callers_temp_storage.make_datum(|packer| {
864 packer.push_list(iter);
865 })
866}
867
868fn first_value_no_list<'a: 'b, 'b, I>(
871 datums: I,
872 callers_temp_storage: &'b RowArena,
873 order_by: &[ColumnOrder],
874 window_frame: &WindowFrame,
875) -> impl Iterator<Item = Datum<'b>>
876where
877 I: IntoIterator<Item = Datum<'a>>,
878{
879 let datums = order_aggregate_datums(datums, order_by);
881
882 let (orig_rows, args): (Vec<_>, Vec<_>) = datums
884 .into_iter()
885 .map(|d| {
886 let mut iter = d.unwrap_list().iter();
887 let original_row = iter.next().unwrap();
888 let arg = iter.next().unwrap();
889
890 (original_row, arg)
891 })
892 .unzip();
893
894 let results = first_value_inner(args, window_frame);
895
896 callers_temp_storage.reserve(results.len());
897 results
898 .into_iter()
899 .zip_eq(orig_rows)
900 .map(|(result_value, original_row)| {
901 callers_temp_storage.make_datum(|packer| {
902 packer.push_list_with(|packer| {
903 packer.push(result_value);
904 packer.push(original_row);
905 });
906 })
907 })
908}
909
910fn first_value_inner<'a>(datums: Vec<Datum<'a>>, window_frame: &WindowFrame) -> Vec<Datum<'a>> {
911 let length = datums.len();
912 let mut result: Vec<Datum> = Vec::with_capacity(length);
913 for (idx, current_datum) in datums.iter().enumerate() {
914 let first_value = match &window_frame.start_bound {
915 WindowFrameBound::CurrentRow => *current_datum,
917 WindowFrameBound::UnboundedPreceding => {
918 if let WindowFrameBound::OffsetPreceding(end_offset) = &window_frame.end_bound {
919 let end_offset = usize::cast_from(*end_offset);
920
921 if idx < end_offset {
923 Datum::Null
924 } else {
925 datums[0]
926 }
927 } else {
928 datums[0]
929 }
930 }
931 WindowFrameBound::OffsetPreceding(offset) => {
932 let start_offset = usize::cast_from(*offset);
933 let start_idx = idx.saturating_sub(start_offset);
934 if let WindowFrameBound::OffsetPreceding(end_offset) = &window_frame.end_bound {
935 let end_offset = usize::cast_from(*end_offset);
936
937 if start_offset < end_offset || idx < end_offset {
939 Datum::Null
940 } else {
941 datums[start_idx]
942 }
943 } else {
944 datums[start_idx]
945 }
946 }
947 WindowFrameBound::OffsetFollowing(offset) => {
948 let start_offset = usize::cast_from(*offset);
949 let start_idx = idx.saturating_add(start_offset);
950 if let WindowFrameBound::OffsetFollowing(end_offset) = &window_frame.end_bound {
951 if offset > end_offset || start_idx >= length {
953 Datum::Null
954 } else {
955 datums[start_idx]
956 }
957 } else {
958 datums
959 .get(start_idx)
960 .map(|d| d.clone())
961 .unwrap_or(Datum::Null)
962 }
963 }
964 WindowFrameBound::UnboundedFollowing => unreachable!(),
966 };
967 result.push(first_value);
968 }
969 result
970}
971
972fn last_value<'a, I>(
974 datums: I,
975 callers_temp_storage: &'a RowArena,
976 order_by: &[ColumnOrder],
977 window_frame: &WindowFrame,
978) -> Datum<'a>
979where
980 I: IntoIterator<Item = Datum<'a>>,
981{
982 let temp_storage = RowArena::new();
983 let iter = last_value_no_list(datums, &temp_storage, order_by, window_frame);
984 callers_temp_storage.make_datum(|packer| {
985 packer.push_list(iter);
986 })
987}
988
989fn last_value_no_list<'a: 'b, 'b, I>(
992 datums: I,
993 callers_temp_storage: &'b RowArena,
994 order_by: &[ColumnOrder],
995 window_frame: &WindowFrame,
996) -> impl Iterator<Item = Datum<'b>>
997where
998 I: IntoIterator<Item = Datum<'a>>,
999{
1000 let datums = order_aggregate_datums_with_rank(datums, order_by);
1003
1004 let size_hint = datums.size_hint().0;
1006 let mut args = Vec::with_capacity(size_hint);
1007 let mut original_rows = Vec::with_capacity(size_hint);
1008 let mut order_by_rows = Vec::with_capacity(size_hint);
1009 for (d, order_by_row) in datums.into_iter() {
1010 let mut iter = d.unwrap_list().iter();
1011 let original_row = iter.next().unwrap();
1012 let arg = iter.next().unwrap();
1013 order_by_rows.push(order_by_row);
1014 original_rows.push(original_row);
1015 args.push(arg);
1016 }
1017
1018 let results = last_value_inner(args, &order_by_rows, window_frame);
1019
1020 callers_temp_storage.reserve(results.len());
1021 results
1022 .into_iter()
1023 .zip_eq(original_rows)
1024 .map(|(result_value, original_row)| {
1025 callers_temp_storage.make_datum(|packer| {
1026 packer.push_list_with(|packer| {
1027 packer.push(result_value);
1028 packer.push(original_row);
1029 });
1030 })
1031 })
1032}
1033
1034fn last_value_inner<'a>(
1035 args: Vec<Datum<'a>>,
1036 order_by_rows: &Vec<Row>,
1037 window_frame: &WindowFrame,
1038) -> Vec<Datum<'a>> {
1039 let length = args.len();
1040 let mut results: Vec<Datum> = Vec::with_capacity(length);
1041 for (idx, (current_datum, order_by_row)) in args.iter().zip_eq(order_by_rows).enumerate() {
1042 let last_value = match &window_frame.end_bound {
1043 WindowFrameBound::CurrentRow => match &window_frame.units {
1044 WindowFrameUnits::Rows => *current_datum,
1046 WindowFrameUnits::Range => {
1047 let target_idx = order_by_rows[idx..]
1052 .iter()
1053 .enumerate()
1054 .take_while(|(_, row)| *row == order_by_row)
1055 .last()
1056 .unwrap()
1057 .0
1058 + idx;
1059 args[target_idx]
1060 }
1061 WindowFrameUnits::Groups => unreachable!(),
1063 },
1064 WindowFrameBound::UnboundedFollowing => {
1065 if let WindowFrameBound::OffsetFollowing(start_offset) = &window_frame.start_bound {
1066 let start_offset = usize::cast_from(*start_offset);
1067
1068 if idx + start_offset > length - 1 {
1070 Datum::Null
1071 } else {
1072 args[length - 1]
1073 }
1074 } else {
1075 args[length - 1]
1076 }
1077 }
1078 WindowFrameBound::OffsetFollowing(offset) => {
1079 let end_offset = usize::cast_from(*offset);
1080 let end_idx = idx.saturating_add(end_offset);
1081 if let WindowFrameBound::OffsetFollowing(start_offset) = &window_frame.start_bound {
1082 let start_offset = usize::cast_from(*start_offset);
1083 let start_idx = idx.saturating_add(start_offset);
1084
1085 if end_offset < start_offset || start_idx >= length {
1087 Datum::Null
1088 } else {
1089 args.get(end_idx).unwrap_or(&args[length - 1]).clone()
1091 }
1092 } else {
1093 args.get(end_idx).unwrap_or(&args[length - 1]).clone()
1094 }
1095 }
1096 WindowFrameBound::OffsetPreceding(offset) => {
1097 let end_offset = usize::cast_from(*offset);
1098 let end_idx = idx.saturating_sub(end_offset);
1099 if idx < end_offset {
1100 Datum::Null
1102 } else if let WindowFrameBound::OffsetPreceding(start_offset) =
1103 &window_frame.start_bound
1104 {
1105 if offset > start_offset {
1107 Datum::Null
1108 } else {
1109 args[end_idx]
1110 }
1111 } else {
1112 args[end_idx]
1113 }
1114 }
1115 WindowFrameBound::UnboundedPreceding => unreachable!(),
1117 };
1118 results.push(last_value);
1119 }
1120 results
1121}
1122
1123fn fused_value_window_func<'a, I>(
1129 input_datums: I,
1130 callers_temp_storage: &'a RowArena,
1131 funcs: &Vec<AggregateFunc>,
1132 order_by: &Vec<ColumnOrder>,
1133) -> Datum<'a>
1134where
1135 I: IntoIterator<Item = Datum<'a>>,
1136{
1137 let temp_storage = RowArena::new();
1138 let iter = fused_value_window_func_no_list(input_datums, &temp_storage, funcs, order_by);
1139 callers_temp_storage.make_datum(|packer| {
1140 packer.push_list(iter);
1141 })
1142}
1143
1144fn fused_value_window_func_no_list<'a: 'b, 'b, I>(
1147 input_datums: I,
1148 callers_temp_storage: &'b RowArena,
1149 funcs: &Vec<AggregateFunc>,
1150 order_by: &Vec<ColumnOrder>,
1151) -> impl Iterator<Item = Datum<'b>>
1152where
1153 I: IntoIterator<Item = Datum<'a>>,
1154{
1155 let has_last_value = funcs
1156 .iter()
1157 .any(|f| matches!(f, AggregateFunc::LastValue { .. }));
1158
1159 let input_datums_with_ranks = order_aggregate_datums_with_rank(input_datums, order_by);
1160
1161 let size_hint = input_datums_with_ranks.size_hint().0;
1162 let mut encoded_argsss = vec![Vec::with_capacity(size_hint); funcs.len()];
1163 let mut original_rows = Vec::with_capacity(size_hint);
1164 let mut order_by_rows = Vec::with_capacity(size_hint);
1165 for (d, order_by_row) in input_datums_with_ranks {
1166 let mut iter = d.unwrap_list().iter();
1167 let original_row = iter.next().unwrap();
1168 original_rows.push(original_row);
1169 let mut argss_iter = iter.next().unwrap().unwrap_list().iter();
1170 for i in 0..funcs.len() {
1171 let encoded_args = argss_iter.next().unwrap();
1172 encoded_argsss[i].push(encoded_args);
1173 }
1174 if has_last_value {
1175 order_by_rows.push(order_by_row);
1176 }
1177 }
1178
1179 let mut results_per_row = vec![Vec::with_capacity(funcs.len()); original_rows.len()];
1180 for (func, encoded_argss) in funcs.iter().zip_eq(encoded_argsss) {
1181 let results = match func {
1182 AggregateFunc::LagLead {
1183 order_by: inner_order_by,
1184 lag_lead,
1185 ignore_nulls,
1186 } => {
1187 assert_eq!(order_by, inner_order_by);
1188 let unwrapped_argss = encoded_argss
1189 .into_iter()
1190 .map(|encoded_args| unwrap_lag_lead_encoded_args(encoded_args))
1191 .collect();
1192 lag_lead_inner(unwrapped_argss, lag_lead, ignore_nulls)
1193 }
1194 AggregateFunc::FirstValue {
1195 order_by: inner_order_by,
1196 window_frame,
1197 } => {
1198 assert_eq!(order_by, inner_order_by);
1199 first_value_inner(encoded_argss, window_frame)
1202 }
1203 AggregateFunc::LastValue {
1204 order_by: inner_order_by,
1205 window_frame,
1206 } => {
1207 assert_eq!(order_by, inner_order_by);
1208 last_value_inner(encoded_argss, &order_by_rows, window_frame)
1211 }
1212 _ => panic!("unknown window function in FusedValueWindowFunc"),
1213 };
1214 for (results, result) in results_per_row.iter_mut().zip_eq(results) {
1215 results.push(result);
1216 }
1217 }
1218
1219 callers_temp_storage.reserve(2 * original_rows.len());
1220 results_per_row
1221 .into_iter()
1222 .enumerate()
1223 .map(move |(i, results)| {
1224 callers_temp_storage.make_datum(|packer| {
1225 packer.push_list_with(|packer| {
1226 packer
1227 .push(callers_temp_storage.make_datum(|packer| packer.push_list(results)));
1228 packer.push(original_rows[i]);
1229 });
1230 })
1231 })
1232}
1233
1234fn window_aggr<'a, I, A>(
1243 input_datums: I,
1244 callers_temp_storage: &'a RowArena,
1245 wrapped_aggregate: &AggregateFunc,
1246 order_by: &[ColumnOrder],
1247 window_frame: &WindowFrame,
1248) -> Datum<'a>
1249where
1250 I: IntoIterator<Item = Datum<'a>>,
1251 A: OneByOneAggr,
1252{
1253 let temp_storage = RowArena::new();
1254 let iter = window_aggr_no_list::<I, A>(
1255 input_datums,
1256 &temp_storage,
1257 wrapped_aggregate,
1258 order_by,
1259 window_frame,
1260 );
1261 callers_temp_storage.make_datum(|packer| {
1262 packer.push_list(iter);
1263 })
1264}
1265
1266fn window_aggr_no_list<'a: 'b, 'b, I, A>(
1269 input_datums: I,
1270 callers_temp_storage: &'b RowArena,
1271 wrapped_aggregate: &AggregateFunc,
1272 order_by: &[ColumnOrder],
1273 window_frame: &WindowFrame,
1274) -> impl Iterator<Item = Datum<'b>>
1275where
1276 I: IntoIterator<Item = Datum<'a>>,
1277 A: OneByOneAggr,
1278{
1279 let datums = order_aggregate_datums_with_rank(input_datums, order_by);
1282
1283 let size_hint = datums.size_hint().0;
1285 let mut args: Vec<Datum> = Vec::with_capacity(size_hint);
1286 let mut original_rows: Vec<Datum> = Vec::with_capacity(size_hint);
1287 let mut order_by_rows = Vec::with_capacity(size_hint);
1288 for (d, order_by_row) in datums.into_iter() {
1289 let mut iter = d.unwrap_list().iter();
1290 let original_row = iter.next().unwrap();
1291 let arg = iter.next().unwrap();
1292 order_by_rows.push(order_by_row);
1293 original_rows.push(original_row);
1294 args.push(arg);
1295 }
1296
1297 let results = window_aggr_inner::<A>(
1298 args,
1299 &order_by_rows,
1300 wrapped_aggregate,
1301 order_by,
1302 window_frame,
1303 callers_temp_storage,
1304 );
1305
1306 callers_temp_storage.reserve(results.len());
1307 results
1308 .into_iter()
1309 .zip_eq(original_rows)
1310 .map(|(result_value, original_row)| {
1311 callers_temp_storage.make_datum(|packer| {
1312 packer.push_list_with(|packer| {
1313 packer.push(result_value);
1314 packer.push(original_row);
1315 });
1316 })
1317 })
1318}
1319
1320fn window_aggr_inner<'a, A>(
1321 mut args: Vec<Datum<'a>>,
1322 order_by_rows: &Vec<Row>,
1323 wrapped_aggregate: &AggregateFunc,
1324 order_by: &[ColumnOrder],
1325 window_frame: &WindowFrame,
1326 temp_storage: &'a RowArena,
1327) -> Vec<Datum<'a>>
1328where
1329 A: OneByOneAggr,
1330{
1331 let length = args.len();
1332 let mut result: Vec<Datum> = Vec::with_capacity(length);
1333
1334 soft_assert_or_log!(
1340 !((matches!(window_frame.units, WindowFrameUnits::Groups)
1341 || matches!(window_frame.units, WindowFrameUnits::Range))
1342 && !window_frame.includes_current_row()),
1343 "window frame without current row"
1344 );
1345
1346 if (matches!(
1347 window_frame.start_bound,
1348 WindowFrameBound::UnboundedPreceding
1349 ) && matches!(window_frame.end_bound, WindowFrameBound::UnboundedFollowing))
1350 || (order_by.is_empty()
1351 && (matches!(window_frame.units, WindowFrameUnits::Groups)
1352 || matches!(window_frame.units, WindowFrameUnits::Range))
1353 && window_frame.includes_current_row())
1354 {
1355 let result_value = wrapped_aggregate.eval(args, temp_storage);
1362 for _ in 0..length {
1364 result.push(result_value);
1365 }
1366 } else {
1367 fn rows_between_unbounded_preceding_and_current_row<'a, A>(
1368 args: Vec<Datum<'a>>,
1369 result: &mut Vec<Datum<'a>>,
1370 mut one_by_one_aggr: A,
1371 temp_storage: &'a RowArena,
1372 ) where
1373 A: OneByOneAggr,
1374 {
1375 for current_arg in args.into_iter() {
1376 one_by_one_aggr.give(¤t_arg);
1377 let result_value = one_by_one_aggr.get_current_aggregate(temp_storage);
1378 result.push(result_value);
1379 }
1380 }
1381
1382 fn groups_between_unbounded_preceding_and_current_row<'a, A>(
1383 args: Vec<Datum<'a>>,
1384 order_by_rows: &Vec<Row>,
1385 result: &mut Vec<Datum<'a>>,
1386 mut one_by_one_aggr: A,
1387 temp_storage: &'a RowArena,
1388 ) where
1389 A: OneByOneAggr,
1390 {
1391 let mut peer_group_start = 0;
1392 while peer_group_start < args.len() {
1393 let mut peer_group_end = peer_group_start + 1;
1397 while peer_group_end < args.len()
1398 && order_by_rows[peer_group_start] == order_by_rows[peer_group_end]
1399 {
1400 peer_group_end += 1;
1402 }
1403 for current_arg in args[peer_group_start..peer_group_end].iter() {
1406 one_by_one_aggr.give(current_arg);
1407 }
1408 let agg_for_peer_group = one_by_one_aggr.get_current_aggregate(temp_storage);
1409 for _ in args[peer_group_start..peer_group_end].iter() {
1411 result.push(agg_for_peer_group);
1412 }
1413 peer_group_start = peer_group_end;
1415 }
1416 }
1417
1418 fn rows_between_offset_and_offset<'a>(
1419 args: Vec<Datum<'a>>,
1420 result: &mut Vec<Datum<'a>>,
1421 wrapped_aggregate: &AggregateFunc,
1422 temp_storage: &'a RowArena,
1423 offset_start: i64,
1424 offset_end: i64,
1425 ) {
1426 let len = args
1427 .len()
1428 .to_i64()
1429 .expect("window partition's len should fit into i64");
1430 for i in 0..len {
1431 let i = i.to_i64().expect("window partition shouldn't be super big");
1432 let frame_start = max(i + offset_start, 0)
1435 .to_usize()
1436 .expect("The max made sure it's not negative");
1437 let frame_end = min(i + offset_end, len - 1).to_usize();
1440 match frame_end {
1441 Some(frame_end) => {
1442 if frame_start <= frame_end {
1443 let frame_values = args[frame_start..=frame_end].iter().cloned();
1454 let result_value = wrapped_aggregate.eval(frame_values, temp_storage);
1455 result.push(result_value);
1456 } else {
1457 let result_value = wrapped_aggregate.default();
1459 result.push(result_value);
1460 }
1461 }
1462 None => {
1463 let result_value = wrapped_aggregate.default();
1465 result.push(result_value);
1466 }
1467 }
1468 }
1469 }
1470
1471 match (
1472 &window_frame.units,
1473 &window_frame.start_bound,
1474 &window_frame.end_bound,
1475 ) {
1476 (Rows, UnboundedPreceding, CurrentRow) => {
1481 rows_between_unbounded_preceding_and_current_row::<A>(
1482 args,
1483 &mut result,
1484 A::new(wrapped_aggregate, false),
1485 temp_storage,
1486 );
1487 }
1488 (Rows, CurrentRow, UnboundedFollowing) => {
1489 args.reverse();
1491 rows_between_unbounded_preceding_and_current_row::<A>(
1492 args,
1493 &mut result,
1494 A::new(wrapped_aggregate, true),
1495 temp_storage,
1496 );
1497 result.reverse();
1498 }
1499 (Range, UnboundedPreceding, CurrentRow) => {
1500 groups_between_unbounded_preceding_and_current_row::<A>(
1503 args,
1504 order_by_rows,
1505 &mut result,
1506 A::new(wrapped_aggregate, false),
1507 temp_storage,
1508 );
1509 }
1510 (Rows, OffsetPreceding(start_prec), OffsetPreceding(end_prec)) => {
1514 let start_prec = start_prec.to_i64().expect(
1515 "window frame start OFFSET shouldn't be super big (the planning ensured this)",
1516 );
1517 let end_prec = end_prec.to_i64().expect(
1518 "window frame end OFFSET shouldn't be super big (the planning ensured this)",
1519 );
1520 rows_between_offset_and_offset(
1521 args,
1522 &mut result,
1523 wrapped_aggregate,
1524 temp_storage,
1525 -start_prec,
1526 -end_prec,
1527 );
1528 }
1529 (Rows, OffsetPreceding(start_prec), OffsetFollowing(end_fol)) => {
1530 let start_prec = start_prec.to_i64().expect(
1531 "window frame start OFFSET shouldn't be super big (the planning ensured this)",
1532 );
1533 let end_fol = end_fol.to_i64().expect(
1534 "window frame end OFFSET shouldn't be super big (the planning ensured this)",
1535 );
1536 rows_between_offset_and_offset(
1537 args,
1538 &mut result,
1539 wrapped_aggregate,
1540 temp_storage,
1541 -start_prec,
1542 end_fol,
1543 );
1544 }
1545 (Rows, OffsetFollowing(start_fol), OffsetFollowing(end_fol)) => {
1546 let start_fol = start_fol.to_i64().expect(
1547 "window frame start OFFSET shouldn't be super big (the planning ensured this)",
1548 );
1549 let end_fol = end_fol.to_i64().expect(
1550 "window frame end OFFSET shouldn't be super big (the planning ensured this)",
1551 );
1552 rows_between_offset_and_offset(
1553 args,
1554 &mut result,
1555 wrapped_aggregate,
1556 temp_storage,
1557 start_fol,
1558 end_fol,
1559 );
1560 }
1561 (Rows, OffsetFollowing(_), OffsetPreceding(_)) => {
1562 unreachable!() }
1564 (Rows, OffsetPreceding(start_prec), CurrentRow) => {
1565 let start_prec = start_prec.to_i64().expect(
1566 "window frame start OFFSET shouldn't be super big (the planning ensured this)",
1567 );
1568 let end_fol = 0;
1569 rows_between_offset_and_offset(
1570 args,
1571 &mut result,
1572 wrapped_aggregate,
1573 temp_storage,
1574 -start_prec,
1575 end_fol,
1576 );
1577 }
1578 (Rows, CurrentRow, OffsetFollowing(end_fol)) => {
1579 let start_fol = 0;
1580 let end_fol = end_fol.to_i64().expect(
1581 "window frame end OFFSET shouldn't be super big (the planning ensured this)",
1582 );
1583 rows_between_offset_and_offset(
1584 args,
1585 &mut result,
1586 wrapped_aggregate,
1587 temp_storage,
1588 start_fol,
1589 end_fol,
1590 );
1591 }
1592 (Rows, CurrentRow, CurrentRow) => {
1593 let start_fol = 0;
1596 let end_fol = 0;
1597 rows_between_offset_and_offset(
1598 args,
1599 &mut result,
1600 wrapped_aggregate,
1601 temp_storage,
1602 start_fol,
1603 end_fol,
1604 );
1605 }
1606 (Rows, CurrentRow, OffsetPreceding(_))
1607 | (Rows, UnboundedFollowing, _)
1608 | (Rows, _, UnboundedPreceding)
1609 | (Rows, OffsetFollowing(..), CurrentRow) => {
1610 unreachable!() }
1612 (Rows, UnboundedPreceding, UnboundedFollowing) => {
1613 unreachable!()
1616 }
1617 (Rows, UnboundedPreceding, OffsetPreceding(_))
1618 | (Rows, UnboundedPreceding, OffsetFollowing(_))
1619 | (Rows, OffsetPreceding(..), UnboundedFollowing)
1620 | (Rows, OffsetFollowing(..), UnboundedFollowing) => {
1621 unreachable!()
1624 }
1625 (Range, _, _) => {
1626 unreachable!()
1633 }
1634 (Groups, _, _) => {
1635 unreachable!()
1639 }
1640 }
1641 }
1642
1643 result
1644}
1645
1646fn fused_window_aggr<'a, I, A>(
1650 input_datums: I,
1651 callers_temp_storage: &'a RowArena,
1652 wrapped_aggregates: &Vec<AggregateFunc>,
1653 order_by: &Vec<ColumnOrder>,
1654 window_frame: &WindowFrame,
1655) -> Datum<'a>
1656where
1657 I: IntoIterator<Item = Datum<'a>>,
1658 A: OneByOneAggr,
1659{
1660 let temp_storage = RowArena::new();
1661 let iter = fused_window_aggr_no_list::<_, A>(
1662 input_datums,
1663 &temp_storage,
1664 wrapped_aggregates,
1665 order_by,
1666 window_frame,
1667 );
1668 callers_temp_storage.make_datum(|packer| {
1669 packer.push_list(iter);
1670 })
1671}
1672
1673fn fused_window_aggr_no_list<'a: 'b, 'b, I, A>(
1676 input_datums: I,
1677 callers_temp_storage: &'b RowArena,
1678 wrapped_aggregates: &Vec<AggregateFunc>,
1679 order_by: &Vec<ColumnOrder>,
1680 window_frame: &WindowFrame,
1681) -> impl Iterator<Item = Datum<'b>>
1682where
1683 I: IntoIterator<Item = Datum<'a>>,
1684 A: OneByOneAggr,
1685{
1686 let datums = order_aggregate_datums_with_rank(input_datums, order_by);
1689
1690 let size_hint = datums.size_hint().0;
1691 let mut argss = vec![Vec::with_capacity(size_hint); wrapped_aggregates.len()];
1692 let mut original_rows = Vec::with_capacity(size_hint);
1693 let mut order_by_rows = Vec::with_capacity(size_hint);
1694 for (d, order_by_row) in datums {
1695 let mut iter = d.unwrap_list().iter();
1696 let original_row = iter.next().unwrap();
1697 original_rows.push(original_row);
1698 let args_iter = iter.next().unwrap().unwrap_list().iter();
1699 for (args, arg) in argss.iter_mut().zip_eq(args_iter) {
1701 args.push(arg);
1702 }
1703 order_by_rows.push(order_by_row);
1704 }
1705
1706 let mut results_per_row =
1707 vec![Vec::with_capacity(wrapped_aggregates.len()); original_rows.len()];
1708 for (wrapped_aggr, args) in wrapped_aggregates.iter().zip_eq(argss) {
1709 let results = window_aggr_inner::<A>(
1710 args,
1711 &order_by_rows,
1712 wrapped_aggr,
1713 order_by,
1714 window_frame,
1715 callers_temp_storage,
1716 );
1717 for (results, result) in results_per_row.iter_mut().zip_eq(results) {
1718 results.push(result);
1719 }
1720 }
1721
1722 callers_temp_storage.reserve(2 * original_rows.len());
1723 results_per_row
1724 .into_iter()
1725 .enumerate()
1726 .map(move |(i, results)| {
1727 callers_temp_storage.make_datum(|packer| {
1728 packer.push_list_with(|packer| {
1729 packer
1730 .push(callers_temp_storage.make_datum(|packer| packer.push_list(results)));
1731 packer.push(original_rows[i]);
1732 });
1733 })
1734 })
1735}
1736
1737pub trait OneByOneAggr {
1741 fn new(agg: &AggregateFunc, reverse: bool) -> Self;
1746 fn give(&mut self, d: &Datum);
1748 fn get_current_aggregate<'a>(&self, temp_storage: &'a RowArena) -> Datum<'a>;
1750}
1751
1752#[derive(Debug)]
1758pub struct NaiveOneByOneAggr {
1759 agg: AggregateFunc,
1760 input: Vec<Row>,
1761 reverse: bool,
1762}
1763
1764impl OneByOneAggr for NaiveOneByOneAggr {
1765 fn new(agg: &AggregateFunc, reverse: bool) -> Self {
1766 NaiveOneByOneAggr {
1767 agg: agg.clone(),
1768 input: Vec::new(),
1769 reverse,
1770 }
1771 }
1772
1773 fn give(&mut self, d: &Datum) {
1774 let mut row = Row::default();
1775 row.packer().push(d);
1776 self.input.push(row);
1777 }
1778
1779 fn get_current_aggregate<'a>(&self, temp_storage: &'a RowArena) -> Datum<'a> {
1780 temp_storage.make_datum(|packer| {
1781 packer.push(if !self.reverse {
1782 self.agg
1783 .eval(self.input.iter().map(|r| r.unpack_first()), temp_storage)
1784 } else {
1785 self.agg.eval(
1786 self.input.iter().rev().map(|r| r.unpack_first()),
1787 temp_storage,
1788 )
1789 });
1790 })
1791 }
1792}
1793
1794#[derive(
1797 Arbitrary, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize, Hash, MzReflect,
1798)]
1799pub enum LagLeadType {
1800 Lag,
1801 Lead,
1802}
1803
1804#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize, Hash, MzReflect)]
1805pub enum AggregateFunc {
1806 MaxNumeric,
1807 MaxInt16,
1808 MaxInt32,
1809 MaxInt64,
1810 MaxUInt16,
1811 MaxUInt32,
1812 MaxUInt64,
1813 MaxMzTimestamp,
1814 MaxFloat32,
1815 MaxFloat64,
1816 MaxBool,
1817 MaxString,
1818 MaxDate,
1819 MaxTimestamp,
1820 MaxTimestampTz,
1821 MaxInterval,
1822 MaxTime,
1823 MinNumeric,
1824 MinInt16,
1825 MinInt32,
1826 MinInt64,
1827 MinUInt16,
1828 MinUInt32,
1829 MinUInt64,
1830 MinMzTimestamp,
1831 MinFloat32,
1832 MinFloat64,
1833 MinBool,
1834 MinString,
1835 MinDate,
1836 MinTimestamp,
1837 MinTimestampTz,
1838 MinInterval,
1839 MinTime,
1840 SumInt16,
1841 SumInt32,
1842 SumInt64,
1843 SumUInt16,
1844 SumUInt32,
1845 SumUInt64,
1846 SumFloat32,
1847 SumFloat64,
1848 SumNumeric,
1849 Count,
1850 Any,
1851 All,
1852 JsonbAgg {
1859 order_by: Vec<ColumnOrder>,
1860 },
1861 JsonbObjectAgg {
1868 order_by: Vec<ColumnOrder>,
1869 },
1870 MapAgg {
1874 order_by: Vec<ColumnOrder>,
1875 value_type: ScalarType,
1876 },
1877 ArrayConcat {
1880 order_by: Vec<ColumnOrder>,
1881 },
1882 ListConcat {
1885 order_by: Vec<ColumnOrder>,
1886 },
1887 StringAgg {
1888 order_by: Vec<ColumnOrder>,
1889 },
1890 RowNumber {
1891 order_by: Vec<ColumnOrder>,
1892 },
1893 Rank {
1894 order_by: Vec<ColumnOrder>,
1895 },
1896 DenseRank {
1897 order_by: Vec<ColumnOrder>,
1898 },
1899 LagLead {
1900 order_by: Vec<ColumnOrder>,
1901 lag_lead: LagLeadType,
1902 ignore_nulls: bool,
1903 },
1904 FirstValue {
1905 order_by: Vec<ColumnOrder>,
1906 window_frame: WindowFrame,
1907 },
1908 LastValue {
1909 order_by: Vec<ColumnOrder>,
1910 window_frame: WindowFrame,
1911 },
1912 FusedValueWindowFunc {
1914 funcs: Vec<AggregateFunc>,
1915 order_by: Vec<ColumnOrder>,
1918 },
1919 WindowAggregate {
1920 wrapped_aggregate: Box<AggregateFunc>,
1921 order_by: Vec<ColumnOrder>,
1922 window_frame: WindowFrame,
1923 },
1924 FusedWindowAggregate {
1925 wrapped_aggregates: Vec<AggregateFunc>,
1926 order_by: Vec<ColumnOrder>,
1927 window_frame: WindowFrame,
1928 },
1929 Dummy,
1934}
1935
1936impl Arbitrary for AggregateFunc {
1943 type Parameters = ();
1944
1945 type Strategy = Union<BoxedStrategy<Self>>;
1946
1947 fn arbitrary_with(_: Self::Parameters) -> Self::Strategy {
1948 use proptest::collection::vec;
1949 use proptest::prelude::any as proptest_any;
1950 Union::new(vec![
1951 Just(AggregateFunc::MaxNumeric).boxed(),
1952 Just(AggregateFunc::MaxInt16).boxed(),
1953 Just(AggregateFunc::MaxInt32).boxed(),
1954 Just(AggregateFunc::MaxInt64).boxed(),
1955 Just(AggregateFunc::MaxUInt16).boxed(),
1956 Just(AggregateFunc::MaxUInt32).boxed(),
1957 Just(AggregateFunc::MaxUInt64).boxed(),
1958 Just(AggregateFunc::MaxMzTimestamp).boxed(),
1959 Just(AggregateFunc::MaxFloat32).boxed(),
1960 Just(AggregateFunc::MaxFloat64).boxed(),
1961 Just(AggregateFunc::MaxBool).boxed(),
1962 Just(AggregateFunc::MaxString).boxed(),
1963 Just(AggregateFunc::MaxTimestamp).boxed(),
1964 Just(AggregateFunc::MaxDate).boxed(),
1965 Just(AggregateFunc::MaxTimestampTz).boxed(),
1966 Just(AggregateFunc::MaxInterval).boxed(),
1967 Just(AggregateFunc::MaxTime).boxed(),
1968 Just(AggregateFunc::MinNumeric).boxed(),
1969 Just(AggregateFunc::MinInt16).boxed(),
1970 Just(AggregateFunc::MinInt32).boxed(),
1971 Just(AggregateFunc::MinInt64).boxed(),
1972 Just(AggregateFunc::MinUInt16).boxed(),
1973 Just(AggregateFunc::MinUInt32).boxed(),
1974 Just(AggregateFunc::MinUInt64).boxed(),
1975 Just(AggregateFunc::MinMzTimestamp).boxed(),
1976 Just(AggregateFunc::MinFloat32).boxed(),
1977 Just(AggregateFunc::MinFloat64).boxed(),
1978 Just(AggregateFunc::MinBool).boxed(),
1979 Just(AggregateFunc::MinString).boxed(),
1980 Just(AggregateFunc::MinDate).boxed(),
1981 Just(AggregateFunc::MinTimestamp).boxed(),
1982 Just(AggregateFunc::MinTimestampTz).boxed(),
1983 Just(AggregateFunc::MinInterval).boxed(),
1984 Just(AggregateFunc::MinTime).boxed(),
1985 Just(AggregateFunc::SumInt16).boxed(),
1986 Just(AggregateFunc::SumInt32).boxed(),
1987 Just(AggregateFunc::SumInt64).boxed(),
1988 Just(AggregateFunc::SumUInt16).boxed(),
1989 Just(AggregateFunc::SumUInt32).boxed(),
1990 Just(AggregateFunc::SumUInt64).boxed(),
1991 Just(AggregateFunc::SumFloat32).boxed(),
1992 Just(AggregateFunc::SumFloat64).boxed(),
1993 Just(AggregateFunc::SumNumeric).boxed(),
1994 Just(AggregateFunc::Count).boxed(),
1995 Just(AggregateFunc::Any).boxed(),
1996 Just(AggregateFunc::All).boxed(),
1997 vec(proptest_any::<ColumnOrder>(), 1..4)
1998 .prop_map(|order_by| AggregateFunc::JsonbAgg { order_by })
1999 .boxed(),
2000 vec(proptest_any::<ColumnOrder>(), 1..4)
2001 .prop_map(|order_by| AggregateFunc::JsonbObjectAgg { order_by })
2002 .boxed(),
2003 (
2004 vec(proptest_any::<ColumnOrder>(), 1..4),
2005 proptest_any::<ScalarType>(),
2006 )
2007 .prop_map(|(order_by, value_type)| AggregateFunc::MapAgg {
2008 order_by,
2009 value_type,
2010 })
2011 .boxed(),
2012 vec(proptest_any::<ColumnOrder>(), 1..4)
2013 .prop_map(|order_by| AggregateFunc::ArrayConcat { order_by })
2014 .boxed(),
2015 vec(proptest_any::<ColumnOrder>(), 1..4)
2016 .prop_map(|order_by| AggregateFunc::ListConcat { order_by })
2017 .boxed(),
2018 vec(proptest_any::<ColumnOrder>(), 1..4)
2019 .prop_map(|order_by| AggregateFunc::StringAgg { order_by })
2020 .boxed(),
2021 vec(proptest_any::<ColumnOrder>(), 1..4)
2022 .prop_map(|order_by| AggregateFunc::RowNumber { order_by })
2023 .boxed(),
2024 vec(proptest_any::<ColumnOrder>(), 1..4)
2025 .prop_map(|order_by| AggregateFunc::DenseRank { order_by })
2026 .boxed(),
2027 (
2028 vec(proptest_any::<ColumnOrder>(), 1..4),
2029 proptest_any::<LagLeadType>(),
2030 proptest_any::<bool>(),
2031 )
2032 .prop_map(
2033 |(order_by, lag_lead, ignore_nulls)| AggregateFunc::LagLead {
2034 order_by,
2035 lag_lead,
2036 ignore_nulls,
2037 },
2038 )
2039 .boxed(),
2040 (
2041 vec(proptest_any::<ColumnOrder>(), 1..4),
2042 proptest_any::<WindowFrame>(),
2043 )
2044 .prop_map(|(order_by, window_frame)| AggregateFunc::FirstValue {
2045 order_by,
2046 window_frame,
2047 })
2048 .boxed(),
2049 (
2050 vec(proptest_any::<ColumnOrder>(), 1..4),
2051 proptest_any::<WindowFrame>(),
2052 )
2053 .prop_map(|(order_by, window_frame)| AggregateFunc::LastValue {
2054 order_by,
2055 window_frame,
2056 })
2057 .boxed(),
2058 Just(AggregateFunc::Dummy).boxed(),
2059 ])
2060 }
2061}
2062
2063impl RustType<ProtoColumnOrders> for Vec<ColumnOrder> {
2064 fn into_proto(&self) -> ProtoColumnOrders {
2065 ProtoColumnOrders {
2066 orders: self.into_proto(),
2067 }
2068 }
2069
2070 fn from_proto(proto: ProtoColumnOrders) -> Result<Self, TryFromProtoError> {
2071 proto.orders.into_rust()
2072 }
2073}
2074
2075impl RustType<ProtoAggregateFunc> for AggregateFunc {
2076 fn into_proto(&self) -> ProtoAggregateFunc {
2077 use proto_aggregate_func::Kind;
2078 ProtoAggregateFunc {
2079 kind: Some(match self {
2080 AggregateFunc::MaxNumeric => Kind::MaxNumeric(()),
2081 AggregateFunc::MaxInt16 => Kind::MaxInt16(()),
2082 AggregateFunc::MaxInt32 => Kind::MaxInt32(()),
2083 AggregateFunc::MaxInt64 => Kind::MaxInt64(()),
2084 AggregateFunc::MaxUInt16 => Kind::MaxUint16(()),
2085 AggregateFunc::MaxUInt32 => Kind::MaxUint32(()),
2086 AggregateFunc::MaxUInt64 => Kind::MaxUint64(()),
2087 AggregateFunc::MaxMzTimestamp => Kind::MaxMzTimestamp(()),
2088 AggregateFunc::MaxFloat32 => Kind::MaxFloat32(()),
2089 AggregateFunc::MaxFloat64 => Kind::MaxFloat64(()),
2090 AggregateFunc::MaxBool => Kind::MaxBool(()),
2091 AggregateFunc::MaxString => Kind::MaxString(()),
2092 AggregateFunc::MaxDate => Kind::MaxDate(()),
2093 AggregateFunc::MaxTimestamp => Kind::MaxTimestamp(()),
2094 AggregateFunc::MaxTimestampTz => Kind::MaxTimestampTz(()),
2095 AggregateFunc::MinNumeric => Kind::MinNumeric(()),
2096 AggregateFunc::MaxInterval => Kind::MaxInterval(()),
2097 AggregateFunc::MaxTime => Kind::MaxTime(()),
2098 AggregateFunc::MinInt16 => Kind::MinInt16(()),
2099 AggregateFunc::MinInt32 => Kind::MinInt32(()),
2100 AggregateFunc::MinInt64 => Kind::MinInt64(()),
2101 AggregateFunc::MinUInt16 => Kind::MinUint16(()),
2102 AggregateFunc::MinUInt32 => Kind::MinUint32(()),
2103 AggregateFunc::MinUInt64 => Kind::MinUint64(()),
2104 AggregateFunc::MinMzTimestamp => Kind::MinMzTimestamp(()),
2105 AggregateFunc::MinFloat32 => Kind::MinFloat32(()),
2106 AggregateFunc::MinFloat64 => Kind::MinFloat64(()),
2107 AggregateFunc::MinBool => Kind::MinBool(()),
2108 AggregateFunc::MinString => Kind::MinString(()),
2109 AggregateFunc::MinDate => Kind::MinDate(()),
2110 AggregateFunc::MinTimestamp => Kind::MinTimestamp(()),
2111 AggregateFunc::MinTimestampTz => Kind::MinTimestampTz(()),
2112 AggregateFunc::MinInterval => Kind::MinInterval(()),
2113 AggregateFunc::MinTime => Kind::MinTime(()),
2114 AggregateFunc::SumInt16 => Kind::SumInt16(()),
2115 AggregateFunc::SumInt32 => Kind::SumInt32(()),
2116 AggregateFunc::SumInt64 => Kind::SumInt64(()),
2117 AggregateFunc::SumUInt16 => Kind::SumUint16(()),
2118 AggregateFunc::SumUInt32 => Kind::SumUint32(()),
2119 AggregateFunc::SumUInt64 => Kind::SumUint64(()),
2120 AggregateFunc::SumFloat32 => Kind::SumFloat32(()),
2121 AggregateFunc::SumFloat64 => Kind::SumFloat64(()),
2122 AggregateFunc::SumNumeric => Kind::SumNumeric(()),
2123 AggregateFunc::Count => Kind::Count(()),
2124 AggregateFunc::Any => Kind::Any(()),
2125 AggregateFunc::All => Kind::All(()),
2126 AggregateFunc::JsonbAgg { order_by } => Kind::JsonbAgg(order_by.into_proto()),
2127 AggregateFunc::JsonbObjectAgg { order_by } => {
2128 Kind::JsonbObjectAgg(order_by.into_proto())
2129 }
2130 AggregateFunc::MapAgg {
2131 order_by,
2132 value_type,
2133 } => Kind::MapAgg(proto_aggregate_func::ProtoMapAgg {
2134 order_by: Some(order_by.into_proto()),
2135 value_type: Some(value_type.into_proto()),
2136 }),
2137 AggregateFunc::ArrayConcat { order_by } => Kind::ArrayConcat(order_by.into_proto()),
2138 AggregateFunc::ListConcat { order_by } => Kind::ListConcat(order_by.into_proto()),
2139 AggregateFunc::StringAgg { order_by } => Kind::StringAgg(order_by.into_proto()),
2140 AggregateFunc::RowNumber { order_by } => Kind::RowNumber(order_by.into_proto()),
2141 AggregateFunc::Rank { order_by } => Kind::Rank(order_by.into_proto()),
2142 AggregateFunc::DenseRank { order_by } => Kind::DenseRank(order_by.into_proto()),
2143 AggregateFunc::LagLead {
2144 order_by,
2145 lag_lead,
2146 ignore_nulls,
2147 } => Kind::LagLead(proto_aggregate_func::ProtoLagLead {
2148 order_by: Some(order_by.into_proto()),
2149 lag_lead: Some(match lag_lead {
2150 LagLeadType::Lag => proto_aggregate_func::proto_lag_lead::LagLead::Lag(()),
2151 LagLeadType::Lead => {
2152 proto_aggregate_func::proto_lag_lead::LagLead::Lead(())
2153 }
2154 }),
2155 ignore_nulls: *ignore_nulls,
2156 }),
2157 AggregateFunc::FirstValue {
2158 order_by,
2159 window_frame,
2160 } => Kind::FirstValue(proto_aggregate_func::ProtoFramedWindowFunc {
2161 order_by: Some(order_by.into_proto()),
2162 window_frame: Some(window_frame.into_proto()),
2163 }),
2164 AggregateFunc::LastValue {
2165 order_by,
2166 window_frame,
2167 } => Kind::LastValue(proto_aggregate_func::ProtoFramedWindowFunc {
2168 order_by: Some(order_by.into_proto()),
2169 window_frame: Some(window_frame.into_proto()),
2170 }),
2171 AggregateFunc::WindowAggregate {
2172 wrapped_aggregate,
2173 order_by,
2174 window_frame,
2175 } => Kind::WindowAggregate(Box::new(proto_aggregate_func::ProtoWindowAggregate {
2176 wrapped_aggregate: Some(wrapped_aggregate.into_proto()),
2177 order_by: Some(order_by.into_proto()),
2178 window_frame: Some(window_frame.into_proto()),
2179 })),
2180 AggregateFunc::FusedValueWindowFunc { funcs, order_by } => {
2181 Kind::FusedValueWindowFunc(ProtoFusedValueWindowFunc {
2182 funcs: funcs.into_proto(),
2183 order_by: Some(order_by.into_proto()),
2184 })
2185 }
2186 AggregateFunc::FusedWindowAggregate {
2187 wrapped_aggregates,
2188 order_by,
2189 window_frame,
2190 } => Kind::FusedWindowAggregate(ProtoFusedWindowAggregate {
2191 wrapped_aggregates: wrapped_aggregates.into_proto(),
2192 order_by: Some(order_by.into_proto()),
2193 window_frame: Some(window_frame.into_proto()),
2194 }),
2195 AggregateFunc::Dummy => Kind::Dummy(()),
2196 }),
2197 }
2198 }
2199
2200 fn from_proto(proto: ProtoAggregateFunc) -> Result<Self, TryFromProtoError> {
2201 use proto_aggregate_func::Kind;
2202 let kind = proto
2203 .kind
2204 .ok_or_else(|| TryFromProtoError::missing_field("ProtoAggregateFunc::kind"))?;
2205 Ok(match kind {
2206 Kind::MaxNumeric(()) => AggregateFunc::MaxNumeric,
2207 Kind::MaxInt16(()) => AggregateFunc::MaxInt16,
2208 Kind::MaxInt32(()) => AggregateFunc::MaxInt32,
2209 Kind::MaxInt64(()) => AggregateFunc::MaxInt64,
2210 Kind::MaxUint16(()) => AggregateFunc::MaxUInt16,
2211 Kind::MaxUint32(()) => AggregateFunc::MaxUInt32,
2212 Kind::MaxUint64(()) => AggregateFunc::MaxUInt64,
2213 Kind::MaxMzTimestamp(()) => AggregateFunc::MaxMzTimestamp,
2214 Kind::MaxFloat32(()) => AggregateFunc::MaxFloat32,
2215 Kind::MaxFloat64(()) => AggregateFunc::MaxFloat64,
2216 Kind::MaxBool(()) => AggregateFunc::MaxBool,
2217 Kind::MaxString(()) => AggregateFunc::MaxString,
2218 Kind::MaxDate(()) => AggregateFunc::MaxDate,
2219 Kind::MaxTimestamp(()) => AggregateFunc::MaxTimestamp,
2220 Kind::MaxTimestampTz(()) => AggregateFunc::MaxTimestampTz,
2221 Kind::MaxInterval(()) => AggregateFunc::MaxInterval,
2222 Kind::MaxTime(()) => AggregateFunc::MaxTime,
2223 Kind::MinNumeric(()) => AggregateFunc::MinNumeric,
2224 Kind::MinInt16(()) => AggregateFunc::MinInt16,
2225 Kind::MinInt32(()) => AggregateFunc::MinInt32,
2226 Kind::MinInt64(()) => AggregateFunc::MinInt64,
2227 Kind::MinUint16(()) => AggregateFunc::MinUInt16,
2228 Kind::MinUint32(()) => AggregateFunc::MinUInt32,
2229 Kind::MinUint64(()) => AggregateFunc::MinUInt64,
2230 Kind::MinMzTimestamp(()) => AggregateFunc::MinMzTimestamp,
2231 Kind::MinFloat32(()) => AggregateFunc::MinFloat32,
2232 Kind::MinFloat64(()) => AggregateFunc::MinFloat64,
2233 Kind::MinBool(()) => AggregateFunc::MinBool,
2234 Kind::MinString(()) => AggregateFunc::MinString,
2235 Kind::MinDate(()) => AggregateFunc::MinDate,
2236 Kind::MinTimestamp(()) => AggregateFunc::MinTimestamp,
2237 Kind::MinTimestampTz(()) => AggregateFunc::MinTimestampTz,
2238 Kind::MinInterval(()) => AggregateFunc::MinInterval,
2239 Kind::MinTime(()) => AggregateFunc::MinTime,
2240 Kind::SumInt16(()) => AggregateFunc::SumInt16,
2241 Kind::SumInt32(()) => AggregateFunc::SumInt32,
2242 Kind::SumInt64(()) => AggregateFunc::SumInt64,
2243 Kind::SumUint16(()) => AggregateFunc::SumUInt16,
2244 Kind::SumUint32(()) => AggregateFunc::SumUInt32,
2245 Kind::SumUint64(()) => AggregateFunc::SumUInt64,
2246 Kind::SumFloat32(()) => AggregateFunc::SumFloat32,
2247 Kind::SumFloat64(()) => AggregateFunc::SumFloat64,
2248 Kind::SumNumeric(()) => AggregateFunc::SumNumeric,
2249 Kind::Count(()) => AggregateFunc::Count,
2250 Kind::Any(()) => AggregateFunc::Any,
2251 Kind::All(()) => AggregateFunc::All,
2252 Kind::JsonbAgg(order_by) => AggregateFunc::JsonbAgg {
2253 order_by: order_by.into_rust()?,
2254 },
2255 Kind::JsonbObjectAgg(order_by) => AggregateFunc::JsonbObjectAgg {
2256 order_by: order_by.into_rust()?,
2257 },
2258 Kind::MapAgg(pma) => AggregateFunc::MapAgg {
2259 order_by: pma.order_by.into_rust_if_some("ProtoMapAgg::order_by")?,
2260 value_type: pma
2261 .value_type
2262 .into_rust_if_some("ProtoMapAgg::value_type")?,
2263 },
2264 Kind::ArrayConcat(order_by) => AggregateFunc::ArrayConcat {
2265 order_by: order_by.into_rust()?,
2266 },
2267 Kind::ListConcat(order_by) => AggregateFunc::ListConcat {
2268 order_by: order_by.into_rust()?,
2269 },
2270 Kind::StringAgg(order_by) => AggregateFunc::StringAgg {
2271 order_by: order_by.into_rust()?,
2272 },
2273 Kind::RowNumber(order_by) => AggregateFunc::RowNumber {
2274 order_by: order_by.into_rust()?,
2275 },
2276 Kind::Rank(order_by) => AggregateFunc::Rank {
2277 order_by: order_by.into_rust()?,
2278 },
2279 Kind::DenseRank(order_by) => AggregateFunc::DenseRank {
2280 order_by: order_by.into_rust()?,
2281 },
2282 Kind::LagLead(pll) => AggregateFunc::LagLead {
2283 order_by: pll.order_by.into_rust_if_some("ProtoLagLead::order_by")?,
2284 lag_lead: match pll.lag_lead {
2285 Some(proto_aggregate_func::proto_lag_lead::LagLead::Lag(())) => {
2286 LagLeadType::Lag
2287 }
2288 Some(proto_aggregate_func::proto_lag_lead::LagLead::Lead(())) => {
2289 LagLeadType::Lead
2290 }
2291 None => {
2292 return Err(TryFromProtoError::MissingField(
2293 "ProtoLagLead::lag_lead".into(),
2294 ));
2295 }
2296 },
2297 ignore_nulls: pll.ignore_nulls,
2298 },
2299 Kind::FirstValue(pfv) => AggregateFunc::FirstValue {
2300 order_by: pfv
2301 .order_by
2302 .into_rust_if_some("ProtoFramedWindowFunc::order_by")?,
2303 window_frame: pfv
2304 .window_frame
2305 .into_rust_if_some("ProtoFramedWindowFunc::window_frame")?,
2306 },
2307 Kind::LastValue(pfv) => AggregateFunc::LastValue {
2308 order_by: pfv
2309 .order_by
2310 .into_rust_if_some("ProtoFramedWindowFunc::order_by")?,
2311 window_frame: pfv
2312 .window_frame
2313 .into_rust_if_some("ProtoFramedWindowFunc::window_frame")?,
2314 },
2315 Kind::WindowAggregate(paf) => AggregateFunc::WindowAggregate {
2316 wrapped_aggregate: paf
2317 .wrapped_aggregate
2318 .into_rust_if_some("ProtoWindowAggregate::wrapped_aggregate")?,
2319 order_by: paf
2320 .order_by
2321 .into_rust_if_some("ProtoWindowAggregate::order_by")?,
2322 window_frame: paf
2323 .window_frame
2324 .into_rust_if_some("ProtoWindowAggregate::window_frame")?,
2325 },
2326 Kind::FusedValueWindowFunc(fvwf) => AggregateFunc::FusedValueWindowFunc {
2327 funcs: fvwf.funcs.into_rust()?,
2328 order_by: fvwf
2329 .order_by
2330 .into_rust_if_some("ProtoFusedValueWindowFunc::order_by")?,
2331 },
2332 Kind::FusedWindowAggregate(fwa) => AggregateFunc::FusedWindowAggregate {
2333 wrapped_aggregates: fwa.wrapped_aggregates.into_rust()?,
2334 order_by: fwa
2335 .order_by
2336 .into_rust_if_some("ProtoFusedWindowAggregate::order_by")?,
2337 window_frame: fwa
2338 .window_frame
2339 .into_rust_if_some("ProtoFusedWindowAggregate::window_frame")?,
2340 },
2341 Kind::Dummy(()) => AggregateFunc::Dummy,
2342 })
2343 }
2344}
2345
2346impl AggregateFunc {
2347 pub fn eval<'a, I>(&self, datums: I, temp_storage: &'a RowArena) -> Datum<'a>
2348 where
2349 I: IntoIterator<Item = Datum<'a>>,
2350 {
2351 match self {
2352 AggregateFunc::MaxNumeric => {
2353 max_datum::<'a, I, OrderedDecimal<numeric::Numeric>>(datums)
2354 }
2355 AggregateFunc::MaxInt16 => max_datum::<'a, I, i16>(datums),
2356 AggregateFunc::MaxInt32 => max_datum::<'a, I, i32>(datums),
2357 AggregateFunc::MaxInt64 => max_datum::<'a, I, i64>(datums),
2358 AggregateFunc::MaxUInt16 => max_datum::<'a, I, u16>(datums),
2359 AggregateFunc::MaxUInt32 => max_datum::<'a, I, u32>(datums),
2360 AggregateFunc::MaxUInt64 => max_datum::<'a, I, u64>(datums),
2361 AggregateFunc::MaxMzTimestamp => max_datum::<'a, I, mz_repr::Timestamp>(datums),
2362 AggregateFunc::MaxFloat32 => max_datum::<'a, I, OrderedFloat<f32>>(datums),
2363 AggregateFunc::MaxFloat64 => max_datum::<'a, I, OrderedFloat<f64>>(datums),
2364 AggregateFunc::MaxBool => max_datum::<'a, I, bool>(datums),
2365 AggregateFunc::MaxString => max_string(datums),
2366 AggregateFunc::MaxDate => max_datum::<'a, I, Date>(datums),
2367 AggregateFunc::MaxTimestamp => {
2368 max_datum::<'a, I, CheckedTimestamp<NaiveDateTime>>(datums)
2369 }
2370 AggregateFunc::MaxTimestampTz => {
2371 max_datum::<'a, I, CheckedTimestamp<DateTime<Utc>>>(datums)
2372 }
2373 AggregateFunc::MaxInterval => max_datum::<'a, I, Interval>(datums),
2374 AggregateFunc::MaxTime => max_datum::<'a, I, NaiveTime>(datums),
2375 AggregateFunc::MinNumeric => {
2376 min_datum::<'a, I, OrderedDecimal<numeric::Numeric>>(datums)
2377 }
2378 AggregateFunc::MinInt16 => min_datum::<'a, I, i16>(datums),
2379 AggregateFunc::MinInt32 => min_datum::<'a, I, i32>(datums),
2380 AggregateFunc::MinInt64 => min_datum::<'a, I, i64>(datums),
2381 AggregateFunc::MinUInt16 => min_datum::<'a, I, u16>(datums),
2382 AggregateFunc::MinUInt32 => min_datum::<'a, I, u32>(datums),
2383 AggregateFunc::MinUInt64 => min_datum::<'a, I, u64>(datums),
2384 AggregateFunc::MinMzTimestamp => min_datum::<'a, I, mz_repr::Timestamp>(datums),
2385 AggregateFunc::MinFloat32 => min_datum::<'a, I, OrderedFloat<f32>>(datums),
2386 AggregateFunc::MinFloat64 => min_datum::<'a, I, OrderedFloat<f64>>(datums),
2387 AggregateFunc::MinBool => min_datum::<'a, I, bool>(datums),
2388 AggregateFunc::MinString => min_string(datums),
2389 AggregateFunc::MinDate => min_datum::<'a, I, Date>(datums),
2390 AggregateFunc::MinTimestamp => {
2391 min_datum::<'a, I, CheckedTimestamp<NaiveDateTime>>(datums)
2392 }
2393 AggregateFunc::MinTimestampTz => {
2394 min_datum::<'a, I, CheckedTimestamp<DateTime<Utc>>>(datums)
2395 }
2396 AggregateFunc::MinInterval => min_datum::<'a, I, Interval>(datums),
2397 AggregateFunc::MinTime => min_datum::<'a, I, NaiveTime>(datums),
2398 AggregateFunc::SumInt16 => sum_datum::<'a, I, i16, i64>(datums),
2399 AggregateFunc::SumInt32 => sum_datum::<'a, I, i32, i64>(datums),
2400 AggregateFunc::SumInt64 => sum_datum::<'a, I, i64, i128>(datums),
2401 AggregateFunc::SumUInt16 => sum_datum::<'a, I, u16, u64>(datums),
2402 AggregateFunc::SumUInt32 => sum_datum::<'a, I, u32, u64>(datums),
2403 AggregateFunc::SumUInt64 => sum_datum::<'a, I, u64, u128>(datums),
2404 AggregateFunc::SumFloat32 => sum_datum::<'a, I, f32, f32>(datums),
2405 AggregateFunc::SumFloat64 => sum_datum::<'a, I, f64, f64>(datums),
2406 AggregateFunc::SumNumeric => sum_numeric(datums),
2407 AggregateFunc::Count => count(datums),
2408 AggregateFunc::Any => any(datums),
2409 AggregateFunc::All => all(datums),
2410 AggregateFunc::JsonbAgg { order_by } => jsonb_agg(datums, temp_storage, order_by),
2411 AggregateFunc::MapAgg { order_by, .. } | AggregateFunc::JsonbObjectAgg { order_by } => {
2412 dict_agg(datums, temp_storage, order_by)
2413 }
2414 AggregateFunc::ArrayConcat { order_by } => array_concat(datums, temp_storage, order_by),
2415 AggregateFunc::ListConcat { order_by } => list_concat(datums, temp_storage, order_by),
2416 AggregateFunc::StringAgg { order_by } => string_agg(datums, temp_storage, order_by),
2417 AggregateFunc::RowNumber { order_by } => row_number(datums, temp_storage, order_by),
2418 AggregateFunc::Rank { order_by } => rank(datums, temp_storage, order_by),
2419 AggregateFunc::DenseRank { order_by } => dense_rank(datums, temp_storage, order_by),
2420 AggregateFunc::LagLead {
2421 order_by,
2422 lag_lead: lag_lead_type,
2423 ignore_nulls,
2424 } => lag_lead(datums, temp_storage, order_by, lag_lead_type, ignore_nulls),
2425 AggregateFunc::FirstValue {
2426 order_by,
2427 window_frame,
2428 } => first_value(datums, temp_storage, order_by, window_frame),
2429 AggregateFunc::LastValue {
2430 order_by,
2431 window_frame,
2432 } => last_value(datums, temp_storage, order_by, window_frame),
2433 AggregateFunc::WindowAggregate {
2434 wrapped_aggregate,
2435 order_by,
2436 window_frame,
2437 } => window_aggr::<_, NaiveOneByOneAggr>(
2438 datums,
2439 temp_storage,
2440 wrapped_aggregate,
2441 order_by,
2442 window_frame,
2443 ),
2444 AggregateFunc::FusedValueWindowFunc { funcs, order_by } => {
2445 fused_value_window_func(datums, temp_storage, funcs, order_by)
2446 }
2447 AggregateFunc::FusedWindowAggregate {
2448 wrapped_aggregates,
2449 order_by,
2450 window_frame,
2451 } => fused_window_aggr::<_, NaiveOneByOneAggr>(
2452 datums,
2453 temp_storage,
2454 wrapped_aggregates,
2455 order_by,
2456 window_frame,
2457 ),
2458 AggregateFunc::Dummy => Datum::Dummy,
2459 }
2460 }
2461
2462 pub fn eval_with_fast_window_agg<'a, I, W>(
2466 &self,
2467 datums: I,
2468 temp_storage: &'a RowArena,
2469 ) -> Datum<'a>
2470 where
2471 I: IntoIterator<Item = Datum<'a>>,
2472 W: OneByOneAggr,
2473 {
2474 match self {
2475 AggregateFunc::WindowAggregate {
2476 wrapped_aggregate,
2477 order_by,
2478 window_frame,
2479 } => window_aggr::<_, W>(
2480 datums,
2481 temp_storage,
2482 wrapped_aggregate,
2483 order_by,
2484 window_frame,
2485 ),
2486 AggregateFunc::FusedWindowAggregate {
2487 wrapped_aggregates,
2488 order_by,
2489 window_frame,
2490 } => fused_window_aggr::<_, W>(
2491 datums,
2492 temp_storage,
2493 wrapped_aggregates,
2494 order_by,
2495 window_frame,
2496 ),
2497 _ => self.eval(datums, temp_storage),
2498 }
2499 }
2500
2501 pub fn eval_with_unnest_list<'a, I, W>(
2502 &self,
2503 datums: I,
2504 temp_storage: &'a RowArena,
2505 ) -> impl Iterator<Item = Datum<'a>>
2506 where
2507 I: IntoIterator<Item = Datum<'a>>,
2508 W: OneByOneAggr,
2509 {
2510 assert!(self.can_fuse_with_unnest_list());
2512 match self {
2513 AggregateFunc::RowNumber { order_by } => {
2514 row_number_no_list(datums, temp_storage, order_by).collect_vec()
2515 }
2516 AggregateFunc::Rank { order_by } => {
2517 rank_no_list(datums, temp_storage, order_by).collect_vec()
2518 }
2519 AggregateFunc::DenseRank { order_by } => {
2520 dense_rank_no_list(datums, temp_storage, order_by).collect_vec()
2521 }
2522 AggregateFunc::LagLead {
2523 order_by,
2524 lag_lead: lag_lead_type,
2525 ignore_nulls,
2526 } => lag_lead_no_list(datums, temp_storage, order_by, lag_lead_type, ignore_nulls)
2527 .collect_vec(),
2528 AggregateFunc::FirstValue {
2529 order_by,
2530 window_frame,
2531 } => first_value_no_list(datums, temp_storage, order_by, window_frame).collect_vec(),
2532 AggregateFunc::LastValue {
2533 order_by,
2534 window_frame,
2535 } => last_value_no_list(datums, temp_storage, order_by, window_frame).collect_vec(),
2536 AggregateFunc::FusedValueWindowFunc { funcs, order_by } => {
2537 fused_value_window_func_no_list(datums, temp_storage, funcs, order_by).collect_vec()
2538 }
2539 AggregateFunc::WindowAggregate {
2540 wrapped_aggregate,
2541 order_by,
2542 window_frame,
2543 } => window_aggr_no_list::<_, W>(
2544 datums,
2545 temp_storage,
2546 wrapped_aggregate,
2547 order_by,
2548 window_frame,
2549 )
2550 .collect_vec(),
2551 AggregateFunc::FusedWindowAggregate {
2552 wrapped_aggregates,
2553 order_by,
2554 window_frame,
2555 } => fused_window_aggr_no_list::<_, W>(
2556 datums,
2557 temp_storage,
2558 wrapped_aggregates,
2559 order_by,
2560 window_frame,
2561 )
2562 .collect_vec(),
2563 _ => unreachable!("asserted above that `can_fuse_with_unnest_list`"),
2564 }
2565 .into_iter()
2566 }
2567
2568 pub fn default(&self) -> Datum<'static> {
2571 match self {
2572 AggregateFunc::Count => Datum::Int64(0),
2573 AggregateFunc::Any => Datum::False,
2574 AggregateFunc::All => Datum::True,
2575 AggregateFunc::Dummy => Datum::Dummy,
2576 _ => Datum::Null,
2577 }
2578 }
2579
2580 pub fn identity_datum(&self) -> Datum<'static> {
2583 match self {
2584 AggregateFunc::Any => Datum::False,
2585 AggregateFunc::All => Datum::True,
2586 AggregateFunc::Dummy => Datum::Dummy,
2587 AggregateFunc::ArrayConcat { .. } => Datum::empty_array(),
2588 AggregateFunc::ListConcat { .. } => Datum::empty_list(),
2589 AggregateFunc::RowNumber { .. }
2590 | AggregateFunc::Rank { .. }
2591 | AggregateFunc::DenseRank { .. }
2592 | AggregateFunc::LagLead { .. }
2593 | AggregateFunc::FirstValue { .. }
2594 | AggregateFunc::LastValue { .. }
2595 | AggregateFunc::WindowAggregate { .. }
2596 | AggregateFunc::FusedValueWindowFunc { .. }
2597 | AggregateFunc::FusedWindowAggregate { .. } => Datum::empty_list(),
2598 AggregateFunc::MaxNumeric
2599 | AggregateFunc::MaxInt16
2600 | AggregateFunc::MaxInt32
2601 | AggregateFunc::MaxInt64
2602 | AggregateFunc::MaxUInt16
2603 | AggregateFunc::MaxUInt32
2604 | AggregateFunc::MaxUInt64
2605 | AggregateFunc::MaxMzTimestamp
2606 | AggregateFunc::MaxFloat32
2607 | AggregateFunc::MaxFloat64
2608 | AggregateFunc::MaxBool
2609 | AggregateFunc::MaxString
2610 | AggregateFunc::MaxDate
2611 | AggregateFunc::MaxTimestamp
2612 | AggregateFunc::MaxTimestampTz
2613 | AggregateFunc::MaxInterval
2614 | AggregateFunc::MaxTime
2615 | AggregateFunc::MinNumeric
2616 | AggregateFunc::MinInt16
2617 | AggregateFunc::MinInt32
2618 | AggregateFunc::MinInt64
2619 | AggregateFunc::MinUInt16
2620 | AggregateFunc::MinUInt32
2621 | AggregateFunc::MinUInt64
2622 | AggregateFunc::MinMzTimestamp
2623 | AggregateFunc::MinFloat32
2624 | AggregateFunc::MinFloat64
2625 | AggregateFunc::MinBool
2626 | AggregateFunc::MinString
2627 | AggregateFunc::MinDate
2628 | AggregateFunc::MinTimestamp
2629 | AggregateFunc::MinTimestampTz
2630 | AggregateFunc::MinInterval
2631 | AggregateFunc::MinTime
2632 | AggregateFunc::SumInt16
2633 | AggregateFunc::SumInt32
2634 | AggregateFunc::SumInt64
2635 | AggregateFunc::SumUInt16
2636 | AggregateFunc::SumUInt32
2637 | AggregateFunc::SumUInt64
2638 | AggregateFunc::SumFloat32
2639 | AggregateFunc::SumFloat64
2640 | AggregateFunc::SumNumeric
2641 | AggregateFunc::Count
2642 | AggregateFunc::JsonbAgg { .. }
2643 | AggregateFunc::JsonbObjectAgg { .. }
2644 | AggregateFunc::MapAgg { .. }
2645 | AggregateFunc::StringAgg { .. } => Datum::Null,
2646 }
2647 }
2648
2649 pub fn can_fuse_with_unnest_list(&self) -> bool {
2650 match self {
2651 AggregateFunc::RowNumber { .. }
2652 | AggregateFunc::Rank { .. }
2653 | AggregateFunc::DenseRank { .. }
2654 | AggregateFunc::LagLead { .. }
2655 | AggregateFunc::FirstValue { .. }
2656 | AggregateFunc::LastValue { .. }
2657 | AggregateFunc::WindowAggregate { .. }
2658 | AggregateFunc::FusedValueWindowFunc { .. }
2659 | AggregateFunc::FusedWindowAggregate { .. } => true,
2660 AggregateFunc::ArrayConcat { .. }
2661 | AggregateFunc::ListConcat { .. }
2662 | AggregateFunc::Any
2663 | AggregateFunc::All
2664 | AggregateFunc::Dummy
2665 | AggregateFunc::MaxNumeric
2666 | AggregateFunc::MaxInt16
2667 | AggregateFunc::MaxInt32
2668 | AggregateFunc::MaxInt64
2669 | AggregateFunc::MaxUInt16
2670 | AggregateFunc::MaxUInt32
2671 | AggregateFunc::MaxUInt64
2672 | AggregateFunc::MaxMzTimestamp
2673 | AggregateFunc::MaxFloat32
2674 | AggregateFunc::MaxFloat64
2675 | AggregateFunc::MaxBool
2676 | AggregateFunc::MaxString
2677 | AggregateFunc::MaxDate
2678 | AggregateFunc::MaxTimestamp
2679 | AggregateFunc::MaxTimestampTz
2680 | AggregateFunc::MaxInterval
2681 | AggregateFunc::MaxTime
2682 | AggregateFunc::MinNumeric
2683 | AggregateFunc::MinInt16
2684 | AggregateFunc::MinInt32
2685 | AggregateFunc::MinInt64
2686 | AggregateFunc::MinUInt16
2687 | AggregateFunc::MinUInt32
2688 | AggregateFunc::MinUInt64
2689 | AggregateFunc::MinMzTimestamp
2690 | AggregateFunc::MinFloat32
2691 | AggregateFunc::MinFloat64
2692 | AggregateFunc::MinBool
2693 | AggregateFunc::MinString
2694 | AggregateFunc::MinDate
2695 | AggregateFunc::MinTimestamp
2696 | AggregateFunc::MinTimestampTz
2697 | AggregateFunc::MinInterval
2698 | AggregateFunc::MinTime
2699 | AggregateFunc::SumInt16
2700 | AggregateFunc::SumInt32
2701 | AggregateFunc::SumInt64
2702 | AggregateFunc::SumUInt16
2703 | AggregateFunc::SumUInt32
2704 | AggregateFunc::SumUInt64
2705 | AggregateFunc::SumFloat32
2706 | AggregateFunc::SumFloat64
2707 | AggregateFunc::SumNumeric
2708 | AggregateFunc::Count
2709 | AggregateFunc::JsonbAgg { .. }
2710 | AggregateFunc::JsonbObjectAgg { .. }
2711 | AggregateFunc::MapAgg { .. }
2712 | AggregateFunc::StringAgg { .. } => false,
2713 }
2714 }
2715
2716 pub fn output_type(&self, input_type: ColumnType) -> ColumnType {
2722 let scalar_type = match self {
2723 AggregateFunc::Count => ScalarType::Int64,
2724 AggregateFunc::Any => ScalarType::Bool,
2725 AggregateFunc::All => ScalarType::Bool,
2726 AggregateFunc::JsonbAgg { .. } => ScalarType::Jsonb,
2727 AggregateFunc::JsonbObjectAgg { .. } => ScalarType::Jsonb,
2728 AggregateFunc::SumInt16 => ScalarType::Int64,
2729 AggregateFunc::SumInt32 => ScalarType::Int64,
2730 AggregateFunc::SumInt64 => ScalarType::Numeric {
2731 max_scale: Some(NumericMaxScale::ZERO),
2732 },
2733 AggregateFunc::SumUInt16 => ScalarType::UInt64,
2734 AggregateFunc::SumUInt32 => ScalarType::UInt64,
2735 AggregateFunc::SumUInt64 => ScalarType::Numeric {
2736 max_scale: Some(NumericMaxScale::ZERO),
2737 },
2738 AggregateFunc::MapAgg { value_type, .. } => ScalarType::Map {
2739 value_type: Box::new(value_type.clone()),
2740 custom_id: None,
2741 },
2742 AggregateFunc::ArrayConcat { .. } | AggregateFunc::ListConcat { .. } => {
2743 match input_type.scalar_type {
2744 ScalarType::Record { ref fields, .. } => fields[0].1.scalar_type.clone(),
2746 _ => unreachable!(),
2747 }
2748 }
2749 AggregateFunc::StringAgg { .. } => ScalarType::String,
2750 AggregateFunc::RowNumber { .. } => {
2751 AggregateFunc::output_type_ranking_window_funcs(&input_type, "?row_number?")
2752 }
2753 AggregateFunc::Rank { .. } => {
2754 AggregateFunc::output_type_ranking_window_funcs(&input_type, "?rank?")
2755 }
2756 AggregateFunc::DenseRank { .. } => {
2757 AggregateFunc::output_type_ranking_window_funcs(&input_type, "?dense_rank?")
2758 }
2759 AggregateFunc::LagLead { lag_lead: lag_lead_type, .. } => {
2760 let fields = input_type.scalar_type.unwrap_record_element_type();
2762 let original_row_type = fields[0].unwrap_record_element_type()[0]
2763 .clone()
2764 .nullable(false);
2765 let output_type_inner = Self::lag_lead_output_type_inner_from_encoded_args(fields[0].unwrap_record_element_type()[1]);
2766 let column_name = Self::lag_lead_result_column_name(lag_lead_type);
2767
2768 ScalarType::List {
2769 element_type: Box::new(ScalarType::Record {
2770 fields: [
2771 (column_name, output_type_inner),
2772 (ColumnName::from("?orig_row?"), original_row_type),
2773 ].into(),
2774 custom_id: None,
2775 }),
2776 custom_id: None,
2777 }
2778 }
2779 AggregateFunc::FirstValue { .. } => {
2780 let fields = input_type.scalar_type.unwrap_record_element_type();
2782 let original_row_type = fields[0].unwrap_record_element_type()[0]
2783 .clone()
2784 .nullable(false);
2785 let value_type = fields[0].unwrap_record_element_type()[1]
2786 .clone()
2787 .nullable(true); ScalarType::List {
2790 element_type: Box::new(ScalarType::Record {
2791 fields: [
2792 (ColumnName::from("?first_value?"), value_type),
2793 (ColumnName::from("?orig_row?"), original_row_type),
2794 ].into(),
2795 custom_id: None,
2796 }),
2797 custom_id: None,
2798 }
2799 }
2800 AggregateFunc::LastValue { .. } => {
2801 let fields = input_type.scalar_type.unwrap_record_element_type();
2803 let original_row_type = fields[0].unwrap_record_element_type()[0]
2804 .clone()
2805 .nullable(false);
2806 let value_type = fields[0].unwrap_record_element_type()[1]
2807 .clone()
2808 .nullable(true); ScalarType::List {
2811 element_type: Box::new(ScalarType::Record {
2812 fields: [
2813 (ColumnName::from("?last_value?"), value_type),
2814 (ColumnName::from("?orig_row?"), original_row_type),
2815 ].into(),
2816 custom_id: None,
2817 }),
2818 custom_id: None,
2819 }
2820 }
2821 AggregateFunc::WindowAggregate {
2822 wrapped_aggregate, ..
2823 } => {
2824 let fields = input_type.scalar_type.unwrap_record_element_type();
2826 let original_row_type = fields[0].unwrap_record_element_type()[0]
2827 .clone()
2828 .nullable(false);
2829 let arg_type = fields[0].unwrap_record_element_type()[1]
2830 .clone()
2831 .nullable(true);
2832 let wrapped_aggr_out_type = wrapped_aggregate.output_type(arg_type);
2833
2834 ScalarType::List {
2835 element_type: Box::new(ScalarType::Record {
2836 fields: [
2837 (ColumnName::from("?window_agg?"), wrapped_aggr_out_type),
2838 (ColumnName::from("?orig_row?"), original_row_type),
2839 ].into(),
2840 custom_id: None,
2841 }),
2842 custom_id: None,
2843 }
2844 }
2845 AggregateFunc::FusedWindowAggregate {
2846 wrapped_aggregates, ..
2847 } => {
2848 let fields = input_type.scalar_type.unwrap_record_element_type();
2851 let original_row_type = fields[0].unwrap_record_element_type()[0]
2852 .clone()
2853 .nullable(false);
2854 let args_type = fields[0].unwrap_record_element_type()[1];
2855 let arg_types = args_type.unwrap_record_element_type();
2856 let out_fields = arg_types.iter().zip_eq(wrapped_aggregates).map(|(arg_type, wrapped_agg)| {
2857 (
2858 ColumnName::from(wrapped_agg.name()),
2859 wrapped_agg.output_type((**arg_type).clone().nullable(true)),
2860 )
2861 }).collect_vec();
2862
2863 ScalarType::List {
2864 element_type: Box::new(ScalarType::Record {
2865 fields: [
2866 (ColumnName::from("?fused_window_agg?"), ScalarType::Record {
2867 fields: out_fields.into(),
2868 custom_id: None,
2869 }.nullable(false)),
2870 (ColumnName::from("?orig_row?"), original_row_type),
2871 ].into(),
2872 custom_id: None,
2873 }),
2874 custom_id: None,
2875 }
2876 }
2877 AggregateFunc::FusedValueWindowFunc { funcs, order_by: _ } => {
2878 let fields = input_type.scalar_type.unwrap_record_element_type();
2883 let original_row_type = fields[0].unwrap_record_element_type()[0]
2884 .clone()
2885 .nullable(false);
2886 let encoded_args_type = fields[0].unwrap_record_element_type()[1].unwrap_record_element_type();
2887
2888 ScalarType::List {
2889 element_type: Box::new(ScalarType::Record {
2890 fields: [
2891 (ColumnName::from("?fused_value_window_func?"), ScalarType::Record {
2892 fields: encoded_args_type.into_iter().zip_eq(funcs).map(|(arg_type, func)| {
2893 match func {
2894 AggregateFunc::LagLead { lag_lead: lag_lead_type, .. } => {
2895 (
2896 Self::lag_lead_result_column_name(lag_lead_type),
2897 Self::lag_lead_output_type_inner_from_encoded_args(arg_type)
2898 )
2899 },
2900 AggregateFunc::FirstValue { .. } => {
2901 (
2902 ColumnName::from("?first_value?"),
2903 arg_type.clone().nullable(true),
2904 )
2905 }
2906 AggregateFunc::LastValue { .. } => {
2907 (
2908 ColumnName::from("?last_value?"),
2909 arg_type.clone().nullable(true),
2910 )
2911 }
2912 _ => panic!("FusedValueWindowFunc has an unknown function"),
2913 }
2914 }).collect(),
2915 custom_id: None,
2916 }.nullable(false)),
2917 (ColumnName::from("?orig_row?"), original_row_type),
2918 ].into(),
2919 custom_id: None,
2920 }),
2921 custom_id: None,
2922 }
2923 }
2924 AggregateFunc::Dummy
2925 | AggregateFunc::MaxNumeric
2926 | AggregateFunc::MaxInt16
2927 | AggregateFunc::MaxInt32
2928 | AggregateFunc::MaxInt64
2929 | AggregateFunc::MaxUInt16
2930 | AggregateFunc::MaxUInt32
2931 | AggregateFunc::MaxUInt64
2932 | AggregateFunc::MaxMzTimestamp
2933 | AggregateFunc::MaxFloat32
2934 | AggregateFunc::MaxFloat64
2935 | AggregateFunc::MaxBool
2936 | AggregateFunc::MaxString
2940 | AggregateFunc::MaxDate
2941 | AggregateFunc::MaxTimestamp
2942 | AggregateFunc::MaxTimestampTz
2943 | AggregateFunc::MaxInterval
2944 | AggregateFunc::MaxTime
2945 | AggregateFunc::MinNumeric
2946 | AggregateFunc::MinInt16
2947 | AggregateFunc::MinInt32
2948 | AggregateFunc::MinInt64
2949 | AggregateFunc::MinUInt16
2950 | AggregateFunc::MinUInt32
2951 | AggregateFunc::MinUInt64
2952 | AggregateFunc::MinMzTimestamp
2953 | AggregateFunc::MinFloat32
2954 | AggregateFunc::MinFloat64
2955 | AggregateFunc::MinBool
2956 | AggregateFunc::MinString
2957 | AggregateFunc::MinDate
2958 | AggregateFunc::MinTimestamp
2959 | AggregateFunc::MinTimestampTz
2960 | AggregateFunc::MinInterval
2961 | AggregateFunc::MinTime
2962 | AggregateFunc::SumFloat32
2963 | AggregateFunc::SumFloat64
2964 | AggregateFunc::SumNumeric => input_type.scalar_type.clone(),
2965 };
2966 let nullable = match self {
2969 AggregateFunc::Count => false,
2970 AggregateFunc::StringAgg { .. } => match input_type.scalar_type {
2972 ScalarType::Record { fields, .. } => match &fields[0].1.scalar_type {
2974 ScalarType::Record { fields, .. } => fields[0].1.nullable,
2976 _ => unreachable!(),
2977 },
2978 _ => unreachable!(),
2979 },
2980 _ => input_type.nullable,
2981 };
2982 scalar_type.nullable(nullable)
2983 }
2984
2985 fn output_type_ranking_window_funcs(input_type: &ColumnType, col_name: &str) -> ScalarType {
2987 match input_type.scalar_type {
2988 ScalarType::Record { ref fields, .. } => ScalarType::List {
2989 element_type: Box::new(ScalarType::Record {
2990 fields: [
2991 (
2992 ColumnName::from(col_name),
2993 ScalarType::Int64.nullable(false),
2994 ),
2995 (ColumnName::from("?orig_row?"), {
2996 let inner = match &fields[0].1.scalar_type {
2997 ScalarType::List { element_type, .. } => element_type.clone(),
2998 _ => unreachable!(),
2999 };
3000 inner.nullable(false)
3001 }),
3002 ]
3003 .into(),
3004 custom_id: None,
3005 }),
3006 custom_id: None,
3007 },
3008 _ => unreachable!(),
3009 }
3010 }
3011
3012 fn lag_lead_output_type_inner_from_encoded_args(encoded_args_type: &ScalarType) -> ColumnType {
3016 encoded_args_type.unwrap_record_element_type()[0]
3020 .clone()
3021 .nullable(true)
3022 }
3023
3024 fn lag_lead_result_column_name(lag_lead_type: &LagLeadType) -> ColumnName {
3025 ColumnName::from(match lag_lead_type {
3026 LagLeadType::Lag => "?lag?",
3027 LagLeadType::Lead => "?lead?",
3028 })
3029 }
3030
3031 pub fn propagates_nonnull_constraint(&self) -> bool {
3036 match self {
3037 AggregateFunc::MaxNumeric
3038 | AggregateFunc::MaxInt16
3039 | AggregateFunc::MaxInt32
3040 | AggregateFunc::MaxInt64
3041 | AggregateFunc::MaxUInt16
3042 | AggregateFunc::MaxUInt32
3043 | AggregateFunc::MaxUInt64
3044 | AggregateFunc::MaxMzTimestamp
3045 | AggregateFunc::MaxFloat32
3046 | AggregateFunc::MaxFloat64
3047 | AggregateFunc::MaxBool
3048 | AggregateFunc::MaxString
3049 | AggregateFunc::MaxDate
3050 | AggregateFunc::MaxTimestamp
3051 | AggregateFunc::MaxTimestampTz
3052 | AggregateFunc::MinNumeric
3053 | AggregateFunc::MinInt16
3054 | AggregateFunc::MinInt32
3055 | AggregateFunc::MinInt64
3056 | AggregateFunc::MinUInt16
3057 | AggregateFunc::MinUInt32
3058 | AggregateFunc::MinUInt64
3059 | AggregateFunc::MinMzTimestamp
3060 | AggregateFunc::MinFloat32
3061 | AggregateFunc::MinFloat64
3062 | AggregateFunc::MinBool
3063 | AggregateFunc::MinString
3064 | AggregateFunc::MinDate
3065 | AggregateFunc::MinTimestamp
3066 | AggregateFunc::MinTimestampTz
3067 | AggregateFunc::SumInt16
3068 | AggregateFunc::SumInt32
3069 | AggregateFunc::SumInt64
3070 | AggregateFunc::SumUInt16
3071 | AggregateFunc::SumUInt32
3072 | AggregateFunc::SumUInt64
3073 | AggregateFunc::SumFloat32
3074 | AggregateFunc::SumFloat64
3075 | AggregateFunc::SumNumeric
3076 | AggregateFunc::StringAgg { .. } => true,
3077 AggregateFunc::Count => false,
3079 _ => false,
3080 }
3081 }
3082}
3083
3084fn jsonb_each<'a>(
3085 a: Datum<'a>,
3086 temp_storage: &'a RowArena,
3087 stringify: bool,
3088) -> impl Iterator<Item = (Row, Diff)> + 'a {
3089 let map = match a {
3091 Datum::Map(dict) => dict,
3092 _ => mz_repr::DatumMap::empty(),
3093 };
3094
3095 map.iter().map(move |(k, mut v)| {
3096 if stringify {
3097 v = jsonb_stringify(v, temp_storage);
3098 }
3099 (Row::pack_slice(&[Datum::String(k), v]), Diff::ONE)
3100 })
3101}
3102
3103fn jsonb_object_keys<'a>(a: Datum<'a>) -> impl Iterator<Item = (Row, Diff)> + 'a {
3104 let map = match a {
3105 Datum::Map(dict) => dict,
3106 _ => mz_repr::DatumMap::empty(),
3107 };
3108
3109 map.iter()
3110 .map(move |(k, _)| (Row::pack_slice(&[Datum::String(k)]), Diff::ONE))
3111}
3112
3113fn jsonb_array_elements<'a>(
3114 a: Datum<'a>,
3115 temp_storage: &'a RowArena,
3116 stringify: bool,
3117) -> impl Iterator<Item = (Row, Diff)> + 'a {
3118 let list = match a {
3119 Datum::List(list) => list,
3120 _ => mz_repr::DatumList::empty(),
3121 };
3122 list.iter().map(move |mut e| {
3123 if stringify {
3124 e = jsonb_stringify(e, temp_storage);
3125 }
3126 (Row::pack_slice(&[e]), Diff::ONE)
3127 })
3128}
3129
3130fn regexp_extract(a: Datum, r: &AnalyzedRegex) -> Option<(Row, Diff)> {
3131 let r = r.inner();
3132 let a = a.unwrap_str();
3133 let captures = r.captures(a)?;
3134 let datums = captures
3135 .iter()
3136 .skip(1)
3137 .map(|m| Datum::from(m.map(|m| m.as_str())));
3138 Some((Row::pack(datums), Diff::ONE))
3139}
3140
3141fn regexp_matches<'a, 'r: 'a>(
3142 exprs: &[Datum<'a>],
3143) -> Result<impl Iterator<Item = (Row, Diff)> + 'a, EvalError> {
3144 assert!(exprs.len() == 2 || exprs.len() == 3);
3148 let a = exprs[0].unwrap_str();
3149 let r = exprs[1].unwrap_str();
3150
3151 let (regex, opts) = if exprs.len() == 3 {
3152 let flag = exprs[2].unwrap_str();
3153 let opts = AnalyzedRegexOpts::from_str(flag)?;
3154 (AnalyzedRegex::new(r, opts)?, opts)
3155 } else {
3156 let opts = AnalyzedRegexOpts::default();
3157 (AnalyzedRegex::new(r, opts)?, opts)
3158 };
3159
3160 let regex = regex.inner().clone();
3161
3162 let iter = regex.captures_iter(a).map(move |captures| {
3163 let matches = captures
3164 .iter()
3165 .skip(1)
3167 .map(|m| Datum::from(m.map(|m| m.as_str())))
3168 .collect::<Vec<_>>();
3169
3170 let row = SharedRow::get();
3171 let mut binding = row.borrow_mut();
3172 let mut packer = binding.packer();
3173
3174 let dimension = ArrayDimension {
3175 lower_bound: 1,
3176 length: matches.len(),
3177 };
3178 packer
3179 .try_push_array(&[dimension], matches)
3180 .expect("generated dimensions above");
3181
3182 (binding.clone(), Diff::ONE)
3183 });
3184
3185 let out = iter.collect::<SmallVec<[_; 3]>>();
3190
3191 if opts.global {
3192 Ok(Either::Left(out.into_iter()))
3193 } else {
3194 Ok(Either::Right(out.into_iter().take(1)))
3195 }
3196}
3197
3198fn generate_series<N>(
3199 start: N,
3200 stop: N,
3201 step: N,
3202) -> Result<impl Iterator<Item = (Row, Diff)>, EvalError>
3203where
3204 N: Integer + Signed + CheckedAdd + Clone,
3205 Datum<'static>: From<N>,
3206{
3207 if step == N::zero() {
3208 return Err(EvalError::InvalidParameterValue(
3209 "step size cannot equal zero".into(),
3210 ));
3211 }
3212 Ok(num::range_step_inclusive(start, stop, step)
3213 .map(move |i| (Row::pack_slice(&[Datum::from(i)]), Diff::ONE)))
3214}
3215
3216#[derive(Clone)]
3220pub struct TimestampRangeStepInclusive<T> {
3221 state: CheckedTimestamp<T>,
3222 stop: CheckedTimestamp<T>,
3223 step: Interval,
3224 rev: bool,
3225 done: bool,
3226}
3227
3228impl<T: TimestampLike> Iterator for TimestampRangeStepInclusive<T> {
3229 type Item = CheckedTimestamp<T>;
3230
3231 #[inline]
3232 fn next(&mut self) -> Option<CheckedTimestamp<T>> {
3233 if !self.done
3234 && ((self.rev && self.state >= self.stop) || (!self.rev && self.state <= self.stop))
3235 {
3236 let result = self.state.clone();
3237 match add_timestamp_months(self.state.deref(), self.step.months) {
3238 Ok(state) => match state.checked_add_signed(self.step.duration_as_chrono()) {
3239 Some(v) => match CheckedTimestamp::from_timestamplike(v) {
3240 Ok(v) => self.state = v,
3241 Err(_) => self.done = true,
3242 },
3243 None => self.done = true,
3244 },
3245 Err(..) => {
3246 self.done = true;
3247 }
3248 }
3249
3250 Some(result)
3251 } else {
3252 None
3253 }
3254 }
3255}
3256
3257fn generate_series_ts<T: TimestampLike>(
3258 start: CheckedTimestamp<T>,
3259 stop: CheckedTimestamp<T>,
3260 step: Interval,
3261 conv: fn(CheckedTimestamp<T>) -> Datum<'static>,
3262) -> Result<impl Iterator<Item = (Row, Diff)>, EvalError> {
3263 let normalized_step = step.as_microseconds();
3264 if normalized_step == 0 {
3265 return Err(EvalError::InvalidParameterValue(
3266 "step size cannot equal zero".into(),
3267 ));
3268 }
3269 let rev = normalized_step < 0;
3270
3271 let trsi = TimestampRangeStepInclusive {
3272 state: start,
3273 stop,
3274 step,
3275 rev,
3276 done: false,
3277 };
3278
3279 Ok(trsi.map(move |i| (Row::pack_slice(&[conv(i)]), Diff::ONE)))
3280}
3281
3282fn generate_subscripts_array(
3283 a: Datum,
3284 dim: i32,
3285) -> Result<Box<dyn Iterator<Item = (Row, Diff)>>, EvalError> {
3286 if dim <= 0 {
3287 return Ok(Box::new(iter::empty()));
3288 }
3289
3290 match a.unwrap_array().dims().into_iter().nth(
3291 (dim - 1)
3292 .try_into()
3293 .map_err(|_| EvalError::Int32OutOfRange((dim - 1).to_string().into()))?,
3294 ) {
3295 Some(requested_dim) => Ok(Box::new(generate_series::<i32>(
3296 requested_dim.lower_bound.try_into().map_err(|_| {
3297 EvalError::Int32OutOfRange(requested_dim.lower_bound.to_string().into())
3298 })?,
3299 requested_dim
3300 .length
3301 .try_into()
3302 .map_err(|_| EvalError::Int32OutOfRange(requested_dim.length.to_string().into()))?,
3303 1,
3304 )?)),
3305 None => Ok(Box::new(iter::empty())),
3306 }
3307}
3308
3309fn unnest_array<'a>(a: Datum<'a>) -> impl Iterator<Item = (Row, Diff)> + 'a {
3310 a.unwrap_array()
3311 .elements()
3312 .iter()
3313 .map(move |e| (Row::pack_slice(&[e]), Diff::ONE))
3314}
3315
3316fn unnest_list<'a>(a: Datum<'a>) -> impl Iterator<Item = (Row, Diff)> + 'a {
3317 a.unwrap_list()
3318 .iter()
3319 .map(move |e| (Row::pack_slice(&[e]), Diff::ONE))
3320}
3321
3322fn unnest_map<'a>(a: Datum<'a>) -> impl Iterator<Item = (Row, Diff)> + 'a {
3323 a.unwrap_map()
3324 .iter()
3325 .map(move |(k, v)| (Row::pack_slice(&[Datum::from(k), v]), Diff::ONE))
3326}
3327
3328impl AggregateFunc {
3329 pub fn name(&self) -> &'static str {
3332 match self {
3333 Self::MaxNumeric => "max",
3334 Self::MaxInt16 => "max",
3335 Self::MaxInt32 => "max",
3336 Self::MaxInt64 => "max",
3337 Self::MaxUInt16 => "max",
3338 Self::MaxUInt32 => "max",
3339 Self::MaxUInt64 => "max",
3340 Self::MaxMzTimestamp => "max",
3341 Self::MaxFloat32 => "max",
3342 Self::MaxFloat64 => "max",
3343 Self::MaxBool => "max",
3344 Self::MaxString => "max",
3345 Self::MaxDate => "max",
3346 Self::MaxTimestamp => "max",
3347 Self::MaxTimestampTz => "max",
3348 Self::MaxInterval => "max",
3349 Self::MaxTime => "max",
3350 Self::MinNumeric => "min",
3351 Self::MinInt16 => "min",
3352 Self::MinInt32 => "min",
3353 Self::MinInt64 => "min",
3354 Self::MinUInt16 => "min",
3355 Self::MinUInt32 => "min",
3356 Self::MinUInt64 => "min",
3357 Self::MinMzTimestamp => "min",
3358 Self::MinFloat32 => "min",
3359 Self::MinFloat64 => "min",
3360 Self::MinBool => "min",
3361 Self::MinString => "min",
3362 Self::MinDate => "min",
3363 Self::MinTimestamp => "min",
3364 Self::MinTimestampTz => "min",
3365 Self::MinInterval => "min",
3366 Self::MinTime => "min",
3367 Self::SumInt16 => "sum",
3368 Self::SumInt32 => "sum",
3369 Self::SumInt64 => "sum",
3370 Self::SumUInt16 => "sum",
3371 Self::SumUInt32 => "sum",
3372 Self::SumUInt64 => "sum",
3373 Self::SumFloat32 => "sum",
3374 Self::SumFloat64 => "sum",
3375 Self::SumNumeric => "sum",
3376 Self::Count => "count",
3377 Self::Any => "any",
3378 Self::All => "all",
3379 Self::JsonbAgg { .. } => "jsonb_agg",
3380 Self::JsonbObjectAgg { .. } => "jsonb_object_agg",
3381 Self::MapAgg { .. } => "map_agg",
3382 Self::ArrayConcat { .. } => "array_agg",
3383 Self::ListConcat { .. } => "list_agg",
3384 Self::StringAgg { .. } => "string_agg",
3385 Self::RowNumber { .. } => "row_number",
3386 Self::Rank { .. } => "rank",
3387 Self::DenseRank { .. } => "dense_rank",
3388 Self::LagLead {
3389 lag_lead: LagLeadType::Lag,
3390 ..
3391 } => "lag",
3392 Self::LagLead {
3393 lag_lead: LagLeadType::Lead,
3394 ..
3395 } => "lead",
3396 Self::FirstValue { .. } => "first_value",
3397 Self::LastValue { .. } => "last_value",
3398 Self::WindowAggregate { .. } => "window_agg",
3399 Self::FusedValueWindowFunc { .. } => "fused_value_window_func",
3400 Self::FusedWindowAggregate { .. } => "fused_window_agg",
3401 Self::Dummy => "dummy",
3402 }
3403 }
3404}
3405
3406impl<'a, M> fmt::Display for HumanizedExpr<'a, AggregateFunc, M>
3407where
3408 M: HumanizerMode,
3409{
3410 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
3411 use AggregateFunc::*;
3412 let name = self.expr.name();
3413 match self.expr {
3414 JsonbAgg { order_by }
3415 | JsonbObjectAgg { order_by }
3416 | MapAgg { order_by, .. }
3417 | ArrayConcat { order_by }
3418 | ListConcat { order_by }
3419 | StringAgg { order_by }
3420 | RowNumber { order_by }
3421 | Rank { order_by }
3422 | DenseRank { order_by } => {
3423 let order_by = order_by.iter().map(|col| self.child(col));
3424 write!(f, "{}[order_by=[{}]]", name, separated(", ", order_by))
3425 }
3426 LagLead {
3427 lag_lead: _,
3428 ignore_nulls,
3429 order_by,
3430 } => {
3431 let order_by = order_by.iter().map(|col| self.child(col));
3432 f.write_str(name)?;
3433 f.write_str("[")?;
3434 if *ignore_nulls {
3435 f.write_str("ignore_nulls=true, ")?;
3436 }
3437 write!(f, "order_by=[{}]", separated(", ", order_by))?;
3438 f.write_str("]")
3439 }
3440 FirstValue {
3441 order_by,
3442 window_frame,
3443 } => {
3444 let order_by = order_by.iter().map(|col| self.child(col));
3445 f.write_str(name)?;
3446 f.write_str("[")?;
3447 write!(f, "order_by=[{}]", separated(", ", order_by))?;
3448 if *window_frame != WindowFrame::default() {
3449 write!(f, " {}", window_frame)?;
3450 }
3451 f.write_str("]")
3452 }
3453 LastValue {
3454 order_by,
3455 window_frame,
3456 } => {
3457 let order_by = order_by.iter().map(|col| self.child(col));
3458 f.write_str(name)?;
3459 f.write_str("[")?;
3460 write!(f, "order_by=[{}]", separated(", ", order_by))?;
3461 if *window_frame != WindowFrame::default() {
3462 write!(f, " {}", window_frame)?;
3463 }
3464 f.write_str("]")
3465 }
3466 WindowAggregate {
3467 wrapped_aggregate,
3468 order_by,
3469 window_frame,
3470 } => {
3471 let order_by = order_by.iter().map(|col| self.child(col));
3472 let wrapped_aggregate = self.child(wrapped_aggregate.deref());
3473 f.write_str(name)?;
3474 f.write_str("[")?;
3475 write!(f, "{} ", wrapped_aggregate)?;
3476 write!(f, "order_by=[{}]", separated(", ", order_by))?;
3477 if *window_frame != WindowFrame::default() {
3478 write!(f, " {}", window_frame)?;
3479 }
3480 f.write_str("]")
3481 }
3482 FusedValueWindowFunc { funcs, order_by } => {
3483 let order_by = order_by.iter().map(|col| self.child(col));
3484 let funcs = separated(", ", funcs.iter().map(|func| self.child(func)));
3485 f.write_str(name)?;
3486 f.write_str("[")?;
3487 write!(f, "{} ", funcs)?;
3488 write!(f, "order_by=[{}]", separated(", ", order_by))?;
3489 f.write_str("]")
3490 }
3491 _ => f.write_str(name),
3492 }
3493 }
3494}
3495
3496#[derive(
3497 Arbitrary, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize, Hash, MzReflect,
3498)]
3499pub struct CaptureGroupDesc {
3500 pub index: u32,
3501 pub name: Option<String>,
3502 pub nullable: bool,
3503}
3504
3505impl RustType<ProtoCaptureGroupDesc> for CaptureGroupDesc {
3506 fn into_proto(&self) -> ProtoCaptureGroupDesc {
3507 ProtoCaptureGroupDesc {
3508 index: self.index,
3509 name: self.name.clone(),
3510 nullable: self.nullable,
3511 }
3512 }
3513
3514 fn from_proto(proto: ProtoCaptureGroupDesc) -> Result<Self, TryFromProtoError> {
3515 Ok(Self {
3516 index: proto.index,
3517 name: proto.name,
3518 nullable: proto.nullable,
3519 })
3520 }
3521}
3522
3523#[derive(
3524 Arbitrary,
3525 Clone,
3526 Copy,
3527 Debug,
3528 Eq,
3529 PartialEq,
3530 Ord,
3531 PartialOrd,
3532 Serialize,
3533 Deserialize,
3534 Hash,
3535 MzReflect,
3536 Default,
3537)]
3538pub struct AnalyzedRegexOpts {
3539 pub case_insensitive: bool,
3540 pub global: bool,
3541}
3542
3543impl FromStr for AnalyzedRegexOpts {
3544 type Err = EvalError;
3545
3546 fn from_str(s: &str) -> Result<Self, Self::Err> {
3547 let mut opts = AnalyzedRegexOpts::default();
3548 for c in s.chars() {
3549 match c {
3550 'i' => opts.case_insensitive = true,
3551 'g' => opts.global = true,
3552 _ => return Err(EvalError::InvalidRegexFlag(c)),
3553 }
3554 }
3555 Ok(opts)
3556 }
3557}
3558
3559impl RustType<ProtoAnalyzedRegexOpts> for AnalyzedRegexOpts {
3560 fn into_proto(&self) -> ProtoAnalyzedRegexOpts {
3561 ProtoAnalyzedRegexOpts {
3562 case_insensitive: self.case_insensitive,
3563 global: self.global,
3564 }
3565 }
3566
3567 fn from_proto(proto: ProtoAnalyzedRegexOpts) -> Result<Self, TryFromProtoError> {
3568 Ok(Self {
3569 case_insensitive: proto.case_insensitive,
3570 global: proto.global,
3571 })
3572 }
3573}
3574
3575#[derive(
3576 Arbitrary, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize, Hash, MzReflect,
3577)]
3578pub struct AnalyzedRegex(
3579 #[proptest(strategy = "mz_repr::adt::regex::any_regex()")] ReprRegex,
3580 Vec<CaptureGroupDesc>,
3581 AnalyzedRegexOpts,
3582);
3583
3584impl RustType<ProtoAnalyzedRegex> for AnalyzedRegex {
3585 fn into_proto(&self) -> ProtoAnalyzedRegex {
3586 ProtoAnalyzedRegex {
3587 regex: Some(self.0.into_proto()),
3588 groups: self.1.into_proto(),
3589 opts: Some(self.2.into_proto()),
3590 }
3591 }
3592
3593 fn from_proto(proto: ProtoAnalyzedRegex) -> Result<Self, TryFromProtoError> {
3594 Ok(AnalyzedRegex(
3595 proto.regex.into_rust_if_some("ProtoAnalyzedRegex::regex")?,
3596 proto.groups.into_rust()?,
3597 proto.opts.into_rust_if_some("ProtoAnalyzedRegex::opts")?,
3598 ))
3599 }
3600}
3601
3602impl AnalyzedRegex {
3603 pub fn new(s: &str, opts: AnalyzedRegexOpts) -> Result<Self, regex::Error> {
3604 let r = ReprRegex::new(s, opts.case_insensitive)?;
3605 #[allow(clippy::as_conversions)]
3607 let descs: Vec<_> = r
3608 .capture_names()
3609 .enumerate()
3610 .skip(1)
3615 .map(|(i, name)| CaptureGroupDesc {
3616 index: i as u32,
3617 name: name.map(String::from),
3618 nullable: true,
3621 })
3622 .collect();
3623 Ok(Self(r, descs, opts))
3624 }
3625 pub fn capture_groups_len(&self) -> usize {
3626 self.1.len()
3627 }
3628 pub fn capture_groups_iter(&self) -> impl Iterator<Item = &CaptureGroupDesc> {
3629 self.1.iter()
3630 }
3631 pub fn inner(&self) -> &Regex {
3632 &(self.0).regex
3633 }
3634 pub fn opts(&self) -> &AnalyzedRegexOpts {
3635 &self.2
3636 }
3637}
3638
3639pub fn csv_extract(a: Datum, n_cols: usize) -> impl Iterator<Item = (Row, Diff)> + '_ {
3640 let bytes = a.unwrap_str().as_bytes();
3641 let mut row = Row::default();
3642 let csv_reader = csv::ReaderBuilder::new()
3643 .has_headers(false)
3644 .from_reader(bytes);
3645 csv_reader.into_records().filter_map(move |res| match res {
3646 Ok(sr) if sr.len() == n_cols => {
3647 row.packer().extend(sr.iter().map(Datum::String));
3648 Some((row.clone(), Diff::ONE))
3649 }
3650 _ => None,
3651 })
3652}
3653
3654pub fn repeat(a: Datum) -> Option<(Row, Diff)> {
3655 let n = a.unwrap_int64();
3656 if n != 0 {
3657 Some((Row::default(), n.into()))
3658 } else {
3659 None
3660 }
3661}
3662
3663fn wrap<'a>(datums: &'a [Datum<'a>], width: usize) -> impl Iterator<Item = (Row, Diff)> + 'a {
3664 datums
3665 .chunks(width)
3666 .map(|chunk| (Row::pack(chunk), Diff::ONE))
3667}
3668
3669fn acl_explode<'a>(
3670 acl_items: Datum<'a>,
3671 temp_storage: &'a RowArena,
3672) -> Result<impl Iterator<Item = (Row, Diff)> + 'a, EvalError> {
3673 let acl_items = acl_items.unwrap_array();
3674 let mut res = Vec::new();
3675 for acl_item in acl_items.elements().iter() {
3676 if acl_item.is_null() {
3677 return Err(EvalError::AclArrayNullElement);
3678 }
3679 let acl_item = acl_item.unwrap_acl_item();
3680 for privilege in acl_item.acl_mode.explode() {
3681 let row = [
3682 Datum::UInt32(acl_item.grantor.0),
3683 Datum::UInt32(acl_item.grantee.0),
3684 Datum::String(temp_storage.push_string(privilege.to_string())),
3685 Datum::False,
3687 ];
3688 res.push((Row::pack_slice(&row), Diff::ONE));
3689 }
3690 }
3691 Ok(res.into_iter())
3692}
3693
3694fn mz_acl_explode<'a>(
3695 mz_acl_items: Datum<'a>,
3696 temp_storage: &'a RowArena,
3697) -> Result<impl Iterator<Item = (Row, Diff)> + 'a, EvalError> {
3698 let mz_acl_items = mz_acl_items.unwrap_array();
3699 let mut res = Vec::new();
3700 for mz_acl_item in mz_acl_items.elements().iter() {
3701 if mz_acl_item.is_null() {
3702 return Err(EvalError::MzAclArrayNullElement);
3703 }
3704 let mz_acl_item = mz_acl_item.unwrap_mz_acl_item();
3705 for privilege in mz_acl_item.acl_mode.explode() {
3706 let row = [
3707 Datum::String(temp_storage.push_string(mz_acl_item.grantor.to_string())),
3708 Datum::String(temp_storage.push_string(mz_acl_item.grantee.to_string())),
3709 Datum::String(temp_storage.push_string(privilege.to_string())),
3710 Datum::False,
3712 ];
3713 res.push((Row::pack_slice(&row), Diff::ONE));
3714 }
3715 }
3716 Ok(res.into_iter())
3717}
3718
3719#[derive(
3720 Arbitrary, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize, Hash, MzReflect,
3721)]
3722pub enum TableFunc {
3723 AclExplode,
3724 MzAclExplode,
3725 JsonbEach {
3726 stringify: bool,
3727 },
3728 JsonbObjectKeys,
3729 JsonbArrayElements {
3730 stringify: bool,
3731 },
3732 RegexpExtract(AnalyzedRegex),
3733 CsvExtract(usize),
3734 GenerateSeriesInt32,
3735 GenerateSeriesInt64,
3736 GenerateSeriesTimestamp,
3737 GenerateSeriesTimestampTz,
3738 Repeat,
3739 UnnestArray {
3740 el_typ: ScalarType,
3741 },
3742 UnnestList {
3743 el_typ: ScalarType,
3744 },
3745 UnnestMap {
3746 value_type: ScalarType,
3747 },
3748 Wrap {
3754 types: Vec<ColumnType>,
3755 width: usize,
3756 },
3757 GenerateSubscriptsArray,
3758 TabletizedScalar {
3760 name: String,
3761 relation: RelationType,
3762 },
3763 RegexpMatches,
3764}
3765
3766impl RustType<ProtoTableFunc> for TableFunc {
3767 fn into_proto(&self) -> ProtoTableFunc {
3768 use proto_table_func::{Kind, ProtoWrap};
3769
3770 ProtoTableFunc {
3771 kind: Some(match self {
3772 TableFunc::AclExplode => Kind::AclExplode(()),
3773 TableFunc::MzAclExplode => Kind::MzAclExplode(()),
3774 TableFunc::JsonbEach { stringify } => Kind::JsonbEach(*stringify),
3775 TableFunc::JsonbObjectKeys => Kind::JsonbObjectKeys(()),
3776 TableFunc::JsonbArrayElements { stringify } => Kind::JsonbArrayElements(*stringify),
3777 TableFunc::RegexpExtract(x) => Kind::RegexpExtract(x.into_proto()),
3778 TableFunc::CsvExtract(x) => Kind::CsvExtract(x.into_proto()),
3779 TableFunc::GenerateSeriesInt32 => Kind::GenerateSeriesInt32(()),
3780 TableFunc::GenerateSeriesInt64 => Kind::GenerateSeriesInt64(()),
3781 TableFunc::GenerateSeriesTimestamp => Kind::GenerateSeriesTimestamp(()),
3782 TableFunc::GenerateSeriesTimestampTz => Kind::GenerateSeriesTimestampTz(()),
3783 TableFunc::Repeat => Kind::Repeat(()),
3784 TableFunc::UnnestArray { el_typ } => Kind::UnnestArray(el_typ.into_proto()),
3785 TableFunc::UnnestList { el_typ } => Kind::UnnestList(el_typ.into_proto()),
3786 TableFunc::UnnestMap { value_type } => Kind::UnnestMap(value_type.into_proto()),
3787 TableFunc::Wrap { types, width } => Kind::Wrap(ProtoWrap {
3788 types: types.into_proto(),
3789 width: width.into_proto(),
3790 }),
3791 TableFunc::GenerateSubscriptsArray => Kind::GenerateSubscriptsArray(()),
3792 TableFunc::TabletizedScalar { name, relation } => {
3793 Kind::TabletizedScalar(ProtoTabletizedScalar {
3794 name: name.into_proto(),
3795 relation: Some(relation.into_proto()),
3796 })
3797 }
3798 TableFunc::RegexpMatches => Kind::RegexpMatches(()),
3799 }),
3800 }
3801 }
3802
3803 fn from_proto(proto: ProtoTableFunc) -> Result<Self, TryFromProtoError> {
3804 use proto_table_func::Kind;
3805
3806 let kind = proto
3807 .kind
3808 .ok_or_else(|| TryFromProtoError::missing_field("ProtoTableFunc::Kind"))?;
3809
3810 Ok(match kind {
3811 Kind::AclExplode(()) => TableFunc::AclExplode,
3812 Kind::MzAclExplode(()) => TableFunc::MzAclExplode,
3813 Kind::JsonbEach(stringify) => TableFunc::JsonbEach { stringify },
3814 Kind::JsonbObjectKeys(()) => TableFunc::JsonbObjectKeys,
3815 Kind::JsonbArrayElements(stringify) => TableFunc::JsonbArrayElements { stringify },
3816 Kind::RegexpExtract(x) => TableFunc::RegexpExtract(x.into_rust()?),
3817 Kind::CsvExtract(x) => TableFunc::CsvExtract(x.into_rust()?),
3818 Kind::GenerateSeriesInt32(()) => TableFunc::GenerateSeriesInt32,
3819 Kind::GenerateSeriesInt64(()) => TableFunc::GenerateSeriesInt64,
3820 Kind::GenerateSeriesTimestamp(()) => TableFunc::GenerateSeriesTimestamp,
3821 Kind::GenerateSeriesTimestampTz(()) => TableFunc::GenerateSeriesTimestampTz,
3822 Kind::Repeat(()) => TableFunc::Repeat,
3823 Kind::UnnestArray(x) => TableFunc::UnnestArray {
3824 el_typ: x.into_rust()?,
3825 },
3826 Kind::UnnestList(x) => TableFunc::UnnestList {
3827 el_typ: x.into_rust()?,
3828 },
3829 Kind::UnnestMap(value_type) => TableFunc::UnnestMap {
3830 value_type: value_type.into_rust()?,
3831 },
3832 Kind::Wrap(x) => TableFunc::Wrap {
3833 width: x.width.into_rust()?,
3834 types: x.types.into_rust()?,
3835 },
3836 Kind::GenerateSubscriptsArray(()) => TableFunc::GenerateSubscriptsArray,
3837 Kind::TabletizedScalar(v) => TableFunc::TabletizedScalar {
3838 name: v.name,
3839 relation: v
3840 .relation
3841 .into_rust_if_some("ProtoTabletizedScalar::relation")?,
3842 },
3843 Kind::RegexpMatches(_) => TableFunc::RegexpMatches,
3844 })
3845 }
3846}
3847
3848impl TableFunc {
3849 pub fn eval<'a>(
3850 &'a self,
3851 datums: &'a [Datum<'a>],
3852 temp_storage: &'a RowArena,
3853 ) -> Result<Box<dyn Iterator<Item = (Row, Diff)> + 'a>, EvalError> {
3854 if self.empty_on_null_input() && datums.iter().any(|d| d.is_null()) {
3855 return Ok(Box::new(vec![].into_iter()));
3856 }
3857 match self {
3858 TableFunc::AclExplode => Ok(Box::new(acl_explode(datums[0], temp_storage)?)),
3859 TableFunc::MzAclExplode => Ok(Box::new(mz_acl_explode(datums[0], temp_storage)?)),
3860 TableFunc::JsonbEach { stringify } => {
3861 Ok(Box::new(jsonb_each(datums[0], temp_storage, *stringify)))
3862 }
3863 TableFunc::JsonbObjectKeys => Ok(Box::new(jsonb_object_keys(datums[0]))),
3864 TableFunc::JsonbArrayElements { stringify } => Ok(Box::new(jsonb_array_elements(
3865 datums[0],
3866 temp_storage,
3867 *stringify,
3868 ))),
3869 TableFunc::RegexpExtract(a) => Ok(Box::new(regexp_extract(datums[0], a).into_iter())),
3870 TableFunc::CsvExtract(n_cols) => Ok(Box::new(csv_extract(datums[0], *n_cols))),
3871 TableFunc::GenerateSeriesInt32 => {
3872 let res = generate_series(
3873 datums[0].unwrap_int32(),
3874 datums[1].unwrap_int32(),
3875 datums[2].unwrap_int32(),
3876 )?;
3877 Ok(Box::new(res))
3878 }
3879 TableFunc::GenerateSeriesInt64 => {
3880 let res = generate_series(
3881 datums[0].unwrap_int64(),
3882 datums[1].unwrap_int64(),
3883 datums[2].unwrap_int64(),
3884 )?;
3885 Ok(Box::new(res))
3886 }
3887 TableFunc::GenerateSeriesTimestamp => {
3888 fn pass_through<'a>(d: CheckedTimestamp<NaiveDateTime>) -> Datum<'a> {
3889 Datum::from(d)
3890 }
3891 let res = generate_series_ts(
3892 datums[0].unwrap_timestamp(),
3893 datums[1].unwrap_timestamp(),
3894 datums[2].unwrap_interval(),
3895 pass_through,
3896 )?;
3897 Ok(Box::new(res))
3898 }
3899 TableFunc::GenerateSeriesTimestampTz => {
3900 fn gen_ts_tz<'a>(d: CheckedTimestamp<DateTime<Utc>>) -> Datum<'a> {
3901 Datum::from(d)
3902 }
3903 let res = generate_series_ts(
3904 datums[0].unwrap_timestamptz(),
3905 datums[1].unwrap_timestamptz(),
3906 datums[2].unwrap_interval(),
3907 gen_ts_tz,
3908 )?;
3909 Ok(Box::new(res))
3910 }
3911 TableFunc::GenerateSubscriptsArray => {
3912 generate_subscripts_array(datums[0], datums[1].unwrap_int32())
3913 }
3914 TableFunc::Repeat => Ok(Box::new(repeat(datums[0]).into_iter())),
3915 TableFunc::UnnestArray { .. } => Ok(Box::new(unnest_array(datums[0]))),
3916 TableFunc::UnnestList { .. } => Ok(Box::new(unnest_list(datums[0]))),
3917 TableFunc::UnnestMap { .. } => Ok(Box::new(unnest_map(datums[0]))),
3918 TableFunc::Wrap { width, .. } => Ok(Box::new(wrap(datums, *width))),
3919 TableFunc::TabletizedScalar { .. } => {
3920 let r = Row::pack_slice(datums);
3921 Ok(Box::new(std::iter::once((r, Diff::ONE))))
3922 }
3923 TableFunc::RegexpMatches => Ok(Box::new(regexp_matches(datums)?)),
3924 }
3925 }
3926
3927 pub fn output_type(&self) -> RelationType {
3928 let (column_types, keys) = match self {
3929 TableFunc::AclExplode => {
3930 let column_types = vec![
3931 ScalarType::Oid.nullable(false),
3932 ScalarType::Oid.nullable(false),
3933 ScalarType::String.nullable(false),
3934 ScalarType::Bool.nullable(false),
3935 ];
3936 let keys = vec![];
3937 (column_types, keys)
3938 }
3939 TableFunc::MzAclExplode => {
3940 let column_types = vec![
3941 ScalarType::String.nullable(false),
3942 ScalarType::String.nullable(false),
3943 ScalarType::String.nullable(false),
3944 ScalarType::Bool.nullable(false),
3945 ];
3946 let keys = vec![];
3947 (column_types, keys)
3948 }
3949 TableFunc::JsonbEach { stringify: true } => {
3950 let column_types = vec![
3951 ScalarType::String.nullable(false),
3952 ScalarType::String.nullable(true),
3953 ];
3954 let keys = vec![];
3955 (column_types, keys)
3956 }
3957 TableFunc::JsonbEach { stringify: false } => {
3958 let column_types = vec![
3959 ScalarType::String.nullable(false),
3960 ScalarType::Jsonb.nullable(false),
3961 ];
3962 let keys = vec![];
3963 (column_types, keys)
3964 }
3965 TableFunc::JsonbObjectKeys => {
3966 let column_types = vec![ScalarType::String.nullable(false)];
3967 let keys = vec![];
3968 (column_types, keys)
3969 }
3970 TableFunc::JsonbArrayElements { stringify: true } => {
3971 let column_types = vec![ScalarType::String.nullable(true)];
3972 let keys = vec![];
3973 (column_types, keys)
3974 }
3975 TableFunc::JsonbArrayElements { stringify: false } => {
3976 let column_types = vec![ScalarType::Jsonb.nullable(false)];
3977 let keys = vec![];
3978 (column_types, keys)
3979 }
3980 TableFunc::RegexpExtract(a) => {
3981 let column_types = a
3982 .capture_groups_iter()
3983 .map(|cg| ScalarType::String.nullable(cg.nullable))
3984 .collect();
3985 let keys = vec![];
3986 (column_types, keys)
3987 }
3988 TableFunc::CsvExtract(n_cols) => {
3989 let column_types = iter::repeat(ScalarType::String.nullable(false))
3990 .take(*n_cols)
3991 .collect();
3992 let keys = vec![];
3993 (column_types, keys)
3994 }
3995 TableFunc::GenerateSeriesInt32 => {
3996 let column_types = vec![ScalarType::Int32.nullable(false)];
3997 let keys = vec![vec![0]];
3998 (column_types, keys)
3999 }
4000 TableFunc::GenerateSeriesInt64 => {
4001 let column_types = vec![ScalarType::Int64.nullable(false)];
4002 let keys = vec![vec![0]];
4003 (column_types, keys)
4004 }
4005 TableFunc::GenerateSeriesTimestamp => {
4006 let column_types = vec![ScalarType::Timestamp { precision: None }.nullable(false)];
4007 let keys = vec![vec![0]];
4008 (column_types, keys)
4009 }
4010 TableFunc::GenerateSeriesTimestampTz => {
4011 let column_types =
4012 vec![ScalarType::TimestampTz { precision: None }.nullable(false)];
4013 let keys = vec![vec![0]];
4014 (column_types, keys)
4015 }
4016 TableFunc::GenerateSubscriptsArray => {
4017 let column_types = vec![ScalarType::Int32.nullable(false)];
4018 let keys = vec![vec![0]];
4019 (column_types, keys)
4020 }
4021 TableFunc::Repeat => {
4022 let column_types = vec![];
4023 let keys = vec![];
4024 (column_types, keys)
4025 }
4026 TableFunc::UnnestArray { el_typ } => {
4027 let column_types = vec![el_typ.clone().nullable(true)];
4028 let keys = vec![];
4029 (column_types, keys)
4030 }
4031 TableFunc::UnnestList { el_typ } => {
4032 let column_types = vec![el_typ.clone().nullable(true)];
4033 let keys = vec![];
4034 (column_types, keys)
4035 }
4036 TableFunc::UnnestMap { value_type } => {
4037 let column_types = vec![
4038 ScalarType::String.nullable(false),
4039 value_type.clone().nullable(true),
4040 ];
4041 let keys = vec![vec![0]];
4042 (column_types, keys)
4043 }
4044 TableFunc::Wrap { types, .. } => {
4045 let column_types = types.clone();
4046 let keys = vec![];
4047 (column_types, keys)
4048 }
4049 TableFunc::TabletizedScalar { relation, .. } => {
4050 return relation.clone();
4051 }
4052 TableFunc::RegexpMatches => {
4053 let column_types =
4054 vec![ScalarType::Array(Box::new(ScalarType::String)).nullable(false)];
4055 let keys = vec![];
4056
4057 (column_types, keys)
4058 }
4059 };
4060
4061 if !keys.is_empty() {
4062 RelationType::new(column_types).with_keys(keys)
4063 } else {
4064 RelationType::new(column_types)
4065 }
4066 }
4067
4068 pub fn output_arity(&self) -> usize {
4069 match self {
4070 TableFunc::AclExplode => 4,
4071 TableFunc::MzAclExplode => 4,
4072 TableFunc::JsonbEach { .. } => 2,
4073 TableFunc::JsonbObjectKeys => 1,
4074 TableFunc::JsonbArrayElements { .. } => 1,
4075 TableFunc::RegexpExtract(a) => a.capture_groups_len(),
4076 TableFunc::CsvExtract(n_cols) => *n_cols,
4077 TableFunc::GenerateSeriesInt32 => 1,
4078 TableFunc::GenerateSeriesInt64 => 1,
4079 TableFunc::GenerateSeriesTimestamp => 1,
4080 TableFunc::GenerateSeriesTimestampTz => 1,
4081 TableFunc::GenerateSubscriptsArray => 1,
4082 TableFunc::Repeat => 0,
4083 TableFunc::UnnestArray { .. } => 1,
4084 TableFunc::UnnestList { .. } => 1,
4085 TableFunc::UnnestMap { .. } => 2,
4086 TableFunc::Wrap { width, .. } => *width,
4087 TableFunc::TabletizedScalar { relation, .. } => relation.column_types.len(),
4088 TableFunc::RegexpMatches => 1,
4089 }
4090 }
4091
4092 pub fn empty_on_null_input(&self) -> bool {
4093 match self {
4094 TableFunc::AclExplode
4095 | TableFunc::MzAclExplode
4096 | TableFunc::JsonbEach { .. }
4097 | TableFunc::JsonbObjectKeys
4098 | TableFunc::JsonbArrayElements { .. }
4099 | TableFunc::GenerateSeriesInt32
4100 | TableFunc::GenerateSeriesInt64
4101 | TableFunc::GenerateSeriesTimestamp
4102 | TableFunc::GenerateSeriesTimestampTz
4103 | TableFunc::GenerateSubscriptsArray
4104 | TableFunc::RegexpExtract(_)
4105 | TableFunc::CsvExtract(_)
4106 | TableFunc::Repeat
4107 | TableFunc::UnnestArray { .. }
4108 | TableFunc::UnnestList { .. }
4109 | TableFunc::UnnestMap { .. }
4110 | TableFunc::RegexpMatches => true,
4111 TableFunc::Wrap { .. } => false,
4112 TableFunc::TabletizedScalar { .. } => false,
4113 }
4114 }
4115
4116 pub fn preserves_monotonicity(&self) -> bool {
4118 match self {
4121 TableFunc::AclExplode => false,
4122 TableFunc::MzAclExplode => false,
4123 TableFunc::JsonbEach { .. } => true,
4124 TableFunc::JsonbObjectKeys => true,
4125 TableFunc::JsonbArrayElements { .. } => true,
4126 TableFunc::RegexpExtract(_) => true,
4127 TableFunc::CsvExtract(_) => true,
4128 TableFunc::GenerateSeriesInt32 => true,
4129 TableFunc::GenerateSeriesInt64 => true,
4130 TableFunc::GenerateSeriesTimestamp => true,
4131 TableFunc::GenerateSeriesTimestampTz => true,
4132 TableFunc::GenerateSubscriptsArray => true,
4133 TableFunc::Repeat => false,
4134 TableFunc::UnnestArray { .. } => true,
4135 TableFunc::UnnestList { .. } => true,
4136 TableFunc::UnnestMap { .. } => true,
4137 TableFunc::Wrap { .. } => true,
4138 TableFunc::TabletizedScalar { .. } => true,
4139 TableFunc::RegexpMatches => true,
4140 }
4141 }
4142}
4143
4144impl fmt::Display for TableFunc {
4145 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
4146 match self {
4147 TableFunc::AclExplode => f.write_str("aclexplode"),
4148 TableFunc::MzAclExplode => f.write_str("mz_aclexplode"),
4149 TableFunc::JsonbEach { .. } => f.write_str("jsonb_each"),
4150 TableFunc::JsonbObjectKeys => f.write_str("jsonb_object_keys"),
4151 TableFunc::JsonbArrayElements { .. } => f.write_str("jsonb_array_elements"),
4152 TableFunc::RegexpExtract(a) => write!(f, "regexp_extract({:?}, _)", a.0),
4153 TableFunc::CsvExtract(n_cols) => write!(f, "csv_extract({}, _)", n_cols),
4154 TableFunc::GenerateSeriesInt32 => f.write_str("generate_series"),
4155 TableFunc::GenerateSeriesInt64 => f.write_str("generate_series"),
4156 TableFunc::GenerateSeriesTimestamp => f.write_str("generate_series"),
4157 TableFunc::GenerateSeriesTimestampTz => f.write_str("generate_series"),
4158 TableFunc::GenerateSubscriptsArray => f.write_str("generate_subscripts"),
4159 TableFunc::Repeat => f.write_str("repeat_row"),
4160 TableFunc::UnnestArray { .. } => f.write_str("unnest_array"),
4161 TableFunc::UnnestList { .. } => f.write_str("unnest_list"),
4162 TableFunc::UnnestMap { .. } => f.write_str("unnest_map"),
4163 TableFunc::Wrap { width, .. } => write!(f, "wrap{}", width),
4164 TableFunc::TabletizedScalar { name, .. } => f.write_str(name),
4165 TableFunc::RegexpMatches => write!(f, "regexp_matches(_, _, _)"),
4166 }
4167 }
4168}
4169
4170#[cfg(test)]
4171mod tests {
4172 use super::{AggregateFunc, ProtoAggregateFunc, ProtoTableFunc, TableFunc};
4173 use mz_ore::assert_ok;
4174 use mz_proto::protobuf_roundtrip;
4175 use proptest::prelude::*;
4176
4177 proptest! {
4178 #[mz_ore::test]
4179 #[cfg_attr(miri, ignore)] fn aggregate_func_protobuf_roundtrip(expect in any::<AggregateFunc>() ) {
4181 let actual = protobuf_roundtrip::<_, ProtoAggregateFunc>(&expect);
4182 assert_ok!(actual);
4183 assert_eq!(actual.unwrap(), expect);
4184 }
4185 }
4186
4187 proptest! {
4188 #[mz_ore::test]
4189 #[cfg_attr(miri, ignore)] fn table_func_protobuf_roundtrip(expect in any::<TableFunc>() ) {
4191 let actual = protobuf_roundtrip::<_, ProtoTableFunc>(&expect);
4192 assert_ok!(actual);
4193 assert_eq!(actual.unwrap(), expect);
4194 }
4195 }
4196}