1use std::sync::Arc;
13use std::time::{Duration, Instant};
14
15use differential_dataflow::operators::arrange::Arrange;
16use differential_dataflow::trace::implementations::ord_neu::{
17 ColValBatcher, ColValBuilder, ColValSpine,
18};
19use differential_dataflow::{AsCollection, Collection, Hashable};
20use mz_interchange::avro::DiffPair;
21use mz_interchange::envelopes::combine_at_timestamp;
22use mz_persist_client::operators::shard_source::SnapshotMode;
23use mz_repr::{Datum, Diff, GlobalId, Row, Timestamp};
24use mz_storage_operators::persist_source;
25use mz_storage_types::controller::CollectionMetadata;
26use mz_storage_types::errors::DataflowError;
27use mz_storage_types::sinks::{StorageSinkConnection, StorageSinkDesc};
28use mz_timely_util::builder_async::PressOnDropButton;
29use timely::dataflow::operators::Leave;
30use timely::dataflow::scopes::Child;
31use timely::dataflow::{Scope, Stream};
32use tracing::warn;
33
34use crate::healthcheck::HealthStatusMessage;
35use crate::internal_control::InternalStorageCommand;
36use crate::storage_state::StorageState;
37
38pub(crate) fn render_sink<'g, G: Scope<Timestamp = ()>>(
42 scope: &mut Child<'g, G, mz_repr::Timestamp>,
43 storage_state: &mut StorageState,
44 sink_id: GlobalId,
45 sink: &StorageSinkDesc<CollectionMetadata, mz_repr::Timestamp>,
46) -> (Stream<G, HealthStatusMessage>, Vec<PressOnDropButton>) {
47 let sink_render = get_sink_render_for(&sink.connection);
48
49 let mut tokens = vec![];
50
51 let snapshot_mode = if sink.with_snapshot {
52 SnapshotMode::Include
53 } else {
54 SnapshotMode::Exclude
55 };
56
57 let command_tx = storage_state.internal_cmd_tx.clone();
58
59 let (ok_collection, err_collection, persist_tokens) = persist_source::persist_source(
60 scope,
61 sink.from,
62 Arc::clone(&storage_state.persist_clients),
63 &storage_state.txns_ctx,
64 storage_state.storage_configuration.config_set(),
65 sink.from_storage_metadata.clone(),
66 None,
67 Some(sink.as_of.clone()),
68 snapshot_mode,
69 timely::progress::Antichain::new(),
70 None,
71 None,
72 async {},
73 move |error| {
74 Box::pin(async move {
75 let error = format!("storage_sink: {error}");
76 tracing::info!("{error}");
77 command_tx.send(InternalStorageCommand::SuspendAndRestart {
78 id: sink_id,
79 reason: error,
80 });
81 })
82 },
83 );
84 tokens.extend(persist_tokens);
85
86 let ok_collection =
87 zip_into_diff_pairs(sink_id, sink, &*sink_render, ok_collection.as_collection());
88
89 let (health, sink_tokens) = sink_render.render_sink(
90 storage_state,
91 sink,
92 sink_id,
93 ok_collection,
94 err_collection.as_collection(),
95 );
96
97 tokens.extend(sink_tokens);
98
99 (health.leave(), tokens)
100}
101
102fn zip_into_diff_pairs<G>(
105 sink_id: GlobalId,
106 sink: &StorageSinkDesc<CollectionMetadata, mz_repr::Timestamp>,
107 sink_render: &dyn SinkRender<G>,
108 collection: Collection<G, Row, Diff>,
109) -> Collection<G, (Option<Row>, DiffPair<Row>), Diff>
110where
111 G: Scope<Timestamp = Timestamp>,
112{
113 let user_key_indices = sink_render.get_key_indices();
123 let relation_key_indices = sink_render.get_relation_key_indices();
124 let key_indices = user_key_indices
125 .or(relation_key_indices)
126 .map(|k| k.to_vec());
127 let key_is_synthetic = key_indices.is_none();
128
129 let collection = match key_indices {
130 None => collection.map(|row| (Some(Row::pack(Some(Datum::UInt64(row.hashed())))), row)),
131 Some(key_indices) => {
132 let mut datum_vec = mz_repr::DatumVec::new();
133 collection.map(move |row| {
134 let key = {
137 let datums = datum_vec.borrow_with(&row);
138 Row::pack(key_indices.iter().map(|&idx| datums[idx].clone()))
139 };
140 (Some(key), row)
141 })
142 }
143 };
144
145 #[allow(clippy::disallowed_methods)]
150 let mut collection =
151 combine_at_timestamp(collection.arrange_named::<ColValBatcher<_,_,_,_>, ColValBuilder<_,_,_,_>, ColValSpine<_, _, _, _>>("Arrange Sink"));
152
153 if user_key_indices.is_none() {
158 collection = collection.map(|(_key, value)| (None, value))
159 }
160
161 collection.flat_map({
162 let mut last_warning = Instant::now();
163 let from_id = sink.from;
164 move |(mut k, vs)| {
165 if !key_is_synthetic && vs.len() > 1 {
172 let now = Instant::now();
175 if now.duration_since(last_warning) >= Duration::from_secs(10) {
176 last_warning = now;
177 warn!(
178 ?sink_id,
179 ?from_id,
180 "primary key error: expected at most one update per key and timestamp; \
181 this can happen when the configured sink key is not a primary key of \
182 the sinked relation"
183 )
184 }
185 }
186
187 let max_idx = vs.len() - 1;
188 vs.into_iter().enumerate().map(move |(idx, dp)| {
189 let k = if idx == max_idx { k.take() } else { k.clone() };
190 (k, dp)
191 })
192 }
193 })
194}
195
196pub(crate) trait SinkRender<G>
198where
199 G: Scope<Timestamp = Timestamp>,
200{
201 fn get_key_indices(&self) -> Option<&[usize]>;
204
205 fn get_relation_key_indices(&self) -> Option<&[usize]>;
208
209 fn render_sink(
211 &self,
212 storage_state: &mut StorageState,
213 sink: &StorageSinkDesc<CollectionMetadata, Timestamp>,
214 sink_id: GlobalId,
215 sinked_collection: Collection<G, (Option<Row>, DiffPair<Row>), Diff>,
216 err_collection: Collection<G, DataflowError, Diff>,
217 ) -> (Stream<G, HealthStatusMessage>, Vec<PressOnDropButton>);
218}
219
220fn get_sink_render_for<G>(connection: &StorageSinkConnection) -> Box<dyn SinkRender<G>>
221where
222 G: Scope<Timestamp = Timestamp>,
223{
224 match connection {
225 StorageSinkConnection::Kafka(connection) => Box::new(connection.clone()),
226 }
227}