postgres_replication/
lib.rs

1//! Utilities for working with the PostgreSQL replication copy both format.
2
3use std::pin::Pin;
4use std::task::{Context, Poll};
5
6use bytes::{BufMut, Bytes, BytesMut};
7use futures_util::{ready, SinkExt, Stream};
8use pin_project_lite::pin_project;
9use postgres_types::PgLsn;
10use tokio_postgres::CopyBothDuplex;
11use tokio_postgres::Error;
12
13pub mod protocol;
14
15use crate::protocol::{LogicalReplicationMessage, ReplicationMessage};
16
17const STANDBY_STATUS_UPDATE_TAG: u8 = b'r';
18const HOT_STANDBY_FEEDBACK_TAG: u8 = b'h';
19
20pin_project! {
21    /// A type which deserializes the postgres replication protocol. This type can be used with
22    /// both physical and logical replication to get access to the byte content of each replication
23    /// message.
24    ///
25    /// The replication *must* be explicitly completed via the `finish` method.
26    pub struct ReplicationStream {
27        #[pin]
28        stream: CopyBothDuplex<Bytes>,
29    }
30}
31
32impl ReplicationStream {
33    /// Creates a new ReplicationStream that will wrap the underlying CopyBoth stream
34    pub fn new(stream: CopyBothDuplex<Bytes>) -> Self {
35        Self { stream }
36    }
37
38    /// Send standby update to server.
39    pub async fn standby_status_update(
40        self: Pin<&mut Self>,
41        write_lsn: PgLsn,
42        flush_lsn: PgLsn,
43        apply_lsn: PgLsn,
44        ts: i64,
45        reply: u8,
46    ) -> Result<(), Error> {
47        let mut this = self.project();
48
49        let mut buf = BytesMut::new();
50        buf.put_u8(STANDBY_STATUS_UPDATE_TAG);
51        buf.put_u64(write_lsn.into());
52        buf.put_u64(flush_lsn.into());
53        buf.put_u64(apply_lsn.into());
54        buf.put_i64(ts);
55        buf.put_u8(reply);
56
57        this.stream.send(buf.freeze()).await
58    }
59
60    /// Send hot standby feedback message to server.
61    pub async fn hot_standby_feedback(
62        self: Pin<&mut Self>,
63        timestamp: i64,
64        global_xmin: u32,
65        global_xmin_epoch: u32,
66        catalog_xmin: u32,
67        catalog_xmin_epoch: u32,
68    ) -> Result<(), Error> {
69        let mut this = self.project();
70
71        let mut buf = BytesMut::new();
72        buf.put_u8(HOT_STANDBY_FEEDBACK_TAG);
73        buf.put_i64(timestamp);
74        buf.put_u32(global_xmin);
75        buf.put_u32(global_xmin_epoch);
76        buf.put_u32(catalog_xmin);
77        buf.put_u32(catalog_xmin_epoch);
78
79        this.stream.send(buf.freeze()).await
80    }
81}
82
83impl Stream for ReplicationStream {
84    type Item = Result<ReplicationMessage<Bytes>, Error>;
85
86    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
87        let this = self.project();
88
89        match ready!(this.stream.poll_next(cx)) {
90            Some(Ok(buf)) => {
91                Poll::Ready(Some(ReplicationMessage::parse(&buf).map_err(Error::parse)))
92            }
93            Some(Err(err)) => Poll::Ready(Some(Err(err))),
94            None => Poll::Ready(None),
95        }
96    }
97}
98
99pin_project! {
100    /// A type which deserializes the postgres logical replication protocol. This type gives access
101    /// to a high level representation of the changes in transaction commit order.
102    ///
103    /// The replication *must* be explicitly completed via the `finish` method.
104    pub struct LogicalReplicationStream {
105        #[pin]
106        stream: ReplicationStream,
107    }
108}
109
110impl LogicalReplicationStream {
111    /// Creates a new LogicalReplicationStream that will wrap the underlying CopyBoth stream
112    pub fn new(stream: CopyBothDuplex<Bytes>) -> Self {
113        Self {
114            stream: ReplicationStream::new(stream),
115        }
116    }
117
118    /// Send standby update to server.
119    pub async fn standby_status_update(
120        self: Pin<&mut Self>,
121        write_lsn: PgLsn,
122        flush_lsn: PgLsn,
123        apply_lsn: PgLsn,
124        ts: i64,
125        reply: u8,
126    ) -> Result<(), Error> {
127        let this = self.project();
128        this.stream
129            .standby_status_update(write_lsn, flush_lsn, apply_lsn, ts, reply)
130            .await
131    }
132
133    /// Send hot standby feedback message to server.
134    pub async fn hot_standby_feedback(
135        self: Pin<&mut Self>,
136        timestamp: i64,
137        global_xmin: u32,
138        global_xmin_epoch: u32,
139        catalog_xmin: u32,
140        catalog_xmin_epoch: u32,
141    ) -> Result<(), Error> {
142        let this = self.project();
143        this.stream
144            .hot_standby_feedback(
145                timestamp,
146                global_xmin,
147                global_xmin_epoch,
148                catalog_xmin,
149                catalog_xmin_epoch,
150            )
151            .await
152    }
153}
154
155impl Stream for LogicalReplicationStream {
156    type Item = Result<ReplicationMessage<LogicalReplicationMessage>, Error>;
157
158    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
159        let this = self.project();
160
161        match ready!(this.stream.poll_next(cx)) {
162            Some(Ok(ReplicationMessage::XLogData(body))) => {
163                let body = body
164                    .map_data(|buf| LogicalReplicationMessage::parse(&buf))
165                    .map_err(Error::parse)?;
166                Poll::Ready(Some(Ok(ReplicationMessage::XLogData(body))))
167            }
168            Some(Ok(ReplicationMessage::PrimaryKeepAlive(body))) => {
169                Poll::Ready(Some(Ok(ReplicationMessage::PrimaryKeepAlive(body))))
170            }
171            Some(Err(err)) => Poll::Ready(Some(Err(err))),
172            None => Poll::Ready(None),
173        }
174    }
175}