1use columnar::Columnar;
11use differential_dataflow::consolidation::ConsolidatingContainerBuilder;
12use mz_expr::MfpPlan;
13use mz_expr::{MapFilterProject, MirScalarExpr, TableFunc};
14use mz_repr::{DatumVec, RowArena, SharedRow};
15use mz_repr::{Diff, Row, Timestamp};
16use mz_timely_util::operator::StreamExt;
17use timely::dataflow::Scope;
18use timely::dataflow::channels::pact::Pipeline;
19use timely::dataflow::channels::pushers::buffer::Session;
20use timely::dataflow::channels::pushers::{Counter, Tee};
21use timely::progress::Antichain;
22
23use crate::render::DataflowError;
24use crate::render::context::{CollectionBundle, Context};
25
26impl<G> Context<G>
27where
28 G: Scope,
29 G::Timestamp: crate::render::RenderTimestamp,
30 <G::Timestamp as Columnar>::Container: Clone + Send,
31{
32 pub fn render_flat_map(
34 &self,
35 input: CollectionBundle<G>,
36 func: TableFunc,
37 exprs: Vec<MirScalarExpr>,
38 mfp: MapFilterProject,
39 input_key: Option<Vec<MirScalarExpr>>,
40 ) -> CollectionBundle<G> {
41 let until = self.until.clone();
42 let mfp_plan = mfp.into_plan().expect("MapFilterProject planning failed");
43 let (ok_collection, err_collection) =
44 input.as_specific_collection(input_key.as_deref(), &self.config_set);
45 let stream = ok_collection.inner;
46 let (oks, errs) = stream.unary_fallible(Pipeline, "FlatMapStage", move |_, _| {
47 Box::new(move |input, ok_output, err_output| {
48 let mut datums = DatumVec::new();
49 let mut datums_mfp = DatumVec::new();
50
51 let mut table_func_output = Vec::new();
53
54 input.for_each(|cap, data| {
55 let mut ok_session = ok_output.session_with_builder(&cap);
56 let mut err_session = err_output.session_with_builder(&cap);
57
58 'input: for (input_row, time, diff) in data.drain(..) {
59 let temp_storage = RowArena::new();
60
61 let datums_local = datums.borrow_with(&input_row);
63 let args = exprs
64 .iter()
65 .map(|e| e.eval(&datums_local, &temp_storage))
66 .collect::<Result<Vec<_>, _>>();
67 let args = match args {
68 Ok(args) => args,
69 Err(e) => {
70 err_session.give((e.into(), time, diff));
71 continue 'input;
72 }
73 };
74 let mut extensions = match func.eval(&args, &temp_storage) {
75 Ok(exts) => exts.fuse(),
76 Err(e) => {
77 err_session.give((e.into(), time, diff));
78 continue 'input;
79 }
80 };
81
82 while let Some((extension, output_diff)) = extensions.next() {
84 table_func_output.push((extension, output_diff));
85 table_func_output.extend((&mut extensions).take(1023));
86 drain_through_mfp(
88 &input_row,
89 &time,
90 &diff,
91 &mut datums_mfp,
92 &table_func_output,
93 &mfp_plan,
94 &until,
95 &mut ok_session,
96 &mut err_session,
97 );
98 table_func_output.clear();
99 }
100 }
101 })
102 })
103 });
104
105 use differential_dataflow::AsCollection;
106 let ok_collection = oks.as_collection();
107 let new_err_collection = errs.as_collection();
108 let err_collection = err_collection.concat(&new_err_collection);
109 CollectionBundle::from_collections(ok_collection, err_collection)
110 }
111}
112
113fn drain_through_mfp<T>(
117 input_row: &Row,
118 input_time: &T,
119 input_diff: &Diff,
120 datum_vec: &mut DatumVec,
121 extensions: &[(Row, Diff)],
122 mfp_plan: &MfpPlan,
123 until: &Antichain<Timestamp>,
124 ok_output: &mut Session<
125 T,
126 ConsolidatingContainerBuilder<Vec<(Row, T, Diff)>>,
127 Counter<T, Vec<(Row, T, Diff)>, Tee<T, Vec<(Row, T, Diff)>>>,
128 >,
129 err_output: &mut Session<
130 T,
131 ConsolidatingContainerBuilder<Vec<(DataflowError, T, Diff)>>,
132 Counter<T, Vec<(DataflowError, T, Diff)>, Tee<T, Vec<(DataflowError, T, Diff)>>>,
133 >,
134) where
135 T: crate::render::RenderTimestamp,
136 <T as Columnar>::Container: Clone + Send,
137{
138 let temp_storage = RowArena::new();
139 let binding = SharedRow::get();
140 let mut row_builder = binding.borrow_mut();
141
142 let mut datums_local = datum_vec.borrow_with(input_row);
144 let datums_len = datums_local.len();
145
146 let event_time = input_time.event_time().clone();
147
148 for (cols, diff) in extensions.iter() {
149 datums_local.truncate(datums_len);
151 datums_local.extend(cols.iter());
152
153 let results = mfp_plan.evaluate(
154 &mut datums_local,
155 &temp_storage,
156 event_time,
157 *diff * *input_diff,
158 |time| !until.less_equal(time),
159 &mut row_builder,
160 );
161
162 for result in results {
163 match result {
164 Ok((row, event_time, diff)) => {
165 let mut time = input_time.clone();
167 *time.event_time_mut() = event_time;
168 ok_output.give((row, time, diff));
169 }
170 Err((err, event_time, diff)) => {
171 let mut time = input_time.clone();
173 *time.event_time_mut() = event_time;
174 err_output.give((err, time, diff));
175 }
176 };
177 }
178 }
179}