1use std::sync::Arc;
13use std::time::{Duration, Instant};
14
15use differential_dataflow::operators::arrange::{Arrange, Arranged, TraceAgent};
16use differential_dataflow::trace::TraceReader;
17use differential_dataflow::trace::implementations::ord_neu::{
18 OrdValBatcher, OrdValSpine, RcOrdValBuilder,
19};
20use differential_dataflow::{AsCollection, Hashable, VecCollection};
21use mz_persist_client::operators::shard_source::SnapshotMode;
22use mz_repr::{Datum, Diff, GlobalId, Row, Timestamp};
23use mz_storage_operators::persist_source;
24use mz_storage_types::controller::CollectionMetadata;
25use mz_storage_types::errors::DataflowError;
26use mz_storage_types::sinks::{StorageSinkConnection, StorageSinkDesc};
27use mz_timely_util::builder_async::PressOnDropButton;
28use timely::dataflow::operators::Leave;
29use timely::dataflow::{Scope, StreamVec};
30use tracing::warn;
31
32use crate::healthcheck::HealthStatusMessage;
33use crate::storage_state::StorageState;
34
35pub(crate) type SinkTrace = TraceAgent<OrdValSpine<Option<Row>, Row, Timestamp, Diff>>;
39
40pub(crate) type SinkBatchStream<'scope> =
48 StreamVec<'scope, Timestamp, <SinkTrace as TraceReader>::Batch>;
49
50pub(crate) fn render_sink<'scope>(
54 scope: Scope<'scope, ()>,
55 storage_state: &mut StorageState,
56 sink_id: GlobalId,
57 sink: &StorageSinkDesc<CollectionMetadata, mz_repr::Timestamp>,
58) -> (
59 StreamVec<'scope, (), HealthStatusMessage>,
60 Vec<PressOnDropButton>,
61) {
62 let snapshot_mode = if sink.with_snapshot {
63 SnapshotMode::Include
64 } else {
65 SnapshotMode::Exclude
66 };
67
68 let error_handler = storage_state.error_handler("storage_sink", sink_id);
69
70 let name = format!("{sink_id}-sinks");
71 let outer_scope = scope.clone();
72
73 scope.scoped(&name, |scope| {
74 let mut tokens = vec![];
75 let sink_render = get_sink_render_for(&sink.connection);
76
77 let (ok_collection, err_collection, persist_tokens) = persist_source::persist_source(
78 scope,
79 sink.from,
80 Arc::clone(&storage_state.persist_clients),
81 &storage_state.txns_ctx,
82 sink.from_storage_metadata.clone(),
83 None,
84 Some(sink.as_of.clone()),
85 snapshot_mode,
86 timely::progress::Antichain::new(),
87 None,
88 None,
89 async {},
90 error_handler,
91 );
92 tokens.extend(persist_tokens);
93
94 let batches = arrange_sink_input(&*sink_render, ok_collection.as_collection());
95 let key_is_synthetic = sink_render.get_key_indices().is_none()
96 && sink_render.get_relation_key_indices().is_none();
97
98 let (health, sink_tokens) = sink_render.render_sink(
99 storage_state,
100 sink,
101 sink_id,
102 batches,
103 key_is_synthetic,
104 err_collection.as_collection(),
105 );
106 tokens.extend(sink_tokens);
107 (health.leave(outer_scope), tokens)
108 })
109}
110
111fn arrange_sink_input<'scope>(
124 sink_render: &dyn SinkRender<'scope>,
125 collection: VecCollection<'scope, Timestamp, Row, Diff>,
126) -> SinkBatchStream<'scope> {
127 let key_indices = sink_render
128 .get_key_indices()
129 .or_else(|| sink_render.get_relation_key_indices())
130 .map(|k| k.to_vec());
131
132 let keyed = match key_indices {
133 None => collection.map(|row| (Some(Row::pack(Some(Datum::UInt64(row.hashed())))), row)),
134 Some(key_indices) => {
135 let mut datum_vec = mz_repr::DatumVec::new();
136 collection.map(move |row| {
137 let key = {
140 let datums = datum_vec.borrow_with(&row);
141 Row::pack(key_indices.iter().map(|&idx| datums[idx].clone()))
142 };
143 (Some(key), row)
144 })
145 }
146 };
147
148 #[allow(clippy::disallowed_methods)]
151 let Arranged {stream, trace: _} = keyed.arrange_named::<OrdValBatcher<_, _, _, _>, RcOrdValBuilder<_, _, _, _>, OrdValSpine<_, _, _, _>>("Arrange Sink");
152 stream
153}
154
155pub(crate) struct PkViolationWarner {
167 sink_id: GlobalId,
168 from_id: GlobalId,
169 last_warning: Instant,
170 current: Option<(u64, Timestamp)>,
171 count: usize,
172}
173
174impl PkViolationWarner {
175 pub fn new(sink_id: GlobalId, from_id: GlobalId) -> Self {
176 Self {
177 sink_id,
178 from_id,
179 last_warning: Instant::now(),
180 current: None,
181 count: 0,
182 }
183 }
184
185 pub fn observe(&mut self, key: &Option<Row>, time: Timestamp) {
189 let hash = key.as_ref().map(|k| k.hashed()).unwrap_or(u64::MAX);
193 let same = self.current == Some((hash, time));
194 if !same {
195 self.flush();
196 self.current = Some((hash, time));
197 }
198 self.count += 1;
199 }
200
201 pub fn flush(&mut self) {
204 if self.count > 1 {
205 let now = Instant::now();
206 if now.duration_since(self.last_warning) >= Duration::from_secs(10) {
207 self.last_warning = now;
208 warn!(
209 sink_id = ?self.sink_id,
210 from_id = ?self.from_id,
211 "primary key error: expected at most one update per key and timestamp; \
212 this can happen when the configured sink key is not a primary key of \
213 the sinked relation"
214 );
215 }
216 }
217 self.current = None;
218 self.count = 0;
219 }
220}
221
222pub(crate) trait SinkRender<'scope> {
224 fn get_key_indices(&self) -> Option<&[usize]>;
227
228 fn get_relation_key_indices(&self) -> Option<&[usize]>;
231
232 fn render_sink(
242 &self,
243 storage_state: &mut StorageState,
244 sink: &StorageSinkDesc<CollectionMetadata, Timestamp>,
245 sink_id: GlobalId,
246 batches: SinkBatchStream<'scope>,
247 key_is_synthetic: bool,
248 err_collection: VecCollection<'scope, Timestamp, DataflowError, Diff>,
249 ) -> (
250 StreamVec<'scope, Timestamp, HealthStatusMessage>,
251 Vec<PressOnDropButton>,
252 );
253}
254
255fn get_sink_render_for<'scope>(connection: &StorageSinkConnection) -> Box<dyn SinkRender<'scope>> {
256 match connection {
257 StorageSinkConnection::Kafka(connection) => Box::new(connection.clone()),
258 StorageSinkConnection::Iceberg(connection) => Box::new(connection.clone()),
259 }
260}