Module mz_compute::render::continual_task

source ·
Expand description

A continual task presents as something like a TRIGGER: it watches some input and whenever it changes at time T, executes a SQL txn, writing to some output at the same time T. It can also read anything in materialize as a reference, most notably including the output.

Only reacting to new inputs (and not the full history) makes a CT’s rehydration time independent of the size of the inputs (NB this is not true for references), enabling things like writing UPSERT on top of an append-only shard in SQL (ignore the obvious bug with my upsert impl):

CREATE CONTINUAL TASK upsert (key INT, val INT) ON INPUT append_only AS (
    DELETE FROM upsert WHERE key IN (SELECT key FROM append_only);
    INSERT INTO upsert SELECT key, max(val) FROM append_only GROUP BY key;
)

Unlike a materialized view, the continual task does not update outputs if references later change. This enables things like auditing:

CREATE CONTINUAL TASK audit_log (count INT8) ON INPUT anomalies AS (
    INSERT INTO audit_log SELECT * FROM anomalies;
)

Rough implementation overview:

  • A CT is created and starts at some start_ts optionally later dropped and stopped at some end_ts.
  • A CT takes one or more _input_s. These must be persist shards (i.e. TABLE, SOURCE, MV, but not VIEW).
  • A CT has one or more _output_s. The outputs are (initially) owned by the task and cannot be written to by other parts of the system.
  • The task is run for each time one of the inputs changes starting at start_ts.
  • It is given the changes in its inputs at time T as diffs.
    • These are presented as two SQL relations with just the inserts/deletes.
    • NB: A full collection for the input can always be recovered by also using the input as a “reference” (see below) and applying the diffs.
  • The task logic is expressed as a SQL transaction that does all reads at commits all writes at T
    • The notable exception to this is self-referential reads of the CT output. See below for how that works.
  • This logic can reference any nameable object in the system, not just the inputs.
    • However, the logic/transaction can mutate only the outputs.
  • Summary of differences between inputs and references:
    • The task receives snapshot + changes for references (like regular dataflow inputs today) but only changes for inputs.
    • The task only produces output in response to changes in the inputs but not in response to changes in the references.
  • Instead of re-evaluating the task logic from scratch for each input time, we maintain the collection representing desired writes to the output(s) as a dataflow.
  • The task dataflow is tied to a CLUSTER and runs on each REPLICA.
    • HA strategy: multi-replica clusters race to commit and the losers throw away the result.

§Self-References

Self-references must be handled differently from other reads. When computing the proposed write to some output at T, we can only know the contents of it through T-1 (the exclusive upper is T).

We address this by initially assuming that the output contains no changes at T, then evaluating each of the statements in order, allowing them to see the proposed output changes made by the previous statements. By default, this is stopped after one iteration and proposed output diffs are committed if possible. (We could also add options for iterating to a fixpoint, stop/error after N iters, etc.) Then to compute the changes at T+1, we read in what was actually written to the output at T (maybe some other replica wrote something different) and begin again.

The above is very similar to how timely/differential dataflow iteration works, except that our feedback loop goes through persist and the loop timestamp is already mz_repr::Timestamp.

This is implemented as follows:

  • let I = persist_source(self-reference)
  • Transform I such that the contents at T-1 are presented at T (i.e. initially assume T is unchanged from T-1).
  • TODO(ct3): Actually implement the following.
  • In an iteration sub-scope:
    • Bring I into the sub-scope and let proposed = Variable.
    • We need a collection that at (T, 0) is always the contents of I at T, but at (T, 1...) contains the proposed diffs by the CT logic. We can construct it by concatenating I with proposed except that we also need to retract everything in proposed for the next (T+1, 0) (because I is the source of truth for what actually committed).
  • let R = retract_at_next_outer_ts(proposed)
  • let result = logic(concat(I, proposed, R))
  • proposed.set(result)
  • Then we return proposed.leave() for attempted write to persist.

§As Ofs and Output Uppers

  • A continual task is first created with an initial as_of I. It is initially rendered at as_of I==A but as it makes progress, it may be rendered at later as_ofs I<A.
  • It is required that the output collection springs into existence at I (i.e. receives the initial contents at I).
    • For a snapshot CT, the full contents of the input at I are run through the CT logic and written at I.
    • For a non-snapshot CT, the collection is defined to be empty at I (i.e. if the input happened to be written exactly at I, we’d ignore it) and then start writing at I+1.
  • As documented in DataflowDescription::as_of, A is the time we render the inputs.
    • An MV with an as_of of A will both have inputs rendered at A and also the first time it could write is also A.
    • A CT is the same on the initial render (I==A), but on renders after it has made progress (I<A) the first time that it could potentially write is A+1. This is because a persist_source started with SnapshotMode::Exclude can only start emitting diffs at as_of+1.
    • As a result, we hold back the since on inputs to be strictly less than the upper of the output. (This is only necessary for CTs, but we also do it for MVs to avoid the special case.)
    • For CT “inputs” (which are disallowed from being the output), we render the persist_source with as_of A.
      • When I==A we include the snapshot iff the snapshot option is used.
      • When I<A we always exclude the snapshot. It would be unnecessary and this is an absolutely critical performance optimization to make CT rehydration times independent of input size.
    • For CT “references”, we render the persist_source with as_of A and always include the snapshot.
      • There is one subtlety: self-references on the initial render. We need the contents to be available at A-1, so that we can do the step_forward described above to get it at A. However, the collection springs into existence at I, so we when I==A, we’re not allowed to read it as_of A-1 (the since of the shard may have advanced past that). We address this by rendering the persist_source as normal at A. On startup, persist_source immediately downgrades its frontier to A (making A-1 readable). Combined with step_forward, this is enough to unblock the CT self-reference. We do however have to tweak the suppress_early_progress operator to use A-1 instead of A for this case.
      • On subsequent renders, self-references work as normal.

Structs§

Enums§

Traits§

Functions§