Module mz_storage::source::postgres::snapshot
source · Expand description
Renders the table snapshot side of the PostgresSourceConnection
ingestion dataflow.
§Snapshot reading
Depending on the resumption LSNs the table reader decides which tables need to be snapshotted
and performs a simple COPY
query on them in order to get a snapshot. There are a few subtle
points about this operation, described in the following sections.
§Consistent LSN point for snapshot transactions
Given that all our ingestion is based on correctly timestamping updates with the LSN they
happened at it is important that we run the COPY
query at a specific LSN point that is
relatable with the LSN numbers we receive from the replication stream. Such point does not
necessarily exist for a normal SQL transaction. To achieve this we must force postgres to
produce a consistent point and let us know of the LSN number of that by creating a replication
slot as the first statement in a transaction.
This is a temporary dummy slot that is only used to put our snapshot transaction on a consistent LSN point. Unfortunately no lighterweight method exists for doing this. See this postgres thread for more details.
One might wonder why we don’t use the actual real slot to provide us with the snapshot point which would automatically be at the correct LSN. The answer is that it’s possible that we crash and restart after having already created the slot but before having finished the snapshot. In that case the restarting process will have lost its opportunity to run queries at the slot’s consistent point as that opportunity only exists in the ephemeral transaction that created the slot and that is long gone. Additionally there are good reasons of why we’d like to move the slot creation much earlier, e.g during purification, in which case the slot will always be pre-created.
§Reusing the consistent point among all workers
Creating replication slots is potentially expensive so the code makes is such that all workers cooperate and reuse one consistent snapshot among them. In order to do so we make use the “export transaction” feature of postgres. This feature allows one SQL session to create an identifier for the transaction (a string identifier) it is currently in, which can be used by other sessions to enter the same “snapshot”.
We accomplish this by picking one worker at random to function as the transaction leader. The transaction leader is responsible for starting a SQL session, creating a temporary replication slot in a transaction, exporting the transaction id, and broadcasting the transaction information to all other workers via a broadcasted feedback edge.
During this phase the follower workers are simply waiting to hear on the feedback edge, effectively synchronizing with the leader. Once all workers have received the snapshot information they can all start to perform their assigned COPY queries.
The leader and follower steps described above are accomplished by the export_snapshot
and
use_snapshot
functions respectively.
§Coordinated transaction COMMIT
When follower workers are done with snapshotting they commit their transaction, close their session, and then drop their snapshot feedback capability. When the leader worker is done with snapshotting it drops its snapshot feedback capability and waits until it observes the snapshot input advancing to the empty frontier. This allows the leader to COMMIT its transaction last, which is the transaction that exported the snapshot.
It’s unclear if this is strictly necessary, but having the frontiers made it easy enough that I added the synchronization.
§Snapshot rewinding
Ingestion dataflows must produce definite data, including the snapshot. What this means practically is that whenever we deem it necessary to snapshot a table we must do so at the same LSN. However, the method for running a transaction described above doesn’t let us choose the LSN, it could be an LSN in the future chosen by PostgresSQL while it creates the temporary replication slot.
The definition of differential collections states that a collection at some time t_snapshot
is defined to be the accumulation of all updates that happen at t <= t_snapshot
, where <=
is the partial order. In this case we are faced with the problem of knowing the state of a
table at t_snapshot
but actually wanting to know the snapshot at t_slot <= t_snapshot
.
From the definition we can see that the snapshot at t_slot
is related to the snapshot at
t_snapshot
with the following equations:
sum(update: t <= t_snapshot) = sum(update: t <= t_slot) + sum(update: t_slot <= t <= t_snapshot)
|
V
sum(update: t <= t_slot) = sum(update: t <= snapshot) - sum(update: t_slot <= t <= t_snapshot)
Therefore, if we manage to recover the sum(update: t_slot <= t <= t_snapshot)
term we will be
able to “rewind” the snapshot we obtained at t_snapshot
to t_slot
by emitting all updates
that happen between these two points with their diffs negated.
It turns out that this term is exactly what the main replication slot provides us with and we
can rewind snapshot at arbitrary points! In order to do this the snapshot dataflow emits rewind
requests to the replication reader which informs it that a certain range of updates must be
emitted at LSN 0 (by convention) with their diffs negated. These negated diffs are consolidated
with the diffs taken at t_snapshot
that were also emitted at LSN 0 (by convention) and we end
up with a TVC that at LSN 0 contains the snapshot at t_slot
.
§Snapshot decoding
The expectation is that tables will most likely be skewed on the number of rows they contain so
while a COPY
query for any given table runs on a single worker the decoding of the COPY
stream is distributed to all workers.
╭──────────────────╮
┏━━━━━━━━━━━━v━┓ │ exported
┃ table ┃ ╭─────────╮ │ snapshot id
┃ reader ┠─>─┤broadcast├──╯
┗━┯━━━━━━━━━━┯━┛ ╰─────────╯
raw│ │
COPY│ │
data│ │
╭────┴─────╮ │
│distribute│ │
╰────┬─────╯ │
┏━━━━┷━━━━┓ │
┃ COPY ┃ │
┃ decoder ┃ │
┗━━━━┯━━━━┛ │
│ snapshot │rewind
│ updates │requests
v v
Structs§
Functions§
- Decodes a row of
col_len
columns obtained from a text encoded COPY query intorow
. - Starts a read-only transaction on the SQL session of
client
at a consistent LSN point by creating a replication slot. Returns a snapshot identifier that can be imported in other SQL session and the LSN of the consistent point. - Record the sizes of the tables being snapshotted in
PgSnapshotMetrics
. - render 🔒Renders the snapshot dataflow. See the module documentation for more information.
- Starts a read-only transaction on the SQL session of
client
at a the consistent LSN point ofsnapshot
.