use differential_dataflow::consolidation;
use differential_dataflow::lattice::Lattice;
use mz_persist_client::error::UpperMismatch;
use mz_repr::Diff;
use mz_storage_client::util::remap_handle::RemapHandle;
use timely::order::PartialOrder;
use timely::progress::frontier::{Antichain, AntichainRef, MutableAntichain};
use timely::progress::Timestamp;
pub mod compat;
#[derive(Debug)]
pub struct ReclockOperator<
FromTime: Timestamp,
IntoTime: Timestamp + Lattice,
Handle: RemapHandle<FromTime = FromTime, IntoTime = IntoTime>,
> {
upper: Antichain<IntoTime>,
source_upper: MutableAntichain<FromTime>,
remap_handle: Handle,
}
#[derive(Clone, Debug, PartialEq)]
pub struct ReclockBatch<FromTime, IntoTime> {
pub updates: Vec<(FromTime, IntoTime, Diff)>,
pub upper: Antichain<IntoTime>,
}
impl<FromTime, IntoTime, Handle> ReclockOperator<FromTime, IntoTime, Handle>
where
FromTime: Timestamp,
IntoTime: Timestamp + Lattice,
Handle: RemapHandle<FromTime = FromTime, IntoTime = IntoTime>,
{
pub async fn new(remap_handle: Handle) -> (Self, ReclockBatch<FromTime, IntoTime>) {
let upper = remap_handle.upper().clone();
let mut operator = Self {
upper: Antichain::from_elem(IntoTime::minimum()),
source_upper: MutableAntichain::new(),
remap_handle,
};
let trace_batch = if upper.elements() != [IntoTime::minimum()] {
operator.sync(upper.borrow()).await
} else {
ReclockBatch {
updates: vec![],
upper: Antichain::from_elem(IntoTime::minimum()),
}
};
(operator, trace_batch)
}
async fn sync(
&mut self,
target_upper: AntichainRef<'_, IntoTime>,
) -> ReclockBatch<FromTime, IntoTime> {
let mut updates: Vec<(FromTime, IntoTime, Diff)> = Vec::new();
while PartialOrder::less_than(&self.upper.borrow(), &target_upper) {
let (mut batch, upper) = self
.remap_handle
.next()
.await
.expect("requested data after empty antichain");
self.upper = upper;
updates.append(&mut batch);
}
self.source_upper.update_iter(
updates
.iter()
.map(|(src_ts, _dest_ts, diff)| (src_ts.clone(), *diff)),
);
ReclockBatch {
updates,
upper: self.upper.clone(),
}
}
pub async fn mint(
&mut self,
binding_ts: IntoTime,
mut new_into_upper: Antichain<IntoTime>,
new_from_upper: AntichainRef<'_, FromTime>,
) -> ReclockBatch<FromTime, IntoTime> {
assert!(!new_into_upper.less_equal(&binding_ts));
let mut batch = ReclockBatch {
updates: vec![],
upper: self.upper.clone(),
};
while *self.upper == [IntoTime::minimum()]
|| (PartialOrder::less_equal(&self.source_upper.frontier(), &new_from_upper)
&& PartialOrder::less_than(&self.upper, &new_into_upper))
{
if new_from_upper.is_empty() {
new_into_upper = Antichain::new();
}
let binding_ts = if *self.upper == [IntoTime::minimum()] {
IntoTime::minimum()
} else {
binding_ts.clone()
};
let mut updates = vec![];
for src_ts in self.source_upper.frontier().iter().cloned() {
updates.push((src_ts, binding_ts.clone(), -1));
}
for src_ts in new_from_upper.iter().cloned() {
updates.push((src_ts, binding_ts.clone(), 1));
}
consolidation::consolidate_updates(&mut updates);
let new_batch = match self.append_batch(updates, &new_into_upper).await {
Ok(trace_batch) => trace_batch,
Err(UpperMismatch { current, .. }) => self.sync(current.borrow()).await,
};
batch.updates.extend(new_batch.updates);
batch.upper = new_batch.upper;
}
batch
}
async fn append_batch(
&mut self,
updates: Vec<(FromTime, IntoTime, Diff)>,
new_upper: &Antichain<IntoTime>,
) -> Result<ReclockBatch<FromTime, IntoTime>, UpperMismatch<IntoTime>> {
match self
.remap_handle
.compare_and_append(updates, self.upper.clone(), new_upper.clone())
.await
{
Ok(()) => Ok(self.sync(new_upper.borrow()).await),
Err(mismatch) => Err(mismatch),
}
}
}
#[cfg(test)]
mod tests {
use std::cell::RefCell;
use std::rc::Rc;
use std::str::FromStr;
use std::sync::Arc;
use std::sync::LazyLock;
use std::time::Duration;
use mz_build_info::DUMMY_BUILD_INFO;
use mz_ore::metrics::MetricsRegistry;
use mz_ore::now::SYSTEM_TIME;
use mz_ore::url::SensitiveUrl;
use mz_persist_client::cache::PersistClientCache;
use mz_persist_client::cfg::PersistConfig;
use mz_persist_client::rpc::PubSubClientConnection;
use mz_persist_client::{Diagnostics, PersistClient, PersistLocation, ShardId};
use mz_persist_types::codec_impls::UnitSchema;
use mz_repr::{GlobalId, RelationDesc, ScalarType, Timestamp};
use mz_storage_client::util::remap_handle::RemapHandle;
use mz_storage_types::controller::CollectionMetadata;
use mz_storage_types::sources::kafka::{self, RangeBound as RB};
use mz_storage_types::sources::{MzOffset, SourceData};
use mz_timely_util::order::Partitioned;
use timely::progress::Timestamp as _;
use tokio::sync::watch;
use super::*;
static PERSIST_READER_LEASE_TIMEOUT_MS: Duration = Duration::from_secs(60 * 15);
static PERSIST_CACHE: LazyLock<Arc<PersistClientCache>> = LazyLock::new(|| {
let persistcfg = PersistConfig::new_default_configs(&DUMMY_BUILD_INFO, SYSTEM_TIME.clone());
persistcfg.set_reader_lease_duration(PERSIST_READER_LEASE_TIMEOUT_MS);
Arc::new(PersistClientCache::new(
persistcfg,
&MetricsRegistry::new(),
|_, _| PubSubClientConnection::noop(),
))
});
static PROGRESS_DESC: LazyLock<RelationDesc> = LazyLock::new(|| {
RelationDesc::builder()
.with_column(
"partition",
ScalarType::Range {
element_type: Box::new(ScalarType::Numeric { max_scale: None }),
}
.nullable(false),
)
.with_column("offset", ScalarType::UInt64.nullable(true))
.finish()
});
async fn make_test_operator(
shard: ShardId,
as_of: Antichain<Timestamp>,
) -> (
ReclockOperator<
kafka::KafkaTimestamp,
Timestamp,
impl RemapHandle<FromTime = kafka::KafkaTimestamp, IntoTime = Timestamp>,
>,
ReclockBatch<kafka::KafkaTimestamp, Timestamp>,
) {
let metadata = CollectionMetadata {
persist_location: PersistLocation {
blob_uri: SensitiveUrl::from_str("mem://").expect("invalid URL"),
consensus_uri: SensitiveUrl::from_str("mem://").expect("invalid URL"),
},
remap_shard: Some(shard),
data_shard: ShardId::new(),
relation_desc: RelationDesc::empty(),
txns_shard: None,
};
let write_frontier = Rc::new(RefCell::new(Antichain::from_elem(Timestamp::minimum())));
let (_read_only_tx, read_only_rx) = watch::channel(false);
let remap_handle = crate::source::reclock::compat::PersistHandle::new(
Arc::clone(&*PERSIST_CACHE),
read_only_rx,
metadata,
as_of.clone(),
write_frontier,
GlobalId::Explain,
"unittest",
0,
1,
PROGRESS_DESC.clone(),
GlobalId::Explain,
)
.await
.unwrap();
let (mut operator, mut initial_batch) = ReclockOperator::new(remap_handle).await;
if *initial_batch.upper == [Timestamp::minimum()] {
initial_batch = operator
.mint(
0.into(),
Antichain::from_elem(1.into()),
Antichain::from_elem(Partitioned::minimum()).borrow(),
)
.await;
}
(operator, initial_batch)
}
fn partitioned_frontier<I>(items: I) -> Antichain<kafka::KafkaTimestamp>
where
I: IntoIterator<Item = (i32, MzOffset)>,
{
let mut frontier = Antichain::new();
let mut prev = RB::NegInfinity;
for (pid, offset) in items {
assert!(prev < RB::before(pid));
let gap = Partitioned::new_range(prev, RB::before(pid), MzOffset::from(0));
frontier.extend([gap, Partitioned::new_singleton(RB::exact(pid), offset)]);
prev = RB::after(pid);
}
frontier.insert(Partitioned::new_range(
prev,
RB::PosInfinity,
MzOffset::from(0),
));
frontier
}
#[mz_ore::test(tokio::test)]
#[cfg_attr(miri, ignore)] async fn test_basic_usage() {
let (mut operator, _) =
make_test_operator(ShardId::new(), Antichain::from_elem(0.into())).await;
let source_upper = partitioned_frontier([(0, MzOffset::from(4))]);
let mut batch = operator
.mint(
1000.into(),
Antichain::from_elem(1001.into()),
source_upper.borrow(),
)
.await;
let mut expected_batch: ReclockBatch<_, Timestamp> = ReclockBatch {
updates: vec![
(
Partitioned::new_range(RB::NegInfinity, RB::before(0), MzOffset::from(0)),
1000.into(),
1,
),
(
Partitioned::new_range(RB::after(0), RB::PosInfinity, MzOffset::from(0)),
1000.into(),
1,
),
(
Partitioned::new_range(RB::NegInfinity, RB::PosInfinity, MzOffset::from(0)),
1000.into(),
-1,
),
(
Partitioned::new_singleton(RB::exact(0), MzOffset::from(4)),
1000.into(),
1,
),
],
upper: Antichain::from_elem(Timestamp::from(1001)),
};
batch.updates.sort();
expected_batch.updates.sort();
assert_eq!(batch, expected_batch);
}
#[mz_ore::test(tokio::test)]
#[cfg_attr(miri, ignore)] async fn test_compaction() {
let persist_location = PersistLocation {
blob_uri: SensitiveUrl::from_str("mem://").expect("invalid URL"),
consensus_uri: SensitiveUrl::from_str("mem://").expect("invalid URL"),
};
let remap_shard = ShardId::new();
let persist_client = PERSIST_CACHE
.open(persist_location)
.await
.expect("error creating persist client");
let mut remap_read_handle = persist_client
.open_critical_since::<SourceData, (), Timestamp, Diff, u64>(
remap_shard,
PersistClient::CONTROLLER_CRITICAL_SINCE,
Diagnostics::from_purpose("test_since_hold"),
)
.await
.expect("error opening persist shard");
let (mut operator, _batch) =
make_test_operator(remap_shard, Antichain::from_elem(0.into())).await;
let source_upper = partitioned_frontier([(0, MzOffset::from(3))]);
operator
.mint(
1000.into(),
Antichain::from_elem(1001.into()),
source_upper.borrow(),
)
.await;
let source_upper = partitioned_frontier([(0, MzOffset::from(5))]);
operator
.mint(
2000.into(),
Antichain::from_elem(2001.into()),
source_upper.borrow(),
)
.await;
remap_read_handle
.compare_and_downgrade_since(&0, (&0, &Antichain::from_elem(1000.into())))
.await
.unwrap();
let (_operator, mut initial_batch) =
make_test_operator(remap_shard, Antichain::from_elem(1000.into())).await;
let mut expected_batch: ReclockBatch<_, Timestamp> = ReclockBatch {
updates: vec![
(
Partitioned::new_range(RB::NegInfinity, RB::before(0), MzOffset::from(0)),
1000.into(),
1,
),
(
Partitioned::new_range(RB::after(0), RB::PosInfinity, MzOffset::from(0)),
1000.into(),
1,
),
(
Partitioned::new_singleton(RB::exact(0), MzOffset::from(3)),
1000.into(),
1,
),
(
Partitioned::new_singleton(RB::exact(0), MzOffset::from(3)),
2000.into(),
-1,
),
(
Partitioned::new_singleton(RB::exact(0), MzOffset::from(5)),
2000.into(),
1,
),
],
upper: Antichain::from_elem(Timestamp::from(2001)),
};
expected_batch.updates.sort();
initial_batch.updates.sort();
assert_eq!(initial_batch, expected_batch);
}
#[mz_ore::test(tokio::test)]
#[cfg_attr(miri, ignore)] async fn test_concurrency() {
let shared_shard = ShardId::new();
let (mut op_a, _) = make_test_operator(shared_shard, Antichain::from_elem(0.into())).await;
let (mut op_b, _) = make_test_operator(shared_shard, Antichain::from_elem(0.into())).await;
let source_upper = partitioned_frontier([(0, MzOffset::from(3))]);
let mut batch = op_a
.mint(
1000.into(),
Antichain::from_elem(1001.into()),
source_upper.borrow(),
)
.await;
let mut expected_batch: ReclockBatch<_, Timestamp> = ReclockBatch {
updates: vec![
(
Partitioned::new_range(RB::NegInfinity, RB::before(0), MzOffset::from(0)),
1000.into(),
1,
),
(
Partitioned::new_range(RB::after(0), RB::PosInfinity, MzOffset::from(0)),
1000.into(),
1,
),
(
Partitioned::new_range(RB::NegInfinity, RB::PosInfinity, MzOffset::from(0)),
1000.into(),
-1,
),
(
Partitioned::new_singleton(RB::exact(0), MzOffset::from(3)),
1000.into(),
1,
),
],
upper: Antichain::from_elem(Timestamp::from(1001)),
};
batch.updates.sort();
expected_batch.updates.sort();
assert_eq!(batch, expected_batch);
let source_upper = partitioned_frontier([(0, MzOffset::from(5))]);
let mut batch = op_b
.mint(
11000.into(),
Antichain::from_elem(11001.into()),
source_upper.borrow(),
)
.await;
expected_batch.updates.extend([
(
Partitioned::new_singleton(RB::exact(0), MzOffset::from(3)),
11000.into(),
-1,
),
(
Partitioned::new_singleton(RB::exact(0), MzOffset::from(5)),
11000.into(),
1,
),
]);
expected_batch.upper = Antichain::from_elem(Timestamp::from(11001));
batch.updates.sort();
expected_batch.updates.sort();
assert_eq!(batch, expected_batch);
}
#[mz_ore::test(tokio::test(start_paused = true))]
#[cfg_attr(miri, ignore)] async fn test_since_hold() {
let binding_shard = ShardId::new();
let (mut operator, _) =
make_test_operator(binding_shard, Antichain::from_elem(0.into())).await;
tokio::time::advance(PERSIST_READER_LEASE_TIMEOUT_MS / 2 + Duration::from_millis(1)).await;
let source_upper = partitioned_frontier([(0, MzOffset::from(3))]);
let _ = operator
.mint(
1000.into(),
Antichain::from_elem(1001.into()),
source_upper.borrow(),
)
.await;
tokio::time::advance(PERSIST_READER_LEASE_TIMEOUT_MS / 2 + Duration::from_millis(1)).await;
let source_upper = partitioned_frontier([(0, MzOffset::from(5))]);
let _ = operator
.mint(
2000.into(),
Antichain::from_elem(2001.into()),
source_upper.borrow(),
)
.await;
tokio::time::sleep(Duration::from_millis(1)).await;
let (_operator, _batch) =
make_test_operator(binding_shard, Antichain::from_elem(0.into())).await;
let persist_location = PersistLocation {
blob_uri: SensitiveUrl::from_str("mem://").expect("invalid URL"),
consensus_uri: SensitiveUrl::from_str("mem://").expect("invalid URL"),
};
let persist_client = PERSIST_CACHE
.open(persist_location)
.await
.expect("error creating persist client");
let read_handle = persist_client
.open_leased_reader::<SourceData, (), Timestamp, Diff>(
binding_shard,
Arc::new(PROGRESS_DESC.clone()),
Arc::new(UnitSchema),
Diagnostics::from_purpose("test_since_hold"),
true,
)
.await
.expect("error opening persist shard");
assert_eq!(
Antichain::from_elem(0.into()),
read_handle.since().to_owned()
);
}
}