Module mz_timely_util::reclock

source Β·
Expand description

Β§Notation

Collections are represented with capital letters (T, S, R), collection traces as bold letters (𝐓, 𝐒, 𝐑), and difference traces as δ𝐓.

Indexing a collection trace 𝐓 to obtain its version at t is written as 𝐓(t). Indexing a collection to obtain the multiplicity of a record x is written as T[x]. These can be combined to obtain the multiplicity of a record x at some version t as 𝐓(t)[x].

Β§Overview

Reclocking transforms a source collection S that evolves with some timestamp FromTime into a collection T that evolves with some other timestamp IntoTime. The reclocked collection T contains all updates u ∈ S that are not beyond some FromTime frontier R(t). The collection R is called the remap collection.

More formally, for some arbitrary time t of IntoTime and some arbitrary record x, the reclocked collection T(t)[x] is defined to be the sum{δ𝐒(s)[x]: !(𝐑(t) βͺ― s)}. Since this holds for any record we can write the definition of Reclock(𝐒, 𝐑) as:

Reclock(𝐒, 𝐑) β‰œ 𝐓: βˆ€ t ∈ IntoTime : 𝐓(t) = sum{δ𝐒(s): !(𝐑(t) βͺ― s)}

In order for the reclocked collection T to have a sensible definition of progress we require that t1 ≀ t2 β‡’ 𝐑(t1) βͺ― 𝐑(t2) where the first ≀ is the partial order of IntoTime and the second one the partial order of FromTime antichains.

Β§Total order simplification

In order to simplify the implementation we will require that IntoTime is a total order. This limitation can be lifted in the future but further elaboration on the mechanics of reclocking is required to ensure a correct implementation.

Β§The difference trace

By the definition of difference traces we have:

    δ𝐓(t) = T(t) - sum{δ𝐓(s): s < t}

Due to the total order assumption we only need to consider two cases.

Case 1: t is the minimum timestamp

