Module mz_timely_util::reclock
source Β· Expand description
Β§Notation
Collections are represented with capital letters (T, S, R), collection traces as bold letters (π, π, π), and difference traces as Ξ΄π.
Indexing a collection trace π to obtain its version at t
is written as π(t). Indexing a
collection to obtain the multiplicity of a record x
is written as T[x]. These can be combined
to obtain the multiplicity of a record x
at some version t
as π(t)[x].
Β§Overview
Reclocking transforms a source collection S
that evolves with some timestamp FromTime
into
a collection T
that evolves with some other timestamp IntoTime
. The reclocked collection T
contains all updates u β S
that are not beyond some FromTime
frontier R(t). The collection
R
is called the remap collection.
More formally, for some arbitrary time t
of IntoTime
and some arbitrary record x
, the
reclocked collection T(t)[x]
is defined to be the sum{Ξ΄π(s)[x]: !(π(t) βͺ― s)}
. Since this
holds for any record we can write the definition of Reclock(π, π) as:
Reclock(π, π) β π: β t β IntoTime : π(t) = sum{Ξ΄π(s): !(π(t) βͺ― s)}
In order for the reclocked collection T
to have a sensible definition of progress we require
that t1 β€ t2 β π(t1) βͺ― π(t2)
where the first β€
is the partial order of IntoTime
and the
second one the partial order of FromTime
antichains.
Β§Total order simplification
In order to simplify the implementation we will require that IntoTime
is a total order. This
limitation can be lifted in the future but further elaboration on the mechanics of reclocking
is required to ensure a correct implementation.
Β§The difference trace
By the definition of difference traces we have:
Ξ΄π(t) = T(t) - sum{Ξ΄π(s): s < t}
Due to the total order assumption we only need to consider two cases.
Case 1: t
is the minimum timestamp
In this case sum{Ξ΄π(s): s < t}
is the empty set and so we obtain:
Ξ΄π(min) = T(min) = sum{Ξ΄π(s): !(π(min) β€ s}
Case 2: t
is a timestamp with a predecessor prev
In this case sum{Ξ΄π(s): s < t}
is equal to T(prev)
because:
sum{Ξ΄π(s): s < t} = sum{Ξ΄π(s): s β€ prev} + sum{Ξ΄π(s): prev < s < t}
= T(prev) + β
= T(prev)
And therefore the difference trace of T is:
Ξ΄π(t) = π(t) - π(prev)
= sum{Ξ΄π(s): !(π(t) βͺ― s)} - sum{Ξ΄π(s): !(π(prev) βͺ― s)}
= sum{Ξ΄π(s): (π(prev) βͺ― s) β§ !(π(t) βͺ― s)}
Β§Unique mapping property
Given the definition above we can derive the fact that for any source difference Ξ΄π(s) there is at most one target timestamp t that it must be reclocked to. This property can be exploited by the implementation of the operator as it can safely discard source updates once a matching Ξ΄T(t) has been found, making it βstatelessβ with respect to the source trace. A formal proof of this property is provided below.
Β§Operational description
The operator follows a run-to-completion model where on each scheduling it completes all outstanding work that can be completed.
Β§Unique mapping property proof
This section contains the formal proof the unique mapping property. The proof follows the structure proof notation created by Leslie Lamport. Readers unfamiliar with structured proofs can read about them here https://lamport.azurewebsites.net/pubs/proof.pdf.
Β§Statement
AtMostOne(X, Ο(x)) β β x1, x2 β X : Ο(x1) β§ Ο(x2) β x1 = x2
- THEOREM UniqueMapping β
- ASSUME
- NEW (FromTime, βͺ―) β PartiallyOrderedTimestamps
- NEW (IntoTime, β€) β TotallyOrderedTimestamps
- NEW π β SetOfCollectionTraces(FromTime)
- NEW π β SetOfCollectionTraces(IntoTime)
- β t β IntoTime: π(t) β SetOfAntichains(FromTime)
- β t1, t1 β IntoTime: t1 β€ t2 β π(t1) βͺ― π(t2)
- NEW π = Reclock(π, π)
- PROVE β s β FromTime : AtMostOne(IntoTime, Ξ΄π(s) β Ξ΄π(x))
- ASSUME
Β§Proof
- SUFFICES ASSUME β s β FromTime: Β¬AtMostOne(IntoTime, Ξ΄π(s) β Ξ΄π(x))
- PROVE FALSE
- By proof by contradiction.
- PICK s β FromTime : Β¬AtMostOne(IntoTime, Ξ΄π(s) β Ξ΄π(x))
- Proof: Such time exists by <1>1.
- β t1, t2 β IntoTime : t1 β t2 β§ Ξ΄π(s) β Ξ΄π(t1) β§ Ξ΄π(s) β Ξ΄π(t2)
- Β¬(β x1, x2 β X : (Ξ΄π(s) β Ξ΄π(x1)) β§ (Ξ΄π(s) β Ξ΄π(x2)) β x1 = x2)
- Proof: By <1>2 and definition of AtMostOne.
- Q.E.D
- Proof: By <2>1, quantifier negation rules, and theorem of propositional logic Β¬(P β Q) β‘ P β§ Β¬Q.
- Β¬(β x1, x2 β X : (Ξ΄π(s) β Ξ΄π(x1)) β§ (Ξ΄π(s) β Ξ΄π(x2)) β x1 = x2)
- PICK t1, t2 β IntoTime : t1 < t2 β§ Ξ΄π(s) β Ξ΄π(t1) β§ Ξ΄π(s) β Ξ΄π(t2)
- Proof: By <1>3. Assume t1 < t2 without loss of generality.
- Β¬(π(t1) βͺ― s)
- CASE t1 = min(IntoTime)
- Ξ΄π(t1) = sum{Ξ΄π(s): !(π(t1)) βͺ― s}
- Proof: By definition of Ξ΄π(min).
- Ξ΄π(s) β Ξ΄π(t1)
- Proof: By <1>4.
- Q.E.D
- Proof: By <3>1 and <3>2.
- Ξ΄π(t1) = sum{Ξ΄π(s): !(π(t1)) βͺ― s}
- CASE t1 > min(IntoTime)
- PICK t1_prev = Predecessor(t1)
- Proof: Predecessor exists because the set {t: t < t1} is non-empty since it must contain at least min(IntoTime).
- Ξ΄π(t1) = sum{Ξ΄π(s): (π(t1_prev) βͺ― s) β§ !(π(t1) βͺ― s)}
- Proof: By definition of Ξ΄π(t).
- Ξ΄π(s) β Ξ΄π(t1)
- Proof: By <1>4.
- Q.E.D
- Proof: By <3>2 and <3>3.
- PICK t1_prev = Predecessor(t1)
- Q.E.D
- Proof: From cases <2>1 and <2>2 which are exhaustive
- CASE t1 = min(IntoTime)
- PICK t2_prev β IntoTime : t2_prev = Predecessor(t2)
- Proof: Predecessor exists because by <1>4 the set {t: t < t2} is non empty since it must contain at least t1.
- t1 β€ t2_prev
- Proof: t1 β {t: t < t2} and t2_prev is the maximum element of the set.
- π(t2) βͺ― s
- t2 > min(IntoTime)
- Proof: By <1>5.
- PICK t2_prev = Predecessor(t2)
- Proof: Predecessor exists because the set {t: t < t2} is non-empty since it must contain at least min(IntoTime).
- Ξ΄π(t) = sum{Ξ΄π(s): (π(t2_prev) βͺ― s) β§ !(π(t) βͺ― s)}
- Proof: By definition of Ξ΄π(t)
- Ξ΄π(s) β Ξ΄π(t1)
- Proof: By <1>4.
- Q.E.D
- Proof: By <2>3 and <2>4.
- t2 > min(IntoTime)
- π(t1) βͺ― π(t2_prev)
- Proof: By <1>.7 and hypothesis on R
- π(t1) βͺ― s
- Proof: By <1>8 and <1>9.
- Q.E.D
- Proof: By <1>5 and <1>10
Structs§
- Chain
Batch πA batch of differential updates that vary over some partial order. This type maintains the data as a set of chains that allows for efficient extraction of batches given a frontier.
Functions§
- Constructs an operator that reclocks a
source
collection varying with some timeFromTime
into the correspondingreclocked
collection varying over some timeIntoTime
using the providedremap
collection.