1use std::io::{self, Read};
2use std::{cmp, str};
3
4use byteorder::{BigEndian, ReadBytesExt};
5use bytes::Bytes;
6use memchr::memchr;
7use postgres_protocol::{Lsn, Oid};
8
9pub const XLOG_DATA_TAG: u8 = b'w';
11pub const PRIMARY_KEEPALIVE_TAG: u8 = b'k';
12
13const BEGIN_TAG: u8 = b'B';
15const COMMIT_TAG: u8 = b'C';
16const ORIGIN_TAG: u8 = b'O';
17const RELATION_TAG: u8 = b'R';
18const TYPE_TAG: u8 = b'Y';
19const INSERT_TAG: u8 = b'I';
20const UPDATE_TAG: u8 = b'U';
21const DELETE_TAG: u8 = b'D';
22const TRUNCATE_TAG: u8 = b'T';
23const TUPLE_NEW_TAG: u8 = b'N';
24const TUPLE_KEY_TAG: u8 = b'K';
25const TUPLE_OLD_TAG: u8 = b'O';
26const TUPLE_DATA_NULL_TAG: u8 = b'n';
27const TUPLE_DATA_TOAST_TAG: u8 = b'u';
28const TUPLE_DATA_TEXT_TAG: u8 = b't';
29const TUPLE_DATA_BINARY_TAG: u8 = b'b';
30
31const REPLICA_IDENTITY_DEFAULT_TAG: u8 = b'd';
33const REPLICA_IDENTITY_NOTHING_TAG: u8 = b'n';
34const REPLICA_IDENTITY_FULL_TAG: u8 = b'f';
35const REPLICA_IDENTITY_INDEX_TAG: u8 = b'i';
36
37#[non_exhaustive]
39#[derive(Debug)]
40pub enum ReplicationMessage<D> {
41 XLogData(XLogDataBody<D>),
42 PrimaryKeepAlive(PrimaryKeepAliveBody),
43}
44
45impl ReplicationMessage<Bytes> {
46 #[inline]
47 pub fn parse(buf: &Bytes) -> io::Result<Self> {
48 let mut buf = Buffer {
49 bytes: buf.clone(),
50 idx: 0,
51 };
52
53 let tag = buf.read_u8()?;
54
55 let replication_message = match tag {
56 XLOG_DATA_TAG => {
57 let wal_start = buf.read_u64::<BigEndian>()?;
58 let wal_end = buf.read_u64::<BigEndian>()?;
59 let timestamp = buf.read_i64::<BigEndian>()?;
60 let data = buf.read_all();
61 ReplicationMessage::XLogData(XLogDataBody {
62 wal_start,
63 wal_end,
64 timestamp,
65 data,
66 })
67 }
68 PRIMARY_KEEPALIVE_TAG => {
69 let wal_end = buf.read_u64::<BigEndian>()?;
70 let timestamp = buf.read_i64::<BigEndian>()?;
71 let reply = buf.read_u8()?;
72 ReplicationMessage::PrimaryKeepAlive(PrimaryKeepAliveBody {
73 wal_end,
74 timestamp,
75 reply,
76 })
77 }
78 tag => {
79 return Err(io::Error::new(
80 io::ErrorKind::InvalidInput,
81 format!("unknown replication message tag `{}`", tag),
82 ));
83 }
84 };
85
86 Ok(replication_message)
87 }
88}
89
90#[derive(Debug)]
91pub struct XLogDataBody<D> {
92 wal_start: u64,
93 wal_end: u64,
94 timestamp: i64,
95 data: D,
96}
97
98impl<D> XLogDataBody<D> {
99 #[inline]
100 pub fn wal_start(&self) -> u64 {
101 self.wal_start
102 }
103
104 #[inline]
105 pub fn wal_end(&self) -> u64 {
106 self.wal_end
107 }
108
109 #[inline]
110 pub fn timestamp(&self) -> i64 {
111 self.timestamp
112 }
113
114 #[inline]
115 pub fn data(&self) -> &D {
116 &self.data
117 }
118
119 #[inline]
120 pub fn into_data(self) -> D {
121 self.data
122 }
123
124 pub fn map_data<F, D2, E>(self, f: F) -> Result<XLogDataBody<D2>, E>
125 where
126 F: Fn(D) -> Result<D2, E>,
127 {
128 let data = f(self.data)?;
129 Ok(XLogDataBody {
130 wal_start: self.wal_start,
131 wal_end: self.wal_end,
132 timestamp: self.timestamp,
133 data,
134 })
135 }
136}
137
138#[derive(Debug)]
139pub struct PrimaryKeepAliveBody {
140 wal_end: u64,
141 timestamp: i64,
142 reply: u8,
143}
144
145impl PrimaryKeepAliveBody {
146 #[inline]
147 pub fn wal_end(&self) -> u64 {
148 self.wal_end
149 }
150
151 #[inline]
152 pub fn timestamp(&self) -> i64 {
153 self.timestamp
154 }
155
156 #[inline]
157 pub fn reply(&self) -> u8 {
158 self.reply
159 }
160}
161
162#[non_exhaustive]
163#[derive(Debug)]
165pub enum LogicalReplicationMessage {
166 Begin(BeginBody),
168 Commit(CommitBody),
170 Origin(OriginBody),
173 Relation(RelationBody),
175 Type(TypeBody),
177 Insert(InsertBody),
179 Update(UpdateBody),
181 Delete(DeleteBody),
183 Truncate(TruncateBody),
185}
186
187impl LogicalReplicationMessage {
188 pub fn parse(buf: &Bytes) -> io::Result<Self> {
189 let mut buf = Buffer {
190 bytes: buf.clone(),
191 idx: 0,
192 };
193
194 let tag = buf.read_u8()?;
195
196 let logical_replication_message = match tag {
197 BEGIN_TAG => Self::Begin(BeginBody {
198 final_lsn: buf.read_u64::<BigEndian>()?,
199 timestamp: buf.read_i64::<BigEndian>()?,
200 xid: buf.read_u32::<BigEndian>()?,
201 }),
202 COMMIT_TAG => Self::Commit(CommitBody {
203 flags: buf.read_i8()?,
204 commit_lsn: buf.read_u64::<BigEndian>()?,
205 end_lsn: buf.read_u64::<BigEndian>()?,
206 timestamp: buf.read_i64::<BigEndian>()?,
207 }),
208 ORIGIN_TAG => Self::Origin(OriginBody {
209 commit_lsn: buf.read_u64::<BigEndian>()?,
210 name: buf.read_cstr()?,
211 }),
212 RELATION_TAG => {
213 let rel_id = buf.read_u32::<BigEndian>()?;
214 let namespace = buf.read_cstr()?;
215 let name = buf.read_cstr()?;
216 let replica_identity = match buf.read_u8()? {
217 REPLICA_IDENTITY_DEFAULT_TAG => ReplicaIdentity::Default,
218 REPLICA_IDENTITY_NOTHING_TAG => ReplicaIdentity::Nothing,
219 REPLICA_IDENTITY_FULL_TAG => ReplicaIdentity::Full,
220 REPLICA_IDENTITY_INDEX_TAG => ReplicaIdentity::Index,
221 tag => {
222 return Err(io::Error::new(
223 io::ErrorKind::InvalidInput,
224 format!("unknown replica identity tag `{}`", tag),
225 ));
226 }
227 };
228 let column_len = buf.read_i16::<BigEndian>()?;
229
230 let mut columns = Vec::with_capacity(column_len as usize);
231 for _ in 0..column_len {
232 columns.push(Column::parse(&mut buf)?);
233 }
234
235 Self::Relation(RelationBody {
236 rel_id,
237 namespace,
238 name,
239 replica_identity,
240 columns,
241 })
242 }
243 TYPE_TAG => Self::Type(TypeBody {
244 id: buf.read_u32::<BigEndian>()?,
245 namespace: buf.read_cstr()?,
246 name: buf.read_cstr()?,
247 }),
248 INSERT_TAG => {
249 let rel_id = buf.read_u32::<BigEndian>()?;
250 let tag = buf.read_u8()?;
251
252 let tuple = match tag {
253 TUPLE_NEW_TAG => Tuple::parse(&mut buf)?,
254 tag => {
255 return Err(io::Error::new(
256 io::ErrorKind::InvalidInput,
257 format!("unexpected tuple tag `{}`", tag),
258 ));
259 }
260 };
261
262 Self::Insert(InsertBody { rel_id, tuple })
263 }
264 UPDATE_TAG => {
265 let rel_id = buf.read_u32::<BigEndian>()?;
266 let tag = buf.read_u8()?;
267
268 let mut key_tuple = None;
269 let mut old_tuple = None;
270
271 let new_tuple = match tag {
272 TUPLE_NEW_TAG => Tuple::parse(&mut buf)?,
273 TUPLE_OLD_TAG | TUPLE_KEY_TAG => {
274 if tag == TUPLE_OLD_TAG {
275 old_tuple = Some(Tuple::parse(&mut buf)?);
276 } else {
277 key_tuple = Some(Tuple::parse(&mut buf)?);
278 }
279
280 match buf.read_u8()? {
281 TUPLE_NEW_TAG => Tuple::parse(&mut buf)?,
282 tag => {
283 return Err(io::Error::new(
284 io::ErrorKind::InvalidInput,
285 format!("unexpected tuple tag `{}`", tag),
286 ));
287 }
288 }
289 }
290 tag => {
291 return Err(io::Error::new(
292 io::ErrorKind::InvalidInput,
293 format!("unknown tuple tag `{}`", tag),
294 ));
295 }
296 };
297
298 Self::Update(UpdateBody {
299 rel_id,
300 key_tuple,
301 old_tuple,
302 new_tuple,
303 })
304 }
305 DELETE_TAG => {
306 let rel_id = buf.read_u32::<BigEndian>()?;
307 let tag = buf.read_u8()?;
308
309 let mut key_tuple = None;
310 let mut old_tuple = None;
311
312 match tag {
313 TUPLE_OLD_TAG => old_tuple = Some(Tuple::parse(&mut buf)?),
314 TUPLE_KEY_TAG => key_tuple = Some(Tuple::parse(&mut buf)?),
315 tag => {
316 return Err(io::Error::new(
317 io::ErrorKind::InvalidInput,
318 format!("unknown tuple tag `{}`", tag),
319 ));
320 }
321 }
322
323 Self::Delete(DeleteBody {
324 rel_id,
325 key_tuple,
326 old_tuple,
327 })
328 }
329 TRUNCATE_TAG => {
330 let relation_len = buf.read_i32::<BigEndian>()?;
331 let options = buf.read_i8()?;
332
333 let mut rel_ids = Vec::with_capacity(relation_len as usize);
334 for _ in 0..relation_len {
335 rel_ids.push(buf.read_u32::<BigEndian>()?);
336 }
337
338 Self::Truncate(TruncateBody { options, rel_ids })
339 }
340 tag => {
341 return Err(io::Error::new(
342 io::ErrorKind::InvalidInput,
343 format!("unknown replication message tag `{}`", tag),
344 ));
345 }
346 };
347
348 Ok(logical_replication_message)
349 }
350}
351
352#[derive(Debug)]
354pub struct Tuple(Vec<TupleData>);
355
356impl Tuple {
357 #[inline]
358 pub fn tuple_data(&self) -> &[TupleData] {
360 &self.0
361 }
362}
363
364impl Tuple {
365 fn parse(buf: &mut Buffer) -> io::Result<Self> {
366 let col_len = buf.read_i16::<BigEndian>()?;
367 let mut tuple = Vec::with_capacity(col_len as usize);
368 for _ in 0..col_len {
369 tuple.push(TupleData::parse(buf)?);
370 }
371
372 Ok(Tuple(tuple))
373 }
374}
375
376#[derive(Debug)]
378pub struct Column {
379 flags: i8,
380 name: Bytes,
381 type_id: i32,
382 type_modifier: i32,
383}
384
385impl Column {
386 #[inline]
387 pub fn flags(&self) -> i8 {
390 self.flags
391 }
392
393 #[inline]
394 pub fn name(&self) -> io::Result<&str> {
396 get_str(&self.name)
397 }
398
399 #[inline]
400 pub fn type_id(&self) -> i32 {
402 self.type_id
403 }
404
405 #[inline]
406 pub fn type_modifier(&self) -> i32 {
408 self.type_modifier
409 }
410}
411
412impl Column {
413 fn parse(buf: &mut Buffer) -> io::Result<Self> {
414 Ok(Self {
415 flags: buf.read_i8()?,
416 name: buf.read_cstr()?,
417 type_id: buf.read_i32::<BigEndian>()?,
418 type_modifier: buf.read_i32::<BigEndian>()?,
419 })
420 }
421}
422
423#[derive(Debug)]
425pub enum TupleData {
426 Null,
428 UnchangedToast,
430 Text(Bytes),
432 Binary(Bytes),
434}
435
436impl TupleData {
437 fn parse(buf: &mut Buffer) -> io::Result<Self> {
438 let type_tag = buf.read_u8()?;
439
440 let tuple = match type_tag {
441 TUPLE_DATA_NULL_TAG => TupleData::Null,
442 TUPLE_DATA_TOAST_TAG => TupleData::UnchangedToast,
443 TUPLE_DATA_TEXT_TAG => {
444 let len = buf.read_i32::<BigEndian>()?;
445 let data = buf.read_buf(len as usize)?;
446 TupleData::Text(data)
447 }
448 TUPLE_DATA_BINARY_TAG => {
449 let len = buf.read_i32::<BigEndian>()?;
450 let mut data = vec![0; len as usize];
451 buf.read_exact(&mut data)?;
452 TupleData::Binary(data.into())
453 }
454 tag => {
455 return Err(io::Error::new(
456 io::ErrorKind::InvalidInput,
457 format!("unknown replication message tag `{}`", tag),
458 ));
459 }
460 };
461
462 Ok(tuple)
463 }
464}
465
466#[derive(Debug)]
468pub struct BeginBody {
469 final_lsn: u64,
470 timestamp: i64,
471 xid: u32,
472}
473
474impl BeginBody {
475 #[inline]
476 pub fn final_lsn(&self) -> Lsn {
478 self.final_lsn
479 }
480
481 #[inline]
482 pub fn timestamp(&self) -> i64 {
484 self.timestamp
485 }
486
487 #[inline]
488 pub fn xid(&self) -> u32 {
490 self.xid
491 }
492}
493
494#[derive(Debug)]
496pub struct CommitBody {
497 flags: i8,
498 commit_lsn: u64,
499 end_lsn: u64,
500 timestamp: i64,
501}
502
503impl CommitBody {
504 #[inline]
505 pub fn commit_lsn(&self) -> Lsn {
507 self.commit_lsn
508 }
509
510 #[inline]
511 pub fn end_lsn(&self) -> Lsn {
513 self.end_lsn
514 }
515
516 #[inline]
517 pub fn timestamp(&self) -> i64 {
519 self.timestamp
520 }
521
522 #[inline]
523 pub fn flags(&self) -> i8 {
525 self.flags
526 }
527}
528
529#[derive(Debug)]
533pub struct OriginBody {
534 commit_lsn: u64,
535 name: Bytes,
536}
537
538impl OriginBody {
539 #[inline]
540 pub fn commit_lsn(&self) -> Lsn {
542 self.commit_lsn
543 }
544
545 #[inline]
546 pub fn name(&self) -> io::Result<&str> {
548 get_str(&self.name)
549 }
550}
551
552#[derive(Debug)]
554pub enum ReplicaIdentity {
555 Default,
557 Nothing,
559 Full,
561 Index,
565}
566
567#[derive(Debug)]
569pub struct RelationBody {
570 rel_id: u32,
571 namespace: Bytes,
572 name: Bytes,
573 replica_identity: ReplicaIdentity,
574 columns: Vec<Column>,
575}
576
577impl RelationBody {
578 #[inline]
579 pub fn rel_id(&self) -> u32 {
581 self.rel_id
582 }
583
584 #[inline]
585 pub fn namespace(&self) -> io::Result<&str> {
587 get_str(&self.namespace)
588 }
589
590 #[inline]
591 pub fn name(&self) -> io::Result<&str> {
593 get_str(&self.name)
594 }
595
596 #[inline]
597 pub fn replica_identity(&self) -> &ReplicaIdentity {
599 &self.replica_identity
600 }
601
602 #[inline]
603 pub fn columns(&self) -> &[Column] {
605 &self.columns
606 }
607}
608
609#[derive(Debug)]
611pub struct TypeBody {
612 id: u32,
613 namespace: Bytes,
614 name: Bytes,
615}
616
617impl TypeBody {
618 #[inline]
619 pub fn id(&self) -> Oid {
621 self.id
622 }
623
624 #[inline]
625 pub fn namespace(&self) -> io::Result<&str> {
627 get_str(&self.namespace)
628 }
629
630 #[inline]
631 pub fn name(&self) -> io::Result<&str> {
633 get_str(&self.name)
634 }
635}
636
637#[derive(Debug)]
639pub struct InsertBody {
640 rel_id: u32,
641 tuple: Tuple,
642}
643
644impl InsertBody {
645 #[inline]
646 pub fn rel_id(&self) -> u32 {
648 self.rel_id
649 }
650
651 #[inline]
652 pub fn tuple(&self) -> &Tuple {
654 &self.tuple
655 }
656}
657
658#[derive(Debug)]
660pub struct UpdateBody {
661 rel_id: u32,
662 old_tuple: Option<Tuple>,
663 key_tuple: Option<Tuple>,
664 new_tuple: Tuple,
665}
666
667impl UpdateBody {
668 #[inline]
669 pub fn rel_id(&self) -> u32 {
671 self.rel_id
672 }
673
674 #[inline]
675 pub fn key_tuple(&self) -> Option<&Tuple> {
678 self.key_tuple.as_ref()
679 }
680
681 #[inline]
682 pub fn old_tuple(&self) -> Option<&Tuple> {
685 self.old_tuple.as_ref()
686 }
687
688 #[inline]
689 pub fn new_tuple(&self) -> &Tuple {
691 &self.new_tuple
692 }
693}
694
695#[derive(Debug)]
697pub struct DeleteBody {
698 rel_id: u32,
699 old_tuple: Option<Tuple>,
700 key_tuple: Option<Tuple>,
701}
702
703impl DeleteBody {
704 #[inline]
705 pub fn rel_id(&self) -> u32 {
707 self.rel_id
708 }
709
710 #[inline]
711 pub fn key_tuple(&self) -> Option<&Tuple> {
714 self.key_tuple.as_ref()
715 }
716
717 #[inline]
718 pub fn old_tuple(&self) -> Option<&Tuple> {
721 self.old_tuple.as_ref()
722 }
723}
724
725#[derive(Debug)]
727pub struct TruncateBody {
728 options: i8,
729 rel_ids: Vec<u32>,
730}
731
732impl TruncateBody {
733 #[inline]
734 pub fn rel_ids(&self) -> &[u32] {
736 &self.rel_ids
737 }
738
739 #[inline]
740 pub fn options(&self) -> i8 {
742 self.options
743 }
744}
745
746struct Buffer {
747 bytes: Bytes,
748 idx: usize,
749}
750
751impl Buffer {
752 #[inline]
753 fn slice(&self) -> &[u8] {
754 &self.bytes[self.idx..]
755 }
756
757 #[inline]
758 fn read_cstr(&mut self) -> io::Result<Bytes> {
759 match memchr(0, self.slice()) {
760 Some(pos) => {
761 let start = self.idx;
762 let end = start + pos;
763 let cstr = self.bytes.slice(start..end);
764 self.idx = end + 1;
765 Ok(cstr)
766 }
767 None => Err(io::Error::new(
768 io::ErrorKind::UnexpectedEof,
769 "unexpected EOF",
770 )),
771 }
772 }
773
774 #[inline]
775 fn read_all(&mut self) -> Bytes {
776 let buf = self.bytes.slice(self.idx..);
777 self.idx = self.bytes.len();
778 buf
779 }
780
781 #[inline]
782 fn read_buf(&mut self, len: usize) -> io::Result<Bytes> {
783 if self.idx + len <= self.bytes.len() {
784 let buf = self.bytes.slice(self.idx..(self.idx + len));
785 self.idx += len;
786 Ok(buf)
787 } else {
788 Err(io::Error::new(
789 io::ErrorKind::UnexpectedEof,
790 "unexpected EOF",
791 ))
792 }
793 }
794}
795
796impl Read for Buffer {
797 #[inline]
798 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
799 let len = {
800 let slice = self.slice();
801 let len = cmp::min(slice.len(), buf.len());
802 buf[..len].copy_from_slice(&slice[..len]);
803 len
804 };
805 self.idx += len;
806 Ok(len)
807 }
808}
809
810#[inline]
811fn get_str(buf: &[u8]) -> io::Result<&str> {
812 str::from_utf8(buf).map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, e))
813}