mz_compute/render/
sinks.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Logic related to the creation of dataflow sinks.
11
12use std::any::Any;
13use std::collections::{BTreeMap, BTreeSet};
14use std::rc::{Rc, Weak};
15
16use columnar::Columnar;
17use differential_dataflow::Collection;
18use mz_compute_types::sinks::{ComputeSinkConnection, ComputeSinkDesc};
19use mz_expr::{EvalError, MapFilterProject, permutation_for_arrangement};
20use mz_ore::soft_assert_or_log;
21use mz_ore::str::StrExt;
22use mz_ore::vec::PartialOrdVecExt;
23use mz_repr::{Diff, GlobalId, Row, Timestamp};
24use mz_storage_types::controller::CollectionMetadata;
25use mz_storage_types::errors::DataflowError;
26use mz_timely_util::operator::CollectionExt;
27use mz_timely_util::probe::Handle;
28use timely::container::CapacityContainerBuilder;
29use timely::dataflow::Scope;
30use timely::dataflow::scopes::Child;
31use timely::progress::Antichain;
32
33use crate::compute_state::SinkToken;
34use crate::logging::compute::LogDataflowErrors;
35use crate::render::context::Context;
36use crate::render::{RenderTimestamp, StartSignal};
37
38impl<'g, G, T> Context<Child<'g, G, T>>
39where
40    G: Scope<Timestamp = mz_repr::Timestamp>,
41    T: RenderTimestamp,
42    <T as Columnar>::Container: Clone + Send,
43    for<'a> <T as Columnar>::Ref<'a>: Ord + Copy,
44{
45    /// Export the sink described by `sink` from the rendering context.
46    pub(crate) fn export_sink(
47        &self,
48        compute_state: &mut crate::compute_state::ComputeState,
49        tokens: &BTreeMap<GlobalId, Rc<dyn std::any::Any>>,
50        dependency_ids: BTreeSet<GlobalId>,
51        sink_id: GlobalId,
52        sink: &ComputeSinkDesc<CollectionMetadata>,
53        start_signal: StartSignal,
54        ct_times: Option<Collection<G, (), Diff>>,
55        output_probe: &Handle<Timestamp>,
56    ) {
57        soft_assert_or_log!(
58            sink.non_null_assertions.is_strictly_sorted(),
59            "non-null assertions not sorted"
60        );
61
62        // put together tokens that belong to the export
63        let mut needed_tokens = Vec::new();
64        for dep_id in dependency_ids {
65            if let Some(token) = tokens.get(&dep_id) {
66                needed_tokens.push(Rc::clone(token))
67            }
68        }
69
70        // TODO[btv] - We should determine the key and permutation to use during planning,
71        // rather than at runtime.
72        //
73        // This is basically an inlined version of the old `as_collection`.
74        let bundle = self
75            .lookup_id(mz_expr::Id::Global(sink.from))
76            .expect("Sink source collection not loaded");
77        let (ok_collection, mut err_collection) = if let Some(collection) = &bundle.collection {
78            collection.clone()
79        } else {
80            let (key, _arrangement) = bundle
81                .arranged
82                .iter()
83                .next()
84                .expect("Invariant violated: at least one collection must be present.");
85            let unthinned_arity = sink.from_desc.arity();
86            let (permutation, thinning) = permutation_for_arrangement(key, unthinned_arity);
87            let mut mfp = MapFilterProject::new(unthinned_arity);
88            mfp.permute_fn(|c| permutation[c], thinning.len() + key.len());
89            bundle.as_collection_core(
90                mfp,
91                Some((key.clone(), None)),
92                self.until.clone(),
93                &self.config_set,
94            )
95        };
96
97        // Attach logging of dataflow errors.
98        if let Some(logger) = compute_state.compute_logger.clone() {
99            err_collection = err_collection.log_dataflow_errors(logger, sink_id);
100        }
101
102        let mut ok_collection = ok_collection.leave();
103        let mut err_collection = err_collection.leave();
104
105        // Ensure that the frontier does not advance past the expiration time, if set. Otherwise,
106        // we might write down incorrect data.
107        if let Some(&expiration) = self.dataflow_expiration.as_option() {
108            let token = Rc::new(());
109            let shutdown_token = Rc::downgrade(&token);
110            ok_collection = ok_collection.expire_collection_at(
111                &format!("{}_export_sink_oks", self.debug_name),
112                expiration,
113                Weak::clone(&shutdown_token),
114            );
115            err_collection = err_collection.expire_collection_at(
116                &format!("{}_export_sink_errs", self.debug_name),
117                expiration,
118                shutdown_token,
119            );
120            needed_tokens.push(token);
121        }
122
123        let non_null_assertions = sink.non_null_assertions.clone();
124        let from_desc = sink.from_desc.clone();
125        if !non_null_assertions.is_empty() {
126            let name = format!("NullAssertions({sink_id:?})");
127            type CB<C> = CapacityContainerBuilder<C>;
128            let (oks, null_errs) =
129                ok_collection.map_fallible::<CB<_>, CB<_>, _, _, _>(&name, move |row| {
130                    let mut idx = 0;
131                    let mut iter = row.iter();
132                    for &i in &non_null_assertions {
133                        let skip = i - idx;
134                        let datum = iter.nth(skip).unwrap();
135                        idx += skip + 1;
136                        if datum.is_null() {
137                            return Err(DataflowError::EvalError(Box::new(
138                                EvalError::MustNotBeNull(
139                                    format!("column {}", from_desc.get_name(i).as_str().quoted())
140                                        .into(),
141                                ),
142                            )));
143                        }
144                    }
145                    Ok(row)
146                });
147            ok_collection = oks;
148            err_collection = err_collection.concat(&null_errs);
149        }
150
151        let region_name = match sink.connection {
152            ComputeSinkConnection::Subscribe(_) => format!("SubscribeSink({:?})", sink_id),
153            ComputeSinkConnection::MaterializedView(_) => {
154                format!("MaterializedViewSink({:?})", sink_id)
155            }
156            ComputeSinkConnection::ContinualTask(_) => {
157                format!("ContinualTaskSink({:?})", sink_id)
158            }
159            ComputeSinkConnection::CopyToS3Oneshot(_) => {
160                format!("CopyToS3OneshotSink({:?})", sink_id)
161            }
162        };
163        self.scope
164            .parent
165            .clone()
166            .region_named(&region_name, |inner| {
167                let sink_render = get_sink_render_for::<_>(&sink.connection);
168
169                let sink_token = sink_render.render_sink(
170                    compute_state,
171                    sink,
172                    sink_id,
173                    self.as_of_frontier.clone(),
174                    start_signal,
175                    ok_collection.enter_region(inner),
176                    err_collection.enter_region(inner),
177                    ct_times.map(|x| x.enter_region(inner)),
178                    output_probe,
179                );
180
181                if let Some(sink_token) = sink_token {
182                    needed_tokens.push(sink_token);
183                }
184
185                let collection = compute_state.expect_collection_mut(sink_id);
186                collection.sink_token = Some(SinkToken::new(Box::new(needed_tokens)));
187            });
188    }
189}
190
191/// A type that can be rendered as a dataflow sink.
192pub(crate) trait SinkRender<G>
193where
194    G: Scope<Timestamp = mz_repr::Timestamp>,
195{
196    fn render_sink(
197        &self,
198        compute_state: &mut crate::compute_state::ComputeState,
199        sink: &ComputeSinkDesc<CollectionMetadata>,
200        sink_id: GlobalId,
201        as_of: Antichain<mz_repr::Timestamp>,
202        start_signal: StartSignal,
203        sinked_collection: Collection<G, Row, Diff>,
204        err_collection: Collection<G, DataflowError, Diff>,
205        // TODO(ct2): Figure out a better way to smuggle this in, potentially by
206        // removing the `SinkRender` trait entirely.
207        ct_times: Option<Collection<G, (), Diff>>,
208        output_probe: &Handle<Timestamp>,
209    ) -> Option<Rc<dyn Any>>;
210}
211
212fn get_sink_render_for<G>(
213    connection: &ComputeSinkConnection<CollectionMetadata>,
214) -> Box<dyn SinkRender<G>>
215where
216    G: Scope<Timestamp = mz_repr::Timestamp>,
217{
218    match connection {
219        ComputeSinkConnection::Subscribe(connection) => Box::new(connection.clone()),
220        ComputeSinkConnection::MaterializedView(connection) => Box::new(connection.clone()),
221        ComputeSinkConnection::ContinualTask(connection) => Box::new(connection.clone()),
222        ComputeSinkConnection::CopyToS3Oneshot(connection) => Box::new(connection.clone()),
223    }
224}