use std::fmt::{self, Debug};
use std::marker::PhantomData;
use bytes::BufMut;
use differential_dataflow::trace::Description;
use mz_ore::bytes::SegmentedBytes;
use mz_ore::cast::CastFrom;
use mz_persist_types::Codec64;
use prost::Message;
use timely::progress::{Antichain, Timestamp};
use timely::PartialOrder;
use crate::error::Error;
use crate::gen::persist::{
ProtoBatchFormat, ProtoBatchPartInline, ProtoU64Antichain, ProtoU64Description,
};
use crate::indexed::columnar::parquet::{decode_trace_parquet, encode_trace_parquet};
use crate::indexed::columnar::ColumnarRecords;
use crate::location::Blob;
use crate::metrics::ColumnarMetrics;
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct TraceBatchMeta {
pub keys: Vec<String>,
pub format: ProtoBatchFormat,
pub desc: Description<u64>,
pub level: u64,
pub size_bytes: u64,
}
#[derive(Clone, Debug)]
pub struct BlobTraceBatchPart<T> {
pub desc: Description<T>,
pub index: u64,
pub updates: Vec<ColumnarRecords>,
}
impl TraceBatchMeta {
pub fn validate(&self) -> Result<(), Error> {
if PartialOrder::less_equal(self.desc.upper(), self.desc.lower()) {
return Err(format!("invalid desc: {:?}", &self.desc).into());
}
Ok(())
}
pub async fn validate_data(
&self,
blob: &(dyn Blob + Send + Sync),
metrics: &ColumnarMetrics,
) -> Result<(), Error> {
let mut batches = vec![];
for (idx, key) in self.keys.iter().enumerate() {
let value = blob
.get(key)
.await?
.ok_or_else(|| Error::from(format!("no blob for trace batch at key: {}", key)))?;
let batch = BlobTraceBatchPart::decode(&value, metrics)?;
if batch.desc != self.desc {
return Err(format!(
"invalid trace batch part desc expected {:?} got {:?}",
&self.desc, &batch.desc
)
.into());
}
if batch.index != u64::cast_from(idx) {
return Err(format!(
"invalid index for blob trace batch part at key {} expected {} got {}",
key, idx, batch.index
)
.into());
}
batch.validate()?;
batches.push(batch);
}
for update in batches
.iter()
.flat_map(|batch| batch.updates.iter().flat_map(|u| u.iter()))
{
let ((_key, _val), _ts, diff) = update;
let diff: u64 = Codec64::decode(diff);
if diff == 0 {
return Err(format!(
"update with 0 diff: {:?}",
PrettyRecord::<u64, i64>(update, PhantomData)
)
.into());
}
}
Ok(())
}
}
impl<T: Timestamp + Codec64> BlobTraceBatchPart<T> {
pub fn validate(&self) -> Result<(), Error> {
if PartialOrder::less_equal(self.desc.upper(), self.desc.lower()) {
return Err(format!("invalid desc: {:?}", &self.desc).into());
}
let uncompacted = PartialOrder::less_equal(self.desc.since(), self.desc.lower());
for update in self.updates.iter().flat_map(|u| u.iter()) {
let ((_key, _val), ts, diff) = update;
let ts = T::decode(ts);
let diff: i64 = Codec64::decode(diff);
if !self.desc.lower().less_equal(&ts) {
return Err(format!(
"timestamp {:?} is less than the batch lower: {:?}",
ts, self.desc
)
.into());
}
if uncompacted && self.desc.upper().less_equal(&ts) {
return Err(format!(
"timestamp {:?} is greater than or equal to the batch upper: {:?}",
ts, self.desc
)
.into());
}
if diff == 0 {
return Err(format!(
"update with 0 diff: {:?}",
PrettyRecord::<u64, i64>(update, PhantomData)
)
.into());
}
}
Ok(())
}
pub fn encode<B>(&self, buf: &mut B)
where
B: BufMut,
{
encode_trace_parquet(&mut buf.writer(), self).expect("batch was invalid");
}
pub fn decode(buf: &SegmentedBytes, metrics: &ColumnarMetrics) -> Result<Self, Error> {
decode_trace_parquet(&mut buf.clone().reader(), metrics)
}
pub fn key_lower(&self) -> &[u8] {
self.updates
.iter()
.flat_map(|x| x.iter())
.map(|((key, _), _, _)| key)
.min()
.unwrap_or(&[])
}
}
#[derive(PartialOrd, Ord, PartialEq, Eq)]
struct PrettyBytes<'a>(&'a [u8]);
impl fmt::Debug for PrettyBytes<'_> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match std::str::from_utf8(self.0) {
Ok(x) => fmt::Debug::fmt(x, f),
Err(_) => fmt::Debug::fmt(self.0, f),
}
}
}
struct PrettyRecord<'a, T, D>(
((&'a [u8], &'a [u8]), [u8; 8], [u8; 8]),
PhantomData<(T, D)>,
);
impl<T, D> fmt::Debug for PrettyRecord<'_, T, D>
where
T: Debug + Codec64,
D: Debug + Codec64,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let ((k, v), ts, diff) = &self.0;
fmt::Debug::fmt(
&(
(PrettyBytes(k), PrettyBytes(v)),
T::decode(*ts),
D::decode(*diff),
),
f,
)
}
}
impl<T: Timestamp + Codec64> From<ProtoU64Description> for Description<T> {
fn from(x: ProtoU64Description) -> Self {
Description::new(
x.lower
.map_or_else(|| Antichain::from_elem(T::minimum()), |x| x.into()),
x.upper
.map_or_else(|| Antichain::from_elem(T::minimum()), |x| x.into()),
x.since
.map_or_else(|| Antichain::from_elem(T::minimum()), |x| x.into()),
)
}
}
impl<T: Timestamp + Codec64> From<ProtoU64Antichain> for Antichain<T> {
fn from(x: ProtoU64Antichain) -> Self {
Antichain::from(
x.elements
.into_iter()
.map(|x| T::decode(u64::to_le_bytes(x)))
.collect::<Vec<_>>(),
)
}
}
impl<T: Timestamp + Codec64> From<&Antichain<T>> for ProtoU64Antichain {
fn from(x: &Antichain<T>) -> Self {
ProtoU64Antichain {
elements: x
.elements()
.iter()
.map(|x| u64::from_le_bytes(T::encode(x)))
.collect(),
}
}
}
impl<T: Timestamp + Codec64> From<&Description<T>> for ProtoU64Description {
fn from(x: &Description<T>) -> Self {
ProtoU64Description {
lower: Some(x.lower().into()),
upper: Some(x.upper().into()),
since: Some(x.since().into()),
}
}
}
pub fn encode_trace_inline_meta<T: Timestamp + Codec64>(
batch: &BlobTraceBatchPart<T>,
format: ProtoBatchFormat,
) -> String {
let inline = ProtoBatchPartInline {
format: format.into(),
desc: Some((&batch.desc).into()),
index: batch.index,
};
let inline_encoded = inline.encode_to_vec();
base64::encode(inline_encoded)
}
pub fn decode_trace_inline_meta(
inline_base64: Option<&String>,
) -> Result<(ProtoBatchFormat, ProtoBatchPartInline), Error> {
let inline_base64 = inline_base64.ok_or("missing batch metadata")?;
let inline_encoded = base64::decode(inline_base64).map_err(|err| err.to_string())?;
let inline = ProtoBatchPartInline::decode(&*inline_encoded).map_err(|err| err.to_string())?;
let format = ProtoBatchFormat::from_i32(inline.format)
.ok_or_else(|| Error::from(format!("unknown format: {}", inline.format)))?;
Ok((format, inline))
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use bytes::Bytes;
use crate::error::Error;
use crate::indexed::columnar::ColumnarRecordsBuilder;
use crate::mem::{MemBlob, MemBlobConfig};
use crate::metrics::ColumnarMetrics;
use crate::workload::DataGenerator;
use super::*;
fn update_with_key(ts: u64, key: &'static str) -> ((Vec<u8>, Vec<u8>), u64, i64) {
((key.into(), "".into()), ts, 1)
}
fn u64_desc(lower: u64, upper: u64) -> Description<u64> {
Description::new(
Antichain::from_elem(lower),
Antichain::from_elem(upper),
Antichain::from_elem(0),
)
}
fn batch_meta(lower: u64, upper: u64) -> TraceBatchMeta {
TraceBatchMeta {
keys: vec![],
format: ProtoBatchFormat::Unknown,
desc: u64_desc(lower, upper),
level: 1,
size_bytes: 0,
}
}
fn u64_desc_since(lower: u64, upper: u64, since: u64) -> Description<u64> {
Description::new(
Antichain::from_elem(lower),
Antichain::from_elem(upper),
Antichain::from_elem(since),
)
}
fn columnar_records(updates: Vec<((Vec<u8>, Vec<u8>), u64, i64)>) -> Vec<ColumnarRecords> {
let mut builder = ColumnarRecordsBuilder::default();
for ((k, v), t, d) in updates {
assert!(builder.push(((&k, &v), Codec64::encode(&t), Codec64::encode(&d))));
}
vec![builder.finish(&ColumnarMetrics::disconnected())]
}
#[mz_ore::test]
fn trace_batch_validate() {
let b = BlobTraceBatchPart {
desc: u64_desc(0, 2),
index: 0,
updates: columnar_records(vec![update_with_key(0, "0"), update_with_key(1, "1")]),
};
assert_eq!(b.validate(), Ok(()));
let b = BlobTraceBatchPart {
desc: u64_desc(0, 2),
index: 0,
updates: columnar_records(vec![]),
};
assert_eq!(b.validate(), Ok(()));
let b = BlobTraceBatchPart {
desc: u64_desc(2, 0),
index: 0,
updates: columnar_records(vec![]),
};
assert_eq!(
b.validate(),
Err(Error::from(
"invalid desc: Description { lower: Antichain { elements: [2] }, upper: Antichain { elements: [0] }, since: Antichain { elements: [0] } }"
))
);
let b = BlobTraceBatchPart {
desc: u64_desc(0, 0),
index: 0,
updates: columnar_records(vec![]),
};
assert_eq!(
b.validate(),
Err(Error::from(
"invalid desc: Description { lower: Antichain { elements: [0] }, upper: Antichain { elements: [0] }, since: Antichain { elements: [0] } }"
))
);
let b = BlobTraceBatchPart {
desc: u64_desc(1, 2),
index: 0,
updates: columnar_records(vec![update_with_key(0, "0")]),
};
assert_eq!(b.validate(), Err(Error::from("timestamp 0 is less than the batch lower: Description { lower: Antichain { elements: [1] }, upper: Antichain { elements: [2] }, since: Antichain { elements: [0] } }")));
let b = BlobTraceBatchPart {
desc: u64_desc(1, 2),
index: 0,
updates: columnar_records(vec![update_with_key(2, "0")]),
};
assert_eq!(b.validate(), Err(Error::from("timestamp 2 is greater than or equal to the batch upper: Description { lower: Antichain { elements: [1] }, upper: Antichain { elements: [2] }, since: Antichain { elements: [0] } }")));
let b = BlobTraceBatchPart {
desc: u64_desc_since(1, 2, 4),
index: 0,
updates: columnar_records(vec![update_with_key(2, "0")]),
};
assert_eq!(b.validate(), Ok(()));
let b = BlobTraceBatchPart {
desc: u64_desc_since(1, 2, 4),
index: 0,
updates: columnar_records(vec![update_with_key(4, "0")]),
};
assert_eq!(b.validate(), Ok(()));
let b = BlobTraceBatchPart {
desc: u64_desc_since(1, 2, 4),
index: 0,
updates: columnar_records(vec![update_with_key(5, "0")]),
};
assert_eq!(b.validate(), Ok(()));
let b = BlobTraceBatchPart {
desc: u64_desc(0, 1),
index: 0,
updates: columnar_records(vec![(("0".into(), "0".into()), 0, 0)]),
};
assert_eq!(
b.validate(),
Err(Error::from("update with 0 diff: ((\"0\", \"0\"), 0, 0)"))
);
}
#[mz_ore::test]
fn trace_batch_meta_validate() {
let b = batch_meta(0, 1);
assert_eq!(b.validate(), Ok(()));
let b = batch_meta(0, 0);
assert_eq!(b.validate(),
Err(Error::from(
"invalid desc: Description { lower: Antichain { elements: [0] }, upper: Antichain { elements: [0] }, since: Antichain { elements: [0] } }"
)),
);
let b = batch_meta(2, 0);
assert_eq!(b.validate(),
Err(Error::from(
"invalid desc: Description { lower: Antichain { elements: [2] }, upper: Antichain { elements: [0] }, since: Antichain { elements: [0] } }"
)),
);
}
async fn expect_set_trace_batch<T: Timestamp + Codec64>(
blob: &(dyn Blob + Send + Sync),
key: &str,
batch: &BlobTraceBatchPart<T>,
) -> u64 {
let mut val = Vec::new();
batch.encode(&mut val);
let val = Bytes::from(val);
let val_len = u64::cast_from(val.len());
blob.set(key, val).await.expect("failed to set trace batch");
val_len
}
#[mz_ore::test(tokio::test)]
#[cfg_attr(miri, ignore)] async fn trace_batch_meta_validate_data() -> Result<(), Error> {
let metrics = ColumnarMetrics::disconnected();
let blob = Arc::new(MemBlob::open(MemBlobConfig::default()));
let format = ProtoBatchFormat::ParquetKvtd;
let batch_desc = u64_desc_since(0, 3, 0);
let batch0 = BlobTraceBatchPart {
desc: batch_desc.clone(),
index: 0,
updates: columnar_records(vec![
(("k".as_bytes().to_vec(), "v".as_bytes().to_vec()), 2, 1),
(("k3".as_bytes().to_vec(), "v3".as_bytes().to_vec()), 2, 1),
]),
};
let batch1 = BlobTraceBatchPart {
desc: batch_desc.clone(),
index: 1,
updates: columnar_records(vec![
(("k4".as_bytes().to_vec(), "v4".as_bytes().to_vec()), 2, 1),
(("k5".as_bytes().to_vec(), "v5".as_bytes().to_vec()), 2, 1),
]),
};
let batch0_size_bytes = expect_set_trace_batch(blob.as_ref(), "b0", &batch0).await;
let batch1_size_bytes = expect_set_trace_batch(blob.as_ref(), "b1", &batch1).await;
let size_bytes = batch0_size_bytes + batch1_size_bytes;
let batch_meta = TraceBatchMeta {
keys: vec!["b0".into(), "b1".into()],
format,
desc: batch_desc.clone(),
level: 0,
size_bytes,
};
assert_eq!(
batch_meta.validate_data(blob.as_ref(), &metrics).await,
Ok(())
);
let batch_meta = TraceBatchMeta {
keys: vec!["b0".into(), "b1".into()],
format,
desc: u64_desc_since(1, 3, 0),
level: 0,
size_bytes,
};
assert_eq!(batch_meta.validate_data(blob.as_ref(), &metrics).await, Err(Error::from("invalid trace batch part desc expected Description { lower: Antichain { elements: [1] }, upper: Antichain { elements: [3] }, since: Antichain { elements: [0] } } got Description { lower: Antichain { elements: [0] }, upper: Antichain { elements: [3] }, since: Antichain { elements: [0] } }")));
let batch_meta = TraceBatchMeta {
keys: vec!["b0".into(), "b1".into(), "b2".into()],
format,
desc: batch_desc.clone(),
level: 0,
size_bytes,
};
assert_eq!(
batch_meta.validate_data(blob.as_ref(), &metrics).await,
Err(Error::from("no blob for trace batch at key: b2"))
);
let batch_meta = TraceBatchMeta {
keys: vec!["b1".into(), "b0".into()],
format,
desc: batch_desc,
level: 0,
size_bytes,
};
assert_eq!(
batch_meta.validate_data(blob.as_ref(), &metrics).await,
Err(Error::from(
"invalid index for blob trace batch part at key b1 expected 0 got 1"
))
);
Ok(())
}
#[mz_ore::test]
#[cfg_attr(miri, ignore)] fn encoded_batch_sizes() {
fn sizes(data: DataGenerator) -> usize {
let trace = BlobTraceBatchPart {
desc: Description::new(
Antichain::from_elem(0u64),
Antichain::new(),
Antichain::from_elem(0u64),
),
index: 0,
updates: data.batches().collect(),
};
let mut trace_buf = Vec::new();
trace.encode(&mut trace_buf);
trace_buf.len()
}
let record_size_bytes = DataGenerator::default().record_size_bytes;
assert_eq!(
format!(
"1/1={:?} 25/1={:?} 1000/1={:?} 1000/100={:?}",
sizes(DataGenerator::new(1, record_size_bytes, 1)),
sizes(DataGenerator::new(25, record_size_bytes, 25)),
sizes(DataGenerator::new(1_000, record_size_bytes, 1_000)),
sizes(DataGenerator::new(1_000, record_size_bytes, 1_000 / 100)),
),
"1/1=1027 25/1=2778 1000/1=73022 1000/100=113067"
);
}
}