1use std::collections::VecDeque;
11
12use differential_dataflow::consolidation::ConsolidatingContainerBuilder;
13use mz_compute_types::dyncfgs::COMPUTE_FLAT_MAP_FUEL;
14use mz_expr::MfpPlan;
15use mz_expr::{MapFilterProject, MirScalarExpr, TableFunc};
16use mz_repr::{DatumVec, RowArena, SharedRow};
17use mz_repr::{Diff, Row, Timestamp};
18use mz_timely_util::operator::StreamExt;
19use timely::dataflow::Scope;
20use timely::dataflow::channels::pact::Pipeline;
21use timely::dataflow::operators::Capability;
22use timely::dataflow::operators::generic::Session;
23use timely::progress::Antichain;
24
25use crate::render::DataflowError;
26use crate::render::context::{CollectionBundle, Context};
27
28impl<G> Context<G>
29where
30 G: Scope,
31 G::Timestamp: crate::render::RenderTimestamp,
32{
33 pub fn render_flat_map(
35 &self,
36 input_key: Option<Vec<MirScalarExpr>>,
37 input: CollectionBundle<G>,
38 exprs: Vec<MirScalarExpr>,
39 func: TableFunc,
40 mfp: MapFilterProject,
41 ) -> CollectionBundle<G> {
42 let until = self.until.clone();
43 let mfp_plan = mfp.into_plan().expect("MapFilterProject planning failed");
44 let (ok_collection, err_collection) =
45 input.as_specific_collection(input_key.as_deref(), &self.config_set);
46 let stream = ok_collection.inner;
47 let scope = input.scope();
48
49 let budget = COMPUTE_FLAT_MAP_FUEL.get(&self.config_set);
54
55 let (oks, errs) = stream.unary_fallible(Pipeline, "FlatMapStage", move |_, info| {
56 let activator = scope.activator_for(info.address);
57 let mut queue = VecDeque::new();
58 Box::new(move |input, ok_output, err_output| {
59 let mut datums = DatumVec::new();
60 let mut datums_mfp = DatumVec::new();
61
62 let mut table_func_output = Vec::new();
64
65 let mut budget = budget;
66
67 input.for_each(|cap, data| {
68 queue.push_back((
69 cap.delayed_for_output(cap.time(), 0),
70 cap.retain_for_output(1),
71 std::mem::take(data),
72 ))
73 });
74
75 while let Some((ok_cap, err_cap, data)) = queue.pop_front() {
76 let mut ok_session = ok_output.session_with_builder(&ok_cap);
77 let mut err_session = err_output.session_with_builder(&err_cap);
78
79 'input: for (input_row, time, diff) in data {
80 let temp_storage = RowArena::new();
81
82 let datums_local = datums.borrow_with(&input_row);
84 let args = exprs
85 .iter()
86 .map(|e| e.eval(&datums_local, &temp_storage))
87 .collect::<Result<Vec<_>, _>>();
88 let args = match args {
89 Ok(args) => args,
90 Err(e) => {
91 err_session.give((e.into(), time, diff));
92 continue 'input;
93 }
94 };
95 let mut extensions = match func.eval(&args, &temp_storage) {
96 Ok(exts) => exts.fuse(),
97 Err(e) => {
98 err_session.give((e.into(), time, diff));
99 continue 'input;
100 }
101 };
102
103 while let Some((extension, output_diff)) = extensions.next() {
105 table_func_output.push((extension, output_diff));
106 table_func_output.extend((&mut extensions).take(1023));
107 drain_through_mfp(
109 &input_row,
110 &time,
111 &diff,
112 &mut datums_mfp,
113 &table_func_output,
114 &mfp_plan,
115 &until,
116 &mut ok_session,
117 &mut err_session,
118 &mut budget,
119 );
120 table_func_output.clear();
121 }
122 }
123 if budget == 0 {
124 activator.activate();
125 break;
126 }
127 }
128 })
129 });
130
131 use differential_dataflow::AsCollection;
132 let ok_collection = oks.as_collection();
133 let new_err_collection = errs.as_collection();
134 let err_collection = err_collection.concat(&new_err_collection);
135 CollectionBundle::from_collections(ok_collection, err_collection)
136 }
137}
138
139fn drain_through_mfp<T>(
143 input_row: &Row,
144 input_time: &T,
145 input_diff: &Diff,
146 datum_vec: &mut DatumVec,
147 extensions: &[(Row, Diff)],
148 mfp_plan: &MfpPlan,
149 until: &Antichain<Timestamp>,
150 ok_output: &mut Session<
151 '_,
152 '_,
153 T,
154 ConsolidatingContainerBuilder<Vec<(Row, T, Diff)>>,
155 Capability<T>,
156 >,
157 err_output: &mut Session<
158 '_,
159 '_,
160 T,
161 ConsolidatingContainerBuilder<Vec<(DataflowError, T, Diff)>>,
162 Capability<T>,
163 >,
164 budget: &mut usize,
165) where
166 T: crate::render::RenderTimestamp,
167{
168 let temp_storage = RowArena::new();
169 let mut row_builder = SharedRow::get();
170
171 let mut datums_local = datum_vec.borrow_with(input_row);
173 let datums_len = datums_local.len();
174
175 let event_time = input_time.event_time().clone();
176
177 for (cols, diff) in extensions.iter() {
178 datums_local.truncate(datums_len);
180 datums_local.extend(cols.iter());
181
182 let results = mfp_plan.evaluate(
183 &mut datums_local,
184 &temp_storage,
185 event_time,
186 *diff * *input_diff,
187 |time| !until.less_equal(time),
188 &mut row_builder,
189 );
190
191 for result in results {
192 *budget = budget.saturating_sub(1);
193 match result {
194 Ok((row, event_time, diff)) => {
195 let mut time = input_time.clone();
197 *time.event_time_mut() = event_time;
198 ok_output.give((row, time, diff));
199 }
200 Err((err, event_time, diff)) => {
201 let mut time = input_time.clone();
203 *time.event_time_mut() = event_time;
204 err_output.give((err, time, diff));
205 }
206 };
207 }
208 }
209}