use std::collections::VecDeque;
use std::fmt::Debug;
use std::marker::PhantomData;
use std::ops::Range;
use std::sync::Arc;
use std::time::Instant;
use bytes::Bytes;
use differential_dataflow::consolidation::consolidate_updates;
use differential_dataflow::difference::Semigroup;
use differential_dataflow::lattice::Lattice;
use differential_dataflow::trace::Description;
use mz_ore::cast::CastFrom;
use mz_ore::task::JoinHandleExt;
use mz_persist::indexed::columnar::{ColumnarRecords, ColumnarRecordsBuilder};
use mz_persist::indexed::encoding::BlobTraceBatchPart;
use mz_persist::location::{Atomicity, Blob};
use mz_persist_types::stats::{trim_to_budget, truncate_bytes, TruncateBound, TRUNCATE_LEN};
use mz_persist_types::{Codec, Codec64};
use mz_proto::{RustType, TryFromProtoError};
use mz_timely_util::order::Reverse;
use proptest_derive::Arbitrary;
use semver::Version;
use timely::progress::{Antichain, Timestamp};
use timely::PartialOrder;
use tokio::task::JoinHandle;
use tracing::{debug_span, error, instrument, trace_span, warn, Instrument};
use crate::async_runtime::IsolatedRuntime;
use crate::cfg::ProtoUntrimmableColumns;
use crate::error::InvalidUsage;
use crate::internal::encoding::{LazyPartStats, Schemas};
use crate::internal::machine::retry_external;
use crate::internal::metrics::{BatchWriteMetrics, Metrics, ShardMetrics};
use crate::internal::paths::{PartId, PartialBatchKey, WriterKey};
use crate::internal::state::{HollowBatch, HollowBatchPart};
use crate::stats::PartStats;
use crate::write::WriterId;
use crate::{PersistConfig, ShardId};
include!(concat!(env!("OUT_DIR"), "/mz_persist_client.batch.rs"));
#[derive(Debug)]
pub struct Batch<K, V, T, D>
where
T: Timestamp + Lattice + Codec64,
{
pub(crate) shard_id: ShardId,
pub(crate) version: Version,
pub(crate) batch: HollowBatch<T>,
pub(crate) _blob: Arc<dyn Blob + Send + Sync>,
pub(crate) _phantom: PhantomData<(K, V, T, D)>,
}
impl<K, V, T, D> Drop for Batch<K, V, T, D>
where
T: Timestamp + Lattice + Codec64,
{
fn drop(&mut self) {
if self.batch.parts.len() > 0 {
warn!(
"un-consumed Batch, with {} dangling blob keys: {:?}",
self.batch.parts.len(),
self.batch
.parts
.iter()
.map(|x| &x.key.0)
.collect::<Vec<_>>(),
);
}
}
}
impl<K, V, T, D> Batch<K, V, T, D>
where
K: Debug + Codec,
V: Debug + Codec,
T: Timestamp + Lattice + Codec64,
D: Semigroup + Codec64,
{
pub(crate) fn new(
blob: Arc<dyn Blob + Send + Sync>,
shard_id: ShardId,
version: Version,
batch: HollowBatch<T>,
) -> Self {
Self {
shard_id,
version,
batch,
_blob: blob,
_phantom: PhantomData,
}
}
pub fn shard_id(&self) -> ShardId {
self.shard_id
}
pub fn upper(&self) -> &Antichain<T> {
self.batch.desc.upper()
}
pub fn lower(&self) -> &Antichain<T> {
self.batch.desc.lower()
}
pub(crate) fn mark_consumed(&mut self) {
self.batch.parts.clear();
}
#[instrument(level = "debug", skip_all, fields(shard = %self.shard_id))]
pub async fn delete(mut self) {
self.batch.parts.clear();
}
pub fn into_hollow_batch(mut self) -> HollowBatch<T> {
let ret = self.batch.clone();
self.mark_consumed();
ret
}
pub fn into_transmittable_batch(mut self) -> ProtoBatch {
let ret = ProtoBatch {
shard_id: self.shard_id.into_proto(),
version: self.version.to_string(),
batch: Some(self.batch.into_proto()),
};
self.mark_consumed();
ret
}
}
#[derive(Debug)]
pub enum Added {
Record,
RecordAndParts,
}
#[derive(Debug, Clone)]
pub struct BatchBuilderConfig {
writer_key: WriterKey,
pub(crate) blob_target_size: usize,
pub(crate) batch_builder_max_outstanding_parts: usize,
pub(crate) stats_collection_enabled: bool,
pub(crate) stats_budget: usize,
pub(crate) stats_untrimmable_columns: Arc<UntrimmableColumns>,
}
impl BatchBuilderConfig {
pub fn new(value: &PersistConfig, _writer_id: &WriterId) -> Self {
let writer_key = WriterKey::for_version(&value.build_version);
BatchBuilderConfig {
writer_key,
blob_target_size: value.dynamic.blob_target_size(),
batch_builder_max_outstanding_parts: value
.dynamic
.batch_builder_max_outstanding_parts(),
stats_collection_enabled: value.dynamic.stats_collection_enabled(),
stats_budget: value.dynamic.stats_budget_bytes(),
stats_untrimmable_columns: Arc::new(value.dynamic.stats_untrimmable_columns()),
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize, Arbitrary)]
pub struct UntrimmableColumns {
pub equals: Vec<String>,
pub prefixes: Vec<String>,
pub suffixes: Vec<String>,
}
impl UntrimmableColumns {
pub(crate) fn should_retain(&self, name: &str) -> bool {
let name_lower = name.to_lowercase();
for s in &self.equals {
if *s == name_lower {
return true;
}
}
for s in &self.prefixes {
if name_lower.starts_with(s) {
return true;
}
}
for s in &self.suffixes {
if name_lower.ends_with(s) {
return true;
}
}
false
}
}
impl RustType<ProtoUntrimmableColumns> for UntrimmableColumns {
fn into_proto(&self) -> ProtoUntrimmableColumns {
ProtoUntrimmableColumns {
equals: self.equals.into_proto(),
prefixes: self.prefixes.into_proto(),
suffixes: self.suffixes.into_proto(),
}
}
fn from_proto(proto: ProtoUntrimmableColumns) -> Result<Self, TryFromProtoError> {
Ok(Self {
equals: proto.equals.into_proto(),
prefixes: proto.prefixes.into_proto(),
suffixes: proto.suffixes.into_proto(),
})
}
}
#[derive(Debug)]
pub struct BatchBuilder<K, V, T, D>
where
K: Codec,
V: Codec,
T: Timestamp + Lattice + Codec64,
{
pub(crate) stats_schemas: Schemas<K, V>,
pub(crate) builder: BatchBuilderInternal<K, V, T, D>,
}
impl<K, V, T, D> BatchBuilder<K, V, T, D>
where
K: Debug + Codec,
V: Debug + Codec,
T: Timestamp + Lattice + Codec64,
D: Semigroup + Codec64,
{
pub async fn finish(
self,
registered_upper: Antichain<T>,
) -> Result<Batch<K, V, T, D>, InvalidUsage<T>> {
self.builder
.finish(&self.stats_schemas, registered_upper)
.await
}
pub async fn add(
&mut self,
key: &K,
val: &V,
ts: &T,
diff: &D,
) -> Result<Added, InvalidUsage<T>> {
self.builder
.add(&self.stats_schemas, key, val, ts, diff)
.await
}
}
#[derive(Debug)]
pub(crate) struct BatchBuilderInternal<K, V, T, D>
where
K: Codec,
V: Codec,
T: Timestamp + Lattice + Codec64,
{
lower: Antichain<T>,
inclusive_upper: Antichain<Reverse<T>>,
shard_id: ShardId,
version: Version,
blob: Arc<dyn Blob + Send + Sync>,
metrics: Arc<Metrics>,
_schemas: Schemas<K, V>,
consolidate: bool,
buffer: BatchBuffer<T, D>,
max_kvt_in_run: Option<(Vec<u8>, Vec<u8>, T)>,
runs: Vec<usize>,
parts_written: usize,
num_updates: usize,
parts: BatchParts<T>,
since: Antichain<T>,
inline_upper: Antichain<T>,
_phantom: PhantomData<(K, V, T, D)>,
}
impl<K, V, T, D> BatchBuilderInternal<K, V, T, D>
where
K: Debug + Codec,
V: Debug + Codec,
T: Timestamp + Lattice + Codec64,
D: Semigroup + Codec64,
{
pub(crate) fn new(
cfg: BatchBuilderConfig,
metrics: Arc<Metrics>,
shard_metrics: Arc<ShardMetrics>,
schemas: Schemas<K, V>,
batch_write_metrics: BatchWriteMetrics,
lower: Antichain<T>,
blob: Arc<dyn Blob + Send + Sync>,
isolated_runtime: Arc<IsolatedRuntime>,
shard_id: ShardId,
version: Version,
since: Antichain<T>,
inline_upper: Option<Antichain<T>>,
consolidate: bool,
) -> Self {
let parts = BatchParts::new(
cfg.clone(),
Arc::clone(&metrics),
shard_metrics,
shard_id,
lower.clone(),
Arc::clone(&blob),
isolated_runtime,
&batch_write_metrics,
consolidate,
);
Self {
lower,
inclusive_upper: Antichain::new(),
blob,
buffer: BatchBuffer::new(
Arc::clone(&metrics),
batch_write_metrics,
cfg.blob_target_size,
consolidate,
),
metrics,
_schemas: schemas,
consolidate,
max_kvt_in_run: None,
parts_written: 0,
runs: Vec::new(),
num_updates: 0,
parts,
shard_id,
version,
since,
inline_upper: inline_upper.unwrap_or_else(|| Antichain::new()),
_phantom: PhantomData,
}
}
#[instrument(level = "debug", name = "batch::finish", skip_all, fields(shard = %self.shard_id))]
pub async fn finish<StatsK: Codec, StatsV: Codec>(
mut self,
stats_schemas: &Schemas<StatsK, StatsV>,
registered_upper: Antichain<T>,
) -> Result<Batch<K, V, T, D>, InvalidUsage<T>> {
if PartialOrder::less_than(®istered_upper, &self.lower) {
return Err(InvalidUsage::InvalidBounds {
lower: self.lower.clone(),
upper: registered_upper,
});
}
if PartialOrder::less_equal(&self.since, &self.lower) {
for ts in self.inclusive_upper.iter() {
if registered_upper.less_equal(&ts.0) {
return Err(InvalidUsage::UpdateBeyondUpper {
ts: ts.0.clone(),
expected_upper: registered_upper.clone(),
});
}
}
}
let remainder = self.buffer.drain();
self.flush_part(stats_schemas, remainder).await;
let parts = self.parts.finish().await;
let desc = Description::new(self.lower, registered_upper, self.since);
let batch = Batch::new(
self.blob,
self.shard_id.clone(),
self.version,
HollowBatch {
desc,
parts,
len: self.num_updates,
runs: self.runs,
},
);
Ok(batch)
}
pub async fn add<StatsK: Codec, StatsV: Codec>(
&mut self,
stats_schemas: &Schemas<StatsK, StatsV>,
key: &K,
val: &V,
ts: &T,
diff: &D,
) -> Result<Added, InvalidUsage<T>> {
if !self.lower.less_equal(ts) {
return Err(InvalidUsage::UpdateNotBeyondLower {
ts: ts.clone(),
lower: self.lower.clone(),
});
}
self.inclusive_upper.insert(Reverse(ts.clone()));
match self.buffer.push(key, val, ts.clone(), diff.clone()) {
Some(part_to_flush) => {
self.flush_part(stats_schemas, part_to_flush).await;
Ok(Added::RecordAndParts)
}
None => Ok(Added::Record),
}
}
async fn flush_part<StatsK: Codec, StatsV: Codec>(
&mut self,
stats_schemas: &Schemas<StatsK, StatsV>,
columnar: ColumnarRecords,
) {
let num_updates = columnar.len();
if num_updates == 0 {
return;
}
if self.consolidate {
let ((min_part_k, min_part_v), min_part_t, _d) =
columnar.get(0).expect("num updates is greater than zero");
let min_part_t = T::decode(min_part_t);
let ((max_part_k, max_part_v), max_part_t, _d) = columnar
.get(num_updates.saturating_sub(1))
.expect("num updates is greater than zero");
let max_part_t = T::decode(max_part_t);
if let Some((max_run_k, max_run_v, max_run_t)) = &mut self.max_kvt_in_run {
if (min_part_k, min_part_v, &min_part_t) < (max_run_k, max_run_v, max_run_t) {
self.runs.push(self.parts_written);
}
max_run_k.clear();
max_run_v.clear();
max_run_k.extend_from_slice(max_part_k);
max_run_v.extend_from_slice(max_part_v);
*max_run_t = max_part_t;
} else {
self.max_kvt_in_run = Some((max_part_k.to_vec(), max_part_v.to_vec(), max_part_t));
}
} else {
if self.parts_written > 0 {
self.runs.push(self.parts_written);
}
}
let start = Instant::now();
self.parts
.write(
stats_schemas,
columnar,
self.inline_upper.clone(),
self.since.clone(),
)
.await;
self.metrics
.compaction
.batch
.step_part_writing
.inc_by(start.elapsed().as_secs_f64());
self.parts_written += 1;
self.num_updates += num_updates;
}
}
#[derive(Debug)]
struct BatchBuffer<T, D> {
metrics: Arc<Metrics>,
batch_write_metrics: BatchWriteMetrics,
blob_target_size: usize,
consolidate: bool,
key_buf: Vec<u8>,
val_buf: Vec<u8>,
current_part: Vec<((Range<usize>, Range<usize>), T, D)>,
current_part_total_bytes: usize,
current_part_key_bytes: usize,
current_part_value_bytes: usize,
}
impl<T, D> BatchBuffer<T, D>
where
T: Ord + Codec64,
D: Semigroup + Codec64,
{
fn new(
metrics: Arc<Metrics>,
batch_write_metrics: BatchWriteMetrics,
blob_target_size: usize,
should_consolidate: bool,
) -> Self {
BatchBuffer {
metrics,
batch_write_metrics,
blob_target_size,
consolidate: should_consolidate,
key_buf: Default::default(),
val_buf: Default::default(),
current_part: Default::default(),
current_part_total_bytes: Default::default(),
current_part_key_bytes: Default::default(),
current_part_value_bytes: Default::default(),
}
}
fn push<K: Codec, V: Codec>(
&mut self,
key: &K,
val: &V,
ts: T,
diff: D,
) -> Option<ColumnarRecords> {
let initial_key_buf_len = self.key_buf.len();
let initial_val_buf_len = self.val_buf.len();
self.metrics
.codecs
.key
.encode(|| K::encode(key, &mut self.key_buf));
self.metrics
.codecs
.val
.encode(|| V::encode(val, &mut self.val_buf));
let k_range = initial_key_buf_len..self.key_buf.len();
let v_range = initial_val_buf_len..self.val_buf.len();
let size = ColumnarRecordsBuilder::columnar_record_size(k_range.len(), v_range.len());
self.current_part_total_bytes += size;
self.current_part_key_bytes += k_range.len();
self.current_part_value_bytes += v_range.len();
self.current_part.push(((k_range, v_range), ts, diff));
if self.current_part_total_bytes >= self.blob_target_size {
Some(self.drain())
} else {
None
}
}
fn drain(&mut self) -> ColumnarRecords {
let mut updates = Vec::with_capacity(self.current_part.len());
for ((k_range, v_range), t, d) in self.current_part.drain(..) {
updates.push(((&self.key_buf[k_range], &self.val_buf[v_range]), t, d));
}
if self.consolidate {
let start = Instant::now();
consolidate_updates(&mut updates);
self.batch_write_metrics
.step_consolidation
.inc_by(start.elapsed().as_secs_f64());
}
if updates.is_empty() {
self.key_buf.clear();
self.val_buf.clear();
return ColumnarRecordsBuilder::default().finish();
}
let start = Instant::now();
let mut builder = ColumnarRecordsBuilder::default();
builder.reserve_exact(
self.current_part.len(),
self.current_part_key_bytes,
self.current_part_value_bytes,
);
for ((k, v), t, d) in updates {
assert!(builder.push(((k, v), T::encode(&t), D::encode(&d))));
}
let columnar = builder.finish();
self.batch_write_metrics
.step_columnar_encoding
.inc_by(start.elapsed().as_secs_f64());
self.key_buf.clear();
self.val_buf.clear();
self.current_part_total_bytes = 0;
self.current_part_key_bytes = 0;
self.current_part_value_bytes = 0;
assert_eq!(self.current_part.len(), 0);
columnar
}
}
#[derive(Debug)]
pub(crate) struct BatchParts<T> {
cfg: BatchBuilderConfig,
metrics: Arc<Metrics>,
shard_metrics: Arc<ShardMetrics>,
shard_id: ShardId,
lower: Antichain<T>,
blob: Arc<dyn Blob + Send + Sync>,
isolated_runtime: Arc<IsolatedRuntime>,
writing_parts: VecDeque<JoinHandle<HollowBatchPart>>,
finished_parts: Vec<HollowBatchPart>,
batch_metrics: BatchWriteMetrics,
consolidated: bool,
}
impl<T: Timestamp + Codec64> BatchParts<T> {
pub(crate) fn new(
cfg: BatchBuilderConfig,
metrics: Arc<Metrics>,
shard_metrics: Arc<ShardMetrics>,
shard_id: ShardId,
lower: Antichain<T>,
blob: Arc<dyn Blob + Send + Sync>,
isolated_runtime: Arc<IsolatedRuntime>,
batch_metrics: &BatchWriteMetrics,
consolidated: bool,
) -> Self {
BatchParts {
cfg,
metrics,
shard_metrics,
shard_id,
lower,
blob,
isolated_runtime,
writing_parts: VecDeque::new(),
finished_parts: Vec::new(),
batch_metrics: batch_metrics.clone(),
consolidated,
}
}
pub(crate) async fn write<K: Codec, V: Codec>(
&mut self,
schemas: &Schemas<K, V>,
updates: ColumnarRecords,
upper: Antichain<T>,
since: Antichain<T>,
) {
let desc = Description::new(self.lower.clone(), upper, since);
let metrics = Arc::clone(&self.metrics);
let shard_metrics = Arc::clone(&self.shard_metrics);
let blob = Arc::clone(&self.blob);
let isolated_runtime = Arc::clone(&self.isolated_runtime);
let batch_metrics = self.batch_metrics.clone();
let partial_key = PartialBatchKey::new(&self.cfg.writer_key, &PartId::new());
let key = partial_key.complete(&self.shard_id);
let index = u64::cast_from(self.finished_parts.len() + self.writing_parts.len());
let stats_collection_enabled = self.cfg.stats_collection_enabled;
let stats_budget = self.cfg.stats_budget;
let schemas = schemas.clone();
let consolidated = self.consolidated;
let untrimmable_columns = Arc::clone(&self.cfg.stats_untrimmable_columns);
let write_span = debug_span!("batch::write_part", shard = %self.shard_id).or_current();
let handle = mz_ore::task::spawn(
|| "batch::write_part",
async move {
let goodbytes = updates.goodbytes();
let key_lower = if consolidated {
updates.get(0).and_then(|((k, _), _, _)| {
truncate_bytes(k, TRUNCATE_LEN, TruncateBound::Lower)
})
} else {
None
};
let batch = BlobTraceBatchPart {
desc,
updates: vec![updates],
index,
};
let (stats, (buf, encode_time)) = isolated_runtime
.spawn_named(|| "batch::encode_part", async move {
let stats = if stats_collection_enabled {
let stats_start = Instant::now();
match PartStats::legacy_part_format(&schemas, &batch.updates) {
Ok(x) => {
let mut trimmed_bytes = 0;
let x = LazyPartStats::encode(&x, |s| {
trimmed_bytes = trim_to_budget(s, stats_budget, |s| {
untrimmable_columns.should_retain(s)
});
});
Some((x, stats_start.elapsed(), trimmed_bytes))
}
Err(err) => {
error!("failed to construct part stats: {}", err);
None
}
}
} else {
None
};
let encode_start = Instant::now();
let mut buf = Vec::new();
batch.encode(&mut buf);
drop(batch);
(stats, (Bytes::from(buf), encode_start.elapsed()))
})
.instrument(debug_span!("batch::encode_part"))
.await
.expect("part encode task failed");
metrics.codecs.batch.encode_count.inc();
metrics
.codecs
.batch
.encode_seconds
.inc_by(encode_time.as_secs_f64());
let start = Instant::now();
let payload_len = buf.len();
let () = retry_external(&metrics.retries.external.batch_set, || async {
shard_metrics.blob_sets.inc();
blob.set(&key, Bytes::clone(&buf), Atomicity::RequireAtomic)
.await
})
.instrument(trace_span!("batch::set", payload_len))
.await;
batch_metrics.seconds.inc_by(start.elapsed().as_secs_f64());
batch_metrics.bytes.inc_by(u64::cast_from(payload_len));
batch_metrics.goodbytes.inc_by(u64::cast_from(goodbytes));
let stats = stats.map(|(stats, stats_step_timing, trimmed_bytes)| {
batch_metrics
.step_stats
.inc_by(stats_step_timing.as_secs_f64());
if trimmed_bytes > 0 {
metrics.pushdown.parts_stats_trimmed_count.inc();
metrics
.pushdown
.parts_stats_trimmed_bytes
.inc_by(u64::cast_from(trimmed_bytes));
}
stats
});
HollowBatchPart {
key: partial_key,
encoded_size_bytes: payload_len,
key_lower: key_lower.unwrap_or_else(Vec::new),
stats,
}
}
.instrument(write_span),
);
self.writing_parts.push_back(handle);
while self.writing_parts.len() > self.cfg.batch_builder_max_outstanding_parts {
batch_metrics.write_stalls.inc();
let handle = self
.writing_parts
.pop_front()
.expect("pop failed when len was just > some usize");
let part = handle
.instrument(debug_span!("batch::max_outstanding"))
.wait_and_assert_finished()
.await;
self.finished_parts.push(part);
}
}
#[instrument(level = "debug", name = "batch::finish_upload", skip_all, fields(shard = %self.shard_id))]
pub(crate) async fn finish(self) -> Vec<HollowBatchPart> {
let mut parts = self.finished_parts;
for handle in self.writing_parts {
let part = handle.wait_and_assert_finished().await;
parts.push(part);
}
parts
}
}
pub(crate) fn validate_truncate_batch<T: Timestamp>(
batch: &Description<T>,
truncate: &Description<T>,
) -> Result<(), InvalidUsage<T>> {
if !PartialOrder::less_equal(batch.lower(), truncate.lower())
|| PartialOrder::less_than(batch.upper(), truncate.upper())
{
return Err(InvalidUsage::InvalidBatchBounds {
batch_lower: batch.lower().clone(),
batch_upper: batch.upper().clone(),
append_lower: truncate.lower().clone(),
append_upper: truncate.upper().clone(),
});
}
Ok(())
}
#[cfg(test)]
mod tests {
use crate::cache::PersistClientCache;
use crate::internal::paths::{BlobKey, PartialBlobKey};
use crate::tests::{all_ok, CodecProduct};
use crate::PersistLocation;
use super::*;
#[mz_ore::test(tokio::test)]
#[cfg_attr(miri, ignore)] async fn batch_builder_flushing() {
let data = vec![
(("1".to_owned(), "one".to_owned()), 1, 1),
(("2".to_owned(), "two".to_owned()), 2, 1),
(("3".to_owned(), "three".to_owned()), 3, 1),
];
let cache = PersistClientCache::new_no_metrics();
cache.cfg.dynamic.set_blob_target_size(0);
cache.cfg.dynamic.set_batch_builder_max_outstanding_parts(2);
let client = cache
.open(PersistLocation::new_in_mem())
.await
.expect("client construction failed");
let (mut write, mut read) = client
.expect_open::<String, String, u64, i64>(ShardId::new())
.await;
let builder = write.builder(Antichain::from_elem(0));
let schemas = builder.stats_schemas;
let mut builder = builder.builder;
assert_eq!(builder.parts.writing_parts.len(), 0);
assert_eq!(builder.parts.finished_parts.len(), 0);
builder
.add(
&schemas,
&data[0].0 .0,
&data[0].0 .1,
&data[0].1,
&data[0].2,
)
.await
.expect("invalid usage");
assert_eq!(builder.parts.writing_parts.len(), 1);
assert_eq!(builder.parts.finished_parts.len(), 0);
builder
.add(
&schemas,
&data[1].0 .0,
&data[1].0 .1,
&data[1].1,
&data[1].2,
)
.await
.expect("invalid usage");
assert_eq!(builder.parts.writing_parts.len(), 2);
assert_eq!(builder.parts.finished_parts.len(), 0);
builder
.add(
&schemas,
&data[2].0 .0,
&data[2].0 .1,
&data[2].1,
&data[2].2,
)
.await
.expect("invalid usage");
assert_eq!(builder.parts.writing_parts.len(), 2);
assert_eq!(builder.parts.finished_parts.len(), 1);
let batch = builder
.finish(&schemas, Antichain::from_elem(4))
.await
.expect("invalid usage");
assert_eq!(batch.batch.parts.len(), 3);
write
.append_batch(batch, Antichain::from_elem(0), Antichain::from_elem(4))
.await
.expect("invalid usage")
.expect("unexpected upper");
assert_eq!(read.expect_snapshot_and_fetch(3).await, all_ok(&data, 3));
}
#[mz_ore::test(tokio::test)]
#[cfg_attr(miri, ignore)] async fn batch_builder_keys() {
let cache = PersistClientCache::new_no_metrics();
cache.cfg.dynamic.set_blob_target_size(0);
let client = cache
.open(PersistLocation::new_in_mem())
.await
.expect("client construction failed");
let shard_id = ShardId::new();
let (mut write, _) = client
.expect_open::<String, String, u64, i64>(shard_id)
.await;
let batch = write
.expect_batch(
&[
(("1".into(), "one".into()), 1, 1),
(("2".into(), "two".into()), 2, 1),
(("3".into(), "three".into()), 3, 1),
],
0,
4,
)
.await;
assert_eq!(batch.batch.parts.len(), 3);
for part in &batch.batch.parts {
match BlobKey::parse_ids(&part.key.complete(&shard_id)) {
Ok((shard, PartialBlobKey::Batch(writer, _))) => {
assert_eq!(shard.to_string(), shard_id.to_string());
assert_eq!(writer, WriterKey::for_version(&cache.cfg.build_version));
}
_ => panic!("unparseable blob key"),
}
}
}
#[mz_ore::test(tokio::test)]
#[cfg_attr(miri, ignore)] async fn batch_builder_partial_order() {
let cache = PersistClientCache::new_no_metrics();
cache.cfg.dynamic.set_blob_target_size(0);
let client = cache
.open(PersistLocation::new_in_mem())
.await
.expect("client construction failed");
let shard_id = ShardId::new();
let (mut write, _) = client
.expect_open::<String, String, CodecProduct, i64>(shard_id)
.await;
let batch = write
.batch(
&[
(
("1".to_owned(), "one".to_owned()),
CodecProduct::new(0, 10),
1,
),
(
("2".to_owned(), "two".to_owned()),
CodecProduct::new(10, 0),
1,
),
],
Antichain::from_elem(CodecProduct::new(0, 0)),
Antichain::from_iter([CodecProduct::new(0, 11), CodecProduct::new(10, 1)]),
)
.await
.expect("invalid usage");
assert_eq!(batch.batch.parts.len(), 2);
for part in &batch.batch.parts {
match BlobKey::parse_ids(&part.key.complete(&shard_id)) {
Ok((shard, PartialBlobKey::Batch(writer, _))) => {
assert_eq!(shard.to_string(), shard_id.to_string());
assert_eq!(writer, WriterKey::for_version(&cache.cfg.build_version));
}
_ => panic!("unparseable blob key"),
}
}
}
#[mz_ore::test]
fn untrimmable_columns() {
let untrimmable = UntrimmableColumns {
equals: vec!["abc".to_string(), "def".to_string()],
prefixes: vec!["123".to_string(), "234".to_string()],
suffixes: vec!["xyz".to_string()],
};
assert!(untrimmable.should_retain("abc"));
assert!(untrimmable.should_retain("ABC"));
assert!(untrimmable.should_retain("aBc"));
assert!(!untrimmable.should_retain("abcd"));
assert!(untrimmable.should_retain("deF"));
assert!(!untrimmable.should_retain("defg"));
assert!(untrimmable.should_retain("123"));
assert!(untrimmable.should_retain("123-4"));
assert!(untrimmable.should_retain("1234"));
assert!(untrimmable.should_retain("234"));
assert!(!untrimmable.should_retain("345"));
assert!(untrimmable.should_retain("ijk_xyZ"));
assert!(untrimmable.should_retain("ww-XYZ"));
assert!(!untrimmable.should_retain("xya"));
}
}