use std::cell::RefCell;
use std::rc::Rc;
use std::sync::Arc;
use std::time::Duration;
use anyhow::{anyhow, Context};
use differential_dataflow::lattice::Lattice;
use fail::fail_point;
use futures::stream::LocalBoxStream;
use futures::StreamExt;
use mz_ore::vec::VecExt;
use mz_persist_client::cache::PersistClientCache;
use mz_persist_client::error::UpperMismatch;
use mz_persist_client::read::ListenEvent;
use mz_persist_client::write::WriteHandle;
use mz_persist_client::Diagnostics;
use mz_persist_types::codec_impls::UnitSchema;
use mz_persist_types::Codec64;
use mz_repr::{Diff, GlobalId, RelationDesc};
use mz_storage_client::util::remap_handle::{RemapHandle, RemapHandleReader};
use mz_storage_types::controller::CollectionMetadata;
use mz_storage_types::sources::{SourceData, SourceTimestamp};
use timely::order::PartialOrder;
use timely::progress::frontier::Antichain;
use timely::progress::Timestamp;
use tokio::sync::watch;
pub struct PersistHandle<FromTime: SourceTimestamp, IntoTime: Timestamp + Lattice + Codec64> {
events: LocalBoxStream<
'static,
ListenEvent<
IntoTime,
(
(Result<SourceData, String>, Result<(), String>),
IntoTime,
Diff,
),
>,
>,
write_handle: WriteHandle<SourceData, (), IntoTime, Diff>,
read_only_rx: watch::Receiver<bool>,
pending_batch: Vec<(FromTime, IntoTime, Diff)>,
shared_write_frontier: Rc<RefCell<Antichain<IntoTime>>>,
}
impl<FromTime: Timestamp, IntoTime: Timestamp + Sync> PersistHandle<FromTime, IntoTime>
where
FromTime: SourceTimestamp,
IntoTime: Timestamp + Lattice + Codec64,
{
pub async fn new(
persist_clients: Arc<PersistClientCache>,
read_only_rx: watch::Receiver<bool>,
metadata: CollectionMetadata,
as_of: Antichain<IntoTime>,
shared_write_frontier: Rc<RefCell<Antichain<IntoTime>>>,
id: GlobalId,
operator: &str,
worker_id: usize,
worker_count: usize,
remap_relation_desc: RelationDesc,
remap_collection_id: GlobalId,
) -> anyhow::Result<Self> {
let remap_shard = metadata.remap_shard.ok_or_else(|| {
anyhow!("cannot create remap PersistHandle for collection without remap shard")
})?;
let persist_client = persist_clients
.open(metadata.persist_location.clone())
.await
.context("error creating persist client")?;
let (write_handle, mut read_handle) = persist_client
.open(
remap_shard,
Arc::new(remap_relation_desc),
Arc::new(UnitSchema),
Diagnostics {
shard_name: remap_collection_id.to_string(),
handle_purpose: format!("reclock for {}", id),
},
false,
)
.await
.context("error opening persist shard")?;
let upper = write_handle.upper();
let since = read_handle.since();
fail_point!("invalid_remap_as_of");
assert!(
PartialOrder::less_equal(since, &as_of),
"invalid as_of: as_of({as_of:?}) < since({since:?}), \
source {id}, \
remap_shard: {:?}",
metadata.remap_shard
);
assert!(
as_of.elements() == [IntoTime::minimum()] || PartialOrder::less_than(&as_of, upper),
"invalid as_of: upper({upper:?}) <= as_of({as_of:?})",
);
tracing::info!(
?since,
?as_of,
?upper,
"{operator}({id}) {worker_id}/{worker_count} initializing PersistHandle"
);
use futures::stream;
let events = stream::once(async move {
let updates = read_handle
.snapshot_and_fetch(as_of.clone())
.await
.expect("since <= as_of asserted");
let snapshot = stream::once(std::future::ready(ListenEvent::Updates(updates)));
let listener = read_handle
.listen(as_of.clone())
.await
.expect("since <= as_of asserted");
let listen_stream = stream::unfold(listener, |mut listener| async move {
let events = stream::iter(listener.fetch_next().await);
Some((events, listener))
})
.flatten();
snapshot.chain(listen_stream)
})
.flatten()
.boxed_local();
Ok(Self {
events,
write_handle,
read_only_rx,
pending_batch: vec![],
shared_write_frontier,
})
}
}
#[async_trait::async_trait(?Send)]
impl<FromTime, IntoTime> RemapHandleReader for PersistHandle<FromTime, IntoTime>
where
FromTime: SourceTimestamp,
IntoTime: Timestamp + Lattice + Codec64,
{
type FromTime = FromTime;
type IntoTime = IntoTime;
async fn next(
&mut self,
) -> Option<(
Vec<(Self::FromTime, Self::IntoTime, Diff)>,
Antichain<Self::IntoTime>,
)> {
while let Some(event) = self.events.next().await {
match event {
ListenEvent::Progress(new_upper) => {
let batch = self
.pending_batch
.drain_filter_swapping(|(_, ts, _)| !new_upper.less_equal(ts))
.collect();
return Some((batch, new_upper));
}
ListenEvent::Updates(msgs) => {
for ((update, _), into_ts, diff) in msgs {
let from_ts = FromTime::decode_row(
&update.expect("invalid row").0.expect("invalid row"),
);
self.pending_batch.push((from_ts, into_ts, diff));
}
}
}
}
None
}
}
#[async_trait::async_trait(?Send)]
impl<FromTime, IntoTime> RemapHandle for PersistHandle<FromTime, IntoTime>
where
FromTime: SourceTimestamp,
IntoTime: Timestamp + Lattice + Codec64 + Sync,
{
async fn compare_and_append(
&mut self,
updates: Vec<(Self::FromTime, Self::IntoTime, Diff)>,
upper: Antichain<Self::IntoTime>,
new_upper: Antichain<Self::IntoTime>,
) -> Result<(), UpperMismatch<Self::IntoTime>> {
if *self.read_only_rx.borrow() {
loop {
tracing::trace!(
?upper,
?new_upper,
persist_upper = ?self.write_handle.upper(),
"persist remap handle is in read-only mode, waiting until we come out of it or the shard upper advances");
let _ =
tokio::time::timeout(Duration::from_secs(1), self.read_only_rx.changed()).await;
if !*self.read_only_rx.borrow() {
tracing::trace!(
?upper,
?new_upper,
persist_upper = ?self.write_handle.upper(),
"persist remap handle has come out of read-only mode"
);
break;
}
let current_upper = self.write_handle.fetch_recent_upper().await;
if PartialOrder::less_than(&upper, current_upper) {
tracing::trace!(
?upper,
?new_upper,
persist_upper = ?current_upper,
"someone else advanced the upper, aborting write"
);
return Err(UpperMismatch {
current: current_upper.clone(),
expected: upper,
});
}
}
}
let row_updates = updates.into_iter().map(|(from_ts, into_ts, diff)| {
((SourceData(Ok(from_ts.encode_row())), ()), into_ts, diff)
});
match self
.write_handle
.compare_and_append(row_updates, upper, new_upper.clone())
.await
{
Ok(result) => {
*self.shared_write_frontier.borrow_mut() = new_upper;
return result;
}
Err(invalid_use) => panic!("compare_and_append failed: {invalid_use}"),
}
}
fn upper(&self) -> &Antichain<Self::IntoTime> {
self.write_handle.upper()
}
}