use std::io::{self, Read};
use std::{cmp, str};
use byteorder::{BigEndian, ReadBytesExt};
use bytes::Bytes;
use memchr::memchr;
use postgres_protocol::{Lsn, Oid};
pub const XLOG_DATA_TAG: u8 = b'w';
pub const PRIMARY_KEEPALIVE_TAG: u8 = b'k';
const BEGIN_TAG: u8 = b'B';
const COMMIT_TAG: u8 = b'C';
const ORIGIN_TAG: u8 = b'O';
const RELATION_TAG: u8 = b'R';
const TYPE_TAG: u8 = b'Y';
const INSERT_TAG: u8 = b'I';
const UPDATE_TAG: u8 = b'U';
const DELETE_TAG: u8 = b'D';
const TRUNCATE_TAG: u8 = b'T';
const TUPLE_NEW_TAG: u8 = b'N';
const TUPLE_KEY_TAG: u8 = b'K';
const TUPLE_OLD_TAG: u8 = b'O';
const TUPLE_DATA_NULL_TAG: u8 = b'n';
const TUPLE_DATA_TOAST_TAG: u8 = b'u';
const TUPLE_DATA_TEXT_TAG: u8 = b't';
const REPLICA_IDENTITY_DEFAULT_TAG: u8 = b'd';
const REPLICA_IDENTITY_NOTHING_TAG: u8 = b'n';
const REPLICA_IDENTITY_FULL_TAG: u8 = b'f';
const REPLICA_IDENTITY_INDEX_TAG: u8 = b'i';
#[non_exhaustive]
#[derive(Debug)]
pub enum ReplicationMessage<D> {
XLogData(XLogDataBody<D>),
PrimaryKeepAlive(PrimaryKeepAliveBody),
}
impl ReplicationMessage<Bytes> {
#[inline]
pub fn parse(buf: &Bytes) -> io::Result<Self> {
let mut buf = Buffer {
bytes: buf.clone(),
idx: 0,
};
let tag = buf.read_u8()?;
let replication_message = match tag {
XLOG_DATA_TAG => {
let wal_start = buf.read_u64::<BigEndian>()?;
let wal_end = buf.read_u64::<BigEndian>()?;
let timestamp = buf.read_i64::<BigEndian>()?;
let data = buf.read_all();
ReplicationMessage::XLogData(XLogDataBody {
wal_start,
wal_end,
timestamp,
data,
})
}
PRIMARY_KEEPALIVE_TAG => {
let wal_end = buf.read_u64::<BigEndian>()?;
let timestamp = buf.read_i64::<BigEndian>()?;
let reply = buf.read_u8()?;
ReplicationMessage::PrimaryKeepAlive(PrimaryKeepAliveBody {
wal_end,
timestamp,
reply,
})
}
tag => {
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
format!("unknown replication message tag `{}`", tag),
));
}
};
Ok(replication_message)
}
}
#[derive(Debug)]
pub struct XLogDataBody<D> {
wal_start: u64,
wal_end: u64,
timestamp: i64,
data: D,
}
impl<D> XLogDataBody<D> {
#[inline]
pub fn wal_start(&self) -> u64 {
self.wal_start
}
#[inline]
pub fn wal_end(&self) -> u64 {
self.wal_end
}
#[inline]
pub fn timestamp(&self) -> i64 {
self.timestamp
}
#[inline]
pub fn data(&self) -> &D {
&self.data
}
#[inline]
pub fn into_data(self) -> D {
self.data
}
pub fn map_data<F, D2, E>(self, f: F) -> Result<XLogDataBody<D2>, E>
where
F: Fn(D) -> Result<D2, E>,
{
let data = f(self.data)?;
Ok(XLogDataBody {
wal_start: self.wal_start,
wal_end: self.wal_end,
timestamp: self.timestamp,
data,
})
}
}
#[derive(Debug)]
pub struct PrimaryKeepAliveBody {
wal_end: u64,
timestamp: i64,
reply: u8,
}
impl PrimaryKeepAliveBody {
#[inline]
pub fn wal_end(&self) -> u64 {
self.wal_end
}
#[inline]
pub fn timestamp(&self) -> i64 {
self.timestamp
}
#[inline]
pub fn reply(&self) -> u8 {
self.reply
}
}
#[non_exhaustive]
#[derive(Debug)]
pub enum LogicalReplicationMessage {
Begin(BeginBody),
Commit(CommitBody),
Origin(OriginBody),
Relation(RelationBody),
Type(TypeBody),
Insert(InsertBody),
Update(UpdateBody),
Delete(DeleteBody),
Truncate(TruncateBody),
}
impl LogicalReplicationMessage {
pub fn parse(buf: &Bytes) -> io::Result<Self> {
let mut buf = Buffer {
bytes: buf.clone(),
idx: 0,
};
let tag = buf.read_u8()?;
let logical_replication_message = match tag {
BEGIN_TAG => Self::Begin(BeginBody {
final_lsn: buf.read_u64::<BigEndian>()?,
timestamp: buf.read_i64::<BigEndian>()?,
xid: buf.read_u32::<BigEndian>()?,
}),
COMMIT_TAG => Self::Commit(CommitBody {
flags: buf.read_i8()?,
commit_lsn: buf.read_u64::<BigEndian>()?,
end_lsn: buf.read_u64::<BigEndian>()?,
timestamp: buf.read_i64::<BigEndian>()?,
}),
ORIGIN_TAG => Self::Origin(OriginBody {
commit_lsn: buf.read_u64::<BigEndian>()?,
name: buf.read_cstr()?,
}),
RELATION_TAG => {
let rel_id = buf.read_u32::<BigEndian>()?;
let namespace = buf.read_cstr()?;
let name = buf.read_cstr()?;
let replica_identity = match buf.read_u8()? {
REPLICA_IDENTITY_DEFAULT_TAG => ReplicaIdentity::Default,
REPLICA_IDENTITY_NOTHING_TAG => ReplicaIdentity::Nothing,
REPLICA_IDENTITY_FULL_TAG => ReplicaIdentity::Full,
REPLICA_IDENTITY_INDEX_TAG => ReplicaIdentity::Index,
tag => {
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
format!("unknown replica identity tag `{}`", tag),
));
}
};
let column_len = buf.read_i16::<BigEndian>()?;
let mut columns = Vec::with_capacity(column_len as usize);
for _ in 0..column_len {
columns.push(Column::parse(&mut buf)?);
}
Self::Relation(RelationBody {
rel_id,
namespace,
name,
replica_identity,
columns,
})
}
TYPE_TAG => Self::Type(TypeBody {
id: buf.read_u32::<BigEndian>()?,
namespace: buf.read_cstr()?,
name: buf.read_cstr()?,
}),
INSERT_TAG => {
let rel_id = buf.read_u32::<BigEndian>()?;
let tag = buf.read_u8()?;
let tuple = match tag {
TUPLE_NEW_TAG => Tuple::parse(&mut buf)?,
tag => {
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
format!("unexpected tuple tag `{}`", tag),
));
}
};
Self::Insert(InsertBody { rel_id, tuple })
}
UPDATE_TAG => {
let rel_id = buf.read_u32::<BigEndian>()?;
let tag = buf.read_u8()?;
let mut key_tuple = None;
let mut old_tuple = None;
let new_tuple = match tag {
TUPLE_NEW_TAG => Tuple::parse(&mut buf)?,
TUPLE_OLD_TAG | TUPLE_KEY_TAG => {
if tag == TUPLE_OLD_TAG {
old_tuple = Some(Tuple::parse(&mut buf)?);
} else {
key_tuple = Some(Tuple::parse(&mut buf)?);
}
match buf.read_u8()? {
TUPLE_NEW_TAG => Tuple::parse(&mut buf)?,
tag => {
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
format!("unexpected tuple tag `{}`", tag),
));
}
}
}
tag => {
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
format!("unknown tuple tag `{}`", tag),
));
}
};
Self::Update(UpdateBody {
rel_id,
key_tuple,
old_tuple,
new_tuple,
})
}
DELETE_TAG => {
let rel_id = buf.read_u32::<BigEndian>()?;
let tag = buf.read_u8()?;
let mut key_tuple = None;
let mut old_tuple = None;
match tag {
TUPLE_OLD_TAG => old_tuple = Some(Tuple::parse(&mut buf)?),
TUPLE_KEY_TAG => key_tuple = Some(Tuple::parse(&mut buf)?),
tag => {
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
format!("unknown tuple tag `{}`", tag),
));
}
}
Self::Delete(DeleteBody {
rel_id,
key_tuple,
old_tuple,
})
}
TRUNCATE_TAG => {
let relation_len = buf.read_i32::<BigEndian>()?;
let options = buf.read_i8()?;
let mut rel_ids = Vec::with_capacity(relation_len as usize);
for _ in 0..relation_len {
rel_ids.push(buf.read_u32::<BigEndian>()?);
}
Self::Truncate(TruncateBody { options, rel_ids })
}
tag => {
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
format!("unknown replication message tag `{}`", tag),
));
}
};
Ok(logical_replication_message)
}
}
#[derive(Debug)]
pub struct Tuple(Vec<TupleData>);
impl Tuple {
#[inline]
pub fn tuple_data(&self) -> &[TupleData] {
&self.0
}
}
impl Tuple {
fn parse(buf: &mut Buffer) -> io::Result<Self> {
let col_len = buf.read_i16::<BigEndian>()?;
let mut tuple = Vec::with_capacity(col_len as usize);
for _ in 0..col_len {
tuple.push(TupleData::parse(buf)?);
}
Ok(Tuple(tuple))
}
}
#[derive(Debug)]
pub struct Column {
flags: i8,
name: Bytes,
type_id: i32,
type_modifier: i32,
}
impl Column {
#[inline]
pub fn flags(&self) -> i8 {
self.flags
}
#[inline]
pub fn name(&self) -> io::Result<&str> {
get_str(&self.name)
}
#[inline]
pub fn type_id(&self) -> i32 {
self.type_id
}
#[inline]
pub fn type_modifier(&self) -> i32 {
self.type_modifier
}
}
impl Column {
fn parse(buf: &mut Buffer) -> io::Result<Self> {
Ok(Self {
flags: buf.read_i8()?,
name: buf.read_cstr()?,
type_id: buf.read_i32::<BigEndian>()?,
type_modifier: buf.read_i32::<BigEndian>()?,
})
}
}
#[derive(Debug)]
pub enum TupleData {
Null,
UnchangedToast,
Text(Bytes),
}
impl TupleData {
fn parse(buf: &mut Buffer) -> io::Result<Self> {
let type_tag = buf.read_u8()?;
let tuple = match type_tag {
TUPLE_DATA_NULL_TAG => TupleData::Null,
TUPLE_DATA_TOAST_TAG => TupleData::UnchangedToast,
TUPLE_DATA_TEXT_TAG => {
let len = buf.read_i32::<BigEndian>()?;
let mut data = vec![0; len as usize];
buf.read_exact(&mut data)?;
TupleData::Text(data.into())
}
tag => {
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
format!("unknown replication message tag `{}`", tag),
));
}
};
Ok(tuple)
}
}
#[derive(Debug)]
pub struct BeginBody {
final_lsn: u64,
timestamp: i64,
xid: u32,
}
impl BeginBody {
#[inline]
pub fn final_lsn(&self) -> Lsn {
self.final_lsn
}
#[inline]
pub fn timestamp(&self) -> i64 {
self.timestamp
}
#[inline]
pub fn xid(&self) -> u32 {
self.xid
}
}
#[derive(Debug)]
pub struct CommitBody {
flags: i8,
commit_lsn: u64,
end_lsn: u64,
timestamp: i64,
}
impl CommitBody {
#[inline]
pub fn commit_lsn(&self) -> Lsn {
self.commit_lsn
}
#[inline]
pub fn end_lsn(&self) -> Lsn {
self.end_lsn
}
#[inline]
pub fn timestamp(&self) -> i64 {
self.timestamp
}
#[inline]
pub fn flags(&self) -> i8 {
self.flags
}
}
#[derive(Debug)]
pub struct OriginBody {
commit_lsn: u64,
name: Bytes,
}
impl OriginBody {
#[inline]
pub fn commit_lsn(&self) -> Lsn {
self.commit_lsn
}
#[inline]
pub fn name(&self) -> io::Result<&str> {
get_str(&self.name)
}
}
#[derive(Debug)]
pub enum ReplicaIdentity {
Default,
Nothing,
Full,
Index,
}
#[derive(Debug)]
pub struct RelationBody {
rel_id: u32,
namespace: Bytes,
name: Bytes,
replica_identity: ReplicaIdentity,
columns: Vec<Column>,
}
impl RelationBody {
#[inline]
pub fn rel_id(&self) -> u32 {
self.rel_id
}
#[inline]
pub fn namespace(&self) -> io::Result<&str> {
get_str(&self.namespace)
}
#[inline]
pub fn name(&self) -> io::Result<&str> {
get_str(&self.name)
}
#[inline]
pub fn replica_identity(&self) -> &ReplicaIdentity {
&self.replica_identity
}
#[inline]
pub fn columns(&self) -> &[Column] {
&self.columns
}
}
#[derive(Debug)]
pub struct TypeBody {
id: u32,
namespace: Bytes,
name: Bytes,
}
impl TypeBody {
#[inline]
pub fn id(&self) -> Oid {
self.id
}
#[inline]
pub fn namespace(&self) -> io::Result<&str> {
get_str(&self.namespace)
}
#[inline]
pub fn name(&self) -> io::Result<&str> {
get_str(&self.name)
}
}
#[derive(Debug)]
pub struct InsertBody {
rel_id: u32,
tuple: Tuple,
}
impl InsertBody {
#[inline]
pub fn rel_id(&self) -> u32 {
self.rel_id
}
#[inline]
pub fn tuple(&self) -> &Tuple {
&self.tuple
}
}
#[derive(Debug)]
pub struct UpdateBody {
rel_id: u32,
old_tuple: Option<Tuple>,
key_tuple: Option<Tuple>,
new_tuple: Tuple,
}
impl UpdateBody {
#[inline]
pub fn rel_id(&self) -> u32 {
self.rel_id
}
#[inline]
pub fn key_tuple(&self) -> Option<&Tuple> {
self.key_tuple.as_ref()
}
#[inline]
pub fn old_tuple(&self) -> Option<&Tuple> {
self.old_tuple.as_ref()
}
#[inline]
pub fn new_tuple(&self) -> &Tuple {
&self.new_tuple
}
}
#[derive(Debug)]
pub struct DeleteBody {
rel_id: u32,
old_tuple: Option<Tuple>,
key_tuple: Option<Tuple>,
}
impl DeleteBody {
#[inline]
pub fn rel_id(&self) -> u32 {
self.rel_id
}
#[inline]
pub fn key_tuple(&self) -> Option<&Tuple> {
self.key_tuple.as_ref()
}
#[inline]
pub fn old_tuple(&self) -> Option<&Tuple> {
self.old_tuple.as_ref()
}
}
#[derive(Debug)]
pub struct TruncateBody {
options: i8,
rel_ids: Vec<u32>,
}
impl TruncateBody {
#[inline]
pub fn rel_ids(&self) -> &[u32] {
&self.rel_ids
}
#[inline]
pub fn options(&self) -> i8 {
self.options
}
}
struct Buffer {
bytes: Bytes,
idx: usize,
}
impl Buffer {
#[inline]
fn slice(&self) -> &[u8] {
&self.bytes[self.idx..]
}
#[inline]
fn read_cstr(&mut self) -> io::Result<Bytes> {
match memchr(0, self.slice()) {
Some(pos) => {
let start = self.idx;
let end = start + pos;
let cstr = self.bytes.slice(start..end);
self.idx = end + 1;
Ok(cstr)
}
None => Err(io::Error::new(
io::ErrorKind::UnexpectedEof,
"unexpected EOF",
)),
}
}
#[inline]
fn read_all(&mut self) -> Bytes {
let buf = self.bytes.slice(self.idx..);
self.idx = self.bytes.len();
buf
}
}
impl Read for Buffer {
#[inline]
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
let len = {
let slice = self.slice();
let len = cmp::min(slice.len(), buf.len());
buf[..len].copy_from_slice(&slice[..len]);
len
};
self.idx += len;
Ok(len)
}
}
#[inline]
fn get_str(buf: &[u8]) -> io::Result<&str> {
str::from_utf8(buf).map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, e))
}