1use differential_dataflow::consolidation::ConsolidatingContainerBuilder;
11use mz_compute_types::dyncfgs::COMPUTE_FLAT_MAP_FUEL;
12use mz_expr::MfpPlan;
13use mz_expr::{MapFilterProject, MirScalarExpr, TableFunc};
14use mz_repr::{DatumVec, RowArena, SharedRow};
15use mz_repr::{Diff, Row, Timestamp};
16use mz_timely_util::operator::StreamExt;
17use timely::dataflow::Scope;
18use timely::dataflow::channels::pact::Pipeline;
19use timely::dataflow::channels::pushers::buffer::Session;
20use timely::dataflow::channels::pushers::{Counter, Tee};
21use timely::progress::Antichain;
22
23use crate::render::DataflowError;
24use crate::render::context::{CollectionBundle, Context};
25
26impl<G> Context<G>
27where
28 G: Scope,
29 G::Timestamp: crate::render::RenderTimestamp,
30{
31 pub fn render_flat_map(
33 &self,
34 input_key: Option<Vec<MirScalarExpr>>,
35 input: CollectionBundle<G>,
36 exprs: Vec<MirScalarExpr>,
37 func: TableFunc,
38 mfp: MapFilterProject,
39 ) -> CollectionBundle<G> {
40 let until = self.until.clone();
41 let mfp_plan = mfp.into_plan().expect("MapFilterProject planning failed");
42 let (ok_collection, err_collection) =
43 input.as_specific_collection(input_key.as_deref(), &self.config_set);
44 let stream = ok_collection.inner;
45 let scope = input.scope();
46
47 let budget = COMPUTE_FLAT_MAP_FUEL.get(&self.config_set);
52
53 let (oks, errs) = stream.unary_fallible(Pipeline, "FlatMapStage", move |_, info| {
54 let activator = scope.activator_for(info.address);
55 Box::new(move |input, ok_output, err_output| {
56 let mut datums = DatumVec::new();
57 let mut datums_mfp = DatumVec::new();
58
59 let mut table_func_output = Vec::new();
61
62 let mut budget = budget;
63
64 while let Some((cap, data)) = input.next() {
65 let mut ok_session = ok_output.session_with_builder(&cap);
66 let mut err_session = err_output.session_with_builder(&cap);
67
68 'input: for (input_row, time, diff) in data.drain(..) {
69 let temp_storage = RowArena::new();
70
71 let datums_local = datums.borrow_with(&input_row);
73 let args = exprs
74 .iter()
75 .map(|e| e.eval(&datums_local, &temp_storage))
76 .collect::<Result<Vec<_>, _>>();
77 let args = match args {
78 Ok(args) => args,
79 Err(e) => {
80 err_session.give((e.into(), time, diff));
81 continue 'input;
82 }
83 };
84 let mut extensions = match func.eval(&args, &temp_storage) {
85 Ok(exts) => exts.fuse(),
86 Err(e) => {
87 err_session.give((e.into(), time, diff));
88 continue 'input;
89 }
90 };
91
92 while let Some((extension, output_diff)) = extensions.next() {
94 table_func_output.push((extension, output_diff));
95 table_func_output.extend((&mut extensions).take(1023));
96 drain_through_mfp(
98 &input_row,
99 &time,
100 &diff,
101 &mut datums_mfp,
102 &table_func_output,
103 &mfp_plan,
104 &until,
105 &mut ok_session,
106 &mut err_session,
107 &mut budget,
108 );
109 table_func_output.clear();
110 }
111 }
112 if budget == 0 {
113 activator.activate();
114 break;
115 }
116 }
117 })
118 });
119
120 use differential_dataflow::AsCollection;
121 let ok_collection = oks.as_collection();
122 let new_err_collection = errs.as_collection();
123 let err_collection = err_collection.concat(&new_err_collection);
124 CollectionBundle::from_collections(ok_collection, err_collection)
125 }
126}
127
128fn drain_through_mfp<T>(
132 input_row: &Row,
133 input_time: &T,
134 input_diff: &Diff,
135 datum_vec: &mut DatumVec,
136 extensions: &[(Row, Diff)],
137 mfp_plan: &MfpPlan,
138 until: &Antichain<Timestamp>,
139 ok_output: &mut Session<
140 T,
141 ConsolidatingContainerBuilder<Vec<(Row, T, Diff)>>,
142 Counter<T, Vec<(Row, T, Diff)>, Tee<T, Vec<(Row, T, Diff)>>>,
143 >,
144 err_output: &mut Session<
145 T,
146 ConsolidatingContainerBuilder<Vec<(DataflowError, T, Diff)>>,
147 Counter<T, Vec<(DataflowError, T, Diff)>, Tee<T, Vec<(DataflowError, T, Diff)>>>,
148 >,
149 budget: &mut usize,
150) where
151 T: crate::render::RenderTimestamp,
152{
153 let temp_storage = RowArena::new();
154 let mut row_builder = SharedRow::get();
155
156 let mut datums_local = datum_vec.borrow_with(input_row);
158 let datums_len = datums_local.len();
159
160 let event_time = input_time.event_time().clone();
161
162 for (cols, diff) in extensions.iter() {
163 datums_local.truncate(datums_len);
165 datums_local.extend(cols.iter());
166
167 let results = mfp_plan.evaluate(
168 &mut datums_local,
169 &temp_storage,
170 event_time,
171 *diff * *input_diff,
172 |time| !until.less_equal(time),
173 &mut row_builder,
174 );
175
176 for result in results {
177 *budget = budget.saturating_sub(1);
178 match result {
179 Ok((row, event_time, diff)) => {
180 let mut time = input_time.clone();
182 *time.event_time_mut() = event_time;
183 ok_output.give((row, time, diff));
184 }
185 Err((err, event_time, diff)) => {
186 let mut time = input_time.clone();
188 *time.event_time_mut() = event_time;
189 err_output.give((err, time, diff));
190 }
191 };
192 }
193 }
194}