use arrow::array::{Array, ArrayRef};
use arrow::datatypes::DataType;
use std::fmt::{self, Debug};
use std::marker::PhantomData;
use std::sync::Arc;
use bytes::{BufMut, Bytes};
use differential_dataflow::trace::Description;
use mz_ore::bytes::SegmentedBytes;
use mz_ore::cast::CastFrom;
use mz_ore::collections::CollectionExt;
use mz_ore::soft_panic_or_log;
use mz_persist_types::columnar::{codec_to_schema2, data_type};
use mz_persist_types::parquet::EncodingConfig;
use mz_persist_types::schema::backward_compatible;
use mz_persist_types::{Codec, Codec64};
use mz_proto::RustType;
use proptest::arbitrary::Arbitrary;
use proptest::prelude::*;
use proptest::strategy::{BoxedStrategy, Just};
use prost::Message;
use serde::Serialize;
use timely::progress::{Antichain, Timestamp};
use timely::PartialOrder;
use tracing::error;
use crate::error::Error;
use crate::gen::persist::proto_batch_part_inline::FormatMetadata as ProtoFormatMetadata;
use crate::gen::persist::{
ProtoBatchFormat, ProtoBatchPartInline, ProtoColumnarRecords, ProtoU64Antichain,
ProtoU64Description,
};
use crate::indexed::columnar::parquet::{decode_trace_parquet, encode_trace_parquet};
use crate::indexed::columnar::{ColumnarRecords, ColumnarRecordsStructuredExt};
use crate::location::Blob;
use crate::metrics::ColumnarMetrics;
#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize)]
pub enum BatchColumnarFormat {
Row,
Both(usize),
}
impl BatchColumnarFormat {
pub const fn default() -> Self {
BatchColumnarFormat::Row
}
pub fn from_str(s: &str) -> Self {
match s {
"row" => BatchColumnarFormat::Row,
"both" => BatchColumnarFormat::Both(0),
"both_v2" => BatchColumnarFormat::Both(2),
x => {
let default = BatchColumnarFormat::default();
soft_panic_or_log!("Invalid batch columnar type: {x}, falling back to {default}");
default
}
}
}
pub const fn as_str(&self) -> &'static str {
match self {
BatchColumnarFormat::Row => "row",
BatchColumnarFormat::Both(0 | 1) => "both",
BatchColumnarFormat::Both(2) => "both_v2",
_ => panic!("unknown batch columnar format"),
}
}
pub const fn is_structured(&self) -> bool {
match self {
BatchColumnarFormat::Row => false,
BatchColumnarFormat::Both(0 | 1) => false,
BatchColumnarFormat::Both(_) => true,
}
}
}
impl fmt::Display for BatchColumnarFormat {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str(self.as_str())
}
}
impl Arbitrary for BatchColumnarFormat {
type Parameters = ();
type Strategy = BoxedStrategy<BatchColumnarFormat>;
fn arbitrary_with(_args: Self::Parameters) -> Self::Strategy {
proptest::strategy::Union::new(vec![
Just(BatchColumnarFormat::Row).boxed(),
Just(BatchColumnarFormat::Both(0)).boxed(),
Just(BatchColumnarFormat::Both(1)).boxed(),
])
.boxed()
}
}
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct TraceBatchMeta {
pub keys: Vec<String>,
pub format: ProtoBatchFormat,
pub desc: Description<u64>,
pub level: u64,
pub size_bytes: u64,
}
#[derive(Clone, Debug, PartialEq)]
pub struct BlobTraceBatchPart<T> {
pub desc: Description<T>,
pub index: u64,
pub updates: BlobTraceUpdates,
}
#[derive(Clone, Debug, PartialEq)]
pub enum BlobTraceUpdates {
Row(ColumnarRecords),
Both(ColumnarRecords, ColumnarRecordsStructuredExt),
}
impl BlobTraceUpdates {
pub fn len(&self) -> usize {
self.records().len()
}
pub fn records(&self) -> &ColumnarRecords {
match self {
BlobTraceUpdates::Row(c) => c,
BlobTraceUpdates::Both(c, _structured) => c,
}
}
pub fn structured(&self) -> Option<&ColumnarRecordsStructuredExt> {
match self {
BlobTraceUpdates::Row(_) => None,
BlobTraceUpdates::Both(_, structured) => Some(structured),
}
}
pub fn goodbytes(&self) -> usize {
self.records().goodbytes() + self.structured().map_or(0, |e| e.goodbytes())
}
pub fn get_or_make_structured<K: Codec, V: Codec>(
&mut self,
key_schema: &K::Schema,
val_schema: &V::Schema,
) -> &ColumnarRecordsStructuredExt {
match self {
BlobTraceUpdates::Row(records) => {
let key = codec_to_schema2::<K>(key_schema, records.keys()).expect("valid keys");
let val = codec_to_schema2::<V>(val_schema, records.vals()).expect("valid values");
*self = BlobTraceUpdates::Both(
records.clone(),
ColumnarRecordsStructuredExt { key, val },
);
self.get_or_make_structured::<K, V>(key_schema, val_schema)
}
BlobTraceUpdates::Both(_, structured) => {
let migrate = |array: &mut ArrayRef, to_type: DataType| {
let from_type = array.data_type().clone();
if from_type != to_type {
if let Some(migration) = backward_compatible(&from_type, &to_type) {
*array = migration.migrate(Arc::clone(array));
} else {
error!(
?from_type,
?to_type,
"failed to migrate array type; backwards-incompatible schema migration?"
);
}
}
};
migrate(
&mut structured.key,
data_type::<K>(key_schema).expect("valid key schema"),
);
migrate(
&mut structured.val,
data_type::<V>(val_schema).expect("valid value schema"),
);
structured
}
}
}
pub fn concat<K: Codec, V: Codec>(
mut updates: Vec<BlobTraceUpdates>,
key_schema: &K::Schema,
val_schema: &V::Schema,
metrics: &ColumnarMetrics,
) -> anyhow::Result<BlobTraceUpdates> {
match updates.len() {
0 => return Ok(BlobTraceUpdates::Row(ColumnarRecords::default())),
1 => return Ok(updates.into_iter().into_element()),
_ => {}
}
let columnar: Vec<_> = updates.iter().map(|u| u.records().clone()).collect();
let records = ColumnarRecords::concat(&columnar, metrics);
let mut keys = Vec::with_capacity(records.len());
let mut vals = Vec::with_capacity(records.len());
for updates in &mut updates {
let structured = updates.get_or_make_structured::<K, V>(key_schema, val_schema);
keys.push(structured.key.as_ref());
vals.push(structured.val.as_ref());
}
let ext = ColumnarRecordsStructuredExt {
key: ::arrow::compute::concat(&keys)?,
val: ::arrow::compute::concat(&vals)?,
};
let out = Self::Both(records, ext);
metrics
.arrow
.concat_bytes
.inc_by(u64::cast_from(out.goodbytes()));
Ok(out)
}
pub fn into_proto(&self) -> ProtoColumnarRecords {
let records = self.records();
let (k_struct, v_struct) = match self.structured().map(|x| x.into_proto()) {
None => (None, None),
Some((k, v)) => (Some(k), Some(v)),
};
ProtoColumnarRecords {
len: records.len().into_proto(),
key_offsets: records.keys().offsets().to_vec(),
key_data: Bytes::copy_from_slice(records.keys().value_data()),
val_offsets: records.vals().offsets().to_vec(),
val_data: Bytes::copy_from_slice(records.vals().value_data()),
timestamps: records.timestamps().values().to_vec(),
diffs: records.diffs().values().to_vec(),
key_structured: k_struct,
val_structured: v_struct,
}
}
}
impl TraceBatchMeta {
pub fn validate(&self) -> Result<(), Error> {
if PartialOrder::less_equal(self.desc.upper(), self.desc.lower()) {
return Err(format!("invalid desc: {:?}", &self.desc).into());
}
Ok(())
}
pub async fn validate_data(
&self,
blob: &dyn Blob,
metrics: &ColumnarMetrics,
) -> Result<(), Error> {
let mut batches = vec![];
for (idx, key) in self.keys.iter().enumerate() {
let value = blob
.get(key)
.await?
.ok_or_else(|| Error::from(format!("no blob for trace batch at key: {}", key)))?;
let batch = BlobTraceBatchPart::decode(&value, metrics)?;
if batch.desc != self.desc {
return Err(format!(
"invalid trace batch part desc expected {:?} got {:?}",
&self.desc, &batch.desc
)
.into());
}
if batch.index != u64::cast_from(idx) {
return Err(format!(
"invalid index for blob trace batch part at key {} expected {} got {}",
key, idx, batch.index
)
.into());
}
batch.validate()?;
batches.push(batch);
}
for update in batches
.iter()
.flat_map(|batch| batch.updates.records().iter())
{
let ((_key, _val), _ts, diff) = update;
let diff: u64 = Codec64::decode(diff);
if diff == 0 {
return Err(format!(
"update with 0 diff: {:?}",
PrettyRecord::<u64, i64>(update, PhantomData)
)
.into());
}
}
Ok(())
}
}
impl<T: Timestamp + Codec64> BlobTraceBatchPart<T> {
pub fn validate(&self) -> Result<(), Error> {
if PartialOrder::less_equal(self.desc.upper(), self.desc.lower()) {
return Err(format!("invalid desc: {:?}", &self.desc).into());
}
let uncompacted = PartialOrder::less_equal(self.desc.since(), self.desc.lower());
for update in self.updates.records().iter() {
let ((_key, _val), ts, diff) = update;
let ts = T::decode(ts);
let diff: i64 = Codec64::decode(diff);
if !self.desc.lower().less_equal(&ts) {
return Err(format!(
"timestamp {:?} is less than the batch lower: {:?}",
ts, self.desc
)
.into());
}
if uncompacted && self.desc.upper().less_equal(&ts) {
return Err(format!(
"timestamp {:?} is greater than or equal to the batch upper: {:?}",
ts, self.desc
)
.into());
}
if diff == 0 {
return Err(format!(
"update with 0 diff: {:?}",
PrettyRecord::<u64, i64>(update, PhantomData)
)
.into());
}
}
Ok(())
}
pub fn encode<B>(&self, buf: &mut B, metrics: &ColumnarMetrics, cfg: &EncodingConfig)
where
B: BufMut + Send,
{
encode_trace_parquet(&mut buf.writer(), self, metrics, cfg).expect("batch was invalid");
}
pub fn decode(buf: &SegmentedBytes, metrics: &ColumnarMetrics) -> Result<Self, Error> {
decode_trace_parquet(buf.clone(), metrics)
}
pub fn key_lower(&self) -> &[u8] {
self.updates
.records()
.iter()
.map(|((key, _), _, _)| key)
.min()
.unwrap_or(&[])
}
}
#[derive(PartialOrd, Ord, PartialEq, Eq)]
struct PrettyBytes<'a>(&'a [u8]);
impl fmt::Debug for PrettyBytes<'_> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match std::str::from_utf8(self.0) {
Ok(x) => fmt::Debug::fmt(x, f),
Err(_) => fmt::Debug::fmt(self.0, f),
}
}
}
struct PrettyRecord<'a, T, D>(
((&'a [u8], &'a [u8]), [u8; 8], [u8; 8]),
PhantomData<(T, D)>,
);
impl<T, D> fmt::Debug for PrettyRecord<'_, T, D>
where
T: Debug + Codec64,
D: Debug + Codec64,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let ((k, v), ts, diff) = &self.0;
fmt::Debug::fmt(
&(
(PrettyBytes(k), PrettyBytes(v)),
T::decode(*ts),
D::decode(*diff),
),
f,
)
}
}
impl<T: Timestamp + Codec64> From<ProtoU64Description> for Description<T> {
fn from(x: ProtoU64Description) -> Self {
Description::new(
x.lower
.map_or_else(|| Antichain::from_elem(T::minimum()), |x| x.into()),
x.upper
.map_or_else(|| Antichain::from_elem(T::minimum()), |x| x.into()),
x.since
.map_or_else(|| Antichain::from_elem(T::minimum()), |x| x.into()),
)
}
}
impl<T: Timestamp + Codec64> From<ProtoU64Antichain> for Antichain<T> {
fn from(x: ProtoU64Antichain) -> Self {
Antichain::from(
x.elements
.into_iter()
.map(|x| T::decode(u64::to_le_bytes(x)))
.collect::<Vec<_>>(),
)
}
}
impl<T: Timestamp + Codec64> From<&Antichain<T>> for ProtoU64Antichain {
fn from(x: &Antichain<T>) -> Self {
ProtoU64Antichain {
elements: x
.elements()
.iter()
.map(|x| u64::from_le_bytes(T::encode(x)))
.collect(),
}
}
}
impl<T: Timestamp + Codec64> From<&Description<T>> for ProtoU64Description {
fn from(x: &Description<T>) -> Self {
ProtoU64Description {
lower: Some(x.lower().into()),
upper: Some(x.upper().into()),
since: Some(x.since().into()),
}
}
}
pub fn encode_trace_inline_meta<T: Timestamp + Codec64>(batch: &BlobTraceBatchPart<T>) -> String {
let (format, format_metadata) = match &batch.updates {
BlobTraceUpdates::Row(_) => (ProtoBatchFormat::ParquetKvtd, None),
BlobTraceUpdates::Both { .. } => {
let metadata = ProtoFormatMetadata::StructuredMigration(2);
(ProtoBatchFormat::ParquetStructured, Some(metadata))
}
};
let inline = ProtoBatchPartInline {
format: format.into(),
desc: Some((&batch.desc).into()),
index: batch.index,
format_metadata,
};
let inline_encoded = inline.encode_to_vec();
base64::encode(inline_encoded)
}
pub fn decode_trace_inline_meta(
inline_base64: Option<&String>,
) -> Result<(ProtoBatchFormat, ProtoBatchPartInline), Error> {
let inline_base64 = inline_base64.ok_or("missing batch metadata")?;
let inline_encoded = base64::decode(inline_base64).map_err(|err| err.to_string())?;
let inline = ProtoBatchPartInline::decode(&*inline_encoded).map_err(|err| err.to_string())?;
let format = ProtoBatchFormat::try_from(inline.format)
.map_err(|_| Error::from(format!("unknown format: {}", inline.format)))?;
Ok((format, inline))
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use bytes::Bytes;
use crate::error::Error;
use crate::indexed::columnar::ColumnarRecordsBuilder;
use crate::mem::{MemBlob, MemBlobConfig};
use crate::metrics::ColumnarMetrics;
use crate::workload::DataGenerator;
use super::*;
fn update_with_key(ts: u64, key: &'static str) -> ((Vec<u8>, Vec<u8>), u64, i64) {
((key.into(), "".into()), ts, 1)
}
fn u64_desc(lower: u64, upper: u64) -> Description<u64> {
Description::new(
Antichain::from_elem(lower),
Antichain::from_elem(upper),
Antichain::from_elem(0),
)
}
fn batch_meta(lower: u64, upper: u64) -> TraceBatchMeta {
TraceBatchMeta {
keys: vec![],
format: ProtoBatchFormat::Unknown,
desc: u64_desc(lower, upper),
level: 1,
size_bytes: 0,
}
}
fn u64_desc_since(lower: u64, upper: u64, since: u64) -> Description<u64> {
Description::new(
Antichain::from_elem(lower),
Antichain::from_elem(upper),
Antichain::from_elem(since),
)
}
fn columnar_records(updates: Vec<((Vec<u8>, Vec<u8>), u64, i64)>) -> BlobTraceUpdates {
let mut builder = ColumnarRecordsBuilder::default();
for ((k, v), t, d) in updates {
assert!(builder.push(((&k, &v), Codec64::encode(&t), Codec64::encode(&d))));
}
let updates = builder.finish(&ColumnarMetrics::disconnected());
BlobTraceUpdates::Row(updates)
}
#[mz_ore::test]
fn trace_batch_validate() {
let b = BlobTraceBatchPart {
desc: u64_desc(0, 2),
index: 0,
updates: columnar_records(vec![update_with_key(0, "0"), update_with_key(1, "1")]),
};
assert_eq!(b.validate(), Ok(()));
let b = BlobTraceBatchPart {
desc: u64_desc(0, 2),
index: 0,
updates: columnar_records(vec![]),
};
assert_eq!(b.validate(), Ok(()));
let b = BlobTraceBatchPart {
desc: u64_desc(2, 0),
index: 0,
updates: columnar_records(vec![]),
};
assert_eq!(
b.validate(),
Err(Error::from(
"invalid desc: Description { lower: Antichain { elements: [2] }, upper: Antichain { elements: [0] }, since: Antichain { elements: [0] } }"
))
);
let b = BlobTraceBatchPart {
desc: u64_desc(0, 0),
index: 0,
updates: columnar_records(vec![]),
};
assert_eq!(
b.validate(),
Err(Error::from(
"invalid desc: Description { lower: Antichain { elements: [0] }, upper: Antichain { elements: [0] }, since: Antichain { elements: [0] } }"
))
);
let b = BlobTraceBatchPart {
desc: u64_desc(1, 2),
index: 0,
updates: columnar_records(vec![update_with_key(0, "0")]),
};
assert_eq!(b.validate(), Err(Error::from("timestamp 0 is less than the batch lower: Description { lower: Antichain { elements: [1] }, upper: Antichain { elements: [2] }, since: Antichain { elements: [0] } }")));
let b = BlobTraceBatchPart {
desc: u64_desc(1, 2),
index: 0,
updates: columnar_records(vec![update_with_key(2, "0")]),
};
assert_eq!(b.validate(), Err(Error::from("timestamp 2 is greater than or equal to the batch upper: Description { lower: Antichain { elements: [1] }, upper: Antichain { elements: [2] }, since: Antichain { elements: [0] } }")));
let b = BlobTraceBatchPart {
desc: u64_desc_since(1, 2, 4),
index: 0,
updates: columnar_records(vec![update_with_key(2, "0")]),
};
assert_eq!(b.validate(), Ok(()));
let b = BlobTraceBatchPart {
desc: u64_desc_since(1, 2, 4),
index: 0,
updates: columnar_records(vec![update_with_key(4, "0")]),
};
assert_eq!(b.validate(), Ok(()));
let b = BlobTraceBatchPart {
desc: u64_desc_since(1, 2, 4),
index: 0,
updates: columnar_records(vec![update_with_key(5, "0")]),
};
assert_eq!(b.validate(), Ok(()));
let b = BlobTraceBatchPart {
desc: u64_desc(0, 1),
index: 0,
updates: columnar_records(vec![(("0".into(), "0".into()), 0, 0)]),
};
assert_eq!(
b.validate(),
Err(Error::from("update with 0 diff: ((\"0\", \"0\"), 0, 0)"))
);
}
#[mz_ore::test]
fn trace_batch_meta_validate() {
let b = batch_meta(0, 1);
assert_eq!(b.validate(), Ok(()));
let b = batch_meta(0, 0);
assert_eq!(b.validate(),
Err(Error::from(
"invalid desc: Description { lower: Antichain { elements: [0] }, upper: Antichain { elements: [0] }, since: Antichain { elements: [0] } }"
)),
);
let b = batch_meta(2, 0);
assert_eq!(b.validate(),
Err(Error::from(
"invalid desc: Description { lower: Antichain { elements: [2] }, upper: Antichain { elements: [0] }, since: Antichain { elements: [0] } }"
)),
);
}
async fn expect_set_trace_batch<T: Timestamp + Codec64>(
blob: &dyn Blob,
key: &str,
batch: &BlobTraceBatchPart<T>,
) -> u64 {
let mut val = Vec::new();
let metrics = ColumnarMetrics::disconnected();
let config = EncodingConfig::default();
batch.encode(&mut val, &metrics, &config);
let val = Bytes::from(val);
let val_len = u64::cast_from(val.len());
blob.set(key, val).await.expect("failed to set trace batch");
val_len
}
#[mz_ore::test(tokio::test)]
#[cfg_attr(miri, ignore)] async fn trace_batch_meta_validate_data() -> Result<(), Error> {
let metrics = ColumnarMetrics::disconnected();
let blob = Arc::new(MemBlob::open(MemBlobConfig::default()));
let format = ProtoBatchFormat::ParquetKvtd;
let batch_desc = u64_desc_since(0, 3, 0);
let batch0 = BlobTraceBatchPart {
desc: batch_desc.clone(),
index: 0,
updates: columnar_records(vec![
(("k".as_bytes().to_vec(), "v".as_bytes().to_vec()), 2, 1),
(("k3".as_bytes().to_vec(), "v3".as_bytes().to_vec()), 2, 1),
]),
};
let batch1 = BlobTraceBatchPart {
desc: batch_desc.clone(),
index: 1,
updates: columnar_records(vec![
(("k4".as_bytes().to_vec(), "v4".as_bytes().to_vec()), 2, 1),
(("k5".as_bytes().to_vec(), "v5".as_bytes().to_vec()), 2, 1),
]),
};
let batch0_size_bytes = expect_set_trace_batch(blob.as_ref(), "b0", &batch0).await;
let batch1_size_bytes = expect_set_trace_batch(blob.as_ref(), "b1", &batch1).await;
let size_bytes = batch0_size_bytes + batch1_size_bytes;
let batch_meta = TraceBatchMeta {
keys: vec!["b0".into(), "b1".into()],
format,
desc: batch_desc.clone(),
level: 0,
size_bytes,
};
assert_eq!(
batch_meta.validate_data(blob.as_ref(), &metrics).await,
Ok(())
);
let batch_meta = TraceBatchMeta {
keys: vec!["b0".into(), "b1".into()],
format,
desc: u64_desc_since(1, 3, 0),
level: 0,
size_bytes,
};
assert_eq!(batch_meta.validate_data(blob.as_ref(), &metrics).await, Err(Error::from("invalid trace batch part desc expected Description { lower: Antichain { elements: [1] }, upper: Antichain { elements: [3] }, since: Antichain { elements: [0] } } got Description { lower: Antichain { elements: [0] }, upper: Antichain { elements: [3] }, since: Antichain { elements: [0] } }")));
let batch_meta = TraceBatchMeta {
keys: vec!["b0".into(), "b1".into(), "b2".into()],
format,
desc: batch_desc.clone(),
level: 0,
size_bytes,
};
assert_eq!(
batch_meta.validate_data(blob.as_ref(), &metrics).await,
Err(Error::from("no blob for trace batch at key: b2"))
);
let batch_meta = TraceBatchMeta {
keys: vec!["b1".into(), "b0".into()],
format,
desc: batch_desc,
level: 0,
size_bytes,
};
assert_eq!(
batch_meta.validate_data(blob.as_ref(), &metrics).await,
Err(Error::from(
"invalid index for blob trace batch part at key b1 expected 0 got 1"
))
);
Ok(())
}
#[mz_ore::test]
#[cfg_attr(miri, ignore)] fn encoded_batch_sizes() {
fn sizes(data: DataGenerator) -> usize {
let metrics = ColumnarMetrics::disconnected();
let config = EncodingConfig::default();
let updates: Vec<_> = data.batches().collect();
let updates = BlobTraceUpdates::Row(ColumnarRecords::concat(&updates, &metrics));
let trace = BlobTraceBatchPart {
desc: Description::new(
Antichain::from_elem(0u64),
Antichain::new(),
Antichain::from_elem(0u64),
),
index: 0,
updates,
};
let mut trace_buf = Vec::new();
trace.encode(&mut trace_buf, &metrics, &config);
trace_buf.len()
}
let record_size_bytes = DataGenerator::default().record_size_bytes;
assert_eq!(
format!(
"1/1={:?} 25/1={:?} 1000/1={:?} 1000/100={:?}",
sizes(DataGenerator::new(1, record_size_bytes, 1)),
sizes(DataGenerator::new(25, record_size_bytes, 25)),
sizes(DataGenerator::new(1_000, record_size_bytes, 1_000)),
sizes(DataGenerator::new(1_000, record_size_bytes, 1_000 / 100)),
),
"1/1=951 25/1=2702 1000/1=72943 1000/100=72943"
);
}
}