In this case sum{δ𝐓(s): s < t} is the empty set and so we obtain:

    δ𝐓(min) = T(min) = sum{δ𝐒(s): !(𝐑(min) ≀ s}

Case 2: t is a timestamp with a predecessor prev

In this case sum{δ𝐓(s): s < t} is equal to T(prev) because:

    sum{δ𝐓(s): s < t} = sum{δ𝐓(s): s ≀ prev} + sum{δ𝐓(s): prev < s < t}
                      = T(prev) + βˆ…
                      = T(prev)

And therefore the difference trace of T is:

    δ𝐓(t) = 𝐓(t) - 𝐓(prev)
          = sum{δ𝐒(s): !(𝐑(t) βͺ― s)} - sum{δ𝐒(s): !(𝐑(prev) βͺ― s)}
          = sum{δ𝐒(s): (𝐑(prev) βͺ― s) ∧ !(𝐑(t) βͺ― s)}

Β§Unique mapping property

Given the definition above we can derive the fact that for any source difference δ𝐒(s) there is at most one target timestamp t that it must be reclocked to. This property can be exploited by the implementation of the operator as it can safely discard source updates once a matching Ξ΄T(t) has been found, making it β€œstateless” with respect to the source trace. A formal proof of this property is provided below.

Β§Operational description

The operator follows a run-to-completion model where on each scheduling it completes all outstanding work that can be completed.

Β§Unique mapping property proof

This section contains the formal proof the unique mapping property. The proof follows the structure proof notation created by Leslie Lamport. Readers unfamiliar with structured proofs can read about them here https://lamport.azurewebsites.net/pubs/proof.pdf.

Β§Statement

AtMostOne(X, Ο†(x)) β‰œ βˆ€ x1, x2 ∈ X : Ο†(x1) ∧ Ο†(x2) β‡’ x1 = x2

  • THEOREM UniqueMapping β‰œ
    • ASSUME
      • NEW (FromTime, βͺ―) ∈ PartiallyOrderedTimestamps
      • NEW (IntoTime, ≀) ∈ TotallyOrderedTimestamps
      • NEW 𝐒 ∈ SetOfCollectionTraces(FromTime)
      • NEW 𝐑 ∈ SetOfCollectionTraces(IntoTime)
      • βˆ€ t ∈ IntoTime: 𝐑(t) ∈ SetOfAntichains(FromTime)
      • βˆ€ t1, t1 ∈ IntoTime: t1 ≀ t2 β‡’ 𝐑(t1) βͺ― 𝐑(t2)
      • NEW 𝐓 = Reclock(𝐒, 𝐑)
    • PROVE βˆ€ s ∈ FromTime : AtMostOne(IntoTime, δ𝐒(s) ∈ δ𝐓(x))
Β§Proof
  1. SUFFICES ASSUME βˆƒ s ∈ FromTime: Β¬AtMostOne(IntoTime, δ𝐒(s) ∈ δ𝐓(x))
    • PROVE FALSE
    • By proof by contradiction.
  2. PICK s ∈ FromTime : Β¬AtMostOne(IntoTime, δ𝐒(s) ∈ δ𝐓(x))
    • Proof: Such time exists by <1>1.
  3. βˆƒ t1, t2 ∈ IntoTime : t1 β‰  t2 ∧ δ𝐒(s) ∈ δ𝐓(t1) ∧ δ𝐒(s) ∈ δ𝐓(t2)
    1. Β¬(βˆ€ x1, x2 ∈ X : (δ𝐒(s) ∈ δ𝐓(x1)) ∧ (δ𝐒(s) ∈ δ𝐓(x2)) β‡’ x1 = x2)
      • Proof: By <1>2 and definition of AtMostOne.
    2. Q.E.D
      • Proof: By <2>1, quantifier negation rules, and theorem of propositional logic Β¬(P β‡’ Q) ≑ P ∧ Β¬Q.
  4. PICK t1, t2 ∈ IntoTime : t1 < t2 ∧ δ𝐒(s) ∈ δ𝐓(t1) ∧ δ𝐒(s) ∈ δ𝐓(t2)
    • Proof: By <1>3. Assume t1 < t2 without loss of generality.
  5. Β¬(𝐑(t1) βͺ― s)
    1. CASE t1 = min(IntoTime)
      1. δ𝐓(t1) = sum{δ𝐒(s): !(𝐑(t1)) βͺ― s}
        • Proof: By definition of δ𝐓(min).
      2. δ𝐒(s) ∈ δ𝐓(t1)
        • Proof: By <1>4.
      3. Q.E.D
        • Proof: By <3>1 and <3>2.
    2. CASE t1 > min(IntoTime)
      1. PICK t1_prev = Predecessor(t1)
        • Proof: Predecessor exists because the set {t: t < t1} is non-empty since it must contain at least min(IntoTime).
      2. δ𝐓(t1) = sum{δ𝐒(s): (𝐑(t1_prev) βͺ― s) ∧ !(𝐑(t1) βͺ― s)}
        • Proof: By definition of δ𝐓(t).
      3. δ𝐒(s) ∈ δ𝐓(t1)
        • Proof: By <1>4.
      4. Q.E.D
        • Proof: By <3>2 and <3>3.
    3. Q.E.D
      • Proof: From cases <2>1 and <2>2 which are exhaustive
  6. PICK t2_prev ∈ IntoTime : t2_prev = Predecessor(t2)
    • Proof: Predecessor exists because by <1>4 the set {t: t < t2} is non empty since it must contain at least t1.
  7. t1 ≀ t2_prev
    • Proof: t1 ∈ {t: t < t2} and t2_prev is the maximum element of the set.
  8. 𝐑(t2) βͺ― s
    1. t2 > min(IntoTime)
      • Proof: By <1>5.
    2. PICK t2_prev = Predecessor(t2)
      • Proof: Predecessor exists because the set {t: t < t2} is non-empty since it must contain at least min(IntoTime).
    3. δ𝐓(t) = sum{δ𝐒(s): (𝐑(t2_prev) βͺ― s) ∧ !(𝐑(t) βͺ― s)}
      • Proof: By definition of δ𝐓(t)
    4. δ𝐒(s) ∈ δ𝐓(t1)
      • Proof: By <1>4.
    5. Q.E.D
      • Proof: By <2>3 and <2>4.
  9. 𝐑(t1) βͺ― 𝐑(t2_prev)
    • Proof: By <1>.7 and hypothesis on R
  10. 𝐑(t1) βͺ― s
    • Proof: By <1>8 and <1>9.
  11. Q.E.D
    • Proof: By <1>5 and <1>10

Structs§

  • ChainBatch πŸ”’
    A batch of differential updates that vary over some partial order. This type maintains the data as a set of chains that allows for efficient extraction of batches given a frontier.

Functions§

  • Constructs an operator that reclocks a source collection varying with some time FromTime into the corresponding reclocked collection varying over some time IntoTime using the provided remap collection.