1use std::sync::Arc;
13use std::time::{Duration, Instant};
14
15use differential_dataflow::operators::arrange::Arrange;
16use differential_dataflow::trace::implementations::ord_neu::{
17    ColValBatcher, ColValBuilder, ColValSpine,
18};
19use differential_dataflow::{AsCollection, Collection, Hashable};
20use mz_interchange::avro::DiffPair;
21use mz_interchange::envelopes::combine_at_timestamp;
22use mz_persist_client::operators::shard_source::SnapshotMode;
23use mz_repr::{Datum, Diff, GlobalId, Row, Timestamp};
24use mz_storage_operators::persist_source;
25use mz_storage_types::controller::CollectionMetadata;
26use mz_storage_types::errors::DataflowError;
27use mz_storage_types::sinks::{StorageSinkConnection, StorageSinkDesc};
28use mz_timely_util::builder_async::PressOnDropButton;
29use timely::dataflow::operators::Leave;
30use timely::dataflow::{Scope, Stream};
31use tracing::warn;
32
33use crate::healthcheck::HealthStatusMessage;
34use crate::storage_state::StorageState;
35
36pub(crate) fn render_sink<G>(
40    scope: &mut G,
41    storage_state: &mut StorageState,
42    sink_id: GlobalId,
43    sink: &StorageSinkDesc<CollectionMetadata, mz_repr::Timestamp>,
44) -> (Stream<G, HealthStatusMessage>, Vec<PressOnDropButton>)
45where
46    G: Scope<Timestamp = ()>,
47{
48    let snapshot_mode = if sink.with_snapshot {
49        SnapshotMode::Include
50    } else {
51        SnapshotMode::Exclude
52    };
53
54    let error_handler = storage_state.error_handler("storage_sink", sink_id);
55
56    let name = format!("{sink_id}-sinks");
57
58    scope.scoped(&name, |scope| {
59        let mut tokens = vec![];
60        let sink_render = get_sink_render_for(&sink.connection);
61
62        let (ok_collection, err_collection, persist_tokens) = persist_source::persist_source(
63            scope,
64            sink.from,
65            Arc::clone(&storage_state.persist_clients),
66            &storage_state.txns_ctx,
67            storage_state.storage_configuration.config_set(),
68            sink.from_storage_metadata.clone(),
69            None,
70            Some(sink.as_of.clone()),
71            snapshot_mode,
72            timely::progress::Antichain::new(),
73            None,
74            None,
75            async {},
76            error_handler,
77        );
78        tokens.extend(persist_tokens);
79
80        let ok_collection =
81            zip_into_diff_pairs(sink_id, sink, &*sink_render, ok_collection.as_collection());
82
83        let (health, sink_tokens) = sink_render.render_sink(
84            storage_state,
85            sink,
86            sink_id,
87            ok_collection,
88            err_collection.as_collection(),
89        );
90        tokens.extend(sink_tokens);
91        (health.leave(), tokens)
92    })
93}
94
95fn zip_into_diff_pairs<G>(
98    sink_id: GlobalId,
99    sink: &StorageSinkDesc<CollectionMetadata, mz_repr::Timestamp>,
100    sink_render: &dyn SinkRender<G>,
101    collection: Collection<G, Row, Diff>,
102) -> Collection<G, (Option<Row>, DiffPair<Row>), Diff>
103where
104    G: Scope<Timestamp = Timestamp>,
105{
106    let user_key_indices = sink_render.get_key_indices();
116    let relation_key_indices = sink_render.get_relation_key_indices();
117    let key_indices = user_key_indices
118        .or(relation_key_indices)
119        .map(|k| k.to_vec());
120    let key_is_synthetic = key_indices.is_none();
121
122    let collection = match key_indices {
123        None => collection.map(|row| (Some(Row::pack(Some(Datum::UInt64(row.hashed())))), row)),
124        Some(key_indices) => {
125            let mut datum_vec = mz_repr::DatumVec::new();
126            collection.map(move |row| {
127                let key = {
130                    let datums = datum_vec.borrow_with(&row);
131                    Row::pack(key_indices.iter().map(|&idx| datums[idx].clone()))
132                };
133                (Some(key), row)
134            })
135        }
136    };
137
138    #[allow(clippy::disallowed_methods)]
143    let mut collection =
144        combine_at_timestamp(collection.arrange_named::<ColValBatcher<_,_,_,_>, ColValBuilder<_,_,_,_>, ColValSpine<_, _, _, _>>("Arrange Sink"));
145
146    if user_key_indices.is_none() {
151        collection = collection.map(|(_key, value)| (None, value))
152    }
153
154    collection.flat_map({
155        let mut last_warning = Instant::now();
156        let from_id = sink.from;
157        move |(mut k, vs)| {
158            if !key_is_synthetic && vs.len() > 1 {
165                let now = Instant::now();
168                if now.duration_since(last_warning) >= Duration::from_secs(10) {
169                    last_warning = now;
170                    warn!(
171                        ?sink_id,
172                        ?from_id,
173                        "primary key error: expected at most one update per key and timestamp; \
174                            this can happen when the configured sink key is not a primary key of \
175                            the sinked relation"
176                    )
177                }
178            }
179
180            let max_idx = vs.len() - 1;
181            vs.into_iter().enumerate().map(move |(idx, dp)| {
182                let k = if idx == max_idx { k.take() } else { k.clone() };
183                (k, dp)
184            })
185        }
186    })
187}
188
189pub(crate) trait SinkRender<G>
191where
192    G: Scope<Timestamp = Timestamp>,
193{
194    fn get_key_indices(&self) -> Option<&[usize]>;
197
198    fn get_relation_key_indices(&self) -> Option<&[usize]>;
201
202    fn render_sink(
204        &self,
205        storage_state: &mut StorageState,
206        sink: &StorageSinkDesc<CollectionMetadata, Timestamp>,
207        sink_id: GlobalId,
208        sinked_collection: Collection<G, (Option<Row>, DiffPair<Row>), Diff>,
209        err_collection: Collection<G, DataflowError, Diff>,
210    ) -> (Stream<G, HealthStatusMessage>, Vec<PressOnDropButton>);
211}
212
213fn get_sink_render_for<G>(connection: &StorageSinkConnection) -> Box<dyn SinkRender<G>>
214where
215    G: Scope<Timestamp = Timestamp>,
216{
217    match connection {
218        StorageSinkConnection::Kafka(connection) => Box::new(connection.clone()),
219        StorageSinkConnection::Iceberg(_) => unimplemented!("iceberg sinks"),
220    }
221}