Skip to main content

mz_storage/render/
sinks.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Logic related to the creation of dataflow sinks.
11
12use std::sync::Arc;
13use std::time::{Duration, Instant};
14
15use differential_dataflow::operators::arrange::Arrange;
16use differential_dataflow::trace::implementations::ord_neu::{
17    OrdValBatcher, OrdValSpine, RcOrdValBuilder,
18};
19use differential_dataflow::{AsCollection, Hashable, VecCollection};
20use mz_interchange::avro::DiffPair;
21use mz_interchange::envelopes::combine_at_timestamp;
22use mz_persist_client::operators::shard_source::SnapshotMode;
23use mz_repr::{Datum, Diff, GlobalId, Row, Timestamp};
24use mz_storage_operators::persist_source;
25use mz_storage_types::controller::CollectionMetadata;
26use mz_storage_types::errors::DataflowError;
27use mz_storage_types::sinks::{StorageSinkConnection, StorageSinkDesc};
28use mz_timely_util::builder_async::PressOnDropButton;
29use timely::dataflow::operators::Leave;
30use timely::dataflow::{Scope, StreamVec};
31use tracing::warn;
32
33use crate::healthcheck::HealthStatusMessage;
34use crate::storage_state::StorageState;
35
36/// _Renders_ complete _differential_ collections
37/// that represent the sink and its errors as requested
38/// by the original `CREATE SINK` statement.
39pub(crate) fn render_sink<'scope>(
40    scope: Scope<'scope, ()>,
41    storage_state: &mut StorageState,
42    sink_id: GlobalId,
43    sink: &StorageSinkDesc<CollectionMetadata, mz_repr::Timestamp>,
44) -> (
45    StreamVec<'scope, (), HealthStatusMessage>,
46    Vec<PressOnDropButton>,
47) {
48    let snapshot_mode = if sink.with_snapshot {
49        SnapshotMode::Include
50    } else {
51        SnapshotMode::Exclude
52    };
53
54    let error_handler = storage_state.error_handler("storage_sink", sink_id);
55
56    let name = format!("{sink_id}-sinks");
57    let outer_scope = scope.clone();
58
59    scope.scoped(&name, |scope| {
60        let mut tokens = vec![];
61        let sink_render = get_sink_render_for(&sink.connection);
62
63        let (ok_collection, err_collection, persist_tokens) = persist_source::persist_source(
64            scope,
65            sink.from,
66            Arc::clone(&storage_state.persist_clients),
67            &storage_state.txns_ctx,
68            sink.from_storage_metadata.clone(),
69            None,
70            Some(sink.as_of.clone()),
71            snapshot_mode,
72            timely::progress::Antichain::new(),
73            None,
74            None,
75            async {},
76            error_handler,
77        );
78        tokens.extend(persist_tokens);
79
80        let ok_collection =
81            zip_into_diff_pairs(sink_id, sink, &*sink_render, ok_collection.as_collection());
82
83        let (health, sink_tokens) = sink_render.render_sink(
84            storage_state,
85            sink,
86            sink_id,
87            ok_collection,
88            err_collection.as_collection(),
89        );
90        tokens.extend(sink_tokens);
91        (health.leave(outer_scope), tokens)
92    })
93}
94
95/// Zip the input to a sink so that updates to the same key appear as
96/// `DiffPair`s.
97fn zip_into_diff_pairs<'scope>(
98    sink_id: GlobalId,
99    sink: &StorageSinkDesc<CollectionMetadata, mz_repr::Timestamp>,
100    sink_render: &dyn SinkRender<'scope>,
101    collection: VecCollection<'scope, Timestamp, Row, Diff>,
102) -> VecCollection<'scope, Timestamp, (Option<Row>, DiffPair<Row>), Diff> {
103    // We need to consolidate the collection and group records by their key.
104    // We'll first attempt to use the explicitly declared key when the sink was
105    // created. If no such key exists, we'll use a key of the sink's underlying
106    // relation, if one exists.
107    //
108    // If no such key exists, we'll generate a synthetic key based on the hash
109    // of the row, just for purposes of distributing work among workers. In this
110    // case the key offers no uniqueness guarantee.
111
112    let user_key_indices = sink_render.get_key_indices();
113    let relation_key_indices = sink_render.get_relation_key_indices();
114    let key_indices = user_key_indices
115        .or(relation_key_indices)
116        .map(|k| k.to_vec());
117    let key_is_synthetic = key_indices.is_none();
118
119    let collection = match key_indices {
120        None => collection.map(|row| (Some(Row::pack(Some(Datum::UInt64(row.hashed())))), row)),
121        Some(key_indices) => {
122            let mut datum_vec = mz_repr::DatumVec::new();
123            collection.map(move |row| {
124                // TODO[perf] (btv) - is there a way to avoid unpacking and
125                // repacking every row and cloning the datums? Does it matter?
126                let key = {
127                    let datums = datum_vec.borrow_with(&row);
128                    Row::pack(key_indices.iter().map(|&idx| datums[idx].clone()))
129                };
130                (Some(key), row)
131            })
132        }
133    };
134
135    // Group messages by key at each timestamp.
136    //
137    // Allow access to `arrange_named` because we cannot access Mz's wrapper
138    // from here. TODO(database-issues#5046): Revisit with cluster unification.
139    #[allow(clippy::disallowed_methods)]
140    let mut collection =
141        combine_at_timestamp(collection.arrange_named::<OrdValBatcher<_,_,_,_>, RcOrdValBuilder<_,_,_,_>, OrdValSpine<_, _, _, _>>("Arrange Sink"));
142
143    // If there is no user-specified key, remove the synthetic key.
144    //
145    // We don't want the synthetic key to appear in the sink's actual output; we
146    // just needed a value to use to distribute work.
147    if user_key_indices.is_none() {
148        collection = collection.map(|(_key, value)| (None, value))
149    }
150
151    collection.flat_map({
152        let mut last_warning = Instant::now();
153        let from_id = sink.from;
154        move |(mut k, vs)| {
155            // If the key is not synthetic, emit a warning to internal logs if
156            // we discover a primary key violation.
157            //
158            // TODO: put the sink in a user-visible errored state instead of
159            // only logging internally. See:
160            // https://github.com/MaterializeInc/database-issues/issues/5099.
161            if !key_is_synthetic && vs.len() > 1 {
162                // We rate limit how often we emit this warning to avoid
163                // flooding logs.
164                let now = Instant::now();
165                if now.duration_since(last_warning) >= Duration::from_secs(10) {
166                    last_warning = now;
167                    warn!(
168                        ?sink_id,
169                        ?from_id,
170                        "primary key error: expected at most one update per key and timestamp; \
171                            this can happen when the configured sink key is not a primary key of \
172                            the sinked relation"
173                    )
174                }
175            }
176
177            let max_idx = vs.len() - 1;
178            vs.into_iter().enumerate().map(move |(idx, dp)| {
179                let k = if idx == max_idx { k.take() } else { k.clone() };
180                (k, dp)
181            })
182        }
183    })
184}
185
186/// A type that can be rendered as a dataflow sink.
187pub(crate) trait SinkRender<'scope> {
188    /// Gets the indexes of the columns that form the key that the user
189    /// specified when creating the sink, if any.
190    fn get_key_indices(&self) -> Option<&[usize]>;
191
192    /// Gets the indexes of the columns that form a key of the sink's underlying
193    /// relation, if such a key exists.
194    fn get_relation_key_indices(&self) -> Option<&[usize]>;
195
196    /// Renders the sink's dataflow.
197    fn render_sink(
198        &self,
199        storage_state: &mut StorageState,
200        sink: &StorageSinkDesc<CollectionMetadata, Timestamp>,
201        sink_id: GlobalId,
202        sinked_collection: VecCollection<'scope, Timestamp, (Option<Row>, DiffPair<Row>), Diff>,
203        err_collection: VecCollection<'scope, Timestamp, DataflowError, Diff>,
204    ) -> (
205        StreamVec<'scope, Timestamp, HealthStatusMessage>,
206        Vec<PressOnDropButton>,
207    );
208}
209
210fn get_sink_render_for<'scope>(connection: &StorageSinkConnection) -> Box<dyn SinkRender<'scope>> {
211    match connection {
212        StorageSinkConnection::Kafka(connection) => Box::new(connection.clone()),
213        StorageSinkConnection::Iceberg(connection) => Box::new(connection.clone()),
214    }
215}