Skip to main content

mz_compute/render/
flat_map.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::VecDeque;
11
12use differential_dataflow::consolidation::ConsolidatingContainerBuilder;
13use mz_compute_types::dyncfgs::COMPUTE_FLAT_MAP_FUEL;
14use mz_expr::MfpPlan;
15use mz_expr::{MapFilterProject, MirScalarExpr, TableFunc};
16use mz_repr::{DatumVec, RowArena, SharedRow};
17use mz_repr::{Diff, Row, Timestamp};
18use mz_timely_util::operator::StreamExt;
19use timely::dataflow::Scope;
20use timely::dataflow::channels::pact::Pipeline;
21use timely::dataflow::operators::Capability;
22use timely::dataflow::operators::generic::Session;
23use timely::progress::Antichain;
24
25use crate::render::DataflowError;
26use crate::render::context::{CollectionBundle, Context};
27
28impl<G> Context<G>
29where
30    G: Scope,
31    G::Timestamp: crate::render::RenderTimestamp,
32{
33    /// Applies a `TableFunc` to every row, followed by an `mfp`.
34    pub fn render_flat_map(
35        &self,
36        input_key: Option<Vec<MirScalarExpr>>,
37        input: CollectionBundle<G>,
38        exprs: Vec<MirScalarExpr>,
39        func: TableFunc,
40        mfp: MapFilterProject,
41    ) -> CollectionBundle<G> {
42        let until = self.until.clone();
43        let mfp_plan = mfp.into_plan().expect("MapFilterProject planning failed");
44        let (ok_collection, err_collection) =
45            input.as_specific_collection(input_key.as_deref(), &self.config_set);
46        let stream = ok_collection.inner;
47        let scope = input.scope();
48
49        // Budget to limit the number of rows processed in a single invocation.
50        //
51        // The current implementation can only yield between input batches, but not from within
52        // a batch. A `generate_series` can still cause unavailability if it generates many rows.
53        let budget = COMPUTE_FLAT_MAP_FUEL.get(&self.config_set);
54
55        let (oks, errs) = stream.unary_fallible(Pipeline, "FlatMapStage", move |_, info| {
56            let activator = scope.activator_for(info.address);
57            let mut queue = VecDeque::new();
58            Box::new(move |input, ok_output, err_output| {
59                let mut datums = DatumVec::new();
60                let mut datums_mfp = DatumVec::new();
61
62                // Buffer for extensions to `input_row`.
63                let mut table_func_output = Vec::new();
64
65                let mut budget = budget;
66
67                input.for_each(|cap, data| {
68                    queue.push_back((cap.retain(0), cap.retain(1), std::mem::take(data)))
69                });
70
71                while let Some((ok_cap, err_cap, data)) = queue.pop_front() {
72                    let mut ok_session = ok_output.session_with_builder(&ok_cap);
73                    let mut err_session = err_output.session_with_builder(&err_cap);
74
75                    'input: for (input_row, time, diff) in data {
76                        let temp_storage = RowArena::new();
77
78                        // Unpack datums for expression evaluation.
79                        let datums_local = datums.borrow_with(&input_row);
80                        let args = exprs
81                            .iter()
82                            .map(|e| e.eval(&datums_local, &temp_storage))
83                            .collect::<Result<Vec<_>, _>>();
84                        let args = match args {
85                            Ok(args) => args,
86                            Err(e) => {
87                                err_session.give((e.into(), time, diff));
88                                continue 'input;
89                            }
90                        };
91                        let mut extensions = match func.eval(&args, &temp_storage) {
92                            Ok(exts) => exts.fuse(),
93                            Err(e) => {
94                                err_session.give((e.into(), time, diff));
95                                continue 'input;
96                            }
97                        };
98
99                        // Draw additional columns out of the table func evaluation.
100                        while let Some((extension, output_diff)) = extensions.next() {
101                            table_func_output.push((extension, output_diff));
102                            table_func_output.extend((&mut extensions).take(1023));
103                            // We could consolidate `table_func_output`, but it seems unlikely to be productive.
104                            drain_through_mfp(
105                                &input_row,
106                                &time,
107                                &diff,
108                                &mut datums_mfp,
109                                &table_func_output,
110                                &mfp_plan,
111                                &until,
112                                &mut ok_session,
113                                &mut err_session,
114                                &mut budget,
115                            );
116                            table_func_output.clear();
117                        }
118                    }
119                    if budget == 0 {
120                        activator.activate();
121                        break;
122                    }
123                }
124            })
125        });
126
127        use differential_dataflow::AsCollection;
128        let ok_collection = oks.as_collection();
129        let new_err_collection = errs.as_collection();
130        let err_collection = err_collection.concat(new_err_collection);
131        CollectionBundle::from_collections(ok_collection, err_collection)
132    }
133}
134
135/// Drains a list of extensions to `input_row` through a supplied `MfpPlan` and into output buffers.
136///
137/// The method decodes `input_row`, and should be amortized across non-trivial `extensions`.
138fn drain_through_mfp<T>(
139    input_row: &Row,
140    input_time: &T,
141    input_diff: &Diff,
142    datum_vec: &mut DatumVec,
143    extensions: &[(Row, Diff)],
144    mfp_plan: &MfpPlan,
145    until: &Antichain<Timestamp>,
146    ok_output: &mut Session<
147        '_,
148        '_,
149        T,
150        ConsolidatingContainerBuilder<Vec<(Row, T, Diff)>>,
151        Capability<T>,
152    >,
153    err_output: &mut Session<
154        '_,
155        '_,
156        T,
157        ConsolidatingContainerBuilder<Vec<(DataflowError, T, Diff)>>,
158        Capability<T>,
159    >,
160    budget: &mut usize,
161) where
162    T: crate::render::RenderTimestamp,
163{
164    let temp_storage = RowArena::new();
165    let mut row_builder = SharedRow::get();
166
167    // This is not cheap, and is meant to be amortized across many `extensions`.
168    let mut datums_local = datum_vec.borrow_with(input_row);
169    let datums_len = datums_local.len();
170
171    let event_time = input_time.event_time().clone();
172
173    for (cols, diff) in extensions.iter() {
174        // Arrange `datums_local` to reflect the intended output pre-mfp.
175        datums_local.truncate(datums_len);
176        datums_local.extend(cols.iter());
177
178        let results = mfp_plan.evaluate(
179            &mut datums_local,
180            &temp_storage,
181            event_time,
182            *diff * *input_diff,
183            |time| !until.less_equal(time),
184            &mut row_builder,
185        );
186
187        for result in results {
188            *budget = budget.saturating_sub(1);
189            match result {
190                Ok((row, event_time, diff)) => {
191                    // Copy the whole time, and re-populate event time.
192                    let mut time = input_time.clone();
193                    *time.event_time_mut() = event_time;
194                    ok_output.give((row, time, diff));
195                }
196                Err((err, event_time, diff)) => {
197                    // Copy the whole time, and re-populate event time.
198                    let mut time = input_time.clone();
199                    *time.event_time_mut() = event_time;
200                    err_output.give((err, time, diff));
201                }
202            };
203        }
204    }
205}