1use std::collections::VecDeque;
11
12use differential_dataflow::consolidation::ConsolidatingContainerBuilder;
13use mz_compute_types::dyncfgs::COMPUTE_FLAT_MAP_FUEL;
14use mz_expr::MfpPlan;
15use mz_expr::{MapFilterProject, MirScalarExpr, TableFunc};
16use mz_repr::{DatumVec, RowArena, SharedRow};
17use mz_repr::{Diff, Row, Timestamp};
18use mz_timely_util::operator::StreamExt;
19use timely::dataflow::channels::pact::Pipeline;
20use timely::dataflow::operators::Capability;
21use timely::dataflow::operators::generic::Session;
22use timely::progress::Antichain;
23
24use crate::render::DataflowError;
25use crate::render::context::{CollectionBundle, Context};
26
27impl<'scope, T: crate::render::RenderTimestamp> Context<'scope, T> {
28 pub fn render_flat_map(
30 &self,
31 input_key: Option<Vec<MirScalarExpr>>,
32 input: CollectionBundle<'scope, T>,
33 exprs: Vec<MirScalarExpr>,
34 func: TableFunc,
35 mfp: MapFilterProject,
36 ) -> CollectionBundle<'scope, T> {
37 let until = self.until.clone();
38 let mfp_plan = mfp.into_plan().expect("MapFilterProject planning failed");
39 let (ok_collection, err_collection) =
40 input.as_specific_collection(input_key.as_deref(), &self.config_set);
41 let stream = ok_collection.inner;
42 let scope = input.scope();
43
44 let budget = COMPUTE_FLAT_MAP_FUEL.get(&self.config_set);
49
50 let (oks, errs) = stream.unary_fallible(Pipeline, "FlatMapStage", move |_, info| {
51 let activator = scope.activator_for(info.address);
52 let mut queue = VecDeque::new();
53 Box::new(move |input, ok_output, err_output| {
54 let mut datums = DatumVec::new();
55 let mut datums_mfp = DatumVec::new();
56
57 let mut table_func_output = Vec::new();
59
60 let mut budget = budget;
61
62 input.for_each(|cap, data| {
63 queue.push_back((cap.retain(0), cap.retain(1), std::mem::take(data)))
64 });
65
66 while let Some((ok_cap, err_cap, data)) = queue.pop_front() {
67 let mut ok_session = ok_output.session_with_builder(&ok_cap);
68 let mut err_session = err_output.session_with_builder(&err_cap);
69
70 'input: for (input_row, time, diff) in data {
71 let temp_storage = RowArena::new();
72
73 let datums_local = datums.borrow_with(&input_row);
75 let args = exprs
76 .iter()
77 .map(|e| e.eval(&datums_local, &temp_storage))
78 .collect::<Result<Vec<_>, _>>();
79 let args = match args {
80 Ok(args) => args,
81 Err(e) => {
82 err_session.give((e.into(), time, diff));
83 continue 'input;
84 }
85 };
86 let mut extensions = match func.eval(&args, &temp_storage) {
87 Ok(exts) => exts.fuse(),
88 Err(e) => {
89 err_session.give((e.into(), time, diff));
90 continue 'input;
91 }
92 };
93
94 while let Some((extension, output_diff)) = extensions.next() {
96 table_func_output.push((extension, output_diff));
97 table_func_output.extend((&mut extensions).take(1023));
98 drain_through_mfp(
100 &input_row,
101 &time,
102 &diff,
103 &mut datums_mfp,
104 &table_func_output,
105 &mfp_plan,
106 &until,
107 &mut ok_session,
108 &mut err_session,
109 &mut budget,
110 );
111 table_func_output.clear();
112 }
113 }
114 if budget == 0 {
115 activator.activate();
116 break;
117 }
118 }
119 })
120 });
121
122 use differential_dataflow::AsCollection;
123 let ok_collection = oks.as_collection();
124 let new_err_collection = errs.as_collection();
125 let err_collection = err_collection.concat(new_err_collection);
126 CollectionBundle::from_collections(ok_collection, err_collection)
127 }
128}
129
130fn drain_through_mfp<T>(
134 input_row: &Row,
135 input_time: &T,
136 input_diff: &Diff,
137 datum_vec: &mut DatumVec,
138 extensions: &[(Row, Diff)],
139 mfp_plan: &MfpPlan,
140 until: &Antichain<Timestamp>,
141 ok_output: &mut Session<
142 '_,
143 '_,
144 T,
145 ConsolidatingContainerBuilder<Vec<(Row, T, Diff)>>,
146 Capability<T>,
147 >,
148 err_output: &mut Session<
149 '_,
150 '_,
151 T,
152 ConsolidatingContainerBuilder<Vec<(DataflowError, T, Diff)>>,
153 Capability<T>,
154 >,
155 budget: &mut usize,
156) where
157 T: crate::render::RenderTimestamp,
158{
159 let temp_storage = RowArena::new();
160 let mut row_builder = SharedRow::get();
161
162 let mut datums_local = datum_vec.borrow_with(input_row);
164 let datums_len = datums_local.len();
165
166 let event_time = input_time.event_time().clone();
167
168 for (cols, diff) in extensions.iter() {
169 datums_local.truncate(datums_len);
171 datums_local.extend(cols.iter());
172
173 let results = mfp_plan.evaluate(
174 &mut datums_local,
175 &temp_storage,
176 event_time,
177 *diff * *input_diff,
178 |time| !until.less_equal(time),
179 &mut row_builder,
180 );
181
182 for result in results {
183 *budget = budget.saturating_sub(1);
184 match result {
185 Ok((row, event_time, diff)) => {
186 let mut time = input_time.clone();
188 *time.event_time_mut() = event_time;
189 ok_output.give((row, time, diff));
190 }
191 Err((err, event_time, diff)) => {
192 let mut time = input_time.clone();
194 *time.event_time_mut() = event_time;
195 err_output.give((err, time, diff));
196 }
197 };
198 }
199 }
200}