use std::pin::Pin;
use std::task::{Context, Poll};
use bytes::{BufMut, Bytes, BytesMut};
use futures_util::{ready, SinkExt, Stream};
use pin_project_lite::pin_project;
use postgres_types::PgLsn;
use tokio_postgres::CopyBothDuplex;
use tokio_postgres::Error;
pub mod protocol;
use crate::protocol::{LogicalReplicationMessage, ReplicationMessage};
const STANDBY_STATUS_UPDATE_TAG: u8 = b'r';
const HOT_STANDBY_FEEDBACK_TAG: u8 = b'h';
pin_project! {
pub struct ReplicationStream {
#[pin]
stream: CopyBothDuplex<Bytes>,
}
}
impl ReplicationStream {
pub fn new(stream: CopyBothDuplex<Bytes>) -> Self {
Self { stream }
}
pub async fn standby_status_update(
self: Pin<&mut Self>,
write_lsn: PgLsn,
flush_lsn: PgLsn,
apply_lsn: PgLsn,
ts: i64,
reply: u8,
) -> Result<(), Error> {
let mut this = self.project();
let mut buf = BytesMut::new();
buf.put_u8(STANDBY_STATUS_UPDATE_TAG);
buf.put_u64(write_lsn.into());
buf.put_u64(flush_lsn.into());
buf.put_u64(apply_lsn.into());
buf.put_i64(ts);
buf.put_u8(reply);
this.stream.send(buf.freeze()).await
}
pub async fn hot_standby_feedback(
self: Pin<&mut Self>,
timestamp: i64,
global_xmin: u32,
global_xmin_epoch: u32,
catalog_xmin: u32,
catalog_xmin_epoch: u32,
) -> Result<(), Error> {
let mut this = self.project();
let mut buf = BytesMut::new();
buf.put_u8(HOT_STANDBY_FEEDBACK_TAG);
buf.put_i64(timestamp);
buf.put_u32(global_xmin);
buf.put_u32(global_xmin_epoch);
buf.put_u32(catalog_xmin);
buf.put_u32(catalog_xmin_epoch);
this.stream.send(buf.freeze()).await
}
}
impl Stream for ReplicationStream {
type Item = Result<ReplicationMessage<Bytes>, Error>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.project();
match ready!(this.stream.poll_next(cx)) {
Some(Ok(buf)) => {
Poll::Ready(Some(ReplicationMessage::parse(&buf).map_err(Error::parse)))
}
Some(Err(err)) => Poll::Ready(Some(Err(err))),
None => Poll::Ready(None),
}
}
}
pin_project! {
pub struct LogicalReplicationStream {
#[pin]
stream: ReplicationStream,
}
}
impl LogicalReplicationStream {
pub fn new(stream: CopyBothDuplex<Bytes>) -> Self {
Self {
stream: ReplicationStream::new(stream),
}
}
pub async fn standby_status_update(
self: Pin<&mut Self>,
write_lsn: PgLsn,
flush_lsn: PgLsn,
apply_lsn: PgLsn,
ts: i64,
reply: u8,
) -> Result<(), Error> {
let this = self.project();
this.stream
.standby_status_update(write_lsn, flush_lsn, apply_lsn, ts, reply)
.await
}
pub async fn hot_standby_feedback(
self: Pin<&mut Self>,
timestamp: i64,
global_xmin: u32,
global_xmin_epoch: u32,
catalog_xmin: u32,
catalog_xmin_epoch: u32,
) -> Result<(), Error> {
let this = self.project();
this.stream
.hot_standby_feedback(
timestamp,
global_xmin,
global_xmin_epoch,
catalog_xmin,
catalog_xmin_epoch,
)
.await
}
}
impl Stream for LogicalReplicationStream {
type Item = Result<ReplicationMessage<LogicalReplicationMessage>, Error>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.project();
match ready!(this.stream.poll_next(cx)) {
Some(Ok(ReplicationMessage::XLogData(body))) => {
let body = body
.map_data(|buf| LogicalReplicationMessage::parse(&buf))
.map_err(Error::parse)?;
Poll::Ready(Some(Ok(ReplicationMessage::XLogData(body))))
}
Some(Ok(ReplicationMessage::PrimaryKeepAlive(body))) => {
Poll::Ready(Some(Ok(ReplicationMessage::PrimaryKeepAlive(body))))
}
Some(Err(err)) => Poll::Ready(Some(Err(err))),
None => Poll::Ready(None),
}
}
}