Module mz_compute::render::continual_task
source · Expand description
A continual task presents as something like a TRIGGER
: it watches some
input and whenever it changes at time T
, executes a SQL txn, writing to
some output at the same time T
. It can also read anything in materialize
as a reference, most notably including the output.
Only reacting to new inputs (and not the full history) makes a CT’s rehydration time independent of the size of the inputs (NB this is not true for references), enabling things like writing UPSERT on top of an append-only shard in SQL (ignore the obvious bug with my upsert impl):
CREATE CONTINUAL TASK upsert (key INT, val INT) ON INPUT append_only AS (
DELETE FROM upsert WHERE key IN (SELECT key FROM append_only);
INSERT INTO upsert SELECT key, max(val) FROM append_only GROUP BY key;
)
Unlike a materialized view, the continual task does not update outputs if references later change. This enables things like auditing:
CREATE CONTINUAL TASK audit_log (count INT8) ON INPUT anomalies AS (
INSERT INTO audit_log SELECT * FROM anomalies;
)
Rough implementation overview:
- A CT is created and starts at some
start_ts
optionally later dropped and stopped at someend_ts
. - A CT takes one or more _input_s. These must be persist shards (i.e. TABLE, SOURCE, MV, but not VIEW).
- A CT has one or more _output_s. The outputs are (initially) owned by the task and cannot be written to by other parts of the system.
- The task is run for each time one of the inputs changes starting at
start_ts
. - It is given the changes in its inputs at time
T
as diffs.- These are presented as two SQL relations with just the inserts/deletes.
- NB: A full collection for the input can always be recovered by also using the input as a “reference” (see below) and applying the diffs.
- The task logic is expressed as a SQL transaction that does all reads at
commits all writes at
T
- The notable exception to this is self-referential reads of the CT output. See below for how that works.
- This logic can reference any nameable object in the system, not just the
inputs.
- However, the logic/transaction can mutate only the outputs.
- Summary of differences between inputs and references:
- The task receives snapshot + changes for references (like regular dataflow inputs today) but only changes for inputs.
- The task only produces output in response to changes in the inputs but not in response to changes in the references.
- Instead of re-evaluating the task logic from scratch for each input time, we maintain the collection representing desired writes to the output(s) as a dataflow.
- The task dataflow is tied to a
CLUSTER
and runs on eachREPLICA
.- HA strategy: multi-replica clusters race to commit and the losers throw away the result.
§Self-References
Self-references must be handled differently from other reads. When computing
the proposed write to some output at T
, we can only know the contents of
it through T-1
(the exclusive upper is T
).
We address this by initially assuming that the output contains no changes at
T
, then evaluating each of the statements in order, allowing them to see
the proposed output changes made by the previous statements. By default,
this is stopped after one iteration and proposed output diffs are committed
if possible. (We could also add options for iterating to a fixpoint,
stop/error after N iters, etc.) Then to compute the changes at T+1
, we
read in what was actually written to the output at T
(maybe some other
replica wrote something different) and begin again.
The above is very similar to how timely/differential dataflow iteration
works, except that our feedback loop goes through persist and the loop
timestamp is already mz_repr::Timestamp
.
This is implemented as follows:
let I = persist_source(self-reference)
- Transform
I
such that the contents atT-1
are presented atT
(i.e. initially assumeT
is unchanged fromT-1
). - TODO(ct3): Actually implement the following.
- In an iteration sub-scope:
- Bring
I
into the sub-scope andlet proposed = Variable
. - We need a collection that at
(T, 0)
is always the contents ofI
atT
, but at(T, 1...)
contains the proposed diffs by the CT logic. We can construct it by concatenatingI
withproposed
except that we also need to retract everything inproposed
for the next(T+1, 0)
(becauseI
is the source of truth for what actually committed).
- Bring
let R = retract_at_next_outer_ts(proposed)
let result = logic(concat(I, proposed, R))
proposed.set(result)
- Then we return
proposed.leave()
for attempted write to persist.
§As Ofs and Output Uppers
- A continual task is first created with an initial as_of
I
. It is initially rendered at as_ofI==A
but as it makes progress, it may be rendered at later as_ofsI<A
. - It is required that the output collection springs into existence at
I
(i.e. receives the initial contents atI
).- For a snapshot CT, the full contents of the input at
I
are run through the CT logic and written atI
. - For a non-snapshot CT, the collection is defined to be empty at
I
(i.e. if the input happened to be written exactly atI
, we’d ignore it) and then start writing atI+1
.
- For a snapshot CT, the full contents of the input at
- As documented in DataflowDescription::as_of,
A
is the time we render the inputs.- An MV with an as_of of
A
will both have inputs rendered atA
and also the first time it could write is alsoA
. - A CT is the same on the initial render (
I==A
), but on renders after it has made progress (I<A
) the first time that it could potentially write isA+1
. This is because a persist_source started with SnapshotMode::Exclude can only start emitting diffs atas_of+1
. - As a result, we hold back the since on inputs to be strictly less than the upper of the output. (This is only necessary for CTs, but we also do it for MVs to avoid the special case.)
- For CT “inputs” (which are disallowed from being the output), we render
the persist_source with as_of
A
.- When
I==A
we include the snapshot iff the snapshot option is used. - When
I<A
we always exclude the snapshot. It would be unnecessary and this is an absolutely critical performance optimization to make CT rehydration times independent of input size.
- When
- For CT “references”, we render the persist_source with as_of
A
and always include the snapshot.- There is one subtlety: self-references on the initial render. We need
the contents to be available at
A-1
, so that we can do the step_forward described above to get it atA
. However, the collection springs into existence atI
, so we whenI==A
, we’re not allowed to read it as_ofA-1
(the since of the shard may have advanced past that). We address this by rendering the persist_source as normal atA
. On startup, persist_source immediately downgrades its frontier toA
(makingA-1
readable). Combined with step_forward, this is enough to unblock the CT self-reference. We do however have to tweak thesuppress_early_progress
operator to useA-1
instead ofA
for this case. - On subsequent renders, self-references work as normal.
- There is one subtlety: self-references on the initial render. We need
the contents to be available at
- An MV with an as_of of
Structs§
Enums§
- An encapsulation of the transformation logic necessary on data coming into a continual task.
Traits§
Functions§
- Writes the given data to the shard, truncating it as necessary.