Skip to main content

mz_storage/render/
sinks.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Logic related to the creation of dataflow sinks.
11
12use std::sync::Arc;
13use std::time::{Duration, Instant};
14
15use differential_dataflow::operators::arrange::{Arrange, Arranged, TraceAgent};
16use differential_dataflow::trace::TraceReader;
17use differential_dataflow::trace::implementations::ord_neu::{
18    OrdValBatcher, OrdValSpine, RcOrdValBuilder,
19};
20use differential_dataflow::{AsCollection, Hashable, VecCollection};
21use mz_persist_client::operators::shard_source::SnapshotMode;
22use mz_repr::{Datum, Diff, GlobalId, Row, Timestamp};
23use mz_storage_operators::persist_source;
24use mz_storage_types::controller::CollectionMetadata;
25use mz_storage_types::errors::DataflowError;
26use mz_storage_types::sinks::{StorageSinkConnection, StorageSinkDesc};
27use mz_timely_util::builder_async::PressOnDropButton;
28use timely::dataflow::operators::Leave;
29use timely::dataflow::{Scope, StreamVec};
30use tracing::warn;
31
32use crate::healthcheck::HealthStatusMessage;
33use crate::storage_state::StorageState;
34
35/// The concrete trace type produced internally when arranging a sink's input.
36/// The sink never sees this directly — only the batches flowing through it —
37/// but it's the anchor for the batch type in [`SinkBatchStream`].
38pub(crate) type SinkTrace = TraceAgent<OrdValSpine<Option<Row>, Row, Timestamp, Diff>>;
39
40/// Stream of arrangement batches handed to [`SinkRender::render_sink`].
41///
42/// This is `Arranged::stream` with the trace reader dropped: sinks only need
43/// batch-level access (no random-access reads via a cursor), so we don't keep
44/// a `TraceAgent` alive. Dropping the reader lets the spine's compaction
45/// frontiers advance to the empty antichain, so the arrange operator can
46/// aggressively compact / release batch state as updates flow through.
47pub(crate) type SinkBatchStream<'scope> =
48    StreamVec<'scope, Timestamp, <SinkTrace as TraceReader>::Batch>;
49
50/// _Renders_ complete _differential_ collections
51/// that represent the sink and its errors as requested
52/// by the original `CREATE SINK` statement.
53pub(crate) fn render_sink<'scope>(
54    scope: Scope<'scope, ()>,
55    storage_state: &mut StorageState,
56    sink_id: GlobalId,
57    sink: &StorageSinkDesc<CollectionMetadata, mz_repr::Timestamp>,
58) -> (
59    StreamVec<'scope, (), HealthStatusMessage>,
60    Vec<PressOnDropButton>,
61) {
62    let snapshot_mode = if sink.with_snapshot {
63        SnapshotMode::Include
64    } else {
65        SnapshotMode::Exclude
66    };
67
68    let error_handler = storage_state.error_handler("storage_sink", sink_id);
69
70    let name = format!("{sink_id}-sinks");
71    let outer_scope = scope.clone();
72
73    scope.scoped(&name, |scope| {
74        let mut tokens = vec![];
75        let sink_render = get_sink_render_for(&sink.connection);
76
77        let (ok_collection, err_collection, persist_tokens) = persist_source::persist_source(
78            scope,
79            sink.from,
80            Arc::clone(&storage_state.persist_clients),
81            &storage_state.txns_ctx,
82            sink.from_storage_metadata.clone(),
83            None,
84            Some(sink.as_of.clone()),
85            snapshot_mode,
86            timely::progress::Antichain::new(),
87            None,
88            None,
89            async {},
90            error_handler,
91        );
92        tokens.extend(persist_tokens);
93
94        let batches = arrange_sink_input(&*sink_render, ok_collection.as_collection());
95        let key_is_synthetic = sink_render.get_key_indices().is_none()
96            && sink_render.get_relation_key_indices().is_none();
97
98        let (health, sink_tokens) = sink_render.render_sink(
99            storage_state,
100            sink,
101            sink_id,
102            batches,
103            key_is_synthetic,
104            err_collection.as_collection(),
105        );
106        tokens.extend(sink_tokens);
107        (health.leave(outer_scope), tokens)
108    })
109}
110
111/// Extract the sink's key column(s) from each row, arrange the resulting
112/// `(Option<Row>, Row)` collection by key, and return just the stream of
113/// batches — dropping the trace reader.
114///
115/// Prefers the user-specified sink key, falling back to any natural key of the
116/// underlying relation. When neither exists, a synthetic per-row hash is used
117/// purely to distribute work across workers — in that case the sink should
118/// treat the key as absent (`key_is_synthetic`).
119///
120/// Partial-moving `arranged.stream` lets the surrounding `Arranged` (and the
121/// `TraceAgent` it holds) drop, releasing the spine's compaction holds so the
122/// arrange operator can compact batch state as it's emitted.
123fn arrange_sink_input<'scope>(
124    sink_render: &dyn SinkRender<'scope>,
125    collection: VecCollection<'scope, Timestamp, Row, Diff>,
126) -> SinkBatchStream<'scope> {
127    let key_indices = sink_render
128        .get_key_indices()
129        .or_else(|| sink_render.get_relation_key_indices())
130        .map(|k| k.to_vec());
131
132    let keyed = match key_indices {
133        None => collection.map(|row| (Some(Row::pack(Some(Datum::UInt64(row.hashed())))), row)),
134        Some(key_indices) => {
135            let mut datum_vec = mz_repr::DatumVec::new();
136            collection.map(move |row| {
137                // TODO[perf] (btv) - is there a way to avoid unpacking and
138                // repacking every row and cloning the datums? Does it matter?
139                let key = {
140                    let datums = datum_vec.borrow_with(&row);
141                    Row::pack(key_indices.iter().map(|&idx| datums[idx].clone()))
142                };
143                (Some(key), row)
144            })
145        }
146    };
147
148    // Allow access to `arrange_named` because we cannot access Mz's wrapper
149    // from here. TODO(database-issues#5046): Revisit with cluster unification.
150    #[allow(clippy::disallowed_methods)]
151    let Arranged {stream, trace: _} = keyed.arrange_named::<OrdValBatcher<_, _, _, _>, RcOrdValBuilder<_, _, _, _>, OrdValSpine<_, _, _, _>>("Arrange Sink");
152    stream
153}
154
155/// Rate-limited detector for primary-key uniqueness violations as a sink's
156/// cursor walk observes `(key, timestamp)` groups.
157///
158/// Call [`PkViolationWarner::observe`] once per emitted `DiffPair`. When the
159/// current `(key, timestamp)` group changes — or when input batches finish —
160/// call [`PkViolationWarner::flush`] so the accumulated count is evaluated.
161///
162/// Keys are identified by their `Hashable::hashed()` value rather than held
163/// by value, so the hot observe path does no `Row` clones. A hash collision
164/// can mask a PK violation but this is a purely diagnostic check, so the
165/// trade-off is acceptable.
166pub(crate) struct PkViolationWarner {
167    sink_id: GlobalId,
168    from_id: GlobalId,
169    last_warning: Instant,
170    current: Option<(u64, Timestamp)>,
171    count: usize,
172}
173
174impl PkViolationWarner {
175    pub fn new(sink_id: GlobalId, from_id: GlobalId) -> Self {
176        Self {
177            sink_id,
178            from_id,
179            last_warning: Instant::now(),
180            current: None,
181            count: 0,
182        }
183    }
184
185    /// Record that a `DiffPair` was observed at `(key, time)`. If this starts
186    /// a new group, the previous group's count is flushed (and warned about
187    /// if the count was > 1).
188    pub fn observe(&mut self, key: &Option<Row>, time: Timestamp) {
189        // `None` keys hash to a distinct sentinel from any `Row::hashed()`;
190        // the exact constant doesn't matter for correctness (it just needs
191        // to be stable).
192        let hash = key.as_ref().map(|k| k.hashed()).unwrap_or(u64::MAX);
193        let same = self.current == Some((hash, time));
194        if !same {
195            self.flush();
196            self.current = Some((hash, time));
197        }
198        self.count += 1;
199    }
200
201    /// Flush the pending `(key, timestamp)` group count. Emits a
202    /// rate-limited warning if more than one `DiffPair` was observed.
203    pub fn flush(&mut self) {
204        if self.count > 1 {
205            let now = Instant::now();
206            if now.duration_since(self.last_warning) >= Duration::from_secs(10) {
207                self.last_warning = now;
208                warn!(
209                    sink_id = ?self.sink_id,
210                    from_id = ?self.from_id,
211                    "primary key error: expected at most one update per key and timestamp; \
212                        this can happen when the configured sink key is not a primary key of \
213                        the sinked relation"
214                );
215            }
216        }
217        self.current = None;
218        self.count = 0;
219    }
220}
221
222/// A type that can be rendered as a dataflow sink.
223pub(crate) trait SinkRender<'scope> {
224    /// Gets the indexes of the columns that form the key that the user
225    /// specified when creating the sink, if any.
226    fn get_key_indices(&self) -> Option<&[usize]>;
227
228    /// Gets the indexes of the columns that form a key of the sink's underlying
229    /// relation, if such a key exists.
230    fn get_relation_key_indices(&self) -> Option<&[usize]>;
231
232    /// Renders the sink's dataflow.
233    ///
234    /// The sink receives a stream of arrangement batches keyed on `Option<Row>`.
235    /// The sink is responsible for walking each batch (typically via
236    /// [`mz_interchange::envelopes::for_each_diff_pair`]) and handling any
237    /// envelope-specific diff-pair construction. When `key_is_synthetic` is
238    /// true the arrangement's key is a per-row hash used only for worker
239    /// distribution — the sink should treat the key as absent when producing
240    /// output.
241    fn render_sink(
242        &self,
243        storage_state: &mut StorageState,
244        sink: &StorageSinkDesc<CollectionMetadata, Timestamp>,
245        sink_id: GlobalId,
246        batches: SinkBatchStream<'scope>,
247        key_is_synthetic: bool,
248        err_collection: VecCollection<'scope, Timestamp, DataflowError, Diff>,
249    ) -> (
250        StreamVec<'scope, Timestamp, HealthStatusMessage>,
251        Vec<PressOnDropButton>,
252    );
253}
254
255fn get_sink_render_for<'scope>(connection: &StorageSinkConnection) -> Box<dyn SinkRender<'scope>> {
256    match connection {
257        StorageSinkConnection::Kafka(connection) => Box::new(connection.clone()),
258        StorageSinkConnection::Iceberg(connection) => Box::new(connection.clone()),
259    }
260}