postgres_replication/
lib.rs
1use std::pin::Pin;
4use std::task::{Context, Poll};
5
6use bytes::{BufMut, Bytes, BytesMut};
7use futures_util::{ready, SinkExt, Stream};
8use pin_project_lite::pin_project;
9use postgres_types::PgLsn;
10use tokio_postgres::CopyBothDuplex;
11use tokio_postgres::Error;
12
13pub mod protocol;
14
15use crate::protocol::{LogicalReplicationMessage, ReplicationMessage};
16
17const STANDBY_STATUS_UPDATE_TAG: u8 = b'r';
18const HOT_STANDBY_FEEDBACK_TAG: u8 = b'h';
19
20pin_project! {
21 pub struct ReplicationStream {
27 #[pin]
28 stream: CopyBothDuplex<Bytes>,
29 }
30}
31
32impl ReplicationStream {
33 pub fn new(stream: CopyBothDuplex<Bytes>) -> Self {
35 Self { stream }
36 }
37
38 pub async fn standby_status_update(
40 self: Pin<&mut Self>,
41 write_lsn: PgLsn,
42 flush_lsn: PgLsn,
43 apply_lsn: PgLsn,
44 ts: i64,
45 reply: u8,
46 ) -> Result<(), Error> {
47 let mut this = self.project();
48
49 let mut buf = BytesMut::new();
50 buf.put_u8(STANDBY_STATUS_UPDATE_TAG);
51 buf.put_u64(write_lsn.into());
52 buf.put_u64(flush_lsn.into());
53 buf.put_u64(apply_lsn.into());
54 buf.put_i64(ts);
55 buf.put_u8(reply);
56
57 this.stream.send(buf.freeze()).await
58 }
59
60 pub async fn hot_standby_feedback(
62 self: Pin<&mut Self>,
63 timestamp: i64,
64 global_xmin: u32,
65 global_xmin_epoch: u32,
66 catalog_xmin: u32,
67 catalog_xmin_epoch: u32,
68 ) -> Result<(), Error> {
69 let mut this = self.project();
70
71 let mut buf = BytesMut::new();
72 buf.put_u8(HOT_STANDBY_FEEDBACK_TAG);
73 buf.put_i64(timestamp);
74 buf.put_u32(global_xmin);
75 buf.put_u32(global_xmin_epoch);
76 buf.put_u32(catalog_xmin);
77 buf.put_u32(catalog_xmin_epoch);
78
79 this.stream.send(buf.freeze()).await
80 }
81}
82
83impl Stream for ReplicationStream {
84 type Item = Result<ReplicationMessage<Bytes>, Error>;
85
86 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
87 let this = self.project();
88
89 match ready!(this.stream.poll_next(cx)) {
90 Some(Ok(buf)) => {
91 Poll::Ready(Some(ReplicationMessage::parse(&buf).map_err(Error::parse)))
92 }
93 Some(Err(err)) => Poll::Ready(Some(Err(err))),
94 None => Poll::Ready(None),
95 }
96 }
97}
98
99pin_project! {
100 pub struct LogicalReplicationStream {
105 #[pin]
106 stream: ReplicationStream,
107 }
108}
109
110impl LogicalReplicationStream {
111 pub fn new(stream: CopyBothDuplex<Bytes>) -> Self {
113 Self {
114 stream: ReplicationStream::new(stream),
115 }
116 }
117
118 pub async fn standby_status_update(
120 self: Pin<&mut Self>,
121 write_lsn: PgLsn,
122 flush_lsn: PgLsn,
123 apply_lsn: PgLsn,
124 ts: i64,
125 reply: u8,
126 ) -> Result<(), Error> {
127 let this = self.project();
128 this.stream
129 .standby_status_update(write_lsn, flush_lsn, apply_lsn, ts, reply)
130 .await
131 }
132
133 pub async fn hot_standby_feedback(
135 self: Pin<&mut Self>,
136 timestamp: i64,
137 global_xmin: u32,
138 global_xmin_epoch: u32,
139 catalog_xmin: u32,
140 catalog_xmin_epoch: u32,
141 ) -> Result<(), Error> {
142 let this = self.project();
143 this.stream
144 .hot_standby_feedback(
145 timestamp,
146 global_xmin,
147 global_xmin_epoch,
148 catalog_xmin,
149 catalog_xmin_epoch,
150 )
151 .await
152 }
153}
154
155impl Stream for LogicalReplicationStream {
156 type Item = Result<ReplicationMessage<LogicalReplicationMessage>, Error>;
157
158 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
159 let this = self.project();
160
161 match ready!(this.stream.poll_next(cx)) {
162 Some(Ok(ReplicationMessage::XLogData(body))) => {
163 let body = body
164 .map_data(|buf| LogicalReplicationMessage::parse(&buf))
165 .map_err(Error::parse)?;
166 Poll::Ready(Some(Ok(ReplicationMessage::XLogData(body))))
167 }
168 Some(Ok(ReplicationMessage::PrimaryKeepAlive(body))) => {
169 Poll::Ready(Some(Ok(ReplicationMessage::PrimaryKeepAlive(body))))
170 }
171 Some(Err(err)) => Poll::Ready(Some(Err(err))),
172 None => Poll::Ready(None),
173 }
174 }
175}