pub struct Arrangement { /* private fields */ }
Expand description

A persistent, compacting data structure containing indexed (Key, Value, Time, i64) entries.

The data is logically and physically separated into two “buckets”: unsealed and trace. It first enters and is initially placed into unsealed, which is a holding pen roughly corresponding to the in-memory buffer of a differential dataflow arrangement operator. At some point, the arranged collection is sealed, which advances the upper timestamp of the collection and logically (but not physically) moves the data into trace. The trace bucket indexes the data by (key, value, time). At some later point, unsealed_step is called, which physically moves the data from unsealed to trace.

There are two notable differences between a persisted arrangement and a differential in-mem one (besides the obvious durability):

  • Because in-mem operations are so much faster than ones on durable storage, the act of advancing the frontier and moving data into trace, one step in differential, is split into separate steps in persist.
  • The differential arrangement keeps the data arranged for efficient indexed access (hence the name). Persist also keeps the data arranged the same way, but finishing up the plumbing for indexed access is still a TODO.

Further details below.

Unsealed

Unsealed exists to hold data that has been added to the persistent collection but not yet “seal“ed into a trace. We store incoming data as immutable batches of updates, corresponding to non-empty, sorted intervals of crate::location::SeqNos.

As times get sealed and the corresponding updates get moved into the trace, Unsealed can remove those updates, and eventually, entire batches. The approach to removing sealed data optimizes for the common case, for which we assume that:

  • data arrives roughly in order,
  • unsealed batches contain data for a small range of distinct times. Every unsealed batch tracks the minimum and maximum update timestamp contained within its list of updates, and we eagerly drop batches that only contain data prior to the sealed frontier whenever possible. In the expected case, this should be sufficient to ensure that Unsealed maintains a bounded storage footprint. If either of the two assumptions are violated, either because updates arrive out of order, or batches contain data at many distinct timestamps, we periodically try to remove the updates strictly behind the current sealed frontier from a given unsealed batch and replace it with a “trimmed” batch that uses less storage.

This approach intentionally does nothing to physically coalesce multiple unsealed batches into a single unsealed batch. Doing so has many potential downsides; for example physically merging a batch containing updates 5 seconds ahead of the current sealed frontier with another batch containing updates 5 hours ahead of the current sealed frontier would only hurt 5 seconds later, when the previously unmerged batch would have been dropped. Instead, the merged batch has to be trimmed, which requires an extra read and write. If we end up having significant amounts of data far ahead of the current sealed frontier we likely will need a different structure that can hold batches of updates organized by overlapping ranges of times and physically merge unsealed batches using an approach similar to trace physical compaction.

Trace

An append-only list of immutable batches that describe updates corresponding to sorted, contiguous, non-overlapping ranges of times. The since frontier defines a time before which we can compact the history of updates (and correspondingly no longer answer queries about).

We can compact the updates prior to the since frontier physically, by combining batches representing consecutive intervals into one large batch representing the union of those intervals, and logically, by forwarding updates at times before the since frontier to the since frontier.

We also want to achieve a balance between the compactness of the representation with the computational effort required to maintain the representation. Specifically, if we have N batches of data already compacted, we don’t want every additional batch to perform O(N) work (i.e. merge with N batches worth of data) in order to get compacted. Instead, we would like to keep a geometrically decreasing (when viewed from oldest to most recent) sequence of batches and perform O(N) work every N calls to append. Thankfully, we can achieve all of this with a few simple rules:

  • Batches are able to be compacted once the since frontier is in advance of all of the data in the batch.
  • All batches are assigned a nonzero compaction level. When a new batch is appended to the trace it is assigned a compaction level of 0.
  • We periodically merge consecutive batches at the same level L representing time intervals [lo, mid) and [mid, hi) into a single batch representing all of the updates in [lo, hi) with level L + 1 iff the new batch contains more data than both of its parents, and L otherwise.. Once two batches are merged they are removed from the trace and replaced with the merged batch.
  • Perform merges for the oldest batches possible first.

