Module mz_storage::sink::kafka

source ·
Expand description

Code to render the sink dataflow of a KafkaSinkConnection. The dataflow consists of two operators in order to take advantage of all the available workers.

       ┏━━━━━━━━━━━━━━┓
       ┃   persist    ┃
       ┃    source    ┃
       ┗━━━━━━┯━━━━━━━┛
              │ row data, the input to this module
              │
       ┏━━━━━━v━━━━━━┓
       ┃    row      ┃
       ┃   encoder   ┃
       ┗━━━━━━┯━━━━━━┛
              │ encoded data
              │
       ┏━━━━━━v━━━━━━┓
       ┃    kafka    ┃ (single worker)
       ┃    sink     ┃
       ┗━━┯━━━━━━━━┯━┛
  records │        │ uppers
     ╭────v──╮ ╭───v──────╮
     │ data  │ │ progress │  <- records and uppers are produced
     │ topic │ │  topic   │     transactionally to both topics
     ╰───────╯ ╰──────────╯

§Encoding

One part of the dataflow deals with encoding the rows that we read from persist. There isn’t anything surprizing here, it is almost just a Collection::map with the exception of an initialization step that makes sure the schemas are published to the Schema Registry. After that step the operator just encodes each batch it receives record by record.

§Sinking

The other part of the dataflow, and what this module mostly deals with, is interacting with the Kafka cluster in order to transactionally commit batches (sets of records associated with a frontier). All the processing happens in a single worker and so all previously encoded records go through an exchange in order to arrive at the chosen worker. We may be able to improve this in the future by committing disjoint partitions of the key space for independent workers but for now we do the simple thing.

§Retries

All of the retry logic heavy lifting is offloaded to librdkafka since it already implements the required behavior1. In particular we only ever enqueue records to its send queue and eventually call commit_transaction which will ensure that all queued messages are successfully delivered before the transaction is reported as committed.

The only error that is possible during sending is that the queue is full. We are purposefully NOT handling this error and simply configure librdkafka with a very large queue. The reason for this choice is that the only choice for hanlding such an error ourselves would be to queue it, and there isn’t a good argument about two small queues being better than one big one. If we reach the queue limit we simply error out the entire sink dataflow and start over.

§Error handling

Both the encoding operator and the sinking operator can produce a transient error that is wired up with our health monitoring and will trigger a restart of the sink dataflow.

Structs§

  • A header to attach to a Kafka message.
  • A message to produce to Kafka.
  • This is the legacy struct that used to be emitted as part of a transactional produce and contains the largest timestamp within the batch committed. Since it is just a timestamp it cannot encode the fact that a sink has finished and deviates from upper frontier semantics. Materialize no longer produces this record but it’s possible that we encounter this in topics written by older versions. In those cases we convert it into upper semantics by stepping the timestamp forward.
  • This struct is emitted as part of a transactional produce, and contains the upper frontier of the batch committed. It is used to recover the frontier a sink needs to resume at.

Functions§