use std::rc::Rc;
use std::sync::Arc;
use std::time::{Duration, Instant};
use differential_dataflow::operators::arrange::Arrange;
use differential_dataflow::trace::implementations::ord_neu::{
ColValBatcher, ColValBuilder, ColValSpine,
};
use differential_dataflow::{AsCollection, Collection, Hashable};
use mz_interchange::avro::DiffPair;
use mz_interchange::envelopes::combine_at_timestamp;
use mz_persist_client::operators::shard_source::SnapshotMode;
use mz_repr::{Datum, Diff, GlobalId, Row, Timestamp};
use mz_storage_operators::persist_source;
use mz_storage_types::errors::DataflowError;
use mz_storage_types::sinks::{MetadataFilled, StorageSinkConnection, StorageSinkDesc};
use mz_timely_util::builder_async::PressOnDropButton;
use timely::dataflow::operators::Leave;
use timely::dataflow::scopes::Child;
use timely::dataflow::{Scope, Stream};
use tracing::warn;
use crate::healthcheck::HealthStatusMessage;
use crate::internal_control::InternalStorageCommand;
use crate::storage_state::StorageState;
pub(crate) fn render_sink<'g, G: Scope<Timestamp = ()>>(
scope: &mut Child<'g, G, mz_repr::Timestamp>,
storage_state: &mut StorageState,
sink_id: GlobalId,
sink: &StorageSinkDesc<MetadataFilled, mz_repr::Timestamp>,
) -> (Stream<G, HealthStatusMessage>, Vec<PressOnDropButton>) {
let sink_render = get_sink_render_for(&sink.connection);
let mut tokens = vec![];
let snapshot_mode = if sink.with_snapshot {
SnapshotMode::Include
} else {
SnapshotMode::Exclude
};
let command_tx = Rc::clone(&storage_state.internal_cmd_tx);
let (ok_collection, err_collection, persist_tokens) = persist_source::persist_source(
scope,
sink.from,
Arc::clone(&storage_state.persist_clients),
&storage_state.txns_ctx,
storage_state.storage_configuration.config_set(),
sink.from_storage_metadata.clone(),
Some(sink.as_of.clone()),
snapshot_mode,
timely::progress::Antichain::new(),
None,
None,
async {},
move |error| {
Box::pin(async move {
let error = format!("storage_sink: {error}");
tracing::info!("{error}");
let mut command_tx = command_tx.borrow_mut();
command_tx.broadcast(InternalStorageCommand::SuspendAndRestart {
id: sink_id,
reason: error,
});
})
},
);
tokens.extend(persist_tokens);
let ok_collection =
zip_into_diff_pairs(sink_id, sink, &*sink_render, ok_collection.as_collection());
let (health, sink_tokens) = sink_render.render_sink(
storage_state,
sink,
sink_id,
ok_collection,
err_collection.as_collection(),
);
tokens.extend(sink_tokens);
(health.leave(), tokens)
}
fn zip_into_diff_pairs<G>(
sink_id: GlobalId,
sink: &StorageSinkDesc<MetadataFilled, mz_repr::Timestamp>,
sink_render: &dyn SinkRender<G>,
collection: Collection<G, Row, Diff>,
) -> Collection<G, (Option<Row>, DiffPair<Row>), Diff>
where
G: Scope<Timestamp = Timestamp>,
{
let user_key_indices = sink_render.get_key_indices();
let relation_key_indices = sink_render.get_relation_key_indices();
let key_indices = user_key_indices
.or(relation_key_indices)
.map(|k| k.to_vec());
let key_is_synthetic = key_indices.is_none();
let collection = match key_indices {
None => collection.map(|row| (Some(Row::pack(Some(Datum::UInt64(row.hashed())))), row)),
Some(key_indices) => {
let mut datum_vec = mz_repr::DatumVec::new();
collection.map(move |row| {
let key = {
let datums = datum_vec.borrow_with(&row);
Row::pack(key_indices.iter().map(|&idx| datums[idx].clone()))
};
(Some(key), row)
})
}
};
#[allow(clippy::disallowed_methods)]
let mut collection =
combine_at_timestamp(collection.arrange_named::<ColValBatcher<_,_,_,_>, ColValBuilder<_,_,_,_>, ColValSpine<_, _, _, _>>("Arrange Sink"));
if user_key_indices.is_none() {
collection = collection.map(|(_key, value)| (None, value))
}
collection.flat_map({
let mut last_warning = Instant::now();
let from_id = sink.from;
move |(mut k, vs)| {
if !key_is_synthetic && vs.len() > 1 {
let now = Instant::now();
if now.duration_since(last_warning) >= Duration::from_secs(10) {
last_warning = now;
warn!(
?sink_id,
?from_id,
"primary key error: expected at most one update per key and timestamp; \
this can happen when the configured sink key is not a primary key of \
the sinked relation"
)
}
}
let max_idx = vs.len() - 1;
vs.into_iter().enumerate().map(move |(idx, dp)| {
let k = if idx == max_idx { k.take() } else { k.clone() };
(k, dp)
})
}
})
}
pub(crate) trait SinkRender<G>
where
G: Scope<Timestamp = Timestamp>,
{
fn get_key_indices(&self) -> Option<&[usize]>;
fn get_relation_key_indices(&self) -> Option<&[usize]>;
fn render_sink(
&self,
storage_state: &mut StorageState,
sink: &StorageSinkDesc<MetadataFilled, Timestamp>,
sink_id: GlobalId,
sinked_collection: Collection<G, (Option<Row>, DiffPair<Row>), Diff>,
err_collection: Collection<G, DataflowError, Diff>,
) -> (Stream<G, HealthStatusMessage>, Vec<PressOnDropButton>);
}
fn get_sink_render_for<G>(connection: &StorageSinkConnection) -> Box<dyn SinkRender<G>>
where
G: Scope<Timestamp = Timestamp>,
{
match connection {
StorageSinkConnection::Kafka(connection) => Box::new(connection.clone()),
}
}