use std::fmt::{self, Debug};
use std::marker::PhantomData;
use std::sync::Arc;
use std::time::Instant;
use anyhow::anyhow;
use arrow::array::{Array, AsArray, BooleanArray, Int64Array};
use arrow::compute::FilterBuilder;
use differential_dataflow::difference::Semigroup;
use differential_dataflow::lattice::Lattice;
use differential_dataflow::trace::Description;
use itertools::EitherOrBoth;
use mz_dyncfg::{Config, ConfigSet, ConfigValHandle};
use mz_ore::bytes::SegmentedBytes;
use mz_ore::cast::CastFrom;
use mz_ore::{soft_panic_no_log, soft_panic_or_log};
use mz_persist::indexed::columnar::arrow::{realloc_any, realloc_array};
use mz_persist::indexed::columnar::{ColumnarRecords, ColumnarRecordsStructuredExt};
use mz_persist::indexed::encoding::{BlobTraceBatchPart, BlobTraceUpdates};
use mz_persist::location::{Blob, SeqNo};
use mz_persist::metrics::ColumnarMetrics;
use mz_persist_types::arrow::ArrayOrd;
use mz_persist_types::columnar::{ColumnDecoder, Schema2};
use mz_persist_types::stats::PartStats;
use mz_persist_types::{Codec, Codec64};
use mz_proto::{IntoRustIfSome, ProtoType, RustType, TryFromProtoError};
use serde::{Deserialize, Serialize};
use timely::progress::frontier::AntichainRef;
use timely::progress::{Antichain, Timestamp};
use timely::PartialOrder;
use tracing::{debug_span, trace_span, Instrument};
use crate::batch::{
proto_fetch_batch_filter, ProtoFetchBatchFilter, ProtoFetchBatchFilterListen, ProtoLease,
ProtoLeasedBatchPart,
};
use crate::cfg::PersistConfig;
use crate::error::InvalidUsage;
use crate::internal::encoding::{LazyInlineBatchPart, LazyPartStats, LazyProto, Schemas};
use crate::internal::machine::retry_external;
use crate::internal::metrics::{Metrics, MetricsPermits, ReadMetrics, ShardMetrics};
use crate::internal::paths::BlobKey;
use crate::internal::state::{BatchPart, HollowBatchPart};
use crate::project::ProjectionPushdown;
use crate::read::LeasedReaderId;
use crate::schema::{PartMigration, SchemaCache};
use crate::ShardId;
pub(crate) const FETCH_SEMAPHORE_COST_ADJUSTMENT: Config<f64> = Config::new(
"persist_fetch_semaphore_cost_adjustment",
1.2,
"\
An adjustment multiplied by encoded_size_bytes to approximate an upper \
bound on the size in lgalloc, which includes the decoded version.",
);
pub(crate) const FETCH_SEMAPHORE_PERMIT_ADJUSTMENT: Config<f64> = Config::new(
"persist_fetch_semaphore_permit_adjustment",
1.0,
"\
A limit on the number of outstanding persist bytes being fetched and \
parsed, expressed as a multiplier of the process's memory limit. This data \
all spills to lgalloc, so values > 1.0 are safe. Only applied to cc \
replicas.",
);
pub(crate) const PART_DECODE_FORMAT: Config<&'static str> = Config::new(
"persist_part_decode_format",
PartDecodeFormat::default().as_str(),
"\
Format we'll use to decode a Persist Part, either 'row', \
'row_with_validate', or 'arrow' (Materialize).",
);
#[derive(Debug, Clone)]
pub(crate) struct BatchFetcherConfig {
pub(crate) part_decode_format: ConfigValHandle<String>,
}
impl BatchFetcherConfig {
pub fn new(value: &PersistConfig) -> Self {
BatchFetcherConfig {
part_decode_format: PART_DECODE_FORMAT.handle(value),
}
}
pub fn part_decode_format(&self) -> PartDecodeFormat {
PartDecodeFormat::from_str(self.part_decode_format.get().as_str())
}
}
#[derive(Debug)]
pub struct BatchFetcher<K, V, T, D>
where
T: Timestamp + Lattice + Codec64,
K: Debug + Codec,
V: Debug + Codec,
D: Semigroup + Codec64 + Send + Sync,
{
pub(crate) cfg: BatchFetcherConfig,
pub(crate) blob: Arc<dyn Blob>,
pub(crate) metrics: Arc<Metrics>,
pub(crate) shard_metrics: Arc<ShardMetrics>,
pub(crate) shard_id: ShardId,
pub(crate) read_schemas: Schemas<K, V>,
pub(crate) schema_cache: SchemaCache<K, V, T, D>,
pub(crate) is_transient: bool,
pub(crate) _phantom: PhantomData<fn() -> (K, V, T, D)>,
}
impl<K, V, T, D> BatchFetcher<K, V, T, D>
where
K: Debug + Codec,
V: Debug + Codec,
T: Timestamp + Lattice + Codec64 + Sync,
D: Semigroup + Codec64 + Send + Sync,
{
pub fn leased_part_from_exchangeable(&self, x: SerdeLeasedBatchPart) -> LeasedBatchPart<T> {
x.decode(Arc::clone(&self.metrics))
}
pub async fn fetch_leased_part(
&mut self,
part: &LeasedBatchPart<T>,
) -> Result<FetchedBlob<K, V, T, D>, InvalidUsage<T>> {
if &part.shard_id != &self.shard_id {
let batch_shard = part.shard_id.clone();
return Err(InvalidUsage::BatchNotFromThisShard {
batch_shard,
handle_shard: self.shard_id.clone(),
});
}
let migration = PartMigration::new(
part.part.schema_id(),
self.read_schemas.clone(),
&mut self.schema_cache,
)
.await
.unwrap_or_else(|read_schemas| {
panic!(
"could not decode part {:?} with schema: {:?}",
part.part.schema_id(),
read_schemas
)
});
let (buf, fetch_permit) = match &part.part {
BatchPart::Hollow(x) => {
let fetch_permit = self
.metrics
.semaphore
.acquire_fetch_permits(x.encoded_size_bytes)
.await;
let read_metrics = if self.is_transient {
&self.metrics.read.unindexed
} else {
&self.metrics.read.batch_fetcher
};
let buf = fetch_batch_part_blob(
&part.shard_id,
self.blob.as_ref(),
&self.metrics,
&self.shard_metrics,
read_metrics,
x,
)
.await
.unwrap_or_else(|blob_key| {
panic!("batch fetcher could not fetch batch part: {}", blob_key)
});
let buf = FetchedBlobBuf::Hollow {
buf,
part: x.clone(),
};
(buf, Some(Arc::new(fetch_permit)))
}
BatchPart::Inline {
updates,
ts_rewrite,
..
} => {
let buf = FetchedBlobBuf::Inline {
desc: part.desc.clone(),
updates: updates.clone(),
ts_rewrite: ts_rewrite.clone(),
};
(buf, None)
}
};
let fetched_blob = FetchedBlob {
metrics: Arc::clone(&self.metrics),
read_metrics: self.metrics.read.batch_fetcher.clone(),
buf,
registered_desc: part.desc.clone(),
migration,
filter: part.filter.clone(),
filter_pushdown_audit: part.filter_pushdown_audit,
structured_part_audit: self.cfg.part_decode_format(),
fetch_permit,
_phantom: PhantomData,
};
Ok(fetched_blob)
}
}
#[derive(Debug, Clone)]
pub(crate) enum FetchBatchFilter<T> {
Snapshot {
as_of: Antichain<T>,
},
Listen {
as_of: Antichain<T>,
lower: Antichain<T>,
},
Compaction {
since: Antichain<T>,
},
}
impl<T: Timestamp + Lattice> FetchBatchFilter<T> {
pub(crate) fn filter_ts(&self, t: &mut T) -> bool {
match self {
FetchBatchFilter::Snapshot { as_of } => {
if as_of.less_than(t) {
return false;
}
t.advance_by(as_of.borrow());
true
}
FetchBatchFilter::Listen { as_of, lower } => {
if !as_of.less_than(t) {
return false;
}
if !lower.less_equal(t) {
return false;
}
true
}
FetchBatchFilter::Compaction { since } => {
t.advance_by(since.borrow());
true
}
}
}
}
impl<T: Timestamp + Codec64> RustType<ProtoFetchBatchFilter> for FetchBatchFilter<T> {
fn into_proto(&self) -> ProtoFetchBatchFilter {
let kind = match self {
FetchBatchFilter::Snapshot { as_of } => {
proto_fetch_batch_filter::Kind::Snapshot(as_of.into_proto())
}
FetchBatchFilter::Listen { as_of, lower } => {
proto_fetch_batch_filter::Kind::Listen(ProtoFetchBatchFilterListen {
as_of: Some(as_of.into_proto()),
lower: Some(lower.into_proto()),
})
}
FetchBatchFilter::Compaction { .. } => unreachable!("not serialized"),
};
ProtoFetchBatchFilter { kind: Some(kind) }
}
fn from_proto(proto: ProtoFetchBatchFilter) -> Result<Self, TryFromProtoError> {
let kind = proto
.kind
.ok_or_else(|| TryFromProtoError::missing_field("ProtoFetchBatchFilter::kind"))?;
match kind {
proto_fetch_batch_filter::Kind::Snapshot(as_of) => Ok(FetchBatchFilter::Snapshot {
as_of: as_of.into_rust()?,
}),
proto_fetch_batch_filter::Kind::Listen(ProtoFetchBatchFilterListen {
as_of,
lower,
}) => Ok(FetchBatchFilter::Listen {
as_of: as_of.into_rust_if_some("ProtoFetchBatchFilterListen::as_of")?,
lower: lower.into_rust_if_some("ProtoFetchBatchFilterListen::lower")?,
}),
}
}
}
pub(crate) async fn fetch_leased_part<K, V, T, D>(
cfg: &PersistConfig,
part: &LeasedBatchPart<T>,
blob: &dyn Blob,
metrics: Arc<Metrics>,
read_metrics: &ReadMetrics,
shard_metrics: &ShardMetrics,
reader_id: &LeasedReaderId,
read_schemas: Schemas<K, V>,
schema_cache: &mut SchemaCache<K, V, T, D>,
) -> FetchedPart<K, V, T, D>
where
K: Debug + Codec,
V: Debug + Codec,
T: Timestamp + Lattice + Codec64 + Sync,
D: Semigroup + Codec64 + Send + Sync,
{
let encoded_part = EncodedPart::fetch(
&part.shard_id,
blob,
&metrics,
shard_metrics,
read_metrics,
&part.desc,
&part.part,
)
.await
.unwrap_or_else(|blob_key| {
panic!("{} could not fetch batch part: {}", reader_id, blob_key)
});
let part_cfg = BatchFetcherConfig::new(cfg);
let migration = PartMigration::new(part.part.schema_id(), read_schemas, schema_cache)
.await
.unwrap_or_else(|read_schemas| {
panic!(
"could not decode part {:?} with schema: {:?}",
part.part.schema_id(),
read_schemas
)
});
FetchedPart::new(
metrics,
encoded_part,
migration,
part.filter.clone(),
part.filter_pushdown_audit,
part_cfg.part_decode_format(),
part.part.stats(),
)
}
pub(crate) async fn fetch_batch_part_blob<T>(
shard_id: &ShardId,
blob: &dyn Blob,
metrics: &Metrics,
shard_metrics: &ShardMetrics,
read_metrics: &ReadMetrics,
part: &HollowBatchPart<T>,
) -> Result<SegmentedBytes, BlobKey> {
let now = Instant::now();
let get_span = debug_span!("fetch_batch::get");
let blob_key = part.key.complete(shard_id);
let value = retry_external(&metrics.retries.external.fetch_batch_get, || async {
shard_metrics.blob_gets.inc();
blob.get(&blob_key).await
})
.instrument(get_span.clone())
.await
.ok_or(blob_key)?;
drop(get_span);
read_metrics.part_count.inc();
read_metrics.part_bytes.inc_by(u64::cast_from(value.len()));
read_metrics.seconds.inc_by(now.elapsed().as_secs_f64());
Ok(value)
}
pub(crate) fn decode_batch_part_blob<T>(
metrics: &Metrics,
read_metrics: &ReadMetrics,
registered_desc: Description<T>,
part: &HollowBatchPart<T>,
buf: &SegmentedBytes,
) -> EncodedPart<T>
where
T: Timestamp + Lattice + Codec64,
{
trace_span!("fetch_batch::decode").in_scope(|| {
let parsed = metrics
.codecs
.batch
.decode(|| BlobTraceBatchPart::decode(buf, &metrics.columnar))
.map_err(|err| anyhow!("couldn't decode batch at key {}: {}", part.key, err))
.expect("internal error: invalid encoded state");
read_metrics
.part_goodbytes
.inc_by(u64::cast_from(parsed.updates.goodbytes()));
EncodedPart::from_hollow(read_metrics.clone(), registered_desc, part, parsed)
})
}
pub(crate) async fn fetch_batch_part<T>(
shard_id: &ShardId,
blob: &dyn Blob,
metrics: &Metrics,
shard_metrics: &ShardMetrics,
read_metrics: &ReadMetrics,
registered_desc: &Description<T>,
part: &HollowBatchPart<T>,
) -> Result<EncodedPart<T>, BlobKey>
where
T: Timestamp + Lattice + Codec64,
{
let buf =
fetch_batch_part_blob(shard_id, blob, metrics, shard_metrics, read_metrics, part).await?;
let part = decode_batch_part_blob(metrics, read_metrics, registered_desc.clone(), part, &buf);
Ok(part)
}
#[derive(Clone, Debug, Default)]
pub(crate) struct Lease(Arc<()>);
impl Lease {
pub fn count(&self) -> usize {
Arc::strong_count(&self.0)
}
}
#[derive(Debug)]
pub struct LeasedBatchPart<T> {
pub(crate) metrics: Arc<Metrics>,
pub(crate) shard_id: ShardId,
pub(crate) reader_id: LeasedReaderId,
pub(crate) filter: FetchBatchFilter<T>,
pub(crate) desc: Description<T>,
pub(crate) part: BatchPart<T>,
pub(crate) leased_seqno: SeqNo,
pub(crate) lease: Option<Lease>,
pub(crate) filter_pushdown_audit: bool,
}
impl<T> LeasedBatchPart<T>
where
T: Timestamp + Codec64,
{
pub(crate) fn into_exchangeable_part(mut self) -> (SerdeLeasedBatchPart, Option<Lease>) {
let (proto, _metrics) = self.into_proto();
let lease = self.lease.take();
let part = SerdeLeasedBatchPart {
encoded_size_bytes: self.part.encoded_size_bytes(),
proto: LazyProto::from(&proto),
};
(part, lease)
}
pub fn encoded_size_bytes(&self) -> usize {
self.part.encoded_size_bytes()
}
pub fn request_filter_pushdown_audit(&mut self) {
self.filter_pushdown_audit = true;
}
pub fn stats(&self) -> Option<PartStats> {
self.part.stats().map(|x| x.decode())
}
pub fn maybe_optimize(&mut self, cfg: &ConfigSet, project: &ProjectionPushdown) {
let as_of = match &self.filter {
FetchBatchFilter::Snapshot { as_of } => as_of,
FetchBatchFilter::Listen { .. } | FetchBatchFilter::Compaction { .. } => return,
};
let faked_part = project.try_optimize_ignored_data_fetch(
cfg,
&self.metrics,
as_of,
&self.desc,
&self.part,
);
if let Some(faked_part) = faked_part {
self.part = faked_part;
}
}
}
impl<T> Drop for LeasedBatchPart<T> {
fn drop(&mut self) {
self.metrics.lease.dropped_part.inc()
}
}
#[derive(Debug)]
pub struct FetchedBlob<K: Codec, V: Codec, T, D> {
metrics: Arc<Metrics>,
read_metrics: ReadMetrics,
buf: FetchedBlobBuf<T>,
registered_desc: Description<T>,
migration: PartMigration<K, V>,
filter: FetchBatchFilter<T>,
filter_pushdown_audit: bool,
structured_part_audit: PartDecodeFormat,
fetch_permit: Option<Arc<MetricsPermits>>,
_phantom: PhantomData<fn() -> D>,
}
#[derive(Debug, Clone)]
enum FetchedBlobBuf<T> {
Hollow {
buf: SegmentedBytes,
part: HollowBatchPart<T>,
},
Inline {
desc: Description<T>,
updates: LazyInlineBatchPart,
ts_rewrite: Option<Antichain<T>>,
},
}
impl<K: Codec, V: Codec, T: Clone, D> Clone for FetchedBlob<K, V, T, D> {
fn clone(&self) -> Self {
Self {
metrics: Arc::clone(&self.metrics),
read_metrics: self.read_metrics.clone(),
buf: self.buf.clone(),
registered_desc: self.registered_desc.clone(),
migration: self.migration.clone(),
filter: self.filter.clone(),
filter_pushdown_audit: self.filter_pushdown_audit.clone(),
fetch_permit: self.fetch_permit.clone(),
structured_part_audit: self.structured_part_audit.clone(),
_phantom: self._phantom.clone(),
}
}
}
pub struct ShardSourcePart<K: Codec, V: Codec, T, D> {
pub part: FetchedPart<K, V, T, D>,
fetch_permit: Option<Arc<MetricsPermits>>,
}
impl<K, V, T: Debug, D: Debug> Debug for ShardSourcePart<K, V, T, D>
where
K: Codec + Debug,
<K as Codec>::Storage: Debug,
V: Codec + Debug,
<V as Codec>::Storage: Debug,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let ShardSourcePart { part, fetch_permit } = self;
f.debug_struct("ShardSourcePart")
.field("part", part)
.field("fetch_permit", fetch_permit)
.finish()
}
}
impl<K: Codec, V: Codec, T: Timestamp + Lattice + Codec64, D> FetchedBlob<K, V, T, D> {
pub fn parse(&self) -> ShardSourcePart<K, V, T, D> {
let (part, stats) = match &self.buf {
FetchedBlobBuf::Hollow { buf, part } => {
let parsed = decode_batch_part_blob(
&self.metrics,
&self.read_metrics,
self.registered_desc.clone(),
part,
buf,
);
(parsed, part.stats.as_ref())
}
FetchedBlobBuf::Inline {
desc,
updates,
ts_rewrite,
} => {
let parsed = EncodedPart::from_inline(
&self.metrics,
self.read_metrics.clone(),
desc.clone(),
updates,
ts_rewrite.as_ref(),
);
(parsed, None)
}
};
let part = FetchedPart::new(
Arc::clone(&self.metrics),
part,
self.migration.clone(),
self.filter.clone(),
self.filter_pushdown_audit,
self.structured_part_audit,
stats,
);
ShardSourcePart {
part,
fetch_permit: self.fetch_permit.clone(),
}
}
pub fn stats(&self) -> Option<PartStats> {
match &self.buf {
FetchedBlobBuf::Hollow { part, .. } => part.stats.as_ref().map(|x| x.decode()),
FetchedBlobBuf::Inline { .. } => None,
}
}
}
#[derive(Debug)]
pub struct FetchedPart<K: Codec, V: Codec, T, D> {
metrics: Arc<Metrics>,
ts_filter: FetchBatchFilter<T>,
part: EitherOrBoth<
ColumnarRecords,
(
<K::Schema as Schema2<K>>::Decoder,
<V::Schema as Schema2<V>>::Decoder,
),
>,
timestamps: Int64Array,
diffs: Int64Array,
migration: PartMigration<K, V>,
filter_pushdown_audit: Option<LazyPartStats>,
peek_stash: Option<((Result<K, String>, Result<V, String>), T, D)>,
part_cursor: usize,
key_storage: Option<K::Storage>,
val_storage: Option<V::Storage>,
_phantom: PhantomData<fn() -> D>,
}
impl<K: Codec, V: Codec, T: Timestamp + Lattice + Codec64, D> FetchedPart<K, V, T, D> {
fn new(
metrics: Arc<Metrics>,
part: EncodedPart<T>,
migration: PartMigration<K, V>,
ts_filter: FetchBatchFilter<T>,
filter_pushdown_audit: bool,
part_decode_format: PartDecodeFormat,
stats: Option<&LazyPartStats>,
) -> Self {
let part_len = u64::cast_from(part.part.updates.len());
match &migration {
PartMigration::SameSchema { .. } => metrics.schema.migration_count_same.inc(),
PartMigration::Codec { .. } => {
metrics.schema.migration_count_codec.inc();
metrics.schema.migration_len_legacy_codec.inc_by(part_len);
}
PartMigration::Either { .. } => {
metrics.schema.migration_count_either.inc();
match part_decode_format {
PartDecodeFormat::Row {
validate_structured: false,
} => metrics.schema.migration_len_either_codec.inc_by(part_len),
PartDecodeFormat::Row {
validate_structured: true,
} => {
metrics.schema.migration_len_either_codec.inc_by(part_len);
metrics.schema.migration_len_either_arrow.inc_by(part_len);
}
PartDecodeFormat::Arrow => {
metrics.schema.migration_len_either_arrow.inc_by(part_len)
}
}
}
}
let filter_pushdown_audit = if filter_pushdown_audit {
stats.cloned()
} else {
None
};
let downcast_structured = |structured: ColumnarRecordsStructuredExt| {
let key_size_before = ArrayOrd::new(&structured.key).goodbytes();
let structured = match &migration {
PartMigration::SameSchema { .. } => structured,
PartMigration::Codec { .. } => {
return None;
}
PartMigration::Either {
write: _,
read: _,
key_migration,
val_migration,
} => {
let start = Instant::now();
let key = key_migration.migrate(structured.key);
let val = val_migration.migrate(structured.val);
metrics
.schema
.migration_migrate_seconds
.inc_by(start.elapsed().as_secs_f64());
ColumnarRecordsStructuredExt { key, val }
}
};
let read_schema = migration.codec_read();
let key = K::Schema::decoder_any(&*read_schema.key, &*structured.key);
let val = V::Schema::decoder_any(&*read_schema.val, &*structured.val);
match &key {
Ok(key_decoder) => {
let key_size_after = key_decoder.goodbytes();
let key_diff = key_size_before.saturating_sub(key_size_after);
metrics
.pushdown
.parts_projection_trimmed_bytes
.inc_by(u64::cast_from(key_diff));
}
Err(e) => {
soft_panic_or_log!("failed to create decoder: {e:#?}");
}
}
Some((key.ok()?, val.ok()?))
};
let updates = part.normalize(&metrics.columnar);
let timestamps = updates.timestamps().clone();
let diffs = updates.diffs().clone();
let part = match updates {
BlobTraceUpdates::Row(records) => EitherOrBoth::Left(records),
BlobTraceUpdates::Structured { key_values, .. } => EitherOrBoth::Right(
downcast_structured(key_values).expect("valid schemas for structured data"),
),
BlobTraceUpdates::Both(records, ext) => match part_decode_format {
PartDecodeFormat::Row {
validate_structured: false,
} => EitherOrBoth::Left(records),
PartDecodeFormat::Row {
validate_structured: true,
} => match downcast_structured(ext) {
Some(decoders) => EitherOrBoth::Both(records, decoders),
None => EitherOrBoth::Left(records),
},
PartDecodeFormat::Arrow => match downcast_structured(ext) {
Some(decoders) => EitherOrBoth::Right(decoders),
None => EitherOrBoth::Left(records),
},
},
};
FetchedPart {
metrics,
ts_filter,
part,
peek_stash: None,
timestamps,
diffs,
migration,
filter_pushdown_audit,
part_cursor: 0,
key_storage: None,
val_storage: None,
_phantom: PhantomData,
}
}
pub fn is_filter_pushdown_audit(&self) -> Option<impl std::fmt::Debug> {
self.filter_pushdown_audit.clone()
}
}
#[derive(Debug)]
pub(crate) struct EncodedPart<T> {
metrics: ReadMetrics,
registered_desc: Description<T>,
part: BlobTraceBatchPart<T>,
needs_truncation: bool,
ts_rewrite: Option<Antichain<T>>,
}
impl<K, V, T, D> FetchedPart<K, V, T, D>
where
K: Debug + Codec,
V: Debug + Codec,
T: Timestamp + Lattice + Codec64,
D: Semigroup + Codec64 + Send + Sync,
{
pub fn next_with_storage(
&mut self,
key: &mut Option<K>,
val: &mut Option<V>,
result_override: Option<(K, V)>,
) -> Option<((Result<K, String>, Result<V, String>), T, D)> {
let mut consolidated = self.peek_stash.take();
loop {
let next = if self.part_cursor < self.timestamps.len() {
let next_idx = self.part_cursor;
self.part_cursor += 1;
let mut t = T::decode(self.timestamps.values()[next_idx].to_le_bytes());
if !self.ts_filter.filter_ts(&mut t) {
continue;
}
let d = D::decode(self.diffs.values()[next_idx].to_le_bytes());
if d.is_zero() {
continue;
}
let kv = if result_override.is_none() {
self.decode_kv(next_idx, key, val)
} else {
(Err("".to_string()), Err("".to_string()))
};
(kv, t, d)
} else {
break;
};
if let Some((kv, t, d)) = &mut consolidated {
let (kv_next, t_next, d_next) = &next;
if kv == kv_next && t == t_next {
d.plus_equals(d_next);
if d.is_zero() {
consolidated = None;
}
} else {
self.peek_stash = Some(next);
break;
}
} else {
consolidated = Some(next);
}
}
let (kv, t, d) = consolidated?;
if let Some((key, val)) = result_override {
return Some(((Ok(key), Ok(val)), t, d));
}
Some((kv, t, d))
}
fn decode_kv(
&mut self,
index: usize,
key: &mut Option<K>,
val: &mut Option<V>,
) -> (Result<K, String>, Result<V, String>) {
let decoded = self
.part
.as_ref()
.map_left(|codec| {
let ((ck, cv), _, _) = codec.get(index).expect("valid index");
Self::decode_codec(
&*self.metrics,
self.migration.codec_read(),
ck,
cv,
key,
val,
&mut self.key_storage,
&mut self.val_storage,
)
})
.map_right(|(structured_key, structured_val)| {
self.decode_structured(index, structured_key, structured_val, key, val)
});
match decoded {
EitherOrBoth::Both((k, v), (k_s, v_s)) => {
let is_valid = self
.metrics
.columnar
.arrow()
.key()
.report_valid(|| k_s == k);
if !is_valid {
soft_panic_no_log!("structured key did not match, {k_s:?} != {k:?}");
}
let is_valid = self
.metrics
.columnar
.arrow()
.val()
.report_valid(|| v_s == v);
if !is_valid {
soft_panic_no_log!("structured val did not match, {v_s:?} != {v:?}");
}
(k, v)
}
EitherOrBoth::Left(kv) => kv,
EitherOrBoth::Right(kv) => kv,
}
}
fn decode_codec(
metrics: &Metrics,
read_schemas: &Schemas<K, V>,
key_buf: &[u8],
val_buf: &[u8],
key: &mut Option<K>,
val: &mut Option<V>,
key_storage: &mut Option<K::Storage>,
val_storage: &mut Option<V::Storage>,
) -> (Result<K, String>, Result<V, String>) {
let k = metrics.codecs.key.decode(|| match key.take() {
Some(mut key) => {
match K::decode_from(&mut key, key_buf, key_storage, &read_schemas.key) {
Ok(()) => Ok(key),
Err(err) => Err(err),
}
}
None => K::decode(key_buf, &read_schemas.key),
});
let v = metrics.codecs.val.decode(|| match val.take() {
Some(mut val) => {
match V::decode_from(&mut val, val_buf, val_storage, &read_schemas.val) {
Ok(()) => Ok(val),
Err(err) => Err(err),
}
}
None => V::decode(val_buf, &read_schemas.val),
});
(k, v)
}
fn decode_structured(
&self,
idx: usize,
keys: &<K::Schema as Schema2<K>>::Decoder,
vals: &<V::Schema as Schema2<V>>::Decoder,
key: &mut Option<K>,
val: &mut Option<V>,
) -> (Result<K, String>, Result<V, String>) {
let key = self.metrics.columnar.arrow().key().measure_decoding(|| {
let mut key = key.take().unwrap_or_default();
keys.decode(idx, &mut key);
key
});
let val = self.metrics.columnar.arrow().val().measure_decoding(|| {
let mut val = val.take().unwrap_or_default();
vals.decode(idx, &mut val);
val
});
(Ok(key), Ok(val))
}
}
impl<K, V, T, D> Iterator for FetchedPart<K, V, T, D>
where
K: Debug + Codec,
V: Debug + Codec,
T: Timestamp + Lattice + Codec64,
D: Semigroup + Codec64 + Send + Sync,
{
type Item = ((Result<K, String>, Result<V, String>), T, D);
fn next(&mut self) -> Option<Self::Item> {
self.next_with_storage(&mut None, &mut None, None)
}
fn size_hint(&self) -> (usize, Option<usize>) {
let max_len = self.timestamps.len();
(0, Some(max_len))
}
}
impl<T> EncodedPart<T>
where
T: Timestamp + Lattice + Codec64,
{
pub async fn fetch(
shard_id: &ShardId,
blob: &dyn Blob,
metrics: &Metrics,
shard_metrics: &ShardMetrics,
read_metrics: &ReadMetrics,
registered_desc: &Description<T>,
part: &BatchPart<T>,
) -> Result<Self, BlobKey> {
match part {
BatchPart::Hollow(x) => {
fetch_batch_part(
shard_id,
blob,
metrics,
shard_metrics,
read_metrics,
registered_desc,
x,
)
.await
}
BatchPart::Inline {
updates,
ts_rewrite,
..
} => Ok(EncodedPart::from_inline(
metrics,
read_metrics.clone(),
registered_desc.clone(),
updates,
ts_rewrite.as_ref(),
)),
}
}
pub(crate) fn from_inline(
metrics: &Metrics,
read_metrics: ReadMetrics,
desc: Description<T>,
x: &LazyInlineBatchPart,
ts_rewrite: Option<&Antichain<T>>,
) -> Self {
let parsed = x.decode(&metrics.columnar).expect("valid inline part");
Self::new(read_metrics, desc, "inline", ts_rewrite, parsed)
}
pub(crate) fn from_hollow(
metrics: ReadMetrics,
registered_desc: Description<T>,
part: &HollowBatchPart<T>,
parsed: BlobTraceBatchPart<T>,
) -> Self {
Self::new(
metrics,
registered_desc,
&part.key.0,
part.ts_rewrite.as_ref(),
parsed,
)
}
pub(crate) fn new(
metrics: ReadMetrics,
registered_desc: Description<T>,
printable_name: &str,
ts_rewrite: Option<&Antichain<T>>,
parsed: BlobTraceBatchPart<T>,
) -> Self {
let inline_desc = &parsed.desc;
let needs_truncation = inline_desc.lower() != registered_desc.lower()
|| inline_desc.upper() != registered_desc.upper();
if needs_truncation {
assert!(
PartialOrder::less_equal(inline_desc.lower(), registered_desc.lower()),
"key={} inline={:?} registered={:?}",
printable_name,
inline_desc,
registered_desc
);
if ts_rewrite.is_none() {
assert!(
PartialOrder::less_equal(registered_desc.upper(), inline_desc.upper()),
"key={} inline={:?} registered={:?}",
printable_name,
inline_desc,
registered_desc
);
}
assert_eq!(
inline_desc.since(),
&Antichain::from_elem(T::minimum()),
"key={} inline={:?} registered={:?}",
printable_name,
inline_desc,
registered_desc
);
} else {
assert_eq!(
inline_desc, ®istered_desc,
"key={} inline={:?} registered={:?}",
printable_name, inline_desc, registered_desc
);
}
EncodedPart {
metrics,
registered_desc,
part: parsed,
needs_truncation,
ts_rewrite: ts_rewrite.cloned(),
}
}
pub(crate) fn maybe_unconsolidated(&self) -> bool {
self.part.desc.since().borrow() == AntichainRef::new(&[T::minimum()])
}
pub(crate) fn normalize(&self, metrics: &ColumnarMetrics) -> BlobTraceUpdates {
let updates = self.part.updates.clone();
if !self.needs_truncation && self.ts_rewrite.is_none() {
return updates;
}
let mut codec = updates
.records()
.map(|r| (r.keys().clone(), r.vals().clone()));
let mut structured = updates.structured().cloned();
let mut timestamps = updates.timestamps().clone();
let mut diffs = updates.diffs().clone();
if let Some(rewrite) = self.ts_rewrite.as_ref() {
timestamps = arrow::compute::unary(×tamps, |i: i64| {
let mut t = T::decode(i.to_le_bytes());
t.advance_by(rewrite.borrow());
i64::from_le_bytes(T::encode(&t))
});
}
let reallocated = if self.needs_truncation {
let filter = BooleanArray::from_unary(×tamps, |i| {
let t = T::decode(i.to_le_bytes());
let truncate_t = {
!self.registered_desc.lower().less_equal(&t)
|| self.registered_desc.upper().less_equal(&t)
};
!truncate_t
});
if filter.false_count() == 0 {
false
} else {
let filter = FilterBuilder::new(&filter).optimize().build();
let do_filter = |array: &dyn Array| filter.filter(array).expect("valid filter len");
if let Some((keys, vals)) = codec {
codec = Some((
realloc_array(do_filter(&keys).as_binary(), metrics),
realloc_array(do_filter(&vals).as_binary(), metrics),
));
}
if let Some(ext) = structured {
structured = Some(ColumnarRecordsStructuredExt {
key: realloc_any(do_filter(&*ext.key), metrics),
val: realloc_any(do_filter(&*ext.val), metrics),
});
}
timestamps = realloc_array(do_filter(×tamps).as_primitive(), metrics);
diffs = realloc_array(do_filter(&diffs).as_primitive(), metrics);
true
}
} else {
false
};
if self.ts_rewrite.is_some() && !reallocated {
timestamps = realloc_array(×tamps, metrics);
}
if self.ts_rewrite.is_some() {
self.metrics
.ts_rewrite
.inc_by(u64::cast_from(timestamps.len()));
}
match (codec, structured) {
(Some((key, value)), None) => {
BlobTraceUpdates::Row(ColumnarRecords::new(key, value, timestamps, diffs))
}
(Some((key, value)), Some(ext)) => {
BlobTraceUpdates::Both(ColumnarRecords::new(key, value, timestamps, diffs), ext)
}
(None, Some(ext)) => BlobTraceUpdates::Structured {
key_values: ext,
timestamps,
diffs,
},
(None, None) => unreachable!(),
}
}
}
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct SerdeLeasedBatchPart {
encoded_size_bytes: usize,
proto: LazyProto<ProtoLeasedBatchPart>,
}
impl SerdeLeasedBatchPart {
pub fn encoded_size_bytes(&self) -> usize {
self.encoded_size_bytes
}
pub(crate) fn decode<T: Timestamp + Codec64>(
&self,
metrics: Arc<Metrics>,
) -> LeasedBatchPart<T> {
let proto = self.proto.decode().expect("valid leased batch part");
(proto, metrics)
.into_rust()
.expect("valid leased batch part")
}
}
impl<T: Timestamp + Codec64> RustType<(ProtoLeasedBatchPart, Arc<Metrics>)> for LeasedBatchPart<T> {
fn into_proto(&self) -> (ProtoLeasedBatchPart, Arc<Metrics>) {
let proto = ProtoLeasedBatchPart {
shard_id: self.shard_id.into_proto(),
filter: Some(self.filter.into_proto()),
desc: Some(self.desc.into_proto()),
part: Some(self.part.into_proto()),
lease: Some(ProtoLease {
reader_id: self.reader_id.into_proto(),
seqno: Some(self.leased_seqno.into_proto()),
}),
filter_pushdown_audit: self.filter_pushdown_audit,
};
(proto, Arc::clone(&self.metrics))
}
fn from_proto(proto: (ProtoLeasedBatchPart, Arc<Metrics>)) -> Result<Self, TryFromProtoError> {
let (proto, metrics) = proto;
let lease = proto
.lease
.ok_or_else(|| TryFromProtoError::missing_field("ProtoLeasedBatchPart::lease"))?;
Ok(LeasedBatchPart {
metrics,
shard_id: proto.shard_id.into_rust()?,
filter: proto
.filter
.into_rust_if_some("ProtoLeasedBatchPart::filter")?,
desc: proto.desc.into_rust_if_some("ProtoLeasedBatchPart::desc")?,
part: proto.part.into_rust_if_some("ProtoLeasedBatchPart::part")?,
reader_id: lease.reader_id.into_rust()?,
leased_seqno: lease.seqno.into_rust_if_some("ProtoLease::seqno")?,
lease: None,
filter_pushdown_audit: proto.filter_pushdown_audit,
})
}
}
#[derive(Debug, Copy, Clone)]
pub enum PartDecodeFormat {
Row {
validate_structured: bool,
},
Arrow,
}
impl PartDecodeFormat {
pub const fn default() -> Self {
PartDecodeFormat::Row {
validate_structured: true,
}
}
pub fn from_str(s: &str) -> Self {
match s {
"row" => PartDecodeFormat::Row {
validate_structured: false,
},
"row_with_validate" => PartDecodeFormat::Row {
validate_structured: true,
},
"arrow" => PartDecodeFormat::Arrow,
x => {
let default = PartDecodeFormat::default();
soft_panic_or_log!("Invalid part decode format: '{x}', falling back to {default}");
default
}
}
}
pub const fn as_str(&self) -> &'static str {
match self {
PartDecodeFormat::Row {
validate_structured: false,
} => "row",
PartDecodeFormat::Row {
validate_structured: true,
} => "row_with_validate",
PartDecodeFormat::Arrow => "arrow",
}
}
}
impl fmt::Display for PartDecodeFormat {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str(self.as_str())
}
}
#[mz_ore::test]
fn client_exchange_data() {
fn is_exchange_data<T: timely::ExchangeData>() {}
is_exchange_data::<SerdeLeasedBatchPart>();
is_exchange_data::<SerdeLeasedBatchPart>();
}