Function mz_persist_txn::operator::txns_progress

source ·
pub fn txns_progress<K, V, T, D, P, C, F, G>(
    passthrough: Stream<G, P>,
    name: &str,
    ctx: TxnsContext<T>,
    config_set: &ConfigSet,
    client_fn: impl Fn() -> F,
    txns_id: ShardId,
    data_id: ShardId,
    as_of: T,
    until: Antichain<T>,
    data_key_schema: Arc<K::Schema>,
    data_val_schema: Arc<V::Schema>
) -> (Stream<G, P>, Vec<PressOnDropButton>)
where K: Debug + Codec + Send + Sync, V: Debug + Codec + Send + Sync, T: Timestamp + Lattice + TotalOrder + StepForward + Codec64, D: Data + Semigroup + Codec64 + Send + Sync, P: Debug + Data, C: TxnsCodec + 'static, F: Future<Output = PersistClient> + Send + 'static, G: Scope<Timestamp = T>,
Expand description

An operator for translating physical data shard frontiers into logical ones.

A data shard in the txns set logically advances its upper each time a txn is committed, but the upper is not physically advanced unless that data shard was involved in the txn. This means that a shard_source (or any read) pointed at a data shard would appear to stall at the time of the most recent write. We fix this for shard_source by flowing its output through a new txns_progress dataflow operator, which ensures that the frontier/capability is advanced as the txns shard progresses, as long as the shard_source is up to date with the latest committed write to that data shard.

Example:

  • A data shard has most recently been written to at 3.
  • The txns shard’s upper is at 6.
  • We render a dataflow containing a shard_source with an as_of of 5.
  • A txn NOT involving the data shard is committed at 7.
  • A txn involving the data shard is committed at 9.

How it works:

  • The shard_source operator is rendered. Its single output is hooked up as a disconnected input to txns_progress. The txns_progress single output is a stream of the same type, which is used by downstream operators. This txns_progress operator is targeted at one data_shard; rendering a shard_source for a second data shard requires a second txns_progress operator.
  • The shard_source operator emits data through 3 and advances the frontier.
  • The txns_progress operator passes through these writes and frontier advancements unchanged. (Recall that it’s always correct to read a data shard “normally”, it just might stall.) Because the txns_progress operator knows there are no writes in [3,5], it then downgrades its own capability past 5 (to 6). Because the input is disconnected, this means the overall frontier of the output is downgraded to 6.
  • The txns_progress operator learns about the write at 7 (the upper is now 8). Because it knows that the data shard was not involved in this, it’s free to downgrade its capability to 8.
  • The txns_progress operator learns about the write at 9 (the upper is now 10). It knows that the data shard WAS involved in this, so it forwards on data from its input until the input has progressed to 10, at which point it can itself downgrade to 10.