1use std::sync::Arc;
13use std::time::{Duration, Instant};
14
15use differential_dataflow::operators::arrange::Arrange;
16use differential_dataflow::trace::implementations::ord_neu::{
17 OrdValBatcher, OrdValSpine, RcOrdValBuilder,
18};
19use differential_dataflow::{AsCollection, Hashable, VecCollection};
20use mz_interchange::avro::DiffPair;
21use mz_interchange::envelopes::combine_at_timestamp;
22use mz_persist_client::operators::shard_source::SnapshotMode;
23use mz_repr::{Datum, Diff, GlobalId, Row, Timestamp};
24use mz_storage_operators::persist_source;
25use mz_storage_types::controller::CollectionMetadata;
26use mz_storage_types::errors::DataflowError;
27use mz_storage_types::sinks::{StorageSinkConnection, StorageSinkDesc};
28use mz_timely_util::builder_async::PressOnDropButton;
29use timely::dataflow::operators::Leave;
30use timely::dataflow::{Scope, StreamVec};
31use tracing::warn;
32
33use crate::healthcheck::HealthStatusMessage;
34use crate::storage_state::StorageState;
35
36pub(crate) fn render_sink<'scope>(
40 scope: Scope<'scope, ()>,
41 storage_state: &mut StorageState,
42 sink_id: GlobalId,
43 sink: &StorageSinkDesc<CollectionMetadata, mz_repr::Timestamp>,
44) -> (
45 StreamVec<'scope, (), HealthStatusMessage>,
46 Vec<PressOnDropButton>,
47) {
48 let snapshot_mode = if sink.with_snapshot {
49 SnapshotMode::Include
50 } else {
51 SnapshotMode::Exclude
52 };
53
54 let error_handler = storage_state.error_handler("storage_sink", sink_id);
55
56 let name = format!("{sink_id}-sinks");
57 let outer_scope = scope.clone();
58
59 scope.scoped(&name, |scope| {
60 let mut tokens = vec![];
61 let sink_render = get_sink_render_for(&sink.connection);
62
63 let (ok_collection, err_collection, persist_tokens) = persist_source::persist_source(
64 scope,
65 sink.from,
66 Arc::clone(&storage_state.persist_clients),
67 &storage_state.txns_ctx,
68 sink.from_storage_metadata.clone(),
69 None,
70 Some(sink.as_of.clone()),
71 snapshot_mode,
72 timely::progress::Antichain::new(),
73 None,
74 None,
75 async {},
76 error_handler,
77 );
78 tokens.extend(persist_tokens);
79
80 let ok_collection =
81 zip_into_diff_pairs(sink_id, sink, &*sink_render, ok_collection.as_collection());
82
83 let (health, sink_tokens) = sink_render.render_sink(
84 storage_state,
85 sink,
86 sink_id,
87 ok_collection,
88 err_collection.as_collection(),
89 );
90 tokens.extend(sink_tokens);
91 (health.leave(outer_scope), tokens)
92 })
93}
94
95fn zip_into_diff_pairs<'scope>(
98 sink_id: GlobalId,
99 sink: &StorageSinkDesc<CollectionMetadata, mz_repr::Timestamp>,
100 sink_render: &dyn SinkRender<'scope>,
101 collection: VecCollection<'scope, Timestamp, Row, Diff>,
102) -> VecCollection<'scope, Timestamp, (Option<Row>, DiffPair<Row>), Diff> {
103 let user_key_indices = sink_render.get_key_indices();
113 let relation_key_indices = sink_render.get_relation_key_indices();
114 let key_indices = user_key_indices
115 .or(relation_key_indices)
116 .map(|k| k.to_vec());
117 let key_is_synthetic = key_indices.is_none();
118
119 let collection = match key_indices {
120 None => collection.map(|row| (Some(Row::pack(Some(Datum::UInt64(row.hashed())))), row)),
121 Some(key_indices) => {
122 let mut datum_vec = mz_repr::DatumVec::new();
123 collection.map(move |row| {
124 let key = {
127 let datums = datum_vec.borrow_with(&row);
128 Row::pack(key_indices.iter().map(|&idx| datums[idx].clone()))
129 };
130 (Some(key), row)
131 })
132 }
133 };
134
135 #[allow(clippy::disallowed_methods)]
140 let mut collection =
141 combine_at_timestamp(collection.arrange_named::<OrdValBatcher<_,_,_,_>, RcOrdValBuilder<_,_,_,_>, OrdValSpine<_, _, _, _>>("Arrange Sink"));
142
143 if user_key_indices.is_none() {
148 collection = collection.map(|(_key, value)| (None, value))
149 }
150
151 collection.flat_map({
152 let mut last_warning = Instant::now();
153 let from_id = sink.from;
154 move |(mut k, vs)| {
155 if !key_is_synthetic && vs.len() > 1 {
162 let now = Instant::now();
165 if now.duration_since(last_warning) >= Duration::from_secs(10) {
166 last_warning = now;
167 warn!(
168 ?sink_id,
169 ?from_id,
170 "primary key error: expected at most one update per key and timestamp; \
171 this can happen when the configured sink key is not a primary key of \
172 the sinked relation"
173 )
174 }
175 }
176
177 let max_idx = vs.len() - 1;
178 vs.into_iter().enumerate().map(move |(idx, dp)| {
179 let k = if idx == max_idx { k.take() } else { k.clone() };
180 (k, dp)
181 })
182 }
183 })
184}
185
186pub(crate) trait SinkRender<'scope> {
188 fn get_key_indices(&self) -> Option<&[usize]>;
191
192 fn get_relation_key_indices(&self) -> Option<&[usize]>;
195
196 fn render_sink(
198 &self,
199 storage_state: &mut StorageState,
200 sink: &StorageSinkDesc<CollectionMetadata, Timestamp>,
201 sink_id: GlobalId,
202 sinked_collection: VecCollection<'scope, Timestamp, (Option<Row>, DiffPair<Row>), Diff>,
203 err_collection: VecCollection<'scope, Timestamp, DataflowError, Diff>,
204 ) -> (
205 StreamVec<'scope, Timestamp, HealthStatusMessage>,
206 Vec<PressOnDropButton>,
207 );
208}
209
210fn get_sink_render_for<'scope>(connection: &StorageSinkConnection) -> Box<dyn SinkRender<'scope>> {
211 match connection {
212 StorageSinkConnection::Kafka(connection) => Box::new(connection.clone()),
213 StorageSinkConnection::Iceberg(connection) => Box::new(connection.clone()),
214 }
215}