Skip to main content

Module snapshot

Module snapshot 

Source
Expand description

Renders the table snapshot side of the PostgresSourceConnection ingestion dataflow.

§Snapshot reading

Depending on the resumption LSNs the table reader decides which tables need to be snapshotted. Each table is partitioned across all workers using PostgreSQL’s ctid (tuple identifier) column, which identifies the physical location of each row. This allows parallel snapshotting of large tables across all available workers.

There are a few subtle points about this operation, described in the following sections.

§Consistent LSN point for snapshot transactions

Given that all our ingestion is based on correctly timestamping updates with the LSN they happened at it is important that we run the COPY query at a specific LSN point that is relatable with the LSN numbers we receive from the replication stream. Such point does not necessarily exist for a normal SQL transaction. To achieve this we must force postgres to produce a consistent point and let us know of the LSN number of that by creating a replication slot as the first statement in a transaction.

This is a temporary dummy slot that is only used to put our snapshot transaction on a consistent LSN point. Unfortunately no lighterweight method exists for doing this. See this postgres thread for more details.

One might wonder why we don’t use the actual real slot to provide us with the snapshot point which would automatically be at the correct LSN. The answer is that it’s possible that we crash and restart after having already created the slot but before having finished the snapshot. In that case the restarting process will have lost its opportunity to run queries at the slot’s consistent point as that opportunity only exists in the ephemeral transaction that created the slot and that is long gone. Additionally there are good reasons of why we’d like to move the slot creation much earlier, e.g during purification, in which case the slot will always be pre-created.

§Reusing the consistent point among all workers

Creating replication slots is potentially expensive so the code makes is such that all workers cooperate and reuse one consistent snapshot among them. In order to do so we make use the “export transaction” feature of postgres. This feature allows one SQL session to create an identifier for the transaction (a string identifier) it is currently in, which can be used by other sessions to enter the same “snapshot”.

We accomplish this by picking one worker at random to function as the transaction leader. The transaction leader is responsible for starting a SQL session, creating a temporary replication slot in a transaction, exporting the transaction id, and broadcasting the transaction information to all other workers via a broadcasted feedback edge.

During this phase the follower workers are simply waiting to hear on the feedback edge, effectively synchronizing with the leader. Once all workers have received the snapshot information they can all start to perform their assigned COPY queries.

The leader and follower steps described above are accomplished by the export_snapshot and use_snapshot functions respectively.

§Coordinated transaction COMMIT

When follower workers are done with snapshotting they commit their transaction, close their session, and then drop their snapshot feedback capability. When the leader worker is done with snapshotting it drops its snapshot feedback capability and waits until it observes the snapshot input advancing to the empty frontier. This allows the leader to COMMIT its transaction last, which is the transaction that exported the snapshot.

It’s unclear if this is strictly necessary, but having the frontiers made it easy enough that I added the synchronization.

§Snapshot rewinding

Ingestion dataflows must produce definite data, including the snapshot. What this means practically is that whenever we deem it necessary to snapshot a table we must do so at the same LSN. However, the method for running a transaction described above doesn’t let us choose the LSN, it could be an LSN in the future chosen by PostgresSQL while it creates the temporary replication slot.

The definition of differential collections states that a collection at some time t_snapshot is defined to be the accumulation of all updates that happen at t <= t_snapshot, where <= is the partial order. In this case we are faced with the problem of knowing the state of a table at t_snapshot but actually wanting to know the snapshot at t_slot <= t_snapshot.

From the definition we can see that the snapshot at t_slot is related to the snapshot at t_snapshot with the following equations:

 sum(update: t <= t_snapshot) = sum(update: t <= t_slot) + sum(update: t_slot <= t <= t_snapshot)
                                         |
                                         V
 sum(update: t <= t_slot) = sum(update: t <= snapshot) - sum(update: t_slot <= t <= t_snapshot)

Therefore, if we manage to recover the sum(update: t_slot <= t <= t_snapshot) term we will be able to “rewind” the snapshot we obtained at t_snapshot to t_slot by emitting all updates that happen between these two points with their diffs negated.

