mz_compute/render/
flat_map.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use differential_dataflow::consolidation::ConsolidatingContainerBuilder;
11use mz_compute_types::dyncfgs::COMPUTE_FLAT_MAP_FUEL;
12use mz_expr::MfpPlan;
13use mz_expr::{MapFilterProject, MirScalarExpr, TableFunc};
14use mz_repr::{DatumVec, RowArena, SharedRow};
15use mz_repr::{Diff, Row, Timestamp};
16use mz_timely_util::operator::StreamExt;
17use timely::dataflow::Scope;
18use timely::dataflow::channels::pact::Pipeline;
19use timely::dataflow::channels::pushers::buffer::Session;
20use timely::dataflow::channels::pushers::{Counter, Tee};
21use timely::progress::Antichain;
22
23use crate::render::DataflowError;
24use crate::render::context::{CollectionBundle, Context};
25
26impl<G> Context<G>
27where
28    G: Scope,
29    G::Timestamp: crate::render::RenderTimestamp,
30{
31    /// Applies a `TableFunc` to every row, followed by an `mfp`.
32    pub fn render_flat_map(
33        &self,
34        input_key: Option<Vec<MirScalarExpr>>,
35        input: CollectionBundle<G>,
36        exprs: Vec<MirScalarExpr>,
37        func: TableFunc,
38        mfp: MapFilterProject,
39    ) -> CollectionBundle<G> {
40        let until = self.until.clone();
41        let mfp_plan = mfp.into_plan().expect("MapFilterProject planning failed");
42        let (ok_collection, err_collection) =
43            input.as_specific_collection(input_key.as_deref(), &self.config_set);
44        let stream = ok_collection.inner;
45        let scope = input.scope();
46
47        // Budget to limit the number of rows processed in a single invocation.
48        //
49        // The current implementation can only yield between input batches, but not from within
50        // a batch. A `generate_series` can still cause unavailability if it generates many rows.
51        let budget = COMPUTE_FLAT_MAP_FUEL.get(&self.config_set);
52
53        let (oks, errs) = stream.unary_fallible(Pipeline, "FlatMapStage", move |_, info| {
54            let activator = scope.activator_for(info.address);
55            Box::new(move |input, ok_output, err_output| {
56                let mut datums = DatumVec::new();
57                let mut datums_mfp = DatumVec::new();
58
59                // Buffer for extensions to `input_row`.
60                let mut table_func_output = Vec::new();
61
62                let mut budget = budget;
63
64                while let Some((cap, data)) = input.next() {
65                    let mut ok_session = ok_output.session_with_builder(&cap);
66                    let mut err_session = err_output.session_with_builder(&cap);
67
68                    'input: for (input_row, time, diff) in data.drain(..) {
69                        let temp_storage = RowArena::new();
70
71                        // Unpack datums for expression evaluation.
72                        let datums_local = datums.borrow_with(&input_row);
73                        let args = exprs
74                            .iter()
75                            .map(|e| e.eval(&datums_local, &temp_storage))
76                            .collect::<Result<Vec<_>, _>>();
77                        let args = match args {
78                            Ok(args) => args,
79                            Err(e) => {
80                                err_session.give((e.into(), time, diff));
81                                continue 'input;
82                            }
83                        };
84                        let mut extensions = match func.eval(&args, &temp_storage) {
85                            Ok(exts) => exts.fuse(),
86                            Err(e) => {
87                                err_session.give((e.into(), time, diff));
88                                continue 'input;
89                            }
90                        };
91
92                        // Draw additional columns out of the table func evaluation.
93                        while let Some((extension, output_diff)) = extensions.next() {
94                            table_func_output.push((extension, output_diff));
95                            table_func_output.extend((&mut extensions).take(1023));
96                            // We could consolidate `table_func_output`, but it seems unlikely to be productive.
97                            drain_through_mfp(
98                                &input_row,
99                                &time,
100                                &diff,
101                                &mut datums_mfp,
102                                &table_func_output,
103                                &mfp_plan,
104                                &until,
105                                &mut ok_session,
106                                &mut err_session,
107                                &mut budget,
108                            );
109                            table_func_output.clear();
110                        }
111                    }
112                    if budget == 0 {
113                        activator.activate();
114                        break;
115                    }
116                }
117            })
118        });
119
120        use differential_dataflow::AsCollection;
121        let ok_collection = oks.as_collection();
122        let new_err_collection = errs.as_collection();
123        let err_collection = err_collection.concat(&new_err_collection);
124        CollectionBundle::from_collections(ok_collection, err_collection)
125    }
126}
127
128/// Drains a list of extensions to `input_row` through a supplied `MfpPlan` and into output buffers.
129///
130/// The method decodes `input_row`, and should be amortized across non-trivial `extensions`.
131fn drain_through_mfp<T>(
132    input_row: &Row,
133    input_time: &T,
134    input_diff: &Diff,
135    datum_vec: &mut DatumVec,
136    extensions: &[(Row, Diff)],
137    mfp_plan: &MfpPlan,
138    until: &Antichain<Timestamp>,
139    ok_output: &mut Session<
140        T,
141        ConsolidatingContainerBuilder<Vec<(Row, T, Diff)>>,
142        Counter<T, Vec<(Row, T, Diff)>, Tee<T, Vec<(Row, T, Diff)>>>,
143    >,
144    err_output: &mut Session<
145        T,
146        ConsolidatingContainerBuilder<Vec<(DataflowError, T, Diff)>>,
147        Counter<T, Vec<(DataflowError, T, Diff)>, Tee<T, Vec<(DataflowError, T, Diff)>>>,
148    >,
149    budget: &mut usize,
150) where
151    T: crate::render::RenderTimestamp,
152{
153    let temp_storage = RowArena::new();
154    let mut row_builder = SharedRow::get();
155
156    // This is not cheap, and is meant to be amortized across many `extensions`.
157    let mut datums_local = datum_vec.borrow_with(input_row);
158    let datums_len = datums_local.len();
159
160    let event_time = input_time.event_time().clone();
161
162    for (cols, diff) in extensions.iter() {
163        // Arrange `datums_local` to reflect the intended output pre-mfp.
164        datums_local.truncate(datums_len);
165        datums_local.extend(cols.iter());
166
167        let results = mfp_plan.evaluate(
168            &mut datums_local,
169            &temp_storage,
170            event_time,
171            *diff * *input_diff,
172            |time| !until.less_equal(time),
173            &mut row_builder,
174        );
175
176        for result in results {
177            *budget = budget.saturating_sub(1);
178            match result {
179                Ok((row, event_time, diff)) => {
180                    // Copy the whole time, and re-populate event time.
181                    let mut time = input_time.clone();
182                    *time.event_time_mut() = event_time;
183                    ok_output.give((row, time, diff));
184                }
185                Err((err, event_time, diff)) => {
186                    // Copy the whole time, and re-populate event time.
187                    let mut time = input_time.clone();
188                    *time.event_time_mut() = event_time;
189                    err_output.give((err, time, diff));
190                }
191            };
192        }
193    }
194}