Expand description

Renders the table snapshot side of the PostgresSourceConnection ingestion dataflow.

Snapshot reading

Depending on the resumption LSNs the table reader decides which tables need to be snapshotted and performs a simple COPY query on them in order to get a snapshot. There are a few subtle points about this operation, described in the following sections.

Consistent LSN point for snapshot transactions

Given that all our ingestion is based on correctly timestamping updates with the LSN they happened at it is important that we run the COPY query at a specific LSN point that is relatable with the LSN numbers we receive from the replication stream. Such point does not necessarily exist for a normal SQL transaction. To achieve this we must force postgres to produce a consistent point and let us know of the LSN number of that by creating a replication slot as the first statement in a transaction.

This is a temporary dummy slot that is only used to put our snapshot transaction on a consistent LSN point. Unfortunately no lighterweight method exists for doing this. See this postgres thread for more details.

One might wonder why we don’t use the actual real slot to provide us with the snapshot point which would automatically be at the correct LSN. The answer is that it’s possible that we crash and restart after having already created the slot but before having finished the snapshot. In that case the restarting process will have lost its opportunity to run queries at the slot’s consistent point as that opportunity only exists in the ephemeral transaction that created the slot and that is long gone. Additionally there are good reasons of why we’d like to move the slot creation much earlier, e.g during purification, in which case the slot will always be pre-created.

Reusing the consistent point among all workers

Creating replication slots is potentially expensive so the code makes is such that all workers cooperate and reuse one consistent snapshot among them. In order to do so we make use the “export transaction” feature of postgres. This feature allows one SQL session to create an identifier for the transaction (a string identifier) it is currently in, which can be used by other sessions to enter the same “snapshot”.

We accomplish this by picking one worker at random to function as the transaction leader. The transaction leader is responsible for starting a SQL session, creating a temporary replication slot in a transaction, exporting the transaction id, and broadcasting the transaction information to all other workers via a broadcasted feedback edge.

During this phase the follower workers are simply waiting to hear on the feedback edge, effectively synchronizing with the leader. Once all workers have received the snapshot information they can all start to perform their assigned COPY queries.

The leader and follower steps described above are accomplished by the export_snapshot and use_snapshot functions respectively.

Coordinated transaction COMMIT

When follower workers are done with snapshotting they commit their transaction, close their session, and then drop their snapshot feedback capability. When the leader worker is done with snapshotting it drops its snapshot feedback capability and waits until it observes the snapshot input advancing to the empty frontier. This allows the leader to COMMIT its transaction last, which is the transaction that exported the snapshot.

It’s unclear if this is strictly necessary, but having the frontiers made it easy enough that I added the synchronization.

Snapshot rewinding

Ingestion dataflows must produce definite data, including the snapshot. What this means practically is that whenever we deem it necessary to snapshot a table we must do so at the same LSN. However, the method for running a transaction described above doesn’t let us choose the LSN, it could be an LSN in the future chosen by PostgresSQL while it creates the temporary replication slot.

The definition of differential collections states that a collection at some time t_snapshot is defined to be the accumulation of all updates that happen at t <= t_snapshot, where <= is the partial order. In this case we are faced with the problem of knowing the state of a table at t_snapshot but actually wanting to know the snapshot at t_slot <= t_snapshot.

From the definition we can see that the snapshot at t_slot is related to the snapshot at t_snapshot with the following equations:

 sum(update: t <= t_snapshot) = sum(update: t <= t_slot) + sum(update: t_slot <= t <= t_snapshot)
                                         |
                                         V
 sum(update: t <= t_slot) = sum(update: t <= snapshot) - sum(update: t_slot <= t <= t_snapshot)

Therefore, if we manage to recover the sum(update: t_slot <= t <= t_snapshot) term we will be able to “rewind” the snapshot we obtained at t_snapshot to t_slot by emitting all updates that happen between these two points with their diffs negated.

It turns out that this term is exactly what the main replication slot provides us with and we can rewind snapshot at arbitrary points! In order to do this the snapshot dataflow emits rewind requests to the replication reader which informs it that a certain range of updates must be emitted at LSN 0 (by convention) with their diffs negated. These negated diffs are consolidated with the diffs taken at t_snapshot that were also emitted at LSN 0 (by convention) and we end up with a TVC that at LSN 0 contains the snapshot at t_slot.

Snapshot decoding

The expectation is that tables will most likely be skewed on the number of rows they contain so while a COPY query for any given table runs on a single worker the decoding of the COPY stream is distributed to all workers.

                ╭──────────────────╮
   ┏━━━━━━━━━━━━v━┓                │ exported
   ┃    table     ┃   ╭─────────╮  │ snapshot id
   ┃    reader    ┠─>─┤broadcast├──╯
   ┗━┯━━━━━━━━━━┯━┛   ╰─────────╯
  raw│          │
 COPY│          │
 data│          │
╭────┴─────╮    │
│distribute│    │
╰────┬─────╯    │
┏━━━━┷━━━━┓     │
┃  COPY   ┃     │
┃ decoder ┃     │
┗━━━━┯━━━━┛     │
     │ snapshot │rewind
     │ updates  │requests
     v          v

Structs

Functions

  • Decodes a row of col_len columns obtained from a text encoded COPY query into row.
  • Starts a read-only transaction on the SQL session of client at a consistent LSN point by creating a temporary replication slot. Returns a snapshot identifier that can be imported in other SQL session and the LSN of the consistent point.
  • Record the sizes of the tables being snapshotted in PgSnapshotMetrics.
  • render 🔒
    Renders the snapshot dataflow. See the module documentation for more information.
  • Starts a read-only transaction on the SQL session of client at a the consistent LSN point of snapshot.