mz_storage/render/
sinks.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Logic related to the creation of dataflow sinks.
11
12use std::sync::Arc;
13use std::time::{Duration, Instant};
14
15use differential_dataflow::operators::arrange::Arrange;
16use differential_dataflow::trace::implementations::ord_neu::{
17    ColValBatcher, ColValBuilder, ColValSpine,
18};
19use differential_dataflow::{AsCollection, Collection, Hashable};
20use mz_interchange::avro::DiffPair;
21use mz_interchange::envelopes::combine_at_timestamp;
22use mz_persist_client::operators::shard_source::SnapshotMode;
23use mz_repr::{Datum, Diff, GlobalId, Row, Timestamp};
24use mz_storage_operators::persist_source;
25use mz_storage_types::controller::CollectionMetadata;
26use mz_storage_types::errors::DataflowError;
27use mz_storage_types::sinks::{StorageSinkConnection, StorageSinkDesc};
28use mz_timely_util::builder_async::PressOnDropButton;
29use timely::dataflow::operators::Leave;
30use timely::dataflow::scopes::Child;
31use timely::dataflow::{Scope, Stream};
32use tracing::warn;
33
34use crate::healthcheck::HealthStatusMessage;
35use crate::internal_control::InternalStorageCommand;
36use crate::storage_state::StorageState;
37
38/// _Renders_ complete _differential_ [`Collection`]s
39/// that represent the sink and its errors as requested
40/// by the original `CREATE SINK` statement.
41pub(crate) fn render_sink<'g, G: Scope<Timestamp = ()>>(
42    scope: &mut Child<'g, G, mz_repr::Timestamp>,
43    storage_state: &mut StorageState,
44    sink_id: GlobalId,
45    sink: &StorageSinkDesc<CollectionMetadata, mz_repr::Timestamp>,
46) -> (Stream<G, HealthStatusMessage>, Vec<PressOnDropButton>) {
47    let sink_render = get_sink_render_for(&sink.connection);
48
49    let mut tokens = vec![];
50
51    let snapshot_mode = if sink.with_snapshot {
52        SnapshotMode::Include
53    } else {
54        SnapshotMode::Exclude
55    };
56
57    let command_tx = storage_state.internal_cmd_tx.clone();
58
59    let (ok_collection, err_collection, persist_tokens) = persist_source::persist_source(
60        scope,
61        sink.from,
62        Arc::clone(&storage_state.persist_clients),
63        &storage_state.txns_ctx,
64        storage_state.storage_configuration.config_set(),
65        sink.from_storage_metadata.clone(),
66        None,
67        Some(sink.as_of.clone()),
68        snapshot_mode,
69        timely::progress::Antichain::new(),
70        None,
71        None,
72        async {},
73        move |error| {
74            Box::pin(async move {
75                let error = format!("storage_sink: {error}");
76                tracing::info!("{error}");
77                command_tx.send(InternalStorageCommand::SuspendAndRestart {
78                    id: sink_id,
79                    reason: error,
80                });
81            })
82        },
83    );
84    tokens.extend(persist_tokens);
85
86    let ok_collection =
87        zip_into_diff_pairs(sink_id, sink, &*sink_render, ok_collection.as_collection());
88
89    let (health, sink_tokens) = sink_render.render_sink(
90        storage_state,
91        sink,
92        sink_id,
93        ok_collection,
94        err_collection.as_collection(),
95    );
96
97    tokens.extend(sink_tokens);
98
99    (health.leave(), tokens)
100}
101
102/// Zip the input to a sink so that updates to the same key appear as
103/// `DiffPair`s.
104fn zip_into_diff_pairs<G>(
105    sink_id: GlobalId,
106    sink: &StorageSinkDesc<CollectionMetadata, mz_repr::Timestamp>,
107    sink_render: &dyn SinkRender<G>,
108    collection: Collection<G, Row, Diff>,
109) -> Collection<G, (Option<Row>, DiffPair<Row>), Diff>
110where
111    G: Scope<Timestamp = Timestamp>,
112{
113    // We need to consolidate the collection and group records by their key.
114    // We'll first attempt to use the explicitly declared key when the sink was
115    // created. If no such key exists, we'll use a key of the sink's underlying
116    // relation, if one exists.
117    //
118    // If no such key exists, we'll generate a synthetic key based on the hash
119    // of the row, just for purposes of distributing work among workers. In this
120    // case the key offers no uniqueness guarantee.
121
122    let user_key_indices = sink_render.get_key_indices();
123    let relation_key_indices = sink_render.get_relation_key_indices();
124    let key_indices = user_key_indices
125        .or(relation_key_indices)
126        .map(|k| k.to_vec());
127    let key_is_synthetic = key_indices.is_none();
128
129    let collection = match key_indices {
130        None => collection.map(|row| (Some(Row::pack(Some(Datum::UInt64(row.hashed())))), row)),
131        Some(key_indices) => {
132            let mut datum_vec = mz_repr::DatumVec::new();
133            collection.map(move |row| {
134                // TODO[perf] (btv) - is there a way to avoid unpacking and
135                // repacking every row and cloning the datums? Does it matter?
136                let key = {
137                    let datums = datum_vec.borrow_with(&row);
138                    Row::pack(key_indices.iter().map(|&idx| datums[idx].clone()))
139                };
140                (Some(key), row)
141            })
142        }
143    };
144
145    // Group messages by key at each timestamp.
146    //
147    // Allow access to `arrange_named` because we cannot access Mz's wrapper
148    // from here. TODO(database-issues#5046): Revisit with cluster unification.
149    #[allow(clippy::disallowed_methods)]
150    let mut collection =
151        combine_at_timestamp(collection.arrange_named::<ColValBatcher<_,_,_,_>, ColValBuilder<_,_,_,_>, ColValSpine<_, _, _, _>>("Arrange Sink"));
152
153    // If there is no user-specified key, remove the synthetic key.
154    //
155    // We don't want the synthetic key to appear in the sink's actual output; we
156    // just needed a value to use to distribute work.
157    if user_key_indices.is_none() {
158        collection = collection.map(|(_key, value)| (None, value))
159    }
160
161    collection.flat_map({
162        let mut last_warning = Instant::now();
163        let from_id = sink.from;
164        move |(mut k, vs)| {
165            // If the key is not synthetic, emit a warning to internal logs if
166            // we discover a primary key violation.
167            //
168            // TODO: put the sink in a user-visible errored state instead of
169            // only logging internally. See:
170            // https://github.com/MaterializeInc/database-issues/issues/5099.
171            if !key_is_synthetic && vs.len() > 1 {
172                // We rate limit how often we emit this warning to avoid
173                // flooding logs.
174                let now = Instant::now();
175                if now.duration_since(last_warning) >= Duration::from_secs(10) {
176                    last_warning = now;
177                    warn!(
178                        ?sink_id,
179                        ?from_id,
180                        "primary key error: expected at most one update per key and timestamp; \
181                            this can happen when the configured sink key is not a primary key of \
182                            the sinked relation"
183                    )
184                }
185            }
186
187            let max_idx = vs.len() - 1;
188            vs.into_iter().enumerate().map(move |(idx, dp)| {
189                let k = if idx == max_idx { k.take() } else { k.clone() };
190                (k, dp)
191            })
192        }
193    })
194}
195
196/// A type that can be rendered as a dataflow sink.
197pub(crate) trait SinkRender<G>
198where
199    G: Scope<Timestamp = Timestamp>,
200{
201    /// Gets the indexes of the columns that form the key that the user
202    /// specified when creating the sink, if any.
203    fn get_key_indices(&self) -> Option<&[usize]>;
204
205    /// Gets the indexes of the columns that form a key of the sink's underlying
206    /// relation, if such a key exists.
207    fn get_relation_key_indices(&self) -> Option<&[usize]>;
208
209    /// Renders the sink's dataflow.
210    fn render_sink(
211        &self,
212        storage_state: &mut StorageState,
213        sink: &StorageSinkDesc<CollectionMetadata, Timestamp>,
214        sink_id: GlobalId,
215        sinked_collection: Collection<G, (Option<Row>, DiffPair<Row>), Diff>,
216        err_collection: Collection<G, DataflowError, Diff>,
217    ) -> (Stream<G, HealthStatusMessage>, Vec<PressOnDropButton>);
218}
219
220fn get_sink_render_for<G>(connection: &StorageSinkConnection) -> Box<dyn SinkRender<G>>
221where
222    G: Scope<Timestamp = Timestamp>,
223{
224    match connection {
225        StorageSinkConnection::Kafka(connection) => Box::new(connection.clone()),
226    }
227}