use arrow::array::Array;
use differential_dataflow::Hashable;
use std::borrow::Cow;
use std::collections::{BTreeMap, BTreeSet};
use std::fmt::Debug;
use std::marker::PhantomData;
use std::mem;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::time::Instant;
use uuid::Uuid;
use bytes::Bytes;
use differential_dataflow::difference::Semigroup;
use differential_dataflow::lattice::Lattice;
use differential_dataflow::trace::Description;
use futures_util::stream::StreamExt;
use futures_util::{stream, FutureExt};
use mz_dyncfg::Config;
use mz_ore::cast::CastFrom;
use mz_ore::task::{JoinHandle, JoinHandleExt};
use mz_ore::{instrument, soft_panic_or_log};
use mz_persist::indexed::columnar::{ColumnarRecords, ColumnarRecordsBuilder};
use mz_persist::indexed::encoding::{BatchColumnarFormat, BlobTraceBatchPart, BlobTraceUpdates};
use mz_persist::location::Blob;
use mz_persist_types::arrow::{ArrayBound, ArrayOrd};
use mz_persist_types::parquet::{CompressionFormat, EncodingConfig};
use mz_persist_types::schema::SchemaId;
use mz_persist_types::stats::{trim_to_budget, truncate_bytes, TruncateBound, TRUNCATE_LEN};
use mz_persist_types::{Codec, Codec64};
use mz_proto::RustType;
use mz_timely_util::order::Reverse;
use proptest_derive::Arbitrary;
use semver::Version;
use timely::order::TotalOrder;
use timely::progress::{Antichain, Timestamp};
use timely::PartialOrder;
use tracing::{debug_span, trace_span, warn, Instrument};
use crate::async_runtime::IsolatedRuntime;
use crate::cfg::MiB;
use crate::error::InvalidUsage;
use crate::internal::encoding::{LazyInlineBatchPart, LazyPartStats, LazyProto, Schemas};
use crate::internal::machine::retry_external;
use crate::internal::metrics::{BatchWriteMetrics, Metrics, RetryMetrics, ShardMetrics};
use crate::internal::paths::{PartId, PartialBatchKey, WriterKey};
use crate::internal::state::{
BatchPart, HollowBatch, HollowBatchPart, HollowRun, HollowRunRef, ProtoInlineBatchPart,
RunMeta, RunOrder, RunPart, WRITE_DIFFS_SUM,
};
use crate::stats::{
encode_updates, untrimmable_columns, STATS_BUDGET_BYTES, STATS_COLLECTION_ENABLED,
};
use crate::{PersistConfig, ShardId};
include!(concat!(env!("OUT_DIR"), "/mz_persist_client.batch.rs"));
#[derive(Debug)]
pub struct Batch<K, V, T, D> {
pub(crate) batch_delete_enabled: bool,
pub(crate) metrics: Arc<Metrics>,
pub(crate) shard_metrics: Arc<ShardMetrics>,
pub(crate) version: Version,
pub(crate) batch: HollowBatch<T>,
pub(crate) blob: Arc<dyn Blob>,
pub(crate) _phantom: PhantomData<fn() -> (K, V, T, D)>,
}
impl<K, V, T, D> Drop for Batch<K, V, T, D> {
fn drop(&mut self) {
if self.batch.part_count() > 0 {
warn!(
"un-consumed Batch, with {} parts and dangling blob keys: {:?}",
self.batch.part_count(),
self.batch
.parts
.iter()
.map(|x| x.printable_name())
.collect::<Vec<_>>(),
);
}
}
}
impl<K, V, T, D> Batch<K, V, T, D>
where
K: Debug + Codec,
V: Debug + Codec,
T: Timestamp + Lattice + Codec64,
D: Semigroup + Codec64,
{
pub(crate) fn new(
batch_delete_enabled: bool,
metrics: Arc<Metrics>,
blob: Arc<dyn Blob>,
shard_metrics: Arc<ShardMetrics>,
version: Version,
batch: HollowBatch<T>,
) -> Self {
Self {
batch_delete_enabled,
metrics,
shard_metrics,
version,
batch,
blob,
_phantom: PhantomData,
}
}
pub fn shard_id(&self) -> ShardId {
self.shard_metrics.shard_id
}
pub fn upper(&self) -> &Antichain<T> {
self.batch.desc.upper()
}
pub fn lower(&self) -> &Antichain<T> {
self.batch.desc.lower()
}
pub(crate) fn mark_consumed(&mut self) {
self.batch.parts.clear();
}
#[instrument(level = "debug", fields(shard = %self.shard_id()))]
pub async fn delete(mut self) {
self.mark_consumed();
if !self.batch_delete_enabled {
return;
}
let mut deletes = PartDeletes::default();
for part in self.batch.parts.iter() {
deletes.add(part);
}
let () = deletes
.delete(
&*self.blob,
self.shard_id(),
usize::MAX,
&*self.metrics,
&*self.metrics.retries.external.batch_delete,
)
.await;
}
pub fn schemas(&self) -> impl Iterator<Item = SchemaId> + '_ {
self.batch.parts.iter().flat_map(|b| b.schema_id())
}
pub fn into_hollow_batch(mut self) -> HollowBatch<T> {
let ret = self.batch.clone();
self.mark_consumed();
ret
}
pub fn into_transmittable_batch(mut self) -> ProtoBatch {
let ret = ProtoBatch {
shard_id: self.shard_metrics.shard_id.into_proto(),
version: self.version.to_string(),
batch: Some(self.batch.into_proto()),
};
self.mark_consumed();
ret
}
pub(crate) async fn flush_to_blob(
&mut self,
cfg: &BatchBuilderConfig,
batch_metrics: &BatchWriteMetrics,
isolated_runtime: &Arc<IsolatedRuntime>,
write_schemas: &Schemas<K, V>,
) {
let mut parts = Vec::new();
for part in self.batch.parts.drain(..) {
let (updates, ts_rewrite, schema_id) = match part {
RunPart::Single(BatchPart::Inline {
updates,
ts_rewrite,
schema_id,
}) => (updates, ts_rewrite, schema_id),
other @ RunPart::Many(_) | other @ RunPart::Single(BatchPart::Hollow(_)) => {
parts.push(other);
continue;
}
};
let updates = updates
.decode::<T>(&self.metrics.columnar)
.expect("valid inline part");
let key_lower = updates.key_lower().to_vec();
let diffs_sum =
diffs_sum::<D>(updates.updates.records()).expect("inline parts are not empty");
let mut write_schemas = write_schemas.clone();
write_schemas.id = schema_id;
let write_span =
debug_span!("batch::flush_to_blob", shard = %self.shard_metrics.shard_id)
.or_current();
let handle = mz_ore::task::spawn(
|| "batch::flush_to_blob",
BatchParts::write_hollow_part(
cfg.clone(),
cfg.part_write_columnar_data(),
Arc::clone(&self.blob),
Arc::clone(&self.metrics),
Arc::clone(&self.shard_metrics),
batch_metrics.clone(),
Arc::clone(isolated_runtime),
updates,
key_lower,
ts_rewrite,
D::encode(&diffs_sum),
write_schemas,
)
.instrument(write_span),
);
let part = handle.await.expect("part write task failed");
parts.push(RunPart::Single(part));
}
self.batch.parts = parts;
}
}
impl<K, V, T, D> Batch<K, V, T, D>
where
K: Debug + Codec,
V: Debug + Codec,
T: Timestamp + Lattice + Codec64 + TotalOrder,
D: Semigroup + Codec64,
{
pub fn rewrite_ts(
&mut self,
frontier: &Antichain<T>,
new_upper: Antichain<T>,
) -> Result<(), InvalidUsage<T>> {
self.batch
.rewrite_ts(frontier, new_upper)
.map_err(InvalidUsage::InvalidRewrite)
}
}
#[derive(Debug)]
pub enum Added {
Record,
RecordAndParts,
}
#[derive(Debug, Clone)]
pub struct BatchBuilderConfig {
writer_key: WriterKey,
pub(crate) blob_target_size: usize,
pub(crate) batch_delete_enabled: bool,
pub(crate) batch_builder_max_outstanding_parts: usize,
pub(crate) batch_columnar_format: BatchColumnarFormat,
pub(crate) batch_columnar_format_percent: usize,
pub(crate) inline_writes_single_max_bytes: usize,
pub(crate) stats_collection_enabled: bool,
pub(crate) stats_budget: usize,
pub(crate) stats_untrimmable_columns: Arc<UntrimmableColumns>,
pub(crate) write_diffs_sum: bool,
pub(crate) encoding_config: EncodingConfig,
pub(crate) expected_order: RunOrder,
pub(crate) structured_key_lower_len: usize,
pub(crate) run_length_limit: usize,
}
pub(crate) const BATCH_DELETE_ENABLED: Config<bool> = Config::new(
"persist_batch_delete_enabled",
false,
"Whether to actually delete blobs when batch delete is called (Materialize).",
);
pub(crate) const BATCH_COLUMNAR_FORMAT: Config<&'static str> = Config::new(
"persist_batch_columnar_format",
BatchColumnarFormat::default().as_str(),
"Columnar format for a batch written to Persist, either 'row', 'both', or 'both_v2' (Materialize).",
);
pub(crate) const BATCH_COLUMNAR_FORMAT_PERCENT: Config<usize> = Config::new(
"persist_batch_columnar_format_percent",
0,
"Percent of parts to write using 'persist_batch_columnar_format', falling back to 'row'.",
);
pub(crate) const ENCODING_ENABLE_DICTIONARY: Config<bool> = Config::new(
"persist_encoding_enable_dictionary",
false,
"A feature flag to enable dictionary encoding for Parquet data (Materialize).",
);
pub(crate) const ENCODING_COMPRESSION_FORMAT: Config<&'static str> = Config::new(
"persist_encoding_compression_format",
"none",
"A feature flag to enable compression of Parquet data (Materialize).",
);
pub(crate) const STRUCTURED_ORDER: Config<bool> = Config::new(
"persist_batch_structured_order",
false,
"If enabled, output compaction batches in structured-data order.",
);
pub(crate) const STRUCTURED_ORDER_UNTIL_SHARD: Config<&'static str> = Config::new(
"persist_batch_structured_order_from_shard",
"sz",
"Restrict shards using structured ordering to those shards with formatted ids less than \
the given string. (For example, `s0` will disable it for all shards, `s8` will enable it for \
half of all shards, `s8888` will enable it for slightly more shards, and `sz` will enable it \
for everyone.)",
);
pub(crate) const STRUCTURED_KEY_LOWER_LEN: Config<usize> = Config::new(
"persist_batch_structured_key_lower_len",
0,
"The maximum size in proto bytes of any structured key-lower metadata to preserve. \
(If we're unable to fit the lower in budget, or the budget is zero, no metadata is kept.)",
);
pub(crate) const MAX_RUN_LEN: Config<usize> = Config::new(
"persist_batch_max_run_len",
usize::MAX,
"The maximum length a run can have before it will be spilled as a hollow run \
into the blob store.",
);
pub(crate) const BLOB_TARGET_SIZE: Config<usize> = Config::new(
"persist_blob_target_size",
128 * MiB,
"A target maximum size of persist blob payloads in bytes (Materialize).",
);
pub(crate) const INLINE_WRITES_SINGLE_MAX_BYTES: Config<usize> = Config::new(
"persist_inline_writes_single_max_bytes",
4096,
"The (exclusive) maximum size of a write that persist will inline in metadata.",
);
pub(crate) const INLINE_WRITES_TOTAL_MAX_BYTES: Config<usize> = Config::new(
"persist_inline_writes_total_max_bytes",
1 * MiB,
"\
The (exclusive) maximum total size of inline writes in metadata before \
persist will backpressure them by flushing out to s3.",
);
impl BatchBuilderConfig {
pub fn new(value: &PersistConfig, shard_id: ShardId, expect_consolidated: bool) -> Self {
let writer_key = WriterKey::for_version(&value.build_version);
let batch_columnar_format =
BatchColumnarFormat::from_str(&BATCH_COLUMNAR_FORMAT.get(value));
let batch_columnar_format_percent = BATCH_COLUMNAR_FORMAT_PERCENT.get(value);
let structured_order = STRUCTURED_ORDER.get(value) && {
shard_id.to_string() < STRUCTURED_ORDER_UNTIL_SHARD.get(value)
};
let expected_order = if expect_consolidated {
if structured_order {
RunOrder::Structured
} else {
RunOrder::Codec
}
} else {
RunOrder::Unordered
};
BatchBuilderConfig {
writer_key,
blob_target_size: BLOB_TARGET_SIZE.get(value),
batch_delete_enabled: BATCH_DELETE_ENABLED.get(value),
batch_builder_max_outstanding_parts: value
.dynamic
.batch_builder_max_outstanding_parts(),
batch_columnar_format,
batch_columnar_format_percent,
inline_writes_single_max_bytes: INLINE_WRITES_SINGLE_MAX_BYTES.get(value),
stats_collection_enabled: STATS_COLLECTION_ENABLED.get(value),
stats_budget: STATS_BUDGET_BYTES.get(value),
stats_untrimmable_columns: Arc::new(untrimmable_columns(value)),
write_diffs_sum: WRITE_DIFFS_SUM.get(value),
encoding_config: EncodingConfig {
use_dictionary: ENCODING_ENABLE_DICTIONARY.get(value),
compression: CompressionFormat::from_str(&ENCODING_COMPRESSION_FORMAT.get(value)),
},
expected_order,
structured_key_lower_len: STRUCTURED_KEY_LOWER_LEN.get(value),
run_length_limit: MAX_RUN_LEN.get(value).clamp(2, usize::MAX),
}
}
fn part_write_columnar_data(&self) -> bool {
let rand = || usize::cast_from(Uuid::new_v4().hashed()) % 100;
self.batch_columnar_format.is_structured() && self.batch_columnar_format_percent > rand()
}
fn run_meta(&self, order: RunOrder, schema: Option<SchemaId>) -> RunMeta {
RunMeta {
order: Some(order),
schema,
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize, Arbitrary)]
pub(crate) struct UntrimmableColumns {
pub equals: Vec<Cow<'static, str>>,
pub prefixes: Vec<Cow<'static, str>>,
pub suffixes: Vec<Cow<'static, str>>,
}
impl UntrimmableColumns {
pub(crate) fn should_retain(&self, name: &str) -> bool {
let name_lower = name.to_lowercase();
for s in &self.equals {
if *s == name_lower {
return true;
}
}
for s in &self.prefixes {
if name_lower.starts_with(s.as_ref()) {
return true;
}
}
for s in &self.suffixes {
if name_lower.ends_with(s.as_ref()) {
return true;
}
}
false
}
}
#[derive(Debug)]
pub struct BatchBuilder<K, V, T, D>
where
K: Codec,
V: Codec,
T: Timestamp + Lattice + Codec64,
{
pub(crate) metrics: Arc<Metrics>,
pub(crate) key_buf: Vec<u8>,
pub(crate) val_buf: Vec<u8>,
pub(crate) builder: BatchBuilderInternal<K, V, T, D>,
}
impl<K, V, T, D> BatchBuilder<K, V, T, D>
where
K: Debug + Codec,
V: Debug + Codec,
T: Timestamp + Lattice + Codec64,
D: Semigroup + Codec64,
{
pub async fn finish(
self,
registered_upper: Antichain<T>,
) -> Result<Batch<K, V, T, D>, InvalidUsage<T>> {
self.builder.finish(registered_upper).await
}
pub async fn add(
&mut self,
key: &K,
val: &V,
ts: &T,
diff: &D,
) -> Result<Added, InvalidUsage<T>> {
self.metrics
.codecs
.key
.encode(|| K::encode(key, &mut self.key_buf));
self.metrics
.codecs
.val
.encode(|| V::encode(val, &mut self.val_buf));
validate_schema(
&self.builder.write_schemas,
&self.key_buf,
&self.val_buf,
Some(key),
Some(val),
);
let result = self
.builder
.add(&self.key_buf, &self.val_buf, ts, diff)
.await;
self.key_buf.clear();
self.val_buf.clear();
result
}
}
#[derive(Debug)]
pub(crate) struct BatchBuilderInternal<K, V, T, D>
where
K: Codec,
V: Codec,
T: Timestamp + Lattice + Codec64,
{
lower: Antichain<T>,
inclusive_upper: Antichain<Reverse<T>>,
shard_id: ShardId,
version: Version,
blob: Arc<dyn Blob>,
metrics: Arc<Metrics>,
write_schemas: Schemas<K, V>,
buffer: BatchBuffer,
num_updates: usize,
parts: BatchParts<T>,
since: Antichain<T>,
inline_upper: Antichain<T>,
_phantom: PhantomData<fn(K, V, T, D)>,
}
impl<K, V, T, D> BatchBuilderInternal<K, V, T, D>
where
K: Debug + Codec,
V: Debug + Codec,
T: Timestamp + Lattice + Codec64,
D: Semigroup + Codec64,
{
pub(crate) fn new(
cfg: BatchBuilderConfig,
metrics: Arc<Metrics>,
write_schemas: Schemas<K, V>,
shard_metrics: Arc<ShardMetrics>,
batch_write_metrics: BatchWriteMetrics,
lower: Antichain<T>,
blob: Arc<dyn Blob>,
isolated_runtime: Arc<IsolatedRuntime>,
shard_id: ShardId,
version: Version,
since: Antichain<T>,
inline_upper: Option<Antichain<T>>,
) -> Self {
let parts = BatchParts::new(
cfg.clone(),
Arc::clone(&metrics),
shard_metrics,
shard_id,
lower.clone(),
Arc::clone(&blob),
isolated_runtime,
&batch_write_metrics,
);
Self {
lower,
inclusive_upper: Antichain::new(),
blob,
buffer: BatchBuffer::new(Arc::clone(&metrics), cfg.blob_target_size),
metrics,
write_schemas,
num_updates: 0,
parts,
shard_id,
version,
since,
inline_upper: inline_upper.unwrap_or_else(|| Antichain::new()),
_phantom: PhantomData,
}
}
#[instrument(level = "debug", name = "batch::finish", fields(shard = %self.shard_id))]
pub async fn finish(
mut self,
registered_upper: Antichain<T>,
) -> Result<Batch<K, V, T, D>, InvalidUsage<T>> {
if PartialOrder::less_than(®istered_upper, &self.lower) {
return Err(InvalidUsage::InvalidBounds {
lower: self.lower.clone(),
upper: registered_upper,
});
}
if PartialOrder::less_equal(&self.since, &self.lower) {
for ts in self.inclusive_upper.iter() {
if registered_upper.less_equal(&ts.0) {
return Err(InvalidUsage::UpdateBeyondUpper {
ts: ts.0.clone(),
expected_upper: registered_upper.clone(),
});
}
}
}
let remainder = self.buffer.drain();
self.flush_part(remainder).await;
let run_meta = self
.parts
.cfg
.run_meta(self.parts.cfg.expected_order, self.write_schemas.id);
let batch_delete_enabled = self.parts.cfg.batch_delete_enabled;
let shard_metrics = Arc::clone(&self.parts.shard_metrics);
let expected_order = self.parts.cfg.expected_order;
let parts = self.parts.finish().await;
let desc = Description::new(self.lower, registered_upper, self.since);
let (runs, run_meta) = if parts.is_empty() {
(vec![], vec![])
} else {
if expected_order == RunOrder::Unordered {
((1..parts.len()).collect(), vec![run_meta; parts.len()])
} else {
(vec![], vec![run_meta])
}
};
let batch = Batch::new(
batch_delete_enabled,
Arc::clone(&self.metrics),
self.blob,
shard_metrics,
self.version,
HollowBatch::new(desc, parts, self.num_updates, run_meta, runs),
);
Ok(batch)
}
pub async fn add(
&mut self,
key: &[u8],
val: &[u8],
ts: &T,
diff: &D,
) -> Result<Added, InvalidUsage<T>> {
if !self.lower.less_equal(ts) {
return Err(InvalidUsage::UpdateNotBeyondLower {
ts: ts.clone(),
lower: self.lower.clone(),
});
}
self.inclusive_upper.insert(Reverse(ts.clone()));
match self.buffer.push(key, val, ts.clone(), diff.clone()) {
Some(part_to_flush) => {
self.flush_part(part_to_flush).await;
Ok(Added::RecordAndParts)
}
None => Ok(Added::Record),
}
}
pub async fn flush_many(&mut self, updates: BlobTraceUpdates) -> Result<(), InvalidUsage<T>> {
for ts in updates.records().timestamps().iter().flatten() {
let ts = T::decode(ts.to_le_bytes());
if !self.lower.less_equal(&ts) {
return Err(InvalidUsage::UpdateNotBeyondLower {
ts,
lower: self.lower.clone(),
});
}
self.inclusive_upper.insert(Reverse(ts));
}
let previous = self.buffer.drain();
self.flush_part(previous).await;
self.flush_part(updates).await;
Ok(())
}
async fn flush_part(&mut self, columnar: BlobTraceUpdates) {
let key_lower = {
let key_bytes = columnar.records().keys();
if key_bytes.is_empty() {
&[]
} else if self.parts.cfg.expected_order == RunOrder::Codec {
key_bytes.value(0)
} else {
::arrow::compute::min_binary(key_bytes).expect("min of nonempty array")
}
};
let key_lower = truncate_bytes(key_lower, TRUNCATE_LEN, TruncateBound::Lower)
.expect("lower bound always exists");
let num_updates = columnar.len();
if num_updates == 0 {
return;
}
let diffs_sum = diffs_sum::<D>(columnar.records()).expect("part is non empty");
let start = Instant::now();
self.parts
.write(
&self.write_schemas,
key_lower,
columnar,
self.inline_upper.clone(),
self.since.clone(),
diffs_sum,
)
.await;
self.metrics
.compaction
.batch
.step_part_writing
.inc_by(start.elapsed().as_secs_f64());
self.num_updates += num_updates;
}
}
pub(crate) fn validate_schema<K: Codec, V: Codec>(
stats_schemas: &Schemas<K, V>,
key: &[u8],
val: &[u8],
decoded_key: Option<&K>,
decoded_val: Option<&V>,
) {
if !mz_ore::assert::SOFT_ASSERTIONS.load(Ordering::Relaxed) {
return;
}
let key_valid = match decoded_key {
Some(key) => K::validate(key, &stats_schemas.key),
None => {
let key = K::decode(key, &stats_schemas.key).expect("valid encoded key");
K::validate(&key, &stats_schemas.key)
}
};
let () = key_valid
.unwrap_or_else(|err| panic!("constructing batch with mismatched key schema: {}", err));
let val_valid = match decoded_val {
Some(val) => V::validate(val, &stats_schemas.val),
None => {
let val = V::decode(val, &stats_schemas.val).expect("valid encoded val");
V::validate(&val, &stats_schemas.val)
}
};
let () = val_valid
.unwrap_or_else(|err| panic!("constructing batch with mismatched val schema: {}", err));
}
#[derive(Debug)]
struct BatchBuffer {
metrics: Arc<Metrics>,
blob_target_size: usize,
records_builder: ColumnarRecordsBuilder,
}
impl BatchBuffer {
fn new(metrics: Arc<Metrics>, blob_target_size: usize) -> Self {
BatchBuffer {
metrics,
blob_target_size,
records_builder: ColumnarRecordsBuilder::default(),
}
}
fn push<T: Codec64, D: Codec64>(
&mut self,
key: &[u8],
val: &[u8],
ts: T,
diff: D,
) -> Option<BlobTraceUpdates> {
let update = ((key, val), ts.encode(), diff.encode());
assert!(
self.records_builder.push(update),
"single update overflowed an i32"
);
if self.records_builder.total_bytes() >= self.blob_target_size {
Some(self.drain())
} else {
None
}
}
fn drain(&mut self) -> BlobTraceUpdates {
let builder = mem::take(&mut self.records_builder);
let records = builder.finish(&self.metrics.columnar);
assert_eq!(self.records_builder.len(), 0);
BlobTraceUpdates::Row(records)
}
}
#[derive(Debug)]
enum Pending<T> {
Writing(JoinHandle<T>),
Blocking,
Finished(T),
}
impl<T: Send + 'static> Pending<T> {
fn new(handle: JoinHandle<T>) -> Self {
Self::Writing(handle)
}
fn is_finished(&self) -> bool {
matches!(self, Self::Finished(_))
}
async fn into_result(self) -> T {
match self {
Pending::Writing(h) => h.wait_and_assert_finished().await,
Pending::Blocking => panic!("block_until_ready cancelled?"),
Pending::Finished(t) => t,
}
}
async fn block_until_ready(&mut self) {
let pending = mem::replace(self, Self::Blocking);
let value = pending.into_result().await;
*self = Pending::Finished(value);
}
}
#[derive(Debug)]
pub(crate) struct BatchParts<T> {
cfg: BatchBuilderConfig,
metrics: Arc<Metrics>,
shard_metrics: Arc<ShardMetrics>,
shard_id: ShardId,
lower: Antichain<T>,
blob: Arc<dyn Blob>,
isolated_runtime: Arc<IsolatedRuntime>,
next_index: u64,
writing_runs: Vec<Vec<Pending<RunPart<T>>>>,
batch_metrics: BatchWriteMetrics,
}
impl<T: Timestamp + Codec64> BatchParts<T> {
pub(crate) fn new(
cfg: BatchBuilderConfig,
metrics: Arc<Metrics>,
shard_metrics: Arc<ShardMetrics>,
shard_id: ShardId,
lower: Antichain<T>,
blob: Arc<dyn Blob>,
isolated_runtime: Arc<IsolatedRuntime>,
batch_metrics: &BatchWriteMetrics,
) -> Self {
BatchParts {
cfg,
metrics,
shard_metrics,
shard_id,
lower,
blob,
isolated_runtime,
next_index: 0,
writing_runs: vec![vec![]],
batch_metrics: batch_metrics.clone(),
}
}
pub(crate) async fn write<K: Codec, V: Codec, D: Codec64>(
&mut self,
write_schemas: &Schemas<K, V>,
key_lower: Vec<u8>,
mut updates: BlobTraceUpdates,
upper: Antichain<T>,
since: Antichain<T>,
diffs_sum: D,
) {
let desc = Description::new(self.lower.clone(), upper, since);
let batch_metrics = self.batch_metrics.clone();
let index = self.next_index;
self.next_index += 1;
let ts_rewrite = None;
let schema_id = write_schemas.id;
let part_write_columnar_data = self.cfg.part_write_columnar_data();
let inline_threshold = if part_write_columnar_data {
self.cfg.inline_writes_single_max_bytes.saturating_div(2)
} else {
self.cfg.inline_writes_single_max_bytes
};
let handle = if updates.goodbytes() < inline_threshold {
let metrics = Arc::clone(&self.metrics);
let write_schemas = write_schemas.clone();
let span = debug_span!("batch::inline_part", shard = %self.shard_id).or_current();
mz_ore::task::spawn(
|| "batch::inline_part",
async move {
let updates = if part_write_columnar_data {
let records = updates.records().clone();
let structured = metrics
.columnar
.arrow()
.measure_part_build(|| {
updates.get_or_make_structured::<K, V>(
write_schemas.key.as_ref(),
write_schemas.val.as_ref(),
)
})
.clone();
BlobTraceUpdates::Both(records, structured)
} else {
let records = updates.records().clone();
BlobTraceUpdates::Row(records)
};
let start = Instant::now();
let updates = LazyInlineBatchPart::from(&ProtoInlineBatchPart {
desc: Some(desc.into_proto()),
index: index.into_proto(),
updates: Some(updates.into_proto()),
});
batch_metrics
.step_inline
.inc_by(start.elapsed().as_secs_f64());
RunPart::Single(BatchPart::Inline {
updates,
ts_rewrite,
schema_id,
})
}
.instrument(span),
)
} else {
let part = BlobTraceBatchPart {
desc,
updates,
index,
};
let write_span =
debug_span!("batch::write_part", shard = %self.shard_metrics.shard_id).or_current();
mz_ore::task::spawn(
|| "batch::write_part",
BatchParts::write_hollow_part(
self.cfg.clone(),
part_write_columnar_data,
Arc::clone(&self.blob),
Arc::clone(&self.metrics),
Arc::clone(&self.shard_metrics),
batch_metrics.clone(),
Arc::clone(&self.isolated_runtime),
part,
key_lower,
ts_rewrite,
D::encode(&diffs_sum),
write_schemas.clone(),
)
.map(RunPart::Single)
.instrument(write_span),
)
};
self.push_part(Pending::new(handle), 0).await;
}
async fn write_hollow_part<K: Codec, V: Codec>(
cfg: BatchBuilderConfig,
part_write_columnar_data: bool,
blob: Arc<dyn Blob>,
metrics: Arc<Metrics>,
shard_metrics: Arc<ShardMetrics>,
batch_metrics: BatchWriteMetrics,
isolated_runtime: Arc<IsolatedRuntime>,
mut updates: BlobTraceBatchPart<T>,
key_lower: Vec<u8>,
ts_rewrite: Option<Antichain<T>>,
diffs_sum: [u8; 8],
write_schemas: Schemas<K, V>,
) -> BatchPart<T> {
let partial_key = PartialBatchKey::new(&cfg.writer_key, &PartId::new());
let key = partial_key.complete(&shard_metrics.shard_id);
let goodbytes = updates.updates.records().goodbytes();
let metrics_ = Arc::clone(&metrics);
let schema_id = write_schemas.id;
let (stats, structured_key_lower, (buf, encode_time)) = isolated_runtime
.spawn_named(|| "batch::encode_part", async move {
let stats = 'collect_stats: {
if cfg.stats_collection_enabled || cfg.batch_columnar_format.is_structured() {
let result = metrics_.columnar.arrow().measure_part_build(|| {
encode_updates(&write_schemas, &updates.updates)
});
let Ok((extended_cols, stats)) = result else {
soft_panic_or_log!("failed to encode in columnar format! {:?}", result);
break 'collect_stats None;
};
if let BlobTraceUpdates::Row(record) = &updates.updates {
if let Some(record_ext) = extended_cols {
if part_write_columnar_data {
updates.updates =
BlobTraceUpdates::Both(record.clone(), record_ext);
}
}
}
if cfg.stats_collection_enabled {
let trimmed_start = Instant::now();
let mut trimmed_bytes = 0;
let trimmed_stats = LazyPartStats::encode(&stats, |s| {
trimmed_bytes = trim_to_budget(s, cfg.stats_budget, |s| {
cfg.stats_untrimmable_columns.should_retain(s)
})
});
let trimmed_duration = trimmed_start.elapsed();
Some((trimmed_stats, trimmed_duration, trimmed_bytes))
} else {
None
}
} else {
None
}
};
let structured_key_lower = if cfg.structured_key_lower_len > 0 {
updates.updates.structured().and_then(|ext| {
let min_key = if cfg.expected_order == RunOrder::Structured {
0
} else {
let ord = ArrayOrd::new(ext.key.as_ref());
(0..ext.key.len())
.min_by_key(|i| ord.at(*i))
.expect("non-empty batch")
};
let lower = ArrayBound::new(Arc::clone(&ext.key), min_key)
.to_proto_lower(cfg.structured_key_lower_len);
if lower.is_none() {
batch_metrics.key_lower_too_big.inc()
}
lower.map(|proto| LazyProto::from(&proto))
})
} else {
None
};
let encode_start = Instant::now();
let mut buf = Vec::new();
updates.encode(&mut buf, &metrics_.columnar, &cfg.encoding_config);
drop(updates);
(
stats,
structured_key_lower,
(Bytes::from(buf), encode_start.elapsed()),
)
})
.instrument(debug_span!("batch::encode_part"))
.await
.expect("part encode task failed");
metrics.codecs.batch.encode_count.inc();
metrics
.codecs
.batch
.encode_seconds
.inc_by(encode_time.as_secs_f64());
let start = Instant::now();
let payload_len = buf.len();
let () = retry_external(&metrics.retries.external.batch_set, || async {
shard_metrics.blob_sets.inc();
blob.set(&key, Bytes::clone(&buf)).await
})
.instrument(trace_span!("batch::set", payload_len))
.await;
batch_metrics.seconds.inc_by(start.elapsed().as_secs_f64());
batch_metrics.bytes.inc_by(u64::cast_from(payload_len));
batch_metrics.goodbytes.inc_by(u64::cast_from(goodbytes));
match cfg.expected_order {
RunOrder::Unordered => batch_metrics.unordered.inc(),
RunOrder::Codec => batch_metrics.codec_order.inc(),
RunOrder::Structured => batch_metrics.structured_order.inc(),
}
let stats = stats.map(|(stats, stats_step_timing, trimmed_bytes)| {
batch_metrics
.step_stats
.inc_by(stats_step_timing.as_secs_f64());
if trimmed_bytes > 0 {
metrics.pushdown.parts_stats_trimmed_count.inc();
metrics
.pushdown
.parts_stats_trimmed_bytes
.inc_by(u64::cast_from(trimmed_bytes));
}
stats
});
BatchPart::Hollow(HollowBatchPart {
key: partial_key,
encoded_size_bytes: payload_len,
key_lower,
structured_key_lower,
stats,
ts_rewrite,
diffs_sum: cfg.write_diffs_sum.then_some(diffs_sum),
format: Some(cfg.batch_columnar_format),
schema_id,
})
}
async fn push_part(&mut self, mut part: Pending<RunPart<T>>, mut depth: usize) {
loop {
if depth >= self.writing_runs.len() {
self.writing_runs.resize_with(depth + 1, Vec::new);
}
let parts = &mut self.writing_runs[depth];
parts.push(part);
for part in parts
.iter_mut()
.rev()
.skip(self.cfg.batch_builder_max_outstanding_parts)
.take_while(|p| !p.is_finished())
{
self.batch_metrics.write_stalls.inc();
part.block_until_ready().await;
}
if parts.len() <= self.cfg.run_length_limit
|| self.cfg.expected_order == RunOrder::Unordered
{
break;
}
let parts: Vec<_> = parts.drain(0..self.cfg.run_length_limit).collect();
let shard_id = self.shard_id;
let blob = Arc::clone(&self.blob);
let writer_key = self.cfg.writer_key.clone();
let metrics = Arc::clone(&self.metrics);
let handle = mz_ore::task::spawn(
|| "batch::inline_part",
async move {
let parts = stream::iter(parts)
.then(|p| p.into_result())
.collect()
.await;
let run_ref = HollowRunRef::set(
shard_id,
blob.as_ref(),
&writer_key,
HollowRun { parts },
&*metrics,
)
.await;
RunPart::Many(run_ref)
}
.instrument(debug_span!("batch::spill_run")),
);
part = Pending::new(handle);
depth += 1;
}
}
#[instrument(level = "debug", name = "batch::finish_upload", fields(shard = %self.shard_id))]
pub(crate) async fn finish(mut self) -> Vec<RunPart<T>> {
for depth in 0.. {
if depth + 1 == self.writing_runs.len() {
break;
}
for part in mem::take(&mut self.writing_runs[depth]) {
self.push_part(part, depth + 1).await;
}
}
let parts = self.writing_runs.pop().expect("at least one level");
assert!(
self.writing_runs.iter().all(|r| r.is_empty()),
"all parts should be in the last run"
);
let mut output = Vec::with_capacity(parts.len());
for part in parts {
output.push(part.into_result().await);
}
output
}
}
pub(crate) fn validate_truncate_batch<T: Timestamp>(
batch: &HollowBatch<T>,
truncate: &Description<T>,
any_batch_rewrite: bool,
) -> Result<(), InvalidUsage<T>> {
if any_batch_rewrite {
if truncate.upper() != batch.desc.upper() {
return Err(InvalidUsage::InvalidRewrite(format!(
"rewritten batch might have data past {:?} up to {:?}",
truncate.upper().elements(),
batch.desc.upper().elements(),
)));
}
for part in batch.parts.iter() {
let part_lower_bound = part.ts_rewrite().unwrap_or(batch.desc.lower());
if !PartialOrder::less_equal(truncate.lower(), part_lower_bound) {
return Err(InvalidUsage::InvalidRewrite(format!(
"rewritten batch might have data below {:?} at {:?}",
truncate.lower().elements(),
part_lower_bound.elements(),
)));
}
}
}
let batch = &batch.desc;
if !PartialOrder::less_equal(batch.lower(), truncate.lower())
|| PartialOrder::less_than(batch.upper(), truncate.upper())
{
return Err(InvalidUsage::InvalidBatchBounds {
batch_lower: batch.lower().clone(),
batch_upper: batch.upper().clone(),
append_lower: truncate.lower().clone(),
append_upper: truncate.upper().clone(),
});
}
Ok(())
}
#[derive(Debug)]
pub(crate) struct PartDeletes<T> {
blob_keys: BTreeSet<PartialBatchKey>,
hollow_runs: BTreeMap<PartialBatchKey, HollowRunRef<T>>,
}
impl<T> Default for PartDeletes<T> {
fn default() -> Self {
Self {
blob_keys: Default::default(),
hollow_runs: Default::default(),
}
}
}
impl<T: Timestamp> PartDeletes<T> {
pub fn add(&mut self, part: &RunPart<T>) -> bool {
match part {
RunPart::Many(r) => self.hollow_runs.insert(r.key.clone(), r.clone()).is_none(),
RunPart::Single(BatchPart::Hollow(x)) => self.blob_keys.insert(x.key.clone()),
RunPart::Single(BatchPart::Inline { .. }) => {
true
}
}
}
pub fn contains(&self, part: &RunPart<T>) -> bool {
match part {
RunPart::Many(r) => self.hollow_runs.contains_key(&r.key),
RunPart::Single(BatchPart::Hollow(x)) => self.blob_keys.contains(&x.key),
RunPart::Single(BatchPart::Inline { .. }) => false,
}
}
pub fn is_empty(&self) -> bool {
self.len() == 0
}
pub fn len(&self) -> usize {
match self {
Self {
blob_keys,
hollow_runs,
} => blob_keys.len() + hollow_runs.len(),
}
}
pub async fn delete(
mut self,
blob: &dyn Blob,
shard_id: ShardId,
concurrency: usize,
metrics: &Metrics,
delete_metrics: &RetryMetrics,
) where
T: Codec64,
{
loop {
let () = stream::iter(mem::take(&mut self.blob_keys))
.map(|key| {
let key = key.complete(&shard_id);
async move {
retry_external(delete_metrics, || blob.delete(&key)).await;
}
})
.buffer_unordered(concurrency)
.collect()
.await;
let Some((run_key, run_ref)) = self.hollow_runs.pop_first() else {
break;
};
if let Some(run) = run_ref.get(shard_id, blob, metrics).await {
for part in &run.parts {
self.add(part);
}
self.blob_keys.insert(run_key);
};
}
}
}
fn diffs_sum<D: Semigroup + Codec64>(updates: &ColumnarRecords) -> Option<D> {
let mut sum = None;
for (_kv, _t, d) in updates.iter() {
let d = D::decode(d);
match &mut sum {
None => sum = Some(d),
Some(x) => x.plus_equals(&d),
}
}
sum
}
#[cfg(test)]
mod tests {
use itertools::Itertools;
use mz_dyncfg::ConfigUpdates;
use timely::order::Product;
use crate::cache::PersistClientCache;
use crate::internal::paths::{BlobKey, PartialBlobKey};
use crate::tests::{all_ok, new_test_client};
use crate::PersistLocation;
use super::*;
#[mz_ore::test(tokio::test)]
#[cfg_attr(miri, ignore)] async fn batch_builder_flushing() {
let data = vec![
(("1".to_owned(), "one".to_owned()), 1, 1),
(("2".to_owned(), "two".to_owned()), 2, 1),
(("3".to_owned(), "three".to_owned()), 3, 1),
(("4".to_owned(), "four".to_owned()), 4, 1),
];
let cache = PersistClientCache::new_no_metrics();
cache.cfg.set_config(&BLOB_TARGET_SIZE, 0);
cache.cfg.dynamic.set_batch_builder_max_outstanding_parts(2);
cache.cfg.set_config(&MAX_RUN_LEN, 3);
let client = cache
.open(PersistLocation::new_in_mem())
.await
.expect("client construction failed");
let (mut write, mut read) = client
.expect_open::<String, String, u64, i64>(ShardId::new())
.await;
let builder = write.builder(Antichain::from_elem(0));
let mut builder = builder.builder;
builder.parts.cfg.expected_order = RunOrder::Codec;
fn assert_writing(
builder: &BatchBuilderInternal<String, String, u64, i64>,
expected_finished: &[&[bool]],
) {
for (depth, (actual, expected)) in builder
.parts
.writing_runs
.iter()
.zip_eq(expected_finished)
.enumerate()
{
let actual: Vec<_> = actual.iter().map(|p| p.is_finished()).collect();
assert_eq!(*expected, actual, "run at depth {depth} should match");
}
}
assert_writing(&builder, &[&[]]);
let ((k, v), t, d) = &data[0];
let key = k.encode_to_vec();
let val = v.encode_to_vec();
builder.add(&key, &val, t, d).await.expect("invalid usage");
assert_writing(&builder, &[&[false]]);
let ((k, v), t, d) = &data[1];
let key = k.encode_to_vec();
let val = v.encode_to_vec();
builder.add(&key, &val, t, d).await.expect("invalid usage");
assert_writing(&builder, &[&[false, false]]);
let ((k, v), t, d) = &data[2];
let key = k.encode_to_vec();
let val = v.encode_to_vec();
builder.add(&key, &val, t, d).await.expect("invalid usage");
assert_writing(&builder, &[&[true, false, false]]);
let ((k, v), t, d) = &data[3];
let key = k.encode_to_vec();
let val = v.encode_to_vec();
builder.add(&key, &val, t, d).await.expect("invalid usage");
assert_writing(&builder, &[&[false], &[false]]);
let batch = builder
.finish(Antichain::from_elem(5))
.await
.expect("invalid usage");
assert_eq!(batch.batch.part_count(), 2);
write
.append_batch(batch, Antichain::from_elem(0), Antichain::from_elem(5))
.await
.expect("invalid usage")
.expect("unexpected upper");
assert_eq!(read.expect_snapshot_and_fetch(4).await, all_ok(&data, 4));
}
#[mz_ore::test(tokio::test)]
#[cfg_attr(miri, ignore)] async fn batch_builder_keys() {
let cache = PersistClientCache::new_no_metrics();
cache.cfg.set_config(&BLOB_TARGET_SIZE, 0);
cache.cfg.set_config(&STRUCTURED_KEY_LOWER_LEN, 0);
cache.cfg.set_config(&INLINE_WRITES_SINGLE_MAX_BYTES, 0);
cache.cfg.set_config(&INLINE_WRITES_TOTAL_MAX_BYTES, 0);
let client = cache
.open(PersistLocation::new_in_mem())
.await
.expect("client construction failed");
let shard_id = ShardId::new();
let (mut write, _) = client
.expect_open::<String, String, u64, i64>(shard_id)
.await;
let batch = write
.expect_batch(
&[
(("1".into(), "one".into()), 1, 1),
(("2".into(), "two".into()), 2, 1),
(("3".into(), "three".into()), 3, 1),
],
0,
4,
)
.await;
assert_eq!(batch.batch.part_count(), 3);
for part in &batch.batch.parts {
let part = part.expect_hollow_part();
match BlobKey::parse_ids(&part.key.complete(&shard_id)) {
Ok((shard, PartialBlobKey::Batch(writer, _))) => {
assert_eq!(shard.to_string(), shard_id.to_string());
assert_eq!(writer, WriterKey::for_version(&cache.cfg.build_version));
}
_ => panic!("unparseable blob key"),
}
}
}
#[mz_ore::test(tokio::test)]
#[cfg_attr(miri, ignore)] async fn batch_builder_partial_order() {
let cache = PersistClientCache::new_no_metrics();
cache.cfg.set_config(&BLOB_TARGET_SIZE, 0);
cache.cfg.set_config(&STRUCTURED_KEY_LOWER_LEN, 0);
cache.cfg.set_config(&INLINE_WRITES_SINGLE_MAX_BYTES, 0);
cache.cfg.set_config(&INLINE_WRITES_TOTAL_MAX_BYTES, 0);
let client = cache
.open(PersistLocation::new_in_mem())
.await
.expect("client construction failed");
let shard_id = ShardId::new();
let (mut write, _) = client
.expect_open::<String, String, Product<u32, u32>, i64>(shard_id)
.await;
let batch = write
.batch(
&[
(("1".to_owned(), "one".to_owned()), Product::new(0, 10), 1),
(("2".to_owned(), "two".to_owned()), Product::new(10, 0), 1),
],
Antichain::from_elem(Product::new(0, 0)),
Antichain::from_iter([Product::new(0, 11), Product::new(10, 1)]),
)
.await
.expect("invalid usage");
assert_eq!(batch.batch.part_count(), 2);
for part in &batch.batch.parts {
let part = part.expect_hollow_part();
match BlobKey::parse_ids(&part.key.complete(&shard_id)) {
Ok((shard, PartialBlobKey::Batch(writer, _))) => {
assert_eq!(shard.to_string(), shard_id.to_string());
assert_eq!(writer, WriterKey::for_version(&cache.cfg.build_version));
}
_ => panic!("unparseable blob key"),
}
}
}
#[mz_ore::test]
fn untrimmable_columns() {
let untrimmable = UntrimmableColumns {
equals: vec!["abc".into(), "def".into()],
prefixes: vec!["123".into(), "234".into()],
suffixes: vec!["xyz".into()],
};
assert!(untrimmable.should_retain("abc"));
assert!(untrimmable.should_retain("ABC"));
assert!(untrimmable.should_retain("aBc"));
assert!(!untrimmable.should_retain("abcd"));
assert!(untrimmable.should_retain("deF"));
assert!(!untrimmable.should_retain("defg"));
assert!(untrimmable.should_retain("123"));
assert!(untrimmable.should_retain("123-4"));
assert!(untrimmable.should_retain("1234"));
assert!(untrimmable.should_retain("234"));
assert!(!untrimmable.should_retain("345"));
assert!(untrimmable.should_retain("ijk_xyZ"));
assert!(untrimmable.should_retain("ww-XYZ"));
assert!(!untrimmable.should_retain("xya"));
}
#[mz_persist_proc::test(tokio::test)]
#[cfg_attr(miri, ignore)] async fn rewrite_ts_example(dyncfgs: ConfigUpdates) {
let client = new_test_client(&dyncfgs).await;
let (mut write, read) = client
.expect_open::<String, (), u64, i64>(ShardId::new())
.await;
let mut batch = write.builder(Antichain::from_elem(0));
batch.add(&"foo".to_owned(), &(), &0, &1).await.unwrap();
let batch = batch.finish(Antichain::from_elem(1)).await.unwrap();
let batch = batch.into_transmittable_batch();
let mut batch = write.batch_from_transmittable_batch(batch);
batch
.rewrite_ts(&Antichain::from_elem(2), Antichain::from_elem(3))
.unwrap();
write
.expect_compare_and_append_batch(&mut [&mut batch], 0, 3)
.await;
let (actual, _) = read.expect_listen(0).await.read_until(&3).await;
let expected = vec![(((Ok("foo".to_owned())), Ok(())), 2, 1)];
assert_eq!(actual, expected);
}
#[mz_ore::test(tokio::test)]
#[cfg_attr(miri, ignore)] async fn structured_lowers() {
let cache = PersistClientCache::new_no_metrics();
cache.cfg().set_config(&BATCH_COLUMNAR_FORMAT, "both_v2");
cache.cfg().set_config(&BATCH_COLUMNAR_FORMAT_PERCENT, 100);
cache.cfg().set_config(&STRUCTURED_KEY_LOWER_LEN, 1024);
cache.cfg().set_config(&INLINE_WRITES_SINGLE_MAX_BYTES, 0);
cache.cfg().set_config(&INLINE_WRITES_TOTAL_MAX_BYTES, 0);
let client = cache
.open(PersistLocation::new_in_mem())
.await
.expect("client construction failed");
let shard_id = ShardId::new();
let (mut write, _) = client
.expect_open::<String, String, u64, i64>(shard_id)
.await;
let batch = write
.expect_batch(
&[
(("1".into(), "one".into()), 1, 1),
(("2".into(), "two".into()), 2, 1),
(("3".into(), "three".into()), 3, 1),
],
0,
4,
)
.await;
assert_eq!(batch.batch.part_count(), 1);
let [part] = batch.batch.parts.as_slice() else {
panic!("expected single part")
};
assert!(part.structured_key_lower().is_some());
}
}