mz_compute/render/
sinks.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Logic related to the creation of dataflow sinks.
11
12use std::any::Any;
13use std::collections::{BTreeMap, BTreeSet};
14use std::rc::Rc;
15
16use differential_dataflow::VecCollection;
17use mz_compute_types::sinks::{ComputeSinkConnection, ComputeSinkDesc};
18use mz_expr::{EvalError, MapFilterProject, permutation_for_arrangement};
19use mz_ore::soft_assert_or_log;
20use mz_ore::str::StrExt;
21use mz_ore::vec::PartialOrdVecExt;
22use mz_repr::{Diff, GlobalId, Row};
23use mz_storage_types::controller::CollectionMetadata;
24use mz_storage_types::errors::DataflowError;
25use mz_timely_util::operator::CollectionExt;
26use mz_timely_util::probe::Handle;
27use timely::container::CapacityContainerBuilder;
28use timely::dataflow::Scope;
29use timely::dataflow::scopes::Child;
30use timely::progress::Antichain;
31
32use crate::compute_state::SinkToken;
33use crate::logging::compute::LogDataflowErrors;
34use crate::render::context::Context;
35use crate::render::{RenderTimestamp, StartSignal};
36
37impl<'g, G, T> Context<Child<'g, G, T>>
38where
39    G: Scope<Timestamp = mz_repr::Timestamp>,
40    T: RenderTimestamp,
41{
42    /// Export the sink described by `sink` from the rendering context.
43    pub(crate) fn export_sink(
44        &self,
45        compute_state: &mut crate::compute_state::ComputeState,
46        tokens: &BTreeMap<GlobalId, Rc<dyn Any>>,
47        dependency_ids: BTreeSet<GlobalId>,
48        sink_id: GlobalId,
49        sink: &ComputeSinkDesc<CollectionMetadata>,
50        start_signal: StartSignal,
51        ct_times: Option<VecCollection<G, (), Diff>>,
52        output_probe: &Handle<mz_repr::Timestamp>,
53    ) {
54        soft_assert_or_log!(
55            sink.non_null_assertions.is_strictly_sorted(),
56            "non-null assertions not sorted"
57        );
58
59        // put together tokens that belong to the export
60        let mut needed_tokens = Vec::new();
61        for dep_id in dependency_ids {
62            if let Some(token) = tokens.get(&dep_id) {
63                needed_tokens.push(Rc::clone(token))
64            }
65        }
66
67        // TODO[btv] - We should determine the key and permutation to use during planning,
68        // rather than at runtime.
69        //
70        // This is basically an inlined version of the old `as_collection`.
71        let bundle = self
72            .lookup_id(mz_expr::Id::Global(sink.from))
73            .expect("Sink source collection not loaded");
74        let (ok_collection, mut err_collection) = if let Some(collection) = &bundle.collection {
75            collection.clone()
76        } else {
77            let (key, _arrangement) = bundle
78                .arranged
79                .iter()
80                .next()
81                .expect("Invariant violated: at least one collection must be present.");
82            let unthinned_arity = sink.from_desc.arity();
83            let (permutation, thinning) = permutation_for_arrangement(key, unthinned_arity);
84            let mut mfp = MapFilterProject::new(unthinned_arity);
85            mfp.permute_fn(|c| permutation[c], thinning.len() + key.len());
86            bundle.as_collection_core(
87                mfp,
88                Some((key.clone(), None)),
89                self.until.clone(),
90                &self.config_set,
91            )
92        };
93
94        // Attach logging of dataflow errors.
95        if let Some(logger) = compute_state.compute_logger.clone() {
96            err_collection = err_collection.log_dataflow_errors(logger, sink_id);
97        }
98
99        let mut ok_collection = ok_collection.leave();
100        let mut err_collection = err_collection.leave();
101
102        // Ensure that the frontier does not advance past the expiration time, if set. Otherwise,
103        // we might write down incorrect data.
104        if let Some(&expiration) = self.dataflow_expiration.as_option() {
105            ok_collection = ok_collection
106                .expire_collection_at(&format!("{}_export_sink_oks", self.debug_name), expiration);
107            err_collection = err_collection
108                .expire_collection_at(&format!("{}_export_sink_errs", self.debug_name), expiration);
109        }
110
111        let non_null_assertions = sink.non_null_assertions.clone();
112        let from_desc = sink.from_desc.clone();
113        if !non_null_assertions.is_empty() {
114            let name = format!("NullAssertions({sink_id:?})");
115            type CB<C> = CapacityContainerBuilder<C>;
116            let (oks, null_errs) =
117                ok_collection.map_fallible::<CB<_>, CB<_>, _, _, _>(&name, move |row| {
118                    let mut idx = 0;
119                    let mut iter = row.iter();
120                    for &i in &non_null_assertions {
121                        let skip = i - idx;
122                        let datum = iter.nth(skip).unwrap();
123                        idx += skip + 1;
124                        if datum.is_null() {
125                            return Err(DataflowError::EvalError(Box::new(
126                                EvalError::MustNotBeNull(
127                                    format!("column {}", from_desc.get_name(i).quoted()).into(),
128                                ),
129                            )));
130                        }
131                    }
132                    Ok(row)
133                });
134            ok_collection = oks;
135            err_collection = err_collection.concat(&null_errs);
136        }
137
138        let region_name = match sink.connection {
139            ComputeSinkConnection::Subscribe(_) => format!("SubscribeSink({:?})", sink_id),
140            ComputeSinkConnection::MaterializedView(_) => {
141                format!("MaterializedViewSink({:?})", sink_id)
142            }
143            ComputeSinkConnection::ContinualTask(_) => {
144                format!("ContinualTaskSink({:?})", sink_id)
145            }
146            ComputeSinkConnection::CopyToS3Oneshot(_) => {
147                format!("CopyToS3OneshotSink({:?})", sink_id)
148            }
149        };
150        self.scope
151            .parent
152            .clone()
153            .region_named(&region_name, |inner| {
154                let sink_render = get_sink_render_for::<_>(&sink.connection);
155
156                let sink_token = sink_render.render_sink(
157                    compute_state,
158                    sink,
159                    sink_id,
160                    self.as_of_frontier.clone(),
161                    start_signal,
162                    ok_collection.enter_region(inner),
163                    err_collection.enter_region(inner),
164                    ct_times.map(|x| x.enter_region(inner)),
165                    output_probe,
166                );
167
168                if let Some(sink_token) = sink_token {
169                    needed_tokens.push(sink_token);
170                }
171
172                let collection = compute_state.expect_collection_mut(sink_id);
173                collection.sink_token = Some(SinkToken::new(Box::new(needed_tokens)));
174            });
175    }
176}
177
178/// A type that can be rendered as a dataflow sink.
179pub(crate) trait SinkRender<G>
180where
181    G: Scope<Timestamp = mz_repr::Timestamp>,
182{
183    fn render_sink(
184        &self,
185        compute_state: &mut crate::compute_state::ComputeState,
186        sink: &ComputeSinkDesc<CollectionMetadata>,
187        sink_id: GlobalId,
188        as_of: Antichain<mz_repr::Timestamp>,
189        start_signal: StartSignal,
190        sinked_collection: VecCollection<G, Row, Diff>,
191        err_collection: VecCollection<G, DataflowError, Diff>,
192        // TODO(ct2): Figure out a better way to smuggle this in, potentially by
193        // removing the `SinkRender` trait entirely.
194        ct_times: Option<VecCollection<G, (), Diff>>,
195        output_probe: &Handle<mz_repr::Timestamp>,
196    ) -> Option<Rc<dyn Any>>;
197}
198
199fn get_sink_render_for<G>(
200    connection: &ComputeSinkConnection<CollectionMetadata>,
201) -> Box<dyn SinkRender<G>>
202where
203    G: Scope<Timestamp = mz_repr::Timestamp>,
204{
205    match connection {
206        ComputeSinkConnection::Subscribe(connection) => Box::new(connection.clone()),
207        ComputeSinkConnection::MaterializedView(connection) => Box::new(connection.clone()),
208        ComputeSinkConnection::ContinualTask(connection) => Box::new(connection.clone()),
209        ComputeSinkConnection::CopyToS3Oneshot(connection) => Box::new(connection.clone()),
210    }
211}