1use std::collections::VecDeque;
11
12use differential_dataflow::consolidation::ConsolidatingContainerBuilder;
13use mz_compute_types::dyncfgs::COMPUTE_FLAT_MAP_FUEL;
14use mz_expr::MfpPlan;
15use mz_expr::{MapFilterProject, MirScalarExpr, TableFunc};
16use mz_repr::{DatumVec, RowArena, SharedRow};
17use mz_repr::{Diff, Row, Timestamp};
18use mz_timely_util::operator::StreamExt;
19use timely::dataflow::Scope;
20use timely::dataflow::channels::pact::Pipeline;
21use timely::dataflow::operators::Capability;
22use timely::dataflow::operators::generic::Session;
23use timely::progress::Antichain;
24
25use crate::render::DataflowError;
26use crate::render::context::{CollectionBundle, Context};
27
28impl<G> Context<G>
29where
30 G: Scope,
31 G::Timestamp: crate::render::RenderTimestamp,
32{
33 pub fn render_flat_map(
35 &self,
36 input_key: Option<Vec<MirScalarExpr>>,
37 input: CollectionBundle<G>,
38 exprs: Vec<MirScalarExpr>,
39 func: TableFunc,
40 mfp: MapFilterProject,
41 ) -> CollectionBundle<G> {
42 let until = self.until.clone();
43 let mfp_plan = mfp.into_plan().expect("MapFilterProject planning failed");
44 let (ok_collection, err_collection) =
45 input.as_specific_collection(input_key.as_deref(), &self.config_set);
46 let stream = ok_collection.inner;
47 let scope = input.scope();
48
49 let budget = COMPUTE_FLAT_MAP_FUEL.get(&self.config_set);
54
55 let (oks, errs) = stream.unary_fallible(Pipeline, "FlatMapStage", move |_, info| {
56 let activator = scope.activator_for(info.address);
57 let mut queue = VecDeque::new();
58 Box::new(move |input, ok_output, err_output| {
59 let mut datums = DatumVec::new();
60 let mut datums_mfp = DatumVec::new();
61
62 let mut table_func_output = Vec::new();
64
65 let mut budget = budget;
66
67 input.for_each(|cap, data| {
68 queue.push_back((cap.retain(0), cap.retain(1), std::mem::take(data)))
69 });
70
71 while let Some((ok_cap, err_cap, data)) = queue.pop_front() {
72 let mut ok_session = ok_output.session_with_builder(&ok_cap);
73 let mut err_session = err_output.session_with_builder(&err_cap);
74
75 'input: for (input_row, time, diff) in data {
76 let temp_storage = RowArena::new();
77
78 let datums_local = datums.borrow_with(&input_row);
80 let args = exprs
81 .iter()
82 .map(|e| e.eval(&datums_local, &temp_storage))
83 .collect::<Result<Vec<_>, _>>();
84 let args = match args {
85 Ok(args) => args,
86 Err(e) => {
87 err_session.give((e.into(), time, diff));
88 continue 'input;
89 }
90 };
91 let mut extensions = match func.eval(&args, &temp_storage) {
92 Ok(exts) => exts.fuse(),
93 Err(e) => {
94 err_session.give((e.into(), time, diff));
95 continue 'input;
96 }
97 };
98
99 while let Some((extension, output_diff)) = extensions.next() {
101 table_func_output.push((extension, output_diff));
102 table_func_output.extend((&mut extensions).take(1023));
103 drain_through_mfp(
105 &input_row,
106 &time,
107 &diff,
108 &mut datums_mfp,
109 &table_func_output,
110 &mfp_plan,
111 &until,
112 &mut ok_session,
113 &mut err_session,
114 &mut budget,
115 );
116 table_func_output.clear();
117 }
118 }
119 if budget == 0 {
120 activator.activate();
121 break;
122 }
123 }
124 })
125 });
126
127 use differential_dataflow::AsCollection;
128 let ok_collection = oks.as_collection();
129 let new_err_collection = errs.as_collection();
130 let err_collection = err_collection.concat(new_err_collection);
131 CollectionBundle::from_collections(ok_collection, err_collection)
132 }
133}
134
135fn drain_through_mfp<T>(
139 input_row: &Row,
140 input_time: &T,
141 input_diff: &Diff,
142 datum_vec: &mut DatumVec,
143 extensions: &[(Row, Diff)],
144 mfp_plan: &MfpPlan,
145 until: &Antichain<Timestamp>,
146 ok_output: &mut Session<
147 '_,
148 '_,
149 T,
150 ConsolidatingContainerBuilder<Vec<(Row, T, Diff)>>,
151 Capability<T>,
152 >,
153 err_output: &mut Session<
154 '_,
155 '_,
156 T,
157 ConsolidatingContainerBuilder<Vec<(DataflowError, T, Diff)>>,
158 Capability<T>,
159 >,
160 budget: &mut usize,
161) where
162 T: crate::render::RenderTimestamp,
163{
164 let temp_storage = RowArena::new();
165 let mut row_builder = SharedRow::get();
166
167 let mut datums_local = datum_vec.borrow_with(input_row);
169 let datums_len = datums_local.len();
170
171 let event_time = input_time.event_time().clone();
172
173 for (cols, diff) in extensions.iter() {
174 datums_local.truncate(datums_len);
176 datums_local.extend(cols.iter());
177
178 let results = mfp_plan.evaluate(
179 &mut datums_local,
180 &temp_storage,
181 event_time,
182 *diff * *input_diff,
183 |time| !until.less_equal(time),
184 &mut row_builder,
185 );
186
187 for result in results {
188 *budget = budget.saturating_sub(1);
189 match result {
190 Ok((row, event_time, diff)) => {
191 let mut time = input_time.clone();
193 *time.event_time_mut() = event_time;
194 ok_output.give((row, time, diff));
195 }
196 Err((err, event_time, diff)) => {
197 let mut time = input_time.clone();
199 *time.event_time_mut() = event_time;
200 err_output.give((err, time, diff));
201 }
202 };
203 }
204 }
205}