mz_compute/render/
flat_map.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use columnar::Columnar;
11use differential_dataflow::consolidation::ConsolidatingContainerBuilder;
12use mz_expr::MfpPlan;
13use mz_expr::{MapFilterProject, MirScalarExpr, TableFunc};
14use mz_repr::{DatumVec, RowArena, SharedRow};
15use mz_repr::{Diff, Row, Timestamp};
16use mz_timely_util::operator::StreamExt;
17use timely::dataflow::Scope;
18use timely::dataflow::channels::pact::Pipeline;
19use timely::dataflow::channels::pushers::buffer::Session;
20use timely::dataflow::channels::pushers::{Counter, Tee};
21use timely::progress::Antichain;
22
23use crate::render::DataflowError;
24use crate::render::context::{CollectionBundle, Context};
25
26impl<G> Context<G>
27where
28    G: Scope,
29    G::Timestamp: crate::render::RenderTimestamp,
30    <G::Timestamp as Columnar>::Container: Clone + Send,
31{
32    /// Applies a `TableFunc` to every row, followed by an `mfp`.
33    pub fn render_flat_map(
34        &self,
35        input: CollectionBundle<G>,
36        func: TableFunc,
37        exprs: Vec<MirScalarExpr>,
38        mfp: MapFilterProject,
39        input_key: Option<Vec<MirScalarExpr>>,
40    ) -> CollectionBundle<G> {
41        let until = self.until.clone();
42        let mfp_plan = mfp.into_plan().expect("MapFilterProject planning failed");
43        let (ok_collection, err_collection) =
44            input.as_specific_collection(input_key.as_deref(), &self.config_set);
45        let stream = ok_collection.inner;
46        let (oks, errs) = stream.unary_fallible(Pipeline, "FlatMapStage", move |_, _| {
47            Box::new(move |input, ok_output, err_output| {
48                let mut datums = DatumVec::new();
49                let mut datums_mfp = DatumVec::new();
50
51                // Buffer for extensions to `input_row`.
52                let mut table_func_output = Vec::new();
53
54                input.for_each(|cap, data| {
55                    let mut ok_session = ok_output.session_with_builder(&cap);
56                    let mut err_session = err_output.session_with_builder(&cap);
57
58                    'input: for (input_row, time, diff) in data.drain(..) {
59                        let temp_storage = RowArena::new();
60
61                        // Unpack datums for expression evaluation.
62                        let datums_local = datums.borrow_with(&input_row);
63                        let args = exprs
64                            .iter()
65                            .map(|e| e.eval(&datums_local, &temp_storage))
66                            .collect::<Result<Vec<_>, _>>();
67                        let args = match args {
68                            Ok(args) => args,
69                            Err(e) => {
70                                err_session.give((e.into(), time, diff));
71                                continue 'input;
72                            }
73                        };
74                        let mut extensions = match func.eval(&args, &temp_storage) {
75                            Ok(exts) => exts.fuse(),
76                            Err(e) => {
77                                err_session.give((e.into(), time, diff));
78                                continue 'input;
79                            }
80                        };
81
82                        // Draw additional columns out of the table func evaluation.
83                        while let Some((extension, output_diff)) = extensions.next() {
84                            table_func_output.push((extension, output_diff));
85                            table_func_output.extend((&mut extensions).take(1023));
86                            // We could consolidate `table_func_output`, but it seems unlikely to be productive.
87                            drain_through_mfp(
88                                &input_row,
89                                &time,
90                                &diff,
91                                &mut datums_mfp,
92                                &table_func_output,
93                                &mfp_plan,
94                                &until,
95                                &mut ok_session,
96                                &mut err_session,
97                            );
98                            table_func_output.clear();
99                        }
100                    }
101                })
102            })
103        });
104
105        use differential_dataflow::AsCollection;
106        let ok_collection = oks.as_collection();
107        let new_err_collection = errs.as_collection();
108        let err_collection = err_collection.concat(&new_err_collection);
109        CollectionBundle::from_collections(ok_collection, err_collection)
110    }
111}
112
113/// Drains a list of extensions to `input_row` through a supplied `MfpPlan` and into output buffers.
114///
115/// The method decodes `input_row`, and should be amortized across non-trivial `extensions`.
116fn drain_through_mfp<T>(
117    input_row: &Row,
118    input_time: &T,
119    input_diff: &Diff,
120    datum_vec: &mut DatumVec,
121    extensions: &[(Row, Diff)],
122    mfp_plan: &MfpPlan,
123    until: &Antichain<Timestamp>,
124    ok_output: &mut Session<
125        T,
126        ConsolidatingContainerBuilder<Vec<(Row, T, Diff)>>,
127        Counter<T, Vec<(Row, T, Diff)>, Tee<T, Vec<(Row, T, Diff)>>>,
128    >,
129    err_output: &mut Session<
130        T,
131        ConsolidatingContainerBuilder<Vec<(DataflowError, T, Diff)>>,
132        Counter<T, Vec<(DataflowError, T, Diff)>, Tee<T, Vec<(DataflowError, T, Diff)>>>,
133    >,
134) where
135    T: crate::render::RenderTimestamp,
136    <T as Columnar>::Container: Clone + Send,
137{
138    let temp_storage = RowArena::new();
139    let binding = SharedRow::get();
140    let mut row_builder = binding.borrow_mut();
141
142    // This is not cheap, and is meant to be amortized across many `extensions`.
143    let mut datums_local = datum_vec.borrow_with(input_row);
144    let datums_len = datums_local.len();
145
146    let event_time = input_time.event_time().clone();
147
148    for (cols, diff) in extensions.iter() {
149        // Arrange `datums_local` to reflect the intended output pre-mfp.
150        datums_local.truncate(datums_len);
151        datums_local.extend(cols.iter());
152
153        let results = mfp_plan.evaluate(
154            &mut datums_local,
155            &temp_storage,
156            event_time,
157            *diff * *input_diff,
158            |time| !until.less_equal(time),
159            &mut row_builder,
160        );
161
162        for result in results {
163            match result {
164                Ok((row, event_time, diff)) => {
165                    // Copy the whole time, and re-populate event time.
166                    let mut time = input_time.clone();
167                    *time.event_time_mut() = event_time;
168                    ok_output.give((row, time, diff));
169                }
170                Err((err, event_time, diff)) => {
171                    // Copy the whole time, and re-populate event time.
172                    let mut time = input_time.clone();
173                    *time.event_time_mut() = event_time;
174                    err_output.give((err, time, diff));
175                }
176            };
177        }
178    }
179}