Function mz_persist_txn::operator::txns_progress
source · pub fn txns_progress<K, V, T, D, P, C, F, G>(
passthrough: Stream<G, P>,
name: &str,
ctx: TxnsContext<T>,
config_set: &ConfigSet,
client_fn: impl Fn() -> F,
txns_id: ShardId,
data_id: ShardId,
as_of: T,
until: Antichain<T>,
data_key_schema: Arc<K::Schema>,
data_val_schema: Arc<V::Schema>
) -> (Stream<G, P>, Vec<PressOnDropButton>)
Expand description
An operator for translating physical data shard frontiers into logical ones.
A data shard in the txns set logically advances its upper each time a txn is
committed, but the upper is not physically advanced unless that data shard
was involved in the txn. This means that a shard_source (or any read)
pointed at a data shard would appear to stall at the time of the most recent
write. We fix this for shard_source by flowing its output through a new
txns_progress
dataflow operator, which ensures that the
frontier/capability is advanced as the txns shard progresses, as long as the
shard_source is up to date with the latest committed write to that data
shard.
Example:
- A data shard has most recently been written to at 3.
- The txns shard’s upper is at 6.
- We render a dataflow containing a shard_source with an as_of of 5.
- A txn NOT involving the data shard is committed at 7.
- A txn involving the data shard is committed at 9.
How it works:
- The shard_source operator is rendered. Its single output is hooked up as a disconnected input to txns_progress. The txns_progress single output is a stream of the same type, which is used by downstream operators. This txns_progress operator is targeted at one data_shard; rendering a shard_source for a second data shard requires a second txns_progress operator.
- The shard_source operator emits data through 3 and advances the frontier.
- The txns_progress operator passes through these writes and frontier
advancements unchanged. (Recall that it’s always correct to read a data
shard “normally”, it just might stall.) Because the txns_progress operator
knows there are no writes in
[3,5]
, it then downgrades its own capability past 5 (to 6). Because the input is disconnected, this means the overall frontier of the output is downgraded to 6. - The txns_progress operator learns about the write at 7 (the upper is now 8). Because it knows that the data shard was not involved in this, it’s free to downgrade its capability to 8.
- The txns_progress operator learns about the write at 9 (the upper is now 10). It knows that the data shard WAS involved in this, so it forwards on data from its input until the input has progressed to 10, at which point it can itself downgrade to 10.