1use std::io::{self, Read};
2use std::{cmp, str};
3
4use byteorder::{BigEndian, ReadBytesExt};
5use bytes::Bytes;
6use memchr::memchr;
7use postgres_protocol::{Lsn, Oid};
8
9pub const XLOG_DATA_TAG: u8 = b'w';
11pub const PRIMARY_KEEPALIVE_TAG: u8 = b'k';
12
13const BEGIN_TAG: u8 = b'B';
15const MESSAGE_TAG: u8 = b'M';
16const COMMIT_TAG: u8 = b'C';
17const ORIGIN_TAG: u8 = b'O';
18const RELATION_TAG: u8 = b'R';
19const TYPE_TAG: u8 = b'Y';
20const INSERT_TAG: u8 = b'I';
21const UPDATE_TAG: u8 = b'U';
22const DELETE_TAG: u8 = b'D';
23const TRUNCATE_TAG: u8 = b'T';
24const TUPLE_NEW_TAG: u8 = b'N';
25const TUPLE_KEY_TAG: u8 = b'K';
26const TUPLE_OLD_TAG: u8 = b'O';
27const TUPLE_DATA_NULL_TAG: u8 = b'n';
28const TUPLE_DATA_TOAST_TAG: u8 = b'u';
29const TUPLE_DATA_TEXT_TAG: u8 = b't';
30const TUPLE_DATA_BINARY_TAG: u8 = b'b';
31
32const REPLICA_IDENTITY_DEFAULT_TAG: u8 = b'd';
34const REPLICA_IDENTITY_NOTHING_TAG: u8 = b'n';
35const REPLICA_IDENTITY_FULL_TAG: u8 = b'f';
36const REPLICA_IDENTITY_INDEX_TAG: u8 = b'i';
37
38#[non_exhaustive]
40#[derive(Debug)]
41pub enum ReplicationMessage<D> {
42 XLogData(XLogDataBody<D>),
43 PrimaryKeepAlive(PrimaryKeepAliveBody),
44}
45
46impl ReplicationMessage<Bytes> {
47 #[inline]
48 pub fn parse(buf: &Bytes) -> io::Result<Self> {
49 let mut buf = Buffer {
50 bytes: buf.clone(),
51 idx: 0,
52 };
53
54 let tag = buf.read_u8()?;
55
56 let replication_message = match tag {
57 XLOG_DATA_TAG => {
58 let wal_start = buf.read_u64::<BigEndian>()?;
59 let wal_end = buf.read_u64::<BigEndian>()?;
60 let timestamp = buf.read_i64::<BigEndian>()?;
61 let data = buf.read_all();
62 ReplicationMessage::XLogData(XLogDataBody {
63 wal_start,
64 wal_end,
65 timestamp,
66 data,
67 })
68 }
69 PRIMARY_KEEPALIVE_TAG => {
70 let wal_end = buf.read_u64::<BigEndian>()?;
71 let timestamp = buf.read_i64::<BigEndian>()?;
72 let reply = buf.read_u8()?;
73 ReplicationMessage::PrimaryKeepAlive(PrimaryKeepAliveBody {
74 wal_end,
75 timestamp,
76 reply,
77 })
78 }
79 tag => {
80 return Err(io::Error::new(
81 io::ErrorKind::InvalidInput,
82 format!("unknown replication message tag `{}`", tag),
83 ));
84 }
85 };
86
87 Ok(replication_message)
88 }
89}
90
91#[derive(Debug)]
92pub struct XLogDataBody<D> {
93 wal_start: u64,
94 wal_end: u64,
95 timestamp: i64,
96 data: D,
97}
98
99impl<D> XLogDataBody<D> {
100 #[inline]
101 pub fn wal_start(&self) -> u64 {
102 self.wal_start
103 }
104
105 #[inline]
106 pub fn wal_end(&self) -> u64 {
107 self.wal_end
108 }
109
110 #[inline]
111 pub fn timestamp(&self) -> i64 {
112 self.timestamp
113 }
114
115 #[inline]
116 pub fn data(&self) -> &D {
117 &self.data
118 }
119
120 #[inline]
121 pub fn into_data(self) -> D {
122 self.data
123 }
124
125 pub fn map_data<F, D2, E>(self, f: F) -> Result<XLogDataBody<D2>, E>
126 where
127 F: Fn(D) -> Result<D2, E>,
128 {
129 let data = f(self.data)?;
130 Ok(XLogDataBody {
131 wal_start: self.wal_start,
132 wal_end: self.wal_end,
133 timestamp: self.timestamp,
134 data,
135 })
136 }
137}
138
139#[derive(Debug)]
140pub struct PrimaryKeepAliveBody {
141 wal_end: u64,
142 timestamp: i64,
143 reply: u8,
144}
145
146impl PrimaryKeepAliveBody {
147 #[inline]
148 pub fn wal_end(&self) -> u64 {
149 self.wal_end
150 }
151
152 #[inline]
153 pub fn timestamp(&self) -> i64 {
154 self.timestamp
155 }
156
157 #[inline]
158 pub fn reply(&self) -> u8 {
159 self.reply
160 }
161}
162
163#[non_exhaustive]
164#[derive(Debug)]
166pub enum LogicalReplicationMessage {
167 Begin(BeginBody),
169 Message(MessageBody),
171 Commit(CommitBody),
173 Origin(OriginBody),
176 Relation(RelationBody),
178 Type(TypeBody),
180 Insert(InsertBody),
182 Update(UpdateBody),
184 Delete(DeleteBody),
186 Truncate(TruncateBody),
188}
189
190impl LogicalReplicationMessage {
191 pub fn parse(buf: &Bytes) -> io::Result<Self> {
192 let mut buf = Buffer {
193 bytes: buf.clone(),
194 idx: 0,
195 };
196
197 let tag = buf.read_u8()?;
198
199 let logical_replication_message = match tag {
200 BEGIN_TAG => Self::Begin(BeginBody {
201 final_lsn: buf.read_u64::<BigEndian>()?,
202 timestamp: buf.read_i64::<BigEndian>()?,
203 xid: buf.read_u32::<BigEndian>()?,
204 }),
205 MESSAGE_TAG => Self::Message(MessageBody {
206 flags: buf.read_i8()?,
207 message_lsn: buf.read_u64::<BigEndian>()?,
208 prefix: buf.read_cstr()?,
209 content: match buf.read_i32::<BigEndian>()? {
210 len if len > 0 => buf.read_buf(len as usize)?,
211 0 => Bytes::new(),
212 len => {
213 return Err(io::Error::new(
214 io::ErrorKind::InvalidInput,
215 format!("unexpected message content length `{len}`"),
216 ))
217 }
218 },
219 }),
220 COMMIT_TAG => Self::Commit(CommitBody {
221 flags: buf.read_i8()?,
222 commit_lsn: buf.read_u64::<BigEndian>()?,
223 end_lsn: buf.read_u64::<BigEndian>()?,
224 timestamp: buf.read_i64::<BigEndian>()?,
225 }),
226 ORIGIN_TAG => Self::Origin(OriginBody {
227 commit_lsn: buf.read_u64::<BigEndian>()?,
228 name: buf.read_cstr()?,
229 }),
230 RELATION_TAG => {
231 let rel_id = buf.read_u32::<BigEndian>()?;
232 let namespace = buf.read_cstr()?;
233 let name = buf.read_cstr()?;
234 let replica_identity = match buf.read_u8()? {
235 REPLICA_IDENTITY_DEFAULT_TAG => ReplicaIdentity::Default,
236 REPLICA_IDENTITY_NOTHING_TAG => ReplicaIdentity::Nothing,
237 REPLICA_IDENTITY_FULL_TAG => ReplicaIdentity::Full,
238 REPLICA_IDENTITY_INDEX_TAG => ReplicaIdentity::Index,
239 tag => {
240 return Err(io::Error::new(
241 io::ErrorKind::InvalidInput,
242 format!("unknown replica identity tag `{}`", tag),
243 ));
244 }
245 };
246 let column_len = buf.read_i16::<BigEndian>()?;
247
248 let mut columns = Vec::with_capacity(column_len as usize);
249 for _ in 0..column_len {
250 columns.push(Column::parse(&mut buf)?);
251 }
252
253 Self::Relation(RelationBody {
254 rel_id,
255 namespace,
256 name,
257 replica_identity,
258 columns,
259 })
260 }
261 TYPE_TAG => Self::Type(TypeBody {
262 id: buf.read_u32::<BigEndian>()?,
263 namespace: buf.read_cstr()?,
264 name: buf.read_cstr()?,
265 }),
266 INSERT_TAG => {
267 let rel_id = buf.read_u32::<BigEndian>()?;
268 let tag = buf.read_u8()?;
269
270 let tuple = match tag {
271 TUPLE_NEW_TAG => Tuple::parse(&mut buf)?,
272 tag => {
273 return Err(io::Error::new(
274 io::ErrorKind::InvalidInput,
275 format!("unexpected tuple tag `{}`", tag),
276 ));
277 }
278 };
279
280 Self::Insert(InsertBody { rel_id, tuple })
281 }
282 UPDATE_TAG => {
283 let rel_id = buf.read_u32::<BigEndian>()?;
284 let tag = buf.read_u8()?;
285
286 let mut key_tuple = None;
287 let mut old_tuple = None;
288
289 let new_tuple = match tag {
290 TUPLE_NEW_TAG => Tuple::parse(&mut buf)?,
291 TUPLE_OLD_TAG | TUPLE_KEY_TAG => {
292 if tag == TUPLE_OLD_TAG {
293 old_tuple = Some(Tuple::parse(&mut buf)?);
294 } else {
295 key_tuple = Some(Tuple::parse(&mut buf)?);
296 }
297
298 match buf.read_u8()? {
299 TUPLE_NEW_TAG => Tuple::parse(&mut buf)?,
300 tag => {
301 return Err(io::Error::new(
302 io::ErrorKind::InvalidInput,
303 format!("unexpected tuple tag `{}`", tag),
304 ));
305 }
306 }
307 }
308 tag => {
309 return Err(io::Error::new(
310 io::ErrorKind::InvalidInput,
311 format!("unknown tuple tag `{}`", tag),
312 ));
313 }
314 };
315
316 Self::Update(UpdateBody {
317 rel_id,
318 key_tuple,
319 old_tuple,
320 new_tuple,
321 })
322 }
323 DELETE_TAG => {
324 let rel_id = buf.read_u32::<BigEndian>()?;
325 let tag = buf.read_u8()?;
326
327 let mut key_tuple = None;
328 let mut old_tuple = None;
329
330 match tag {
331 TUPLE_OLD_TAG => old_tuple = Some(Tuple::parse(&mut buf)?),
332 TUPLE_KEY_TAG => key_tuple = Some(Tuple::parse(&mut buf)?),
333 tag => {
334 return Err(io::Error::new(
335 io::ErrorKind::InvalidInput,
336 format!("unknown tuple tag `{}`", tag),
337 ));
338 }
339 }
340
341 Self::Delete(DeleteBody {
342 rel_id,
343 key_tuple,
344 old_tuple,
345 })
346 }
347 TRUNCATE_TAG => {
348 let relation_len = buf.read_i32::<BigEndian>()?;
349 let options = buf.read_i8()?;
350
351 let mut rel_ids = Vec::with_capacity(relation_len as usize);
352 for _ in 0..relation_len {
353 rel_ids.push(buf.read_u32::<BigEndian>()?);
354 }
355
356 Self::Truncate(TruncateBody { options, rel_ids })
357 }
358 tag => {
359 return Err(io::Error::new(
360 io::ErrorKind::InvalidInput,
361 format!("unknown replication message tag `{}`", tag),
362 ));
363 }
364 };
365
366 Ok(logical_replication_message)
367 }
368}
369
370#[derive(Debug)]
372pub struct Tuple(Vec<TupleData>);
373
374impl Tuple {
375 #[inline]
376 pub fn tuple_data(&self) -> &[TupleData] {
378 &self.0
379 }
380}
381
382impl Tuple {
383 fn parse(buf: &mut Buffer) -> io::Result<Self> {
384 let col_len = buf.read_i16::<BigEndian>()?;
385 let mut tuple = Vec::with_capacity(col_len as usize);
386 for _ in 0..col_len {
387 tuple.push(TupleData::parse(buf)?);
388 }
389
390 Ok(Tuple(tuple))
391 }
392}
393
394#[derive(Debug)]
396pub struct Column {
397 flags: i8,
398 name: Bytes,
399 type_id: i32,
400 type_modifier: i32,
401}
402
403impl Column {
404 #[inline]
405 pub fn flags(&self) -> i8 {
408 self.flags
409 }
410
411 #[inline]
412 pub fn name(&self) -> io::Result<&str> {
414 get_str(&self.name)
415 }
416
417 #[inline]
418 pub fn type_id(&self) -> i32 {
420 self.type_id
421 }
422
423 #[inline]
424 pub fn type_modifier(&self) -> i32 {
426 self.type_modifier
427 }
428}
429
430impl Column {
431 fn parse(buf: &mut Buffer) -> io::Result<Self> {
432 Ok(Self {
433 flags: buf.read_i8()?,
434 name: buf.read_cstr()?,
435 type_id: buf.read_i32::<BigEndian>()?,
436 type_modifier: buf.read_i32::<BigEndian>()?,
437 })
438 }
439}
440
441#[derive(Debug)]
443pub enum TupleData {
444 Null,
446 UnchangedToast,
448 Text(Bytes),
450 Binary(Bytes),
452}
453
454impl TupleData {
455 fn parse(buf: &mut Buffer) -> io::Result<Self> {
456 let type_tag = buf.read_u8()?;
457
458 let tuple = match type_tag {
459 TUPLE_DATA_NULL_TAG => TupleData::Null,
460 TUPLE_DATA_TOAST_TAG => TupleData::UnchangedToast,
461 TUPLE_DATA_TEXT_TAG => {
462 let len = buf.read_i32::<BigEndian>()?;
463 let data = buf.read_buf(len as usize)?;
464 TupleData::Text(data)
465 }
466 TUPLE_DATA_BINARY_TAG => {
467 let len = buf.read_i32::<BigEndian>()?;
468 let mut data = vec![0; len as usize];
469 buf.read_exact(&mut data)?;
470 TupleData::Binary(data.into())
471 }
472 tag => {
473 return Err(io::Error::new(
474 io::ErrorKind::InvalidInput,
475 format!("unknown replication message tag `{}`", tag),
476 ));
477 }
478 };
479
480 Ok(tuple)
481 }
482}
483
484#[derive(Debug)]
486pub struct BeginBody {
487 final_lsn: u64,
488 timestamp: i64,
489 xid: u32,
490}
491
492impl BeginBody {
493 #[inline]
494 pub fn final_lsn(&self) -> Lsn {
496 self.final_lsn
497 }
498
499 #[inline]
500 pub fn timestamp(&self) -> i64 {
502 self.timestamp
503 }
504
505 #[inline]
506 pub fn xid(&self) -> u32 {
508 self.xid
509 }
510}
511
512#[derive(Debug)]
514pub struct MessageBody {
515 message_lsn: u64,
516 flags: i8,
517 prefix: Bytes,
518 content: Bytes,
519}
520
521impl MessageBody {
522 #[inline]
523 pub fn message_lsn(&self) -> Lsn {
525 self.message_lsn
526 }
527
528 #[inline]
529 pub fn flags(&self) -> i8 {
531 self.flags
532 }
533
534 #[inline]
535 pub fn prefix(&self) -> io::Result<&str> {
537 get_str(&self.prefix)
538 }
539
540 #[inline]
541 pub fn content(&self) -> &Bytes {
543 &self.content
544 }
545}
546
547#[derive(Debug)]
549pub struct CommitBody {
550 flags: i8,
551 commit_lsn: u64,
552 end_lsn: u64,
553 timestamp: i64,
554}
555
556impl CommitBody {
557 #[inline]
558 pub fn commit_lsn(&self) -> Lsn {
560 self.commit_lsn
561 }
562
563 #[inline]
564 pub fn end_lsn(&self) -> Lsn {
566 self.end_lsn
567 }
568
569 #[inline]
570 pub fn timestamp(&self) -> i64 {
572 self.timestamp
573 }
574
575 #[inline]
576 pub fn flags(&self) -> i8 {
578 self.flags
579 }
580}
581
582#[derive(Debug)]
586pub struct OriginBody {
587 commit_lsn: u64,
588 name: Bytes,
589}
590
591impl OriginBody {
592 #[inline]
593 pub fn commit_lsn(&self) -> Lsn {
595 self.commit_lsn
596 }
597
598 #[inline]
599 pub fn name(&self) -> io::Result<&str> {
601 get_str(&self.name)
602 }
603}
604
605#[derive(Debug)]
607pub enum ReplicaIdentity {
608 Default,
610 Nothing,
612 Full,
614 Index,
618}
619
620#[derive(Debug)]
622pub struct RelationBody {
623 rel_id: u32,
624 namespace: Bytes,
625 name: Bytes,
626 replica_identity: ReplicaIdentity,
627 columns: Vec<Column>,
628}
629
630impl RelationBody {
631 #[inline]
632 pub fn rel_id(&self) -> u32 {
634 self.rel_id
635 }
636
637 #[inline]
638 pub fn namespace(&self) -> io::Result<&str> {
640 get_str(&self.namespace)
641 }
642
643 #[inline]
644 pub fn name(&self) -> io::Result<&str> {
646 get_str(&self.name)
647 }
648
649 #[inline]
650 pub fn replica_identity(&self) -> &ReplicaIdentity {
652 &self.replica_identity
653 }
654
655 #[inline]
656 pub fn columns(&self) -> &[Column] {
658 &self.columns
659 }
660}
661
662#[derive(Debug)]
664pub struct TypeBody {
665 id: u32,
666 namespace: Bytes,
667 name: Bytes,
668}
669
670impl TypeBody {
671 #[inline]
672 pub fn id(&self) -> Oid {
674 self.id
675 }
676
677 #[inline]
678 pub fn namespace(&self) -> io::Result<&str> {
680 get_str(&self.namespace)
681 }
682
683 #[inline]
684 pub fn name(&self) -> io::Result<&str> {
686 get_str(&self.name)
687 }
688}
689
690#[derive(Debug)]
692pub struct InsertBody {
693 rel_id: u32,
694 tuple: Tuple,
695}
696
697impl InsertBody {
698 #[inline]
699 pub fn rel_id(&self) -> u32 {
701 self.rel_id
702 }
703
704 #[inline]
705 pub fn tuple(&self) -> &Tuple {
707 &self.tuple
708 }
709}
710
711#[derive(Debug)]
713pub struct UpdateBody {
714 rel_id: u32,
715 old_tuple: Option<Tuple>,
716 key_tuple: Option<Tuple>,
717 new_tuple: Tuple,
718}
719
720impl UpdateBody {
721 #[inline]
722 pub fn rel_id(&self) -> u32 {
724 self.rel_id
725 }
726
727 #[inline]
728 pub fn key_tuple(&self) -> Option<&Tuple> {
731 self.key_tuple.as_ref()
732 }
733
734 #[inline]
735 pub fn old_tuple(&self) -> Option<&Tuple> {
738 self.old_tuple.as_ref()
739 }
740
741 #[inline]
742 pub fn new_tuple(&self) -> &Tuple {
744 &self.new_tuple
745 }
746}
747
748#[derive(Debug)]
750pub struct DeleteBody {
751 rel_id: u32,
752 old_tuple: Option<Tuple>,
753 key_tuple: Option<Tuple>,
754}
755
756impl DeleteBody {
757 #[inline]
758 pub fn rel_id(&self) -> u32 {
760 self.rel_id
761 }
762
763 #[inline]
764 pub fn key_tuple(&self) -> Option<&Tuple> {
767 self.key_tuple.as_ref()
768 }
769
770 #[inline]
771 pub fn old_tuple(&self) -> Option<&Tuple> {
774 self.old_tuple.as_ref()
775 }
776}
777
778#[derive(Debug)]
780pub struct TruncateBody {
781 options: i8,
782 rel_ids: Vec<u32>,
783}
784
785impl TruncateBody {
786 #[inline]
787 pub fn rel_ids(&self) -> &[u32] {
789 &self.rel_ids
790 }
791
792 #[inline]
793 pub fn options(&self) -> i8 {
795 self.options
796 }
797}
798
799struct Buffer {
800 bytes: Bytes,
801 idx: usize,
802}
803
804impl Buffer {
805 #[inline]
806 fn slice(&self) -> &[u8] {
807 &self.bytes[self.idx..]
808 }
809
810 #[inline]
811 fn read_cstr(&mut self) -> io::Result<Bytes> {
812 match memchr(0, self.slice()) {
813 Some(pos) => {
814 let start = self.idx;
815 let end = start + pos;
816 let cstr = self.bytes.slice(start..end);
817 self.idx = end + 1;
818 Ok(cstr)
819 }
820 None => Err(io::Error::new(
821 io::ErrorKind::UnexpectedEof,
822 "unexpected EOF",
823 )),
824 }
825 }
826
827 #[inline]
828 fn read_all(&mut self) -> Bytes {
829 let buf = self.bytes.slice(self.idx..);
830 self.idx = self.bytes.len();
831 buf
832 }
833
834 #[inline]
835 fn read_buf(&mut self, len: usize) -> io::Result<Bytes> {
836 if self.idx + len <= self.bytes.len() {
837 let buf = self.bytes.slice(self.idx..(self.idx + len));
838 self.idx += len;
839 Ok(buf)
840 } else {
841 Err(io::Error::new(
842 io::ErrorKind::UnexpectedEof,
843 "unexpected EOF",
844 ))
845 }
846 }
847}
848
849impl Read for Buffer {
850 #[inline]
851 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
852 let len = {
853 let slice = self.slice();
854 let len = cmp::min(slice.len(), buf.len());
855 buf[..len].copy_from_slice(&slice[..len]);
856 len
857 };
858 self.idx += len;
859 Ok(len)
860 }
861}
862
863#[inline]
864fn get_str(buf: &[u8]) -> io::Result<&str> {
865 str::from_utf8(buf).map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, e))
866}