Module mz_storage::sink::kafka
source · Expand description
Code to render the sink dataflow of a KafkaSinkConnection
. The dataflow consists
of two operators in order to take advantage of all the available workers.
┏━━━━━━━━━━━━━━┓
┃ persist ┃
┃ source ┃
┗━━━━━━┯━━━━━━━┛
│ row data, the input to this module
│
┏━━━━━━v━━━━━━┓
┃ row ┃
┃ encoder ┃
┗━━━━━━┯━━━━━━┛
│ encoded data
│
┏━━━━━━v━━━━━━┓
┃ kafka ┃ (single worker)
┃ sink ┃
┗━━┯━━━━━━━━┯━┛
records │ │ uppers
╭────v──╮ ╭───v──────╮
│ data │ │ progress │ <- records and uppers are produced
│ topic │ │ topic │ transactionally to both topics
╰───────╯ ╰──────────╯
§Encoding
One part of the dataflow deals with encoding the rows that we read from persist. There isn’t
anything surprizing here, it is almost just a Collection::map
with the exception of an
initialization step that makes sure the schemas are published to the Schema Registry. After
that step the operator just encodes each batch it receives record by record.
§Sinking
The other part of the dataflow, and what this module mostly deals with, is interacting with the Kafka cluster in order to transactionally commit batches (sets of records associated with a frontier). All the processing happens in a single worker and so all previously encoded records go through an exchange in order to arrive at the chosen worker. We may be able to improve this in the future by committing disjoint partitions of the key space for independent workers but for now we do the simple thing.
§Retries
All of the retry logic heavy lifting is offloaded to librdkafka
since it already implements
the required behavior1. In particular we only ever enqueue records to its send queue and
eventually call commit_transaction
which will ensure that all queued messages are
successfully delivered before the transaction is reported as committed.
The only error that is possible during sending is that the queue is full. We are purposefully
NOT handling this error and simply configure librdkafka
with a very large queue. The reason
for this choice is that the only choice for hanlding such an error ourselves would be to queue
it, and there isn’t a good argument about two small queues being better than one big one. If we
reach the queue limit we simply error out the entire sink dataflow and start over.
§Error handling
Both the encoding operator and the sinking operator can produce a transient error that is wired up with our health monitoring and will trigger a restart of the sink dataflow.
Structs§
- A header to attach to a Kafka message.
- A message to produce to Kafka.
- This is the legacy struct that used to be emitted as part of a transactional produce and contains the largest timestamp within the batch committed. Since it is just a timestamp it cannot encode the fact that a sink has finished and deviates from upper frontier semantics. Materialize no longer produces this record but it’s possible that we encounter this in topics written by older versions. In those cases we convert it into upper semantics by stepping the timestamp forward.
- This struct is emitted as part of a transactional produce, and contains the upper frontier of the batch committed. It is used to recover the frontier a sink needs to resume at.
Functions§
- Listens for statistics updates from librdkafka and updates our Prometheus metrics.
- Determines the latest progress record from the specified topic for the given progress key.
- Encodes a stream of
(Option<Row>, Option<Row>)
updates using the specified encoder. - Evaluates a partition by expression on the given row, returning the hash value to use for partition assignment.
- Fetches the partition count for the identified topic.
- Fetches the partition count for the identified topic at the specified interval.
- Sinks a collection of encoded rows to Kafka.