Module mz_storage::source::postgres::replication
source · Expand description
Renders the logical replication side of the PostgresSourceConnection
ingestion dataflow.
o
│rewind
│requests
╭───┴────╮
│exchange│ (collect all requests to one worker)
╰───┬────╯
┏━━v━━━━━━━━━━┓
┃ replication ┃ (single worker)
┃ reader ┃
┗━┯━━━━━━━━┯━━┛
│raw │
│data │
╭────┴─────╮ │
│distribute│ │ (distribute to all workers)
╰────┬─────╯ │
┏━━━━━━━━━━━┷━┓ │
┃ replication ┃ │ (parallel decode)
┃ decoder ┃ │
┗━━━━━┯━━━━━━━┛ │
│ replication │ progress
│ updates │ output
v v
§Progress tracking
In order to avoid causing excessive resource usage in the upstream server it’s important to
track the LSN that we have successfully committed to persist and communicate that back to
PostgreSQL. Under normal operation this gauge of progress is provided by the presence of
transactions themselves. Since at a given LSN offset there can be only a single message, when a
transaction is received and processed we can infer that we have seen all the messages that are
not beyond commit_lsn + 1
.
Things are a bit more complicated in the absence of transactions though because even though we don’t receive any the server might very well be generating WAL records. This can happen if there is a separate logical database performing writes (which is the case for RDS databases), or, in servers running PostgreSQL version 15 or greater, the logical replication process includes an optimization that omits empty transactions, which can happen if you’re only replicating a subset of the tables and there writes going to the other ones.
If we fail to detect this situation and don’t send LSN feedback in a timely manner the server will be forced to keep around WAL data that can eventually lead to disk space exhaustion.
In the absence of transactions the only available piece of information in the replication stream are keepalive messages. Keepalive messages are documented1 to contain the current end of WAL on the server. That is a useless number when it comes to progress tracking because there might be pending messages at LSNs between the last received commit_lsn and the current end of WAL.
Fortunately for us, the documentation for PrimaryKeepalive messages is wrong and it actually contains the last sent LSN2. Here sent doesn’t necessarily mean sent over the wire, but sent to the upstream process that is handling producing the logical stream. Therefore, if we receive a keepalive with a particular LSN we can be certain that there are no other replication messages at previous LSNs, because they would have been already generated and received. We therefore connect the keepalive messages directly to our capability.
Structs§
Statics§
- PG_
EPOCH 🔒Postgres epoch is 2000-01-01T00:00:00Z
Functions§
- Ensures the publication exists on the server. It returns an outer transient error in case of connection issues and an inner definite error if the publication is dropped.
- Ensure the active replication timeline_id matches the one we expect such that we can safely resume replication. It returns an outer transient error in case of connection issues and an inner definite error if the timeline id does not match.
- Extracts a single transaction from the replication stream delimited by a BEGIN and COMMIT message. The BEGIN message must have already been consumed from the stream before calling this function.
- Produces the logical replication stream while taking care of regularly sending standby keepalive messages with the provided
uppers
stream. - render 🔒Renders the replication dataflow. See the module documentation for more information.
- Unpacks an iterator of TupleData into a list of nullable bytes or an error if this can’t be done.