Skip to main content

mz_compute/render/
sinks.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Logic related to the creation of dataflow sinks.
11
12use std::any::Any;
13use std::collections::{BTreeMap, BTreeSet};
14use std::rc::Rc;
15
16use differential_dataflow::VecCollection;
17use mz_compute_types::sinks::{ComputeSinkConnection, ComputeSinkDesc};
18use mz_expr::{EvalError, MapFilterProject, permutation_for_arrangement};
19use mz_ore::soft_assert_or_log;
20use mz_ore::str::StrExt;
21use mz_ore::vec::PartialOrdVecExt;
22use mz_repr::{Diff, GlobalId, Row};
23use mz_storage_types::controller::CollectionMetadata;
24use mz_timely_util::operator::CollectionExt;
25use mz_timely_util::probe::Handle;
26use timely::container::CapacityContainerBuilder;
27use timely::dataflow::Scope;
28use timely::progress::Antichain;
29
30use crate::compute_state::SinkToken;
31use crate::logging::compute::LogDataflowErrors;
32use crate::render::context::Context;
33use crate::render::errors::DataflowErrorSer;
34use crate::render::{RenderTimestamp, StartSignal};
35
36impl<'g, T: RenderTimestamp> Context<'g, T> {
37    /// Export the sink described by `sink` from the rendering context.
38    pub(crate) fn export_sink(
39        &self,
40        compute_state: &mut crate::compute_state::ComputeState,
41        tokens: &BTreeMap<GlobalId, Rc<dyn Any>>,
42        dependency_ids: BTreeSet<GlobalId>,
43        sink_id: GlobalId,
44        sink: &ComputeSinkDesc<CollectionMetadata>,
45        start_signal: StartSignal,
46        output_probe: &Handle<mz_repr::Timestamp>,
47        outer_scope: Scope<'g, mz_repr::Timestamp>,
48    ) {
49        soft_assert_or_log!(
50            sink.non_null_assertions.is_strictly_sorted(),
51            "non-null assertions not sorted"
52        );
53
54        // put together tokens that belong to the export
55        let mut needed_tokens = Vec::new();
56        for dep_id in dependency_ids {
57            if let Some(token) = tokens.get(&dep_id) {
58                needed_tokens.push(Rc::clone(token))
59            }
60        }
61
62        // TODO[btv] - We should determine the key and permutation to use during planning,
63        // rather than at runtime.
64        //
65        // This is basically an inlined version of the old `as_collection`.
66        let bundle = self
67            .lookup_id(mz_expr::Id::Global(sink.from))
68            .expect("Sink source collection not loaded");
69        let (ok_collection, mut err_collection) = if let Some(collection) = &bundle.collection {
70            collection.clone()
71        } else {
72            let (key, _arrangement) = bundle
73                .arranged
74                .iter()
75                .next()
76                .expect("Invariant violated: at least one collection must be present.");
77            let unthinned_arity = sink.from_desc.arity();
78            let (permutation, thinning) = permutation_for_arrangement(key, unthinned_arity);
79            let mut mfp = MapFilterProject::new(unthinned_arity);
80            mfp.permute_fn(|c| permutation[c], thinning.len() + key.len());
81            bundle.as_collection_core(
82                mfp,
83                Some((key.clone(), None)),
84                self.until.clone(),
85                &self.config_set,
86            )
87        };
88
89        // Attach logging of dataflow errors.
90        if let Some(logger) = compute_state.compute_logger.clone() {
91            err_collection = err_collection.log_dataflow_errors(logger, sink_id);
92        }
93
94        let mut ok_collection = ok_collection.leave(outer_scope);
95        let mut err_collection = err_collection.leave(outer_scope);
96
97        // Ensure that the frontier does not advance past the expiration time, if set. Otherwise,
98        // we might write down incorrect data.
99        if let Some(&expiration) = self.dataflow_expiration.as_option() {
100            ok_collection = ok_collection
101                .expire_collection_at(&format!("{}_export_sink_oks", self.debug_name), expiration);
102            err_collection = err_collection
103                .expire_collection_at(&format!("{}_export_sink_errs", self.debug_name), expiration);
104        }
105
106        let non_null_assertions = sink.non_null_assertions.clone();
107        let from_desc = sink.from_desc.clone();
108        if !non_null_assertions.is_empty() {
109            let name = format!("NullAssertions({sink_id:?})");
110            type CB<C> = CapacityContainerBuilder<C>;
111            let (oks, null_errs) =
112                ok_collection.map_fallible::<CB<_>, CB<_>, _, _, _>(&name, move |row| {
113                    let mut idx = 0;
114                    let mut iter = row.iter();
115                    for &i in &non_null_assertions {
116                        let skip = i - idx;
117                        let datum = iter.nth(skip).unwrap();
118                        idx += skip + 1;
119                        if datum.is_null() {
120                            return Err(DataflowErrorSer::from(EvalError::MustNotBeNull(
121                                format!("column {}", from_desc.get_name(i).quoted()).into(),
122                            )));
123                        }
124                    }
125                    Ok(row)
126                });
127            ok_collection = oks;
128            err_collection = err_collection.concat(null_errs);
129        }
130
131        let region_name = match sink.connection {
132            ComputeSinkConnection::Subscribe(_) => format!("SubscribeSink({:?})", sink_id),
133            ComputeSinkConnection::MaterializedView(_) => {
134                format!("MaterializedViewSink({:?})", sink_id)
135            }
136            ComputeSinkConnection::CopyToS3Oneshot(_) => {
137                format!("CopyToS3OneshotSink({:?})", sink_id)
138            }
139        };
140        outer_scope.clone().region_named(&region_name, |inner| {
141            let sink_render = get_sink_render_for(&sink.connection);
142
143            let sink_token = sink_render.render_sink(
144                compute_state,
145                sink,
146                sink_id,
147                self.as_of_frontier.clone(),
148                start_signal,
149                ok_collection.enter_region(inner),
150                err_collection.enter_region(inner),
151                output_probe,
152            );
153
154            if let Some(sink_token) = sink_token {
155                needed_tokens.push(sink_token);
156            }
157
158            let collection = compute_state.expect_collection_mut(sink_id);
159            collection.sink_token = Some(SinkToken::new(Box::new(needed_tokens)));
160        });
161    }
162}
163
164/// A type that can be rendered as a dataflow sink.
165pub(crate) trait SinkRender<'scope> {
166    fn render_sink(
167        &self,
168        compute_state: &mut crate::compute_state::ComputeState,
169        sink: &ComputeSinkDesc<CollectionMetadata>,
170        sink_id: GlobalId,
171        as_of: Antichain<mz_repr::Timestamp>,
172        start_signal: StartSignal,
173        sinked_collection: VecCollection<'scope, mz_repr::Timestamp, Row, Diff>,
174        err_collection: VecCollection<'scope, mz_repr::Timestamp, DataflowErrorSer, Diff>,
175        output_probe: &Handle<mz_repr::Timestamp>,
176    ) -> Option<Rc<dyn Any>>;
177}
178
179fn get_sink_render_for<'scope>(
180    connection: &ComputeSinkConnection<CollectionMetadata>,
181) -> Box<dyn SinkRender<'scope>> {
182    match connection {
183        ComputeSinkConnection::Subscribe(connection) => Box::new(connection.clone()),
184        ComputeSinkConnection::MaterializedView(connection) => Box::new(connection.clone()),
185        ComputeSinkConnection::CopyToS3Oneshot(connection) => Box::new(connection.clone()),
186    }
187}