It turns out that this term is exactly what the main replication slot provides us with and we can rewind snapshot at arbitrary points! In order to do this the snapshot dataflow emits rewind requests to the replication reader which informs it that a certain range of updates must be emitted at LSN 0 (by convention) with their diffs negated. These negated diffs are consolidated with the diffs taken at t_snapshot that were also emitted at LSN 0 (by convention) and we end up with a TVC that at LSN 0 contains the snapshot at t_slot.

§Parallel table snapshotting with ctid ranges

Each table is partitioned across workers using PostgreSQL’s ctid column. The ctid is a tuple identifier of the form (block_number, tuple_index) that represents the physical location of a row on disk. By partitioning the ctid range, each worker can independently fetch a portion of the table.

The partitioning works as follows:

  1. The snapshot leader queries pg_class.relpages to estimate the number of blocks for each table. This is much faster than querying max(ctid) which would require a sequential scan.
  2. The leader broadcasts the block count estimates along with the snapshot transaction ID to all workers, ensuring all workers use consistent estimates for partitioning.
  3. Each worker calculates its assigned block range and fetches rows using a COPY query with a SELECT that filters by ctid >= start AND ctid < end.
  4. The last worker uses an open-ended range (ctid >= start) to capture any rows beyond the estimated block count (handles cases where statistics are stale or table has grown).

This approach efficiently parallelizes large table snapshots while maintaining the benefits of the COPY protocol for bulk data transfer.

§PostgreSQL version requirements

Ctid range scans are only efficient on PostgreSQL >= 14 due to TID range scan optimizations introduced in that version. For older PostgreSQL versions, the snapshot falls back to the single-worker-per-table mode where each table is assigned to one worker based on consistent hashing. This is implemented by having the leader broadcast all-zero block counts when PostgreSQL version < 14.

§Snapshot decoding

Each worker fetches its ctid range directly and decodes the COPY stream locally.

                ╭──────────────────╮
   ┏━━━━━━━━━━━━v━┓                │ exported
   ┃    table     ┃   ╭─────────╮  │ snapshot id
   ┃   readers    ┠─>─┤broadcast├──╯
   ┃  (parallel)  ┃   ╰─────────╯
   ┗━┯━━━━━━━━━━┯━┛
  raw│          │
 COPY│          │
 data│          │
┏━━━━┷━━━━┓     │
┃  COPY   ┃     │
┃ decoder ┃     │
┗━━━━┯━━━━┛     │
     │ snapshot │rewind
     │ updates  │requests
     v          v

Structs§

CtidRange 🔒
Represents a ctid range that a worker should snapshot. The range is [start_block, end_block) where end_block is optional (None means unbounded).
SnapshotInfo 🔒
Information broadcasted from the snapshot leader to all workers. This includes the transaction snapshot ID, LSN, and estimated block counts for each table.
TableStatistics 🔒

Functions§

collect_table_statistics 🔒
decode_copy_row 🔒
Decodes a row of col_len columns obtained from a text encoded COPY query into row.
estimate_table_block_counts 🔒
Estimate the number of blocks for each table from pg_class statistics. This is used to partition ctid ranges across workers.
export_snapshot 🔒
Starts a read-only transaction on the SQL session of client at a consistent LSN point by creating a replication slot. Returns a snapshot identifier that can be imported in other SQL session and the LSN of the consistent point.
export_snapshot_inner 🔒
render 🔒
Renders the snapshot dataflow. See the module documentation for more information.
report_snapshot_size 🔒
Record the sizes of the tables being snapshotted in PgSnapshotMetrics and emit snapshot statistics for each export.
retrieve_schema_info 🔒
Validates that there are no blocking RLS polcicies on the tables and retrieves table schemas for the given publication.
set_statement_timeout 🔒
use_snapshot 🔒
Starts a read-only transaction on the SQL session of client at a the consistent LSN point of snapshot.
worker_ctid_range 🔒
Calculate the ctid range for a given worker based on estimated block count.