Expand description

Render an operator that persists a source collection.

Implementation

This module defines the persist_sink operator, that writes a collection produced by source rendering into a persist shard.

It attempts to use all workers to write data to persist, and uses single-instance workers to coordinate work. The below diagram is an overview how it it shaped. There is more information in the doc comments of the top-level functions of this module.


                                       ,------------.
                                       | source     |
                                       | collection |
                                       +---+--------+
                                       /   |
                                      /    |
                                     /     |
                                    /      |
                                   /       |
                                  /        |
                                 /         |
                                /          |
                               /     ,-+-----------------------.
                              /      | mint_batch_descriptions |
                             /       | one arbitrary worker    |
                            |        +-,--,--------+----+------+
                           ,----------´.-´         |     \
                       _.-´ |       .-´            |      \
                   _.-´     |    .-´               |       \
                .-´  .------+----|-------+---------|--------\-----.
               /    /            |       |         |         \     \
        ,--------------.   ,-----------------.     |     ,-----------------.
        | write_batches|   |  write_batches  |     |     |  write_batches  |
        | worker 0     |   | worker 1        |     |     | worker N        |
        +-----+--------+   +-+---------------+     |     +--+--------------+
               \              \                    |        /
                `-.            `,                  |       /
                   `-._          `-.               |      /
                       `-._         `-.            |     /
                           `---------. `-.         |    /
                                     +`---`---+-------------,
                                     | append_batches       |
                                     | one arbitrary worker |
                                     +------+---------------+

Similarities with mz_compute::sink::persist_sink

This module has many similarities with the compute version of the same concept, and in fact, is entirely derived from it.

Compute requires that its persist_sink is self-correcting; that is, it corrects what the collection in persist accumulates to if the collection has values changed at previous timestamps. It does this by continually comparing the input stream with the collection as read back from persist.

Source collections, while definite, cannot be reliably by re-produced once written down, which means compute’s persist_sink’s self-correction mechanism would need to be skipped on operator startup, and would cause unnecessary read load on persist.

Additionally, persisting sources requires we use bounded amounts of memory, even if a single timestamp represents a huge amount of data. This is not (currently) possible to guarantee while also performing self-correction.

Because of this, we have ripped out the self-correction mechanism, and aggressively simplified the sub-operators. Some, particularly append_batches could be merged with the compute version, but that requires some amount of onerous refactoring that we have chosen to skip for now.

Structs

  • Manages batches and metrics, including the small-batch optimization in write_batches.
  • Metrics about batches.
  • BatchSet 🔒
    Holds finished batches and the data for the small-batch optimization for append_batches.
  • A batch or data + metrics moved from write_batches to append_batches.

Functions

  • Fuses written batches together and appends them to persist using one compare_and_append call. Writing only happens for batch descriptions where we know that no future batches will arrive, that is, for those batch descriptions that are not beyond the frontier of both the batch_descriptions and batches inputs.
  • Whenever the frontier advances, this mints a new batch description (lower and upper) that writers should use for writing the next set of batches to persist.
  • render 🔒
    Continuously writes the desired_stream into persist This is done via a multi-stage operator graph:
  • Writes desired_collection to persist, but only for updates that fall into batch a description that we get via batch_descriptions. This forwards a HollowBatch (with additional metadata) for any batch of updates that was written.