Expand description

Renders the logical replication side of the PostgresSourceConnection ingestion dataflow.

             o
             │rewind
             │requests
         ╭───┴────╮
         │exchange│ (collect all requests to one worker)
         ╰───┬────╯
          ┏━━v━━━━━━━━━━┓
          ┃ replication ┃ (single worker)
          ┃   reader    ┃
          ┗━┯━━━━━━━━┯━━┛
            │raw     │
            │data    │
       ╭────┴─────╮  │
       │distribute│  │ (distribute to all workers)
       ╰────┬─────╯  │
┏━━━━━━━━━━━┷━┓      │
┃ replication ┃      │ (parallel decode)
┃   decoder   ┃      │
┗━━━━━┯━━━━━━━┛      │
      │ replication  │ progress
      │ updates      │ output
      v              v

Progress tracking

In order to avoid causing excessive resource usage in the upstream server it’s important to track the LSN that we have successfully committed to persist and communicate that back to PostgreSQL. Under normal operation this gauge of progress is provided by the presence of transactions themselves. Since at a given LSN offset there can be only a single message, when a transaction is received and processed we can infer that we have seen all the messages that are not beyond commit_lsn + 1.

Things are a bit more complicated in the absence of transactions though because even though we don’t receive any the server might very well be generating WAL records. This can happen if there is a separate logical database performing writes (which is the case for RDS databases), or, in servers running PostgreSQL version 15 or greater, the logical replication process includes an optimization that omits empty transactions, which can happen if you’re only replicating a subset of the tables and there writes going to the other ones.

If we fail to detect this situation and don’t send LSN feedback in a timely manner the server will be forced to keep around WAL data that can eventually lead to disk space exhaustion.

In the absence of transactions the only available piece of information in the replication stream are keepalive messages. Keepalive messages are documented1 to contain the current end of WAL on the server. That is a useless number when it comes to progress tracking because there might be pending messages at LSNs between the last received commit_lsn and the current end of WAL.

Fortunately for us, the documentation for PrimaryKeepalive messages is wrong and it actually contains the last sent LSN2. Here sent doesn’t necessarily mean sent over the wire, but sent to the upstream process that is handling producing the logical stream. Therefore, if we receive a keepalive with a particular LSN we can be certain that there are no other replication messages at previous LSNs, because they would have been already generated and received. We therefore connect the keepalive messages directly to our capability.

Structs

Constants

  • How often a proactive standby status update message should be sent to the server.

Statics

  • PG_EPOCH 🔒
    Postgres epoch is 2000-01-01T00:00:00Z

Functions

  • Ensures the publication exists on the server. It returns an outer transient error in case of connection issues and an inner definite error if the publication is dropped.
  • Ensure the active replication timeline_id matches the one we expect such that we can safely resume replication. It returns an outer transient error in case of connection issues and an inner definite error if the timeline id does not match.
  • Extracts a single transaction from the replication stream delimited by a BEGIN and COMMIT message. The BEGIN message must have already been consumed from the stream before calling this function.
  • raw_stream 🔒
    Produces the logical replication stream while taking care of regularly sending standby keepalive messages with the provided uppers stream.
  • render 🔒
    Renders the replication dataflow. See the module documentation for more information.
  • Unpacks an iterator of TupleData into a list of nullable bytes or an error if this can’t be done.