Struct mz_persist::indexed::arrangement::Arrangement
source · [−]pub struct Arrangement { /* private fields */ }
Expand description
A persistent, compacting data structure containing indexed (Key, Value, Time, i64)
entries.
The data is logically and physically separated into two “buckets”:
unsealed and trace. It first enters and is initially placed into
unsealed, which is a holding pen roughly corresponding to the in-memory
buffer of a differential dataflow arrangement operator. At some point, the
arranged collection is sealed, which advances the upper timestamp of the
collection and logically (but not physically) moves the data into trace.
The trace bucket indexes the data by (key, value, time)
. At some later
point, unsealed_step
is called, which physically moves the data from
unsealed to trace.
There are two notable differences between a persisted arrangement and a differential in-mem one (besides the obvious durability):
- Because in-mem operations are so much faster than ones on durable storage, the act of advancing the frontier and moving data into trace, one step in differential, is split into separate steps in persist.
- The differential arrangement keeps the data arranged for efficient indexed access (hence the name). Persist also keeps the data arranged the same way, but finishing up the plumbing for indexed access is still a TODO.
Further details below.
Unsealed
Unsealed exists to hold data that has been added to the persistent collection but not yet “seal“ed into a trace. We store incoming data as immutable batches of updates, corresponding to non-empty, sorted intervals of crate::location::SeqNos.
As times get sealed and the corresponding updates get moved into the trace, Unsealed can remove those updates, and eventually, entire batches. The approach to removing sealed data optimizes for the common case, for which we assume that:
- data arrives roughly in order,
- unsealed batches contain data for a small range of distinct times. Every unsealed batch tracks the minimum and maximum update timestamp contained within its list of updates, and we eagerly drop batches that only contain data prior to the sealed frontier whenever possible. In the expected case, this should be sufficient to ensure that Unsealed maintains a bounded storage footprint. If either of the two assumptions are violated, either because updates arrive out of order, or batches contain data at many distinct timestamps, we periodically try to remove the updates strictly behind the current sealed frontier from a given unsealed batch and replace it with a “trimmed” batch that uses less storage.
This approach intentionally does nothing to physically coalesce multiple unsealed batches into a single unsealed batch. Doing so has many potential downsides; for example physically merging a batch containing updates 5 seconds ahead of the current sealed frontier with another batch containing updates 5 hours ahead of the current sealed frontier would only hurt 5 seconds later, when the previously unmerged batch would have been dropped. Instead, the merged batch has to be trimmed, which requires an extra read and write. If we end up having significant amounts of data far ahead of the current sealed frontier we likely will need a different structure that can hold batches of updates organized by overlapping ranges of times and physically merge unsealed batches using an approach similar to trace physical compaction.
Trace
An append-only list of immutable batches that describe updates corresponding
to sorted, contiguous, non-overlapping ranges of times. The since
frontier
defines a time before which we can compact the history of updates (and
correspondingly no longer answer queries about).
We can compact the updates prior to the since frontier physically, by combining batches representing consecutive intervals into one large batch representing the union of those intervals, and logically, by forwarding updates at times before the since frontier to the since frontier.
We also want to achieve a balance between the compactness of the representation with the computational effort required to maintain the representation. Specifically, if we have N batches of data already compacted, we don’t want every additional batch to perform O(N) work (i.e. merge with N batches worth of data) in order to get compacted. Instead, we would like to keep a geometrically decreasing (when viewed from oldest to most recent) sequence of batches and perform O(N) work every N calls to append. Thankfully, we can achieve all of this with a few simple rules:
- Batches are able to be compacted once the since frontier is in advance of all of the data in the batch.
- All batches are assigned a nonzero compaction level. When a new batch is appended to the trace it is assigned a compaction level of 0.
- We periodically merge consecutive batches at the same level L representing time intervals [lo, mid) and [mid, hi) into a single batch representing all of the updates in [lo, hi) with level L + 1 iff the new batch contains more data than both of its parents, and L otherwise.. Once two batches are merged they are removed from the trace and replaced with the merged batch.
- Perform merges for the oldest batches possible first.
NB: this approach assumes that all batches are roughly uniformly sized when they are first appended.
Invariants
- New updates less than the seal frontier are never added to unsealed.
- Unsealed batches have non-overlapping SeqNo ranges.
- All trace updates are less than the seal frontier.
- Trace batches are sorted by time and represent a sorted, consecutive, non-overlapping list of time intervals.
- Individual batches are immutable, and their set of updates, the time interval they describe and their compaction level all remain constant as long as the batch remains in the trace.
- The compaction levels across the list of batches in a trace are weakly decreasing (non-increasing) when iterating from oldest to most recent time intervals.
- TODO: Space usage.
Implementations
sourceimpl Arrangement
impl Arrangement
sourcepub fn new(meta: ArrangementMeta) -> Self
pub fn new(meta: ArrangementMeta) -> Self
Returns an Arrangement re-instantiated with the previously serialized state.
sourcepub fn new_blob_key() -> String
pub fn new_blob_key() -> String
Get a new key to write to the Blob store for this arrangement.
sourcepub fn meta(&self) -> ArrangementMeta
pub fn meta(&self) -> ArrangementMeta
Serializes the state of this Arrangement for later re-instantiation.
sourcepub fn unsealed_seqno_upper(&self) -> SeqNo
pub fn unsealed_seqno_upper(&self) -> SeqNo
An open upper bound on the seqnos of contained updates.
sourcepub fn snapshot<L: BlobRead>(
&self,
seqno: SeqNo,
blob: &BlobCache<L>
) -> Result<ArrangementSnapshot, Error>
pub fn snapshot<L: BlobRead>(
&self,
seqno: SeqNo,
blob: &BlobCache<L>
) -> Result<ArrangementSnapshot, Error>
Returns a consistent read of all the updates contained in this arrangement.
sourcepub fn unsealed_append<L: Blob>(
&mut self,
batch: BlobUnsealedBatch,
blob: &mut BlobCache<L>
) -> Result<(), Error>
pub fn unsealed_append<L: Blob>(
&mut self,
batch: BlobUnsealedBatch,
blob: &mut BlobCache<L>
) -> Result<(), Error>
Writes the given batch to Blob storage and logically adds the contained updates to this unsealed.
sourcepub fn unsealed_snapshot(
&self,
ts_lower: Antichain<u64>,
ts_upper: Antichain<u64>
) -> Result<UnsealedSnapshotMeta, Error>
pub fn unsealed_snapshot(
&self,
ts_lower: Antichain<u64>,
ts_upper: Antichain<u64>
) -> Result<UnsealedSnapshotMeta, Error>
Returns a consistent read of the updates contained in this unsealed matching the given filters (in practice, everything not in Trace).
sourcepub fn unsealed_drain<L: Blob>(
&mut self,
blob: &mut BlobCache<L>
) -> Result<(), Error>
pub fn unsealed_drain<L: Blob>(
&mut self,
blob: &mut BlobCache<L>
) -> Result<(), Error>
Atomically moves all writes in unsealed not in advance of the trace’s seal frontier into the trace and does any necessary resulting eviction work to remove unnecessary batches.
sourcepub fn unsealed_next_drain_req(&self) -> Result<Option<DrainUnsealedReq>, Error>
pub fn unsealed_next_drain_req(&self) -> Result<Option<DrainUnsealedReq>, Error>
Get the next available drain work from the unsealed, if some exists.
sourcepub fn drain_unsealed_blocking<B: Blob>(
blob: &BlobCache<B>,
req: DrainUnsealedReq
) -> Result<DrainUnsealedRes, Error>
pub fn drain_unsealed_blocking<B: Blob>(
blob: &BlobCache<B>,
req: DrainUnsealedReq
) -> Result<DrainUnsealedRes, Error>
Copies unsealed data matching the specified description into a new trace batch.
sourcepub fn unsealed_handle_drain_response(&mut self, res: DrainUnsealedRes)
pub fn unsealed_handle_drain_response(&mut self, res: DrainUnsealedRes)
Handle an externally completed trace compaction request.
TODO: Call unsealed_evict at the end of this and return a list of unsealed batches that can now be physically deleted after the drain step is committed to durable storage. This could save us a META write.
sourcepub fn unsealed_evict(&mut self) -> Vec<UnsealedBatchMeta>ⓘNotable traits for Vec<u8, A>impl<A> Write for Vec<u8, A> where
A: Allocator,
pub fn unsealed_evict(&mut self) -> Vec<UnsealedBatchMeta>ⓘNotable traits for Vec<u8, A>impl<A> Write for Vec<u8, A> where
A: Allocator,
A: Allocator,
Remove all batches containing only data strictly before the trace’s physical ts frontier.
Returns a list of batches that can safely be deleted after the eviction is committed to durable storage.
sourcepub fn unsealed_step<L: Blob>(
&mut self,
blob: &mut BlobCache<L>
) -> Result<bool, Error>
pub fn unsealed_step<L: Blob>(
&mut self,
blob: &mut BlobCache<L>
) -> Result<bool, Error>
Take one step towards shrinking the representation of this unsealed.
Returns true if the trace was modified, false otherwise.
sourcepub fn trace_ts_upper(&self) -> Antichain<u64>
pub fn trace_ts_upper(&self) -> Antichain<u64>
The frontier of times that have been physically moved into trace.
While self.seal
tracks the frontier of times that have been logically
been closed and are eligible to be moved into the trace,
self.trace_ts_upper()
tracks the frontier of times that have
actually been physically moved into the trace. self.seal()
is required
to manage invariants between commands (e.g. a seal request has to be at
a time in advance of prior seal requests) whereas
self.trace_ts_upper()
is required to manage physical reads
and writes to the trace (e.g. to determine which times may be added that
are not already present.
Invariant:
- self.trace_ts_upper() <= self.seal()
sourcepub fn get_seal(&self) -> Antichain<u64>
pub fn get_seal(&self) -> Antichain<u64>
A logical upper bound on the times which may currently be added to the trace.
sourcepub fn update_seal(&mut self, ts: u64)
pub fn update_seal(&mut self, ts: u64)
Update the seal frontier to ts
.
This function intentionally does not do any checking to see if ts is in advance of the current seal frontier, because we sometimes need to use this to revert a seal update in the event of a storage failure.
sourcepub fn validate_seal(&self, ts: u64) -> Result<(), String>
pub fn validate_seal(&self, ts: u64) -> Result<(), String>
Checks whether the given seal would be valid to pass to Self::update_seal.
sourcepub fn since(&self) -> Antichain<u64>
pub fn since(&self) -> Antichain<u64>
A lower bound on the time at which updates may have been logically compacted together.
sourcepub fn validate_allow_compaction(
&self,
since: &Antichain<u64>
) -> Result<(), String>
pub fn validate_allow_compaction(
&self,
since: &Antichain<u64>
) -> Result<(), String>
Checks whether the given since would be valid to pass to Self::allow_compaction.
sourcepub fn allow_compaction(&mut self, since: Antichain<u64>)
pub fn allow_compaction(&mut self, since: Antichain<u64>)
Update the compaction frontier to since
.
This function intentionally does not do any checking to see if ts is in advance of the current seal frontier, because we sometimes need to use this to revert a seal update in the event of a storage failure.
sourcepub fn trace_snapshot<B: BlobRead>(&self, blob: &BlobCache<B>) -> TraceSnapshot
pub fn trace_snapshot<B: BlobRead>(&self, blob: &BlobCache<B>) -> TraceSnapshot
Returns a consistent read of all the updates contained in this trace.
sourcepub fn trace_next_compact_req(&self) -> Result<Option<CompactTraceReq>, Error>
pub fn trace_next_compact_req(&self) -> Result<Option<CompactTraceReq>, Error>
Get the next available compaction work from the trace, if some exists.
sourcepub fn trace_handle_compact_response(
&mut self,
res: CompactTraceRes
) -> Vec<TraceBatchMeta>ⓘNotable traits for Vec<u8, A>impl<A> Write for Vec<u8, A> where
A: Allocator,
pub fn trace_handle_compact_response(
&mut self,
res: CompactTraceRes
) -> Vec<TraceBatchMeta>ⓘNotable traits for Vec<u8, A>impl<A> Write for Vec<u8, A> where
A: Allocator,
A: Allocator,
Handle an externally completed trace compaction request.
Returns a list of trace batches that can now be physically deleted after the compaction step is committed to durable storage.
Trait Implementations
Auto Trait Implementations
impl RefUnwindSafe for Arrangement
impl Send for Arrangement
impl Sync for Arrangement
impl Unpin for Arrangement
impl UnwindSafe for Arrangement
Blanket Implementations
sourceimpl<T> BorrowMut<T> for T where
T: ?Sized,
impl<T> BorrowMut<T> for T where
T: ?Sized,
const: unstable · sourcefn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Mutably borrows from an owned value. Read more
sourceimpl<T> FutureExt for T
impl<T> FutureExt for T
sourcefn with_context(self, otel_cx: Context) -> WithContext<Self>
fn with_context(self, otel_cx: Context) -> WithContext<Self>
sourcefn with_current_context(self) -> WithContext<Self>
fn with_current_context(self) -> WithContext<Self>
sourceimpl<T> Instrument for T
impl<T> Instrument for T
sourcefn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
sourcefn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
sourceimpl<T> IntoRequest<T> for T
impl<T> IntoRequest<T> for T
sourcefn into_request(self) -> Request<T>
fn into_request(self) -> Request<T>
Wrap the input message T
in a tonic::Request
sourceimpl<T> WithSubscriber for T
impl<T> WithSubscriber for T
sourcefn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self> where
S: Into<Dispatch>,
fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self> where
S: Into<Dispatch>,
Attaches the provided Subscriber
to this type, returning a
WithDispatch
wrapper. Read more
sourcefn with_current_subscriber(self) -> WithDispatch<Self>
fn with_current_subscriber(self) -> WithDispatch<Self>
Attaches the current default Subscriber
to this type, returning a
WithDispatch
wrapper. Read more