NB: this approach assumes that all batches are roughly uniformly sized when they are first appended.

Invariants

  • New updates less than the seal frontier are never added to unsealed.
  • Unsealed batches have non-overlapping SeqNo ranges.
  • All trace updates are less than the seal frontier.
  • Trace batches are sorted by time and represent a sorted, consecutive, non-overlapping list of time intervals.
  • Individual batches are immutable, and their set of updates, the time interval they describe and their compaction level all remain constant as long as the batch remains in the trace.
  • The compaction levels across the list of batches in a trace are weakly decreasing (non-increasing) when iterating from oldest to most recent time intervals.
  • TODO: Space usage.

Implementations

Returns an Arrangement re-instantiated with the previously serialized state.

Get a new key to write to the Blob store for this arrangement.

Serializes the state of this Arrangement for later re-instantiation.

An open upper bound on the seqnos of contained updates.

Returns a consistent read of all the updates contained in this arrangement.

Writes the given batch to Blob storage and logically adds the contained updates to this unsealed.

Returns a consistent read of the updates contained in this unsealed matching the given filters (in practice, everything not in Trace).

Atomically moves all writes in unsealed not in advance of the trace’s seal frontier into the trace and does any necessary resulting eviction work to remove unnecessary batches.

Get the next available drain work from the unsealed, if some exists.

Copies unsealed data matching the specified description into a new trace batch.

Handle an externally completed trace compaction request.

TODO: Call unsealed_evict at the end of this and return a list of unsealed batches that can now be physically deleted after the drain step is committed to durable storage. This could save us a META write.

Remove all batches containing only data strictly before the trace’s physical ts frontier.

Returns a list of batches that can safely be deleted after the eviction is committed to durable storage.

Take one step towards shrinking the representation of this unsealed.

Returns true if the trace was modified, false otherwise.

The frontier of times that have been physically moved into trace.

While self.seal tracks the frontier of times that have been logically been closed and are eligible to be moved into the trace, self.trace_ts_upper() tracks the frontier of times that have actually been physically moved into the trace. self.seal() is required to manage invariants between commands (e.g. a seal request has to be at a time in advance of prior seal requests) whereas self.trace_ts_upper() is required to manage physical reads and writes to the trace (e.g. to determine which times may be added that are not already present.

Invariant:

  • self.trace_ts_upper() <= self.seal()

A logical upper bound on the times which may currently be added to the trace.

Update the seal frontier to ts.

This function intentionally does not do any checking to see if ts is in advance of the current seal frontier, because we sometimes need to use this to revert a seal update in the event of a storage failure.

Checks whether the given seal would be valid to pass to Self::update_seal.

A lower bound on the time at which updates may have been logically compacted together.

Checks whether the given since would be valid to pass to Self::allow_compaction.

Update the compaction frontier to since.

This function intentionally does not do any checking to see if ts is in advance of the current seal frontier, because we sometimes need to use this to revert a seal update in the event of a storage failure.

Returns a consistent read of all the updates contained in this trace.

Get the next available compaction work from the trace, if some exists.

Handle an externally completed trace compaction request.

Returns a list of trace batches that can now be physically deleted after the compaction step is committed to durable storage.

Trait Implementations

Formats the value using the given formatter. Read more

Auto Trait Implementations

Blanket Implementations

Gets the TypeId of self. Read more

Immutably borrows from an owned value. Read more

Mutably borrows from an owned value. Read more

Returns the argument unchanged.

Attaches the provided Context to this type, returning a WithContext wrapper. Read more

Attaches the current Context to this type, returning a WithContext wrapper. Read more

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more

Instruments this type with the current Span, returning an Instrumented wrapper. Read more

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Wrap the input message T in a tonic::Request

Should always be Self

The type returned in the event of a conversion error.

Performs the conversion.

The type returned in the event of a conversion error.

Performs the conversion.

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more