Skip to main content

mz_compute/render/
flat_map.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::VecDeque;
11
12use differential_dataflow::consolidation::ConsolidatingContainerBuilder;
13use mz_compute_types::dyncfgs::COMPUTE_FLAT_MAP_FUEL;
14use mz_expr::MfpPlan;
15use mz_expr::{MapFilterProject, MirScalarExpr, TableFunc};
16use mz_repr::{DatumVec, RowArena, SharedRow};
17use mz_repr::{Diff, Row, Timestamp};
18use mz_timely_util::operator::StreamExt;
19use timely::dataflow::Scope;
20use timely::dataflow::channels::pact::Pipeline;
21use timely::dataflow::operators::Capability;
22use timely::dataflow::operators::generic::Session;
23use timely::progress::Antichain;
24
25use crate::render::DataflowError;
26use crate::render::context::{CollectionBundle, Context};
27
28impl<G> Context<G>
29where
30    G: Scope,
31    G::Timestamp: crate::render::RenderTimestamp,
32{
33    /// Applies a `TableFunc` to every row, followed by an `mfp`.
34    pub fn render_flat_map(
35        &self,
36        input_key: Option<Vec<MirScalarExpr>>,
37        input: CollectionBundle<G>,
38        exprs: Vec<MirScalarExpr>,
39        func: TableFunc,
40        mfp: MapFilterProject,
41    ) -> CollectionBundle<G> {
42        let until = self.until.clone();
43        let mfp_plan = mfp.into_plan().expect("MapFilterProject planning failed");
44        let (ok_collection, err_collection) =
45            input.as_specific_collection(input_key.as_deref(), &self.config_set);
46        let stream = ok_collection.inner;
47        let scope = input.scope();
48
49        // Budget to limit the number of rows processed in a single invocation.
50        //
51        // The current implementation can only yield between input batches, but not from within
52        // a batch. A `generate_series` can still cause unavailability if it generates many rows.
53        let budget = COMPUTE_FLAT_MAP_FUEL.get(&self.config_set);
54
55        let (oks, errs) = stream.unary_fallible(Pipeline, "FlatMapStage", move |_, info| {
56            let activator = scope.activator_for(info.address);
57            let mut queue = VecDeque::new();
58            Box::new(move |input, ok_output, err_output| {
59                let mut datums = DatumVec::new();
60                let mut datums_mfp = DatumVec::new();
61
62                // Buffer for extensions to `input_row`.
63                let mut table_func_output = Vec::new();
64
65                let mut budget = budget;
66
67                input.for_each(|cap, data| {
68                    queue.push_back((
69                        cap.delayed_for_output(cap.time(), 0),
70                        cap.retain_for_output(1),
71                        std::mem::take(data),
72                    ))
73                });
74
75                while let Some((ok_cap, err_cap, data)) = queue.pop_front() {
76                    let mut ok_session = ok_output.session_with_builder(&ok_cap);
77                    let mut err_session = err_output.session_with_builder(&err_cap);
78
79                    'input: for (input_row, time, diff) in data {
80                        let temp_storage = RowArena::new();
81
82                        // Unpack datums for expression evaluation.
83                        let datums_local = datums.borrow_with(&input_row);
84                        let args = exprs
85                            .iter()
86                            .map(|e| e.eval(&datums_local, &temp_storage))
87                            .collect::<Result<Vec<_>, _>>();
88                        let args = match args {
89                            Ok(args) => args,
90                            Err(e) => {
91                                err_session.give((e.into(), time, diff));
92                                continue 'input;
93                            }
94                        };
95                        let mut extensions = match func.eval(&args, &temp_storage) {
96                            Ok(exts) => exts.fuse(),
97                            Err(e) => {
98                                err_session.give((e.into(), time, diff));
99                                continue 'input;
100                            }
101                        };
102
103                        // Draw additional columns out of the table func evaluation.
104                        while let Some((extension, output_diff)) = extensions.next() {
105                            table_func_output.push((extension, output_diff));
106                            table_func_output.extend((&mut extensions).take(1023));
107                            // We could consolidate `table_func_output`, but it seems unlikely to be productive.
108                            drain_through_mfp(
109                                &input_row,
110                                &time,
111                                &diff,
112                                &mut datums_mfp,
113                                &table_func_output,
114                                &mfp_plan,
115                                &until,
116                                &mut ok_session,
117                                &mut err_session,
118                                &mut budget,
119                            );
120                            table_func_output.clear();
121                        }
122                    }
123                    if budget == 0 {
124                        activator.activate();
125                        break;
126                    }
127                }
128            })
129        });
130
131        use differential_dataflow::AsCollection;
132        let ok_collection = oks.as_collection();
133        let new_err_collection = errs.as_collection();
134        let err_collection = err_collection.concat(&new_err_collection);
135        CollectionBundle::from_collections(ok_collection, err_collection)
136    }
137}
138
139/// Drains a list of extensions to `input_row` through a supplied `MfpPlan` and into output buffers.
140///
141/// The method decodes `input_row`, and should be amortized across non-trivial `extensions`.
142fn drain_through_mfp<T>(
143    input_row: &Row,
144    input_time: &T,
145    input_diff: &Diff,
146    datum_vec: &mut DatumVec,
147    extensions: &[(Row, Diff)],
148    mfp_plan: &MfpPlan,
149    until: &Antichain<Timestamp>,
150    ok_output: &mut Session<
151        '_,
152        '_,
153        T,
154        ConsolidatingContainerBuilder<Vec<(Row, T, Diff)>>,
155        Capability<T>,
156    >,
157    err_output: &mut Session<
158        '_,
159        '_,
160        T,
161        ConsolidatingContainerBuilder<Vec<(DataflowError, T, Diff)>>,
162        Capability<T>,
163    >,
164    budget: &mut usize,
165) where
166    T: crate::render::RenderTimestamp,
167{
168    let temp_storage = RowArena::new();
169    let mut row_builder = SharedRow::get();
170
171    // This is not cheap, and is meant to be amortized across many `extensions`.
172    let mut datums_local = datum_vec.borrow_with(input_row);
173    let datums_len = datums_local.len();
174
175    let event_time = input_time.event_time().clone();
176
177    for (cols, diff) in extensions.iter() {
178        // Arrange `datums_local` to reflect the intended output pre-mfp.
179        datums_local.truncate(datums_len);
180        datums_local.extend(cols.iter());
181
182        let results = mfp_plan.evaluate(
183            &mut datums_local,
184            &temp_storage,
185            event_time,
186            *diff * *input_diff,
187            |time| !until.less_equal(time),
188            &mut row_builder,
189        );
190
191        for result in results {
192            *budget = budget.saturating_sub(1);
193            match result {
194                Ok((row, event_time, diff)) => {
195                    // Copy the whole time, and re-populate event time.
196                    let mut time = input_time.clone();
197                    *time.event_time_mut() = event_time;
198                    ok_output.give((row, time, diff));
199                }
200                Err((err, event_time, diff)) => {
201                    // Copy the whole time, and re-populate event time.
202                    let mut time = input_time.clone();
203                    *time.event_time_mut() = event_time;
204                    err_output.give((err, time, diff));
205                }
206            };
207        }
208    }
209}