use std::collections::{BTreeMap, BTreeSet};
use std::fmt::Debug;
use std::fmt::Write;
use std::sync::Arc;
use differential_dataflow::lattice::Lattice;
use futures::future::BoxFuture;
use futures::stream::FuturesUnordered;
use futures::{FutureExt, StreamExt};
use itertools::Itertools;
use mz_ore::tracing::OpenTelemetryContext;
use mz_persist_client::write::WriteHandle;
use mz_persist_client::ShardId;
use mz_persist_types::Codec64;
use mz_repr::{Diff, GlobalId, TimestampManipulation};
use mz_storage_client::client::{TimestamplessUpdate, Update};
use mz_storage_types::controller::{InvalidUpper, TxnsCodecRow};
use mz_storage_types::sources::SourceData;
use mz_txn_wal::txns::{Tidy, TxnsHandle};
use timely::order::TotalOrder;
use timely::progress::{Antichain, Timestamp};
use tokio::sync::mpsc::UnboundedSender;
use tokio::sync::oneshot;
use tracing::{debug, info_span, Instrument, Span};
use crate::{PersistEpoch, StorageError};
mod read_only_table_worker;
#[derive(Debug, Clone)]
pub struct PersistTableWriteWorker<T: Timestamp + Lattice + Codec64 + TimestampManipulation> {
inner: Arc<PersistTableWriteWorkerInner<T>>,
}
#[derive(Debug)]
enum PersistTableWriteCmd<T: Timestamp + Lattice + Codec64> {
Register(
T,
Vec<(GlobalId, WriteHandle<SourceData, (), T, Diff>)>,
tokio::sync::oneshot::Sender<()>,
),
Update {
table_id: GlobalId,
forget_ts: T,
register_ts: T,
handle: WriteHandle<SourceData, (), T, Diff>,
tx: oneshot::Sender<()>,
},
DropHandles {
forget_ts: T,
ids: Vec<GlobalId>,
tx: oneshot::Sender<()>,
},
Append {
write_ts: T,
advance_to: T,
updates: Vec<(GlobalId, Vec<TimestamplessUpdate>)>,
tx: tokio::sync::oneshot::Sender<Result<(), StorageError<T>>>,
},
Shutdown,
}
impl<T: Timestamp + Lattice + Codec64> PersistTableWriteCmd<T> {
fn name(&self) -> &'static str {
match self {
PersistTableWriteCmd::Register(_, _, _) => "PersistTableWriteCmd::Register",
PersistTableWriteCmd::Update { .. } => "PersistTableWriteCmd::Update",
PersistTableWriteCmd::DropHandles { .. } => "PersistTableWriteCmd::DropHandle",
PersistTableWriteCmd::Append { .. } => "PersistTableWriteCmd::Append",
PersistTableWriteCmd::Shutdown => "PersistTableWriteCmd::Shutdown",
}
}
}
async fn append_work<T2: Timestamp + Lattice + Codec64 + Sync>(
write_handles: &mut BTreeMap<GlobalId, WriteHandle<SourceData, (), T2, Diff>>,
mut commands: BTreeMap<
GlobalId,
(tracing::Span, Vec<Update<T2>>, Antichain<T2>, Antichain<T2>),
>,
) -> Result<(), Vec<(GlobalId, Antichain<T2>)>> {
let futs = FuturesUnordered::new();
for (id, write) in write_handles.iter_mut() {
if let Some((span, updates, expected_upper, new_upper)) = commands.remove(id) {
let updates = updates
.into_iter()
.map(|u| ((SourceData(Ok(u.row)), ()), u.timestamp, u.diff));
futs.push(async move {
write
.compare_and_append(updates.clone(), expected_upper.clone(), new_upper.clone())
.instrument(span.clone())
.await
.expect("cannot append updates")
.or_else(|upper_mismatch| Err((*id, upper_mismatch.current)))?;
Ok::<_, (GlobalId, Antichain<T2>)>((*id, new_upper))
})
}
}
let (_new_uppers, failed_appends): (Vec<_>, Vec<_>) = futs
.collect::<Vec<_>>()
.await
.into_iter()
.partition_result();
if failed_appends.is_empty() {
Ok(())
} else {
Err(failed_appends)
}
}
impl<T: Timestamp + Lattice + Codec64 + TimestampManipulation> PersistTableWriteWorker<T> {
pub(crate) fn new_read_only_mode(txns_handle: WriteHandle<SourceData, (), T, Diff>) -> Self {
let (tx, rx) =
tokio::sync::mpsc::unbounded_channel::<(tracing::Span, PersistTableWriteCmd<T>)>();
mz_ore::task::spawn(
|| "PersistTableWriteWorker",
read_only_table_worker::read_only_mode_table_worker(rx, txns_handle),
);
Self {
inner: Arc::new(PersistTableWriteWorkerInner::new(tx)),
}
}
pub(crate) fn new_txns(
txns: TxnsHandle<SourceData, (), T, i64, PersistEpoch, TxnsCodecRow>,
) -> Self {
let (tx, rx) =
tokio::sync::mpsc::unbounded_channel::<(tracing::Span, PersistTableWriteCmd<T>)>();
mz_ore::task::spawn(|| "PersistTableWriteWorker", async move {
let mut worker = TxnsTableWorker {
txns,
write_handles: BTreeMap::new(),
tidy: Tidy::default(),
};
worker.run(rx).await
});
Self {
inner: Arc::new(PersistTableWriteWorkerInner::new(tx)),
}
}
pub(crate) fn register(
&self,
register_ts: T,
ids_handles: Vec<(GlobalId, WriteHandle<SourceData, (), T, Diff>)>,
) -> tokio::sync::oneshot::Receiver<()> {
let span = info_span!("PersistTableWriteCmd::Register");
let (tx, rx) = tokio::sync::oneshot::channel();
let cmd = PersistTableWriteCmd::Register(register_ts, ids_handles, tx);
self.inner.send_with_span(span, cmd);
rx
}
#[allow(dead_code)]
pub(crate) fn update(
&self,
table_id: GlobalId,
forget_ts: T,
register_ts: T,
handle: WriteHandle<SourceData, (), T, Diff>,
) -> oneshot::Receiver<()> {
let (tx, rx) = oneshot::channel();
self.send(PersistTableWriteCmd::Update {
table_id,
forget_ts,
register_ts,
handle,
tx,
});
rx
}
pub(crate) fn append(
&self,
write_ts: T,
advance_to: T,
updates: Vec<(GlobalId, Vec<TimestamplessUpdate>)>,
) -> tokio::sync::oneshot::Receiver<Result<(), StorageError<T>>> {
let (tx, rx) = tokio::sync::oneshot::channel();
if updates.is_empty() {
tx.send(Ok(()))
.expect("rx has not been dropped at this point");
rx
} else {
self.send(PersistTableWriteCmd::Append {
write_ts,
advance_to,
updates,
tx,
});
rx
}
}
pub(crate) fn drop_handles(&self, ids: Vec<GlobalId>, forget_ts: T) -> BoxFuture<'static, ()> {
let (tx, rx) = oneshot::channel();
self.send(PersistTableWriteCmd::DropHandles { forget_ts, ids, tx });
Box::pin(rx.map(|_| ()))
}
fn send(&self, cmd: PersistTableWriteCmd<T>) {
self.inner.send(cmd);
}
}
struct TxnsTableWorker<T: Timestamp + Lattice + TotalOrder + Codec64> {
txns: TxnsHandle<SourceData, (), T, i64, PersistEpoch, TxnsCodecRow>,
write_handles: BTreeMap<GlobalId, ShardId>,
tidy: Tidy,
}
impl<T: Timestamp + Lattice + Codec64 + TimestampManipulation> TxnsTableWorker<T> {
async fn run(
&mut self,
mut rx: tokio::sync::mpsc::UnboundedReceiver<(Span, PersistTableWriteCmd<T>)>,
) {
while let Some((span, command)) = rx.recv().await {
match command {
PersistTableWriteCmd::Register(register_ts, ids_handles, tx) => {
self.register(register_ts, ids_handles)
.instrument(span)
.await;
let _ = tx.send(());
}
PersistTableWriteCmd::Update {
table_id,
forget_ts,
register_ts,
handle,
tx,
} => {
async {
self.drop_handles(vec![table_id], forget_ts).await;
self.register(register_ts, vec![(table_id, handle)]).await;
}
.instrument(span)
.await;
let _ = tx.send(());
}
PersistTableWriteCmd::DropHandles { forget_ts, ids, tx } => {
self.drop_handles(ids, forget_ts).instrument(span).await;
let _ = tx.send(());
}
PersistTableWriteCmd::Append {
write_ts,
advance_to,
updates,
tx,
} => {
self.append(write_ts, advance_to, updates, tx)
.instrument(span)
.await
}
PersistTableWriteCmd::Shutdown => {
tracing::info!("PersistTableWriteWorker shutting down via command");
return;
}
}
}
tracing::info!("PersistTableWriteWorker shutting down via input exhaustion");
}
async fn register(
&mut self,
register_ts: T,
ids_handles: Vec<(GlobalId, WriteHandle<SourceData, (), T, i64>)>,
) {
for (id, write_handle) in ids_handles.iter() {
debug!(
"tables register {} {:.9}",
id,
write_handle.shard_id().to_string()
);
let previous = self.write_handles.insert(*id, write_handle.shard_id());
if previous.is_some() {
panic!("already registered a WriteHandle for collection {:?}", id);
}
}
let new_ids = ids_handles.iter().map(|(id, _)| *id).collect_vec();
let handles = ids_handles.into_iter().map(|(_, handle)| handle);
let res = self.txns.register(register_ts.clone(), handles).await;
match res {
Ok(tidy) => {
self.tidy.merge(tidy);
}
Err(current) => {
panic!(
"cannot register {:?} at {:?} because txns is at {:?}",
new_ids, register_ts, current
);
}
}
}
async fn drop_handles(&mut self, ids: Vec<GlobalId>, forget_ts: T) {
tracing::info!(?ids, "drop tables");
let data_ids = ids
.iter()
.filter_map(|id| self.write_handles.remove(id))
.collect::<Vec<_>>();
if !data_ids.is_empty() {
match self.txns.forget(forget_ts.clone(), data_ids.clone()).await {
Ok(tidy) => {
self.tidy.merge(tidy);
}
Err(current) => {
panic!(
"cannot forget {:?} at {:?} because txns is at {:?}",
ids, forget_ts, current
);
}
}
}
}
async fn append(
&mut self,
write_ts: T,
advance_to: T,
updates: Vec<(GlobalId, Vec<TimestamplessUpdate>)>,
tx: tokio::sync::oneshot::Sender<Result<(), StorageError<T>>>,
) {
debug!(
"tables append timestamp={:?} advance_to={:?} len={} ids={:?}{}",
write_ts,
advance_to,
updates.iter().flat_map(|(_, x)| x).count(),
updates
.iter()
.map(|(x, _)| x.to_string())
.collect::<BTreeSet<_>>(),
updates.iter().filter(|(_, v)| !v.is_empty()).fold(
String::new(),
|mut output, (k, v)| {
let _ = write!(output, "\n {}: {:?}", k, v.first());
output
}
)
);
assert_eq!(
advance_to,
mz_persist_types::StepForward::step_forward(&write_ts)
);
let mut txn = self.txns.begin();
for (id, updates) in updates {
let Some(data_id) = self.write_handles.get(&id) else {
assert!(updates.is_empty(), "{}: {:?}", id, updates);
continue;
};
for update in updates {
let () = txn
.write(data_id, SourceData(Ok(update.row)), (), update.diff)
.await;
}
}
txn.tidy(std::mem::take(&mut self.tidy));
let txn_res = txn.commit_at(&mut self.txns, write_ts.clone()).await;
let response = match txn_res {
Ok(apply) => {
debug!("applying {:?}", apply);
let tidy = apply.apply(&mut self.txns).await;
self.tidy.merge(tidy);
let () = self.txns.compact_to(write_ts).await;
Ok(())
}
Err(current) => {
self.tidy.merge(txn.take_tidy());
debug!(
"unable to commit txn at {:?} current={:?}",
write_ts, current
);
Err(StorageError::InvalidUppers(
self.write_handles
.keys()
.copied()
.map(|id| InvalidUpper {
id,
current_upper: Antichain::from_elem(current.clone()),
})
.collect(),
))
}
};
let _ = tx.send(response);
}
}
#[derive(Debug)]
struct PersistTableWriteWorkerInner<T: Timestamp + Lattice + Codec64 + TimestampManipulation> {
tx: UnboundedSender<(tracing::Span, PersistTableWriteCmd<T>)>,
}
impl<T> Drop for PersistTableWriteWorkerInner<T>
where
T: Timestamp + Lattice + Codec64 + TimestampManipulation,
{
fn drop(&mut self) {
self.send(PersistTableWriteCmd::Shutdown);
}
}
impl<T> PersistTableWriteWorkerInner<T>
where
T: Timestamp + Lattice + Codec64 + TimestampManipulation,
{
fn new(tx: UnboundedSender<(tracing::Span, PersistTableWriteCmd<T>)>) -> Self {
PersistTableWriteWorkerInner { tx }
}
fn send(&self, cmd: PersistTableWriteCmd<T>) {
let span =
info_span!(parent: None, "PersistTableWriteWorkerInner::send", otel.name = cmd.name());
OpenTelemetryContext::obtain().attach_as_parent_to(&span);
self.send_with_span(span, cmd)
}
fn send_with_span(&self, span: Span, cmd: PersistTableWriteCmd<T>) {
match self.tx.send((span, cmd)) {
Ok(()) => (), Err(e) => {
tracing::trace!("could not forward command: {:?}", e);
}
}
}
}