postgres_replication/
protocol.rs

1use std::io::{self, Read};
2use std::{cmp, str};
3
4use byteorder::{BigEndian, ReadBytesExt};
5use bytes::Bytes;
6use memchr::memchr;
7use postgres_protocol::{Lsn, Oid};
8
9// replication message tags
10pub const XLOG_DATA_TAG: u8 = b'w';
11pub const PRIMARY_KEEPALIVE_TAG: u8 = b'k';
12
13// logical replication message tags
14const BEGIN_TAG: u8 = b'B';
15const COMMIT_TAG: u8 = b'C';
16const ORIGIN_TAG: u8 = b'O';
17const RELATION_TAG: u8 = b'R';
18const TYPE_TAG: u8 = b'Y';
19const INSERT_TAG: u8 = b'I';
20const UPDATE_TAG: u8 = b'U';
21const DELETE_TAG: u8 = b'D';
22const TRUNCATE_TAG: u8 = b'T';
23const TUPLE_NEW_TAG: u8 = b'N';
24const TUPLE_KEY_TAG: u8 = b'K';
25const TUPLE_OLD_TAG: u8 = b'O';
26const TUPLE_DATA_NULL_TAG: u8 = b'n';
27const TUPLE_DATA_TOAST_TAG: u8 = b'u';
28const TUPLE_DATA_TEXT_TAG: u8 = b't';
29const TUPLE_DATA_BINARY_TAG: u8 = b'b';
30
31// replica identity tags
32const REPLICA_IDENTITY_DEFAULT_TAG: u8 = b'd';
33const REPLICA_IDENTITY_NOTHING_TAG: u8 = b'n';
34const REPLICA_IDENTITY_FULL_TAG: u8 = b'f';
35const REPLICA_IDENTITY_INDEX_TAG: u8 = b'i';
36
37/// An enum representing Postgres backend replication messages.
38#[non_exhaustive]
39#[derive(Debug)]
40pub enum ReplicationMessage<D> {
41    XLogData(XLogDataBody<D>),
42    PrimaryKeepAlive(PrimaryKeepAliveBody),
43}
44
45impl ReplicationMessage<Bytes> {
46    #[inline]
47    pub fn parse(buf: &Bytes) -> io::Result<Self> {
48        let mut buf = Buffer {
49            bytes: buf.clone(),
50            idx: 0,
51        };
52
53        let tag = buf.read_u8()?;
54
55        let replication_message = match tag {
56            XLOG_DATA_TAG => {
57                let wal_start = buf.read_u64::<BigEndian>()?;
58                let wal_end = buf.read_u64::<BigEndian>()?;
59                let timestamp = buf.read_i64::<BigEndian>()?;
60                let data = buf.read_all();
61                ReplicationMessage::XLogData(XLogDataBody {
62                    wal_start,
63                    wal_end,
64                    timestamp,
65                    data,
66                })
67            }
68            PRIMARY_KEEPALIVE_TAG => {
69                let wal_end = buf.read_u64::<BigEndian>()?;
70                let timestamp = buf.read_i64::<BigEndian>()?;
71                let reply = buf.read_u8()?;
72                ReplicationMessage::PrimaryKeepAlive(PrimaryKeepAliveBody {
73                    wal_end,
74                    timestamp,
75                    reply,
76                })
77            }
78            tag => {
79                return Err(io::Error::new(
80                    io::ErrorKind::InvalidInput,
81                    format!("unknown replication message tag `{}`", tag),
82                ));
83            }
84        };
85
86        Ok(replication_message)
87    }
88}
89
90#[derive(Debug)]
91pub struct XLogDataBody<D> {
92    wal_start: u64,
93    wal_end: u64,
94    timestamp: i64,
95    data: D,
96}
97
98impl<D> XLogDataBody<D> {
99    #[inline]
100    pub fn wal_start(&self) -> u64 {
101        self.wal_start
102    }
103
104    #[inline]
105    pub fn wal_end(&self) -> u64 {
106        self.wal_end
107    }
108
109    #[inline]
110    pub fn timestamp(&self) -> i64 {
111        self.timestamp
112    }
113
114    #[inline]
115    pub fn data(&self) -> &D {
116        &self.data
117    }
118
119    #[inline]
120    pub fn into_data(self) -> D {
121        self.data
122    }
123
124    pub fn map_data<F, D2, E>(self, f: F) -> Result<XLogDataBody<D2>, E>
125    where
126        F: Fn(D) -> Result<D2, E>,
127    {
128        let data = f(self.data)?;
129        Ok(XLogDataBody {
130            wal_start: self.wal_start,
131            wal_end: self.wal_end,
132            timestamp: self.timestamp,
133            data,
134        })
135    }
136}
137
138#[derive(Debug)]
139pub struct PrimaryKeepAliveBody {
140    wal_end: u64,
141    timestamp: i64,
142    reply: u8,
143}
144
145impl PrimaryKeepAliveBody {
146    #[inline]
147    pub fn wal_end(&self) -> u64 {
148        self.wal_end
149    }
150
151    #[inline]
152    pub fn timestamp(&self) -> i64 {
153        self.timestamp
154    }
155
156    #[inline]
157    pub fn reply(&self) -> u8 {
158        self.reply
159    }
160}
161
162#[non_exhaustive]
163/// A message of the logical replication stream
164#[derive(Debug)]
165pub enum LogicalReplicationMessage {
166    /// A BEGIN statement
167    Begin(BeginBody),
168    /// A BEGIN statement
169    Commit(CommitBody),
170    /// An Origin replication message
171    /// Note that there can be multiple Origin messages inside a single transaction.
172    Origin(OriginBody),
173    /// A Relation replication message
174    Relation(RelationBody),
175    /// A Type replication message
176    Type(TypeBody),
177    /// An INSERT statement
178    Insert(InsertBody),
179    /// An UPDATE statement
180    Update(UpdateBody),
181    /// A DELETE statement
182    Delete(DeleteBody),
183    /// A TRUNCATE statement
184    Truncate(TruncateBody),
185}
186
187impl LogicalReplicationMessage {
188    pub fn parse(buf: &Bytes) -> io::Result<Self> {
189        let mut buf = Buffer {
190            bytes: buf.clone(),
191            idx: 0,
192        };
193
194        let tag = buf.read_u8()?;
195
196        let logical_replication_message = match tag {
197            BEGIN_TAG => Self::Begin(BeginBody {
198                final_lsn: buf.read_u64::<BigEndian>()?,
199                timestamp: buf.read_i64::<BigEndian>()?,
200                xid: buf.read_u32::<BigEndian>()?,
201            }),
202            COMMIT_TAG => Self::Commit(CommitBody {
203                flags: buf.read_i8()?,
204                commit_lsn: buf.read_u64::<BigEndian>()?,
205                end_lsn: buf.read_u64::<BigEndian>()?,
206                timestamp: buf.read_i64::<BigEndian>()?,
207            }),
208            ORIGIN_TAG => Self::Origin(OriginBody {
209                commit_lsn: buf.read_u64::<BigEndian>()?,
210                name: buf.read_cstr()?,
211            }),
212            RELATION_TAG => {
213                let rel_id = buf.read_u32::<BigEndian>()?;
214                let namespace = buf.read_cstr()?;
215                let name = buf.read_cstr()?;
216                let replica_identity = match buf.read_u8()? {
217                    REPLICA_IDENTITY_DEFAULT_TAG => ReplicaIdentity::Default,
218                    REPLICA_IDENTITY_NOTHING_TAG => ReplicaIdentity::Nothing,
219                    REPLICA_IDENTITY_FULL_TAG => ReplicaIdentity::Full,
220                    REPLICA_IDENTITY_INDEX_TAG => ReplicaIdentity::Index,
221                    tag => {
222                        return Err(io::Error::new(
223                            io::ErrorKind::InvalidInput,
224                            format!("unknown replica identity tag `{}`", tag),
225                        ));
226                    }
227                };
228                let column_len = buf.read_i16::<BigEndian>()?;
229
230                let mut columns = Vec::with_capacity(column_len as usize);
231                for _ in 0..column_len {
232                    columns.push(Column::parse(&mut buf)?);
233                }
234
235                Self::Relation(RelationBody {
236                    rel_id,
237                    namespace,
238                    name,
239                    replica_identity,
240                    columns,
241                })
242            }
243            TYPE_TAG => Self::Type(TypeBody {
244                id: buf.read_u32::<BigEndian>()?,
245                namespace: buf.read_cstr()?,
246                name: buf.read_cstr()?,
247            }),
248            INSERT_TAG => {
249                let rel_id = buf.read_u32::<BigEndian>()?;
250                let tag = buf.read_u8()?;
251
252                let tuple = match tag {
253                    TUPLE_NEW_TAG => Tuple::parse(&mut buf)?,
254                    tag => {
255                        return Err(io::Error::new(
256                            io::ErrorKind::InvalidInput,
257                            format!("unexpected tuple tag `{}`", tag),
258                        ));
259                    }
260                };
261
262                Self::Insert(InsertBody { rel_id, tuple })
263            }
264            UPDATE_TAG => {
265                let rel_id = buf.read_u32::<BigEndian>()?;
266                let tag = buf.read_u8()?;
267
268                let mut key_tuple = None;
269                let mut old_tuple = None;
270
271                let new_tuple = match tag {
272                    TUPLE_NEW_TAG => Tuple::parse(&mut buf)?,
273                    TUPLE_OLD_TAG | TUPLE_KEY_TAG => {
274                        if tag == TUPLE_OLD_TAG {
275                            old_tuple = Some(Tuple::parse(&mut buf)?);
276                        } else {
277                            key_tuple = Some(Tuple::parse(&mut buf)?);
278                        }
279
280                        match buf.read_u8()? {
281                            TUPLE_NEW_TAG => Tuple::parse(&mut buf)?,
282                            tag => {
283                                return Err(io::Error::new(
284                                    io::ErrorKind::InvalidInput,
285                                    format!("unexpected tuple tag `{}`", tag),
286                                ));
287                            }
288                        }
289                    }
290                    tag => {
291                        return Err(io::Error::new(
292                            io::ErrorKind::InvalidInput,
293                            format!("unknown tuple tag `{}`", tag),
294                        ));
295                    }
296                };
297
298                Self::Update(UpdateBody {
299                    rel_id,
300                    key_tuple,
301                    old_tuple,
302                    new_tuple,
303                })
304            }
305            DELETE_TAG => {
306                let rel_id = buf.read_u32::<BigEndian>()?;
307                let tag = buf.read_u8()?;
308
309                let mut key_tuple = None;
310                let mut old_tuple = None;
311
312                match tag {
313                    TUPLE_OLD_TAG => old_tuple = Some(Tuple::parse(&mut buf)?),
314                    TUPLE_KEY_TAG => key_tuple = Some(Tuple::parse(&mut buf)?),
315                    tag => {
316                        return Err(io::Error::new(
317                            io::ErrorKind::InvalidInput,
318                            format!("unknown tuple tag `{}`", tag),
319                        ));
320                    }
321                }
322
323                Self::Delete(DeleteBody {
324                    rel_id,
325                    key_tuple,
326                    old_tuple,
327                })
328            }
329            TRUNCATE_TAG => {
330                let relation_len = buf.read_i32::<BigEndian>()?;
331                let options = buf.read_i8()?;
332
333                let mut rel_ids = Vec::with_capacity(relation_len as usize);
334                for _ in 0..relation_len {
335                    rel_ids.push(buf.read_u32::<BigEndian>()?);
336                }
337
338                Self::Truncate(TruncateBody { options, rel_ids })
339            }
340            tag => {
341                return Err(io::Error::new(
342                    io::ErrorKind::InvalidInput,
343                    format!("unknown replication message tag `{}`", tag),
344                ));
345            }
346        };
347
348        Ok(logical_replication_message)
349    }
350}
351
352/// A row as it appears in the replication stream
353#[derive(Debug)]
354pub struct Tuple(Vec<TupleData>);
355
356impl Tuple {
357    #[inline]
358    /// The tuple data of this tuple
359    pub fn tuple_data(&self) -> &[TupleData] {
360        &self.0
361    }
362}
363
364impl Tuple {
365    fn parse(buf: &mut Buffer) -> io::Result<Self> {
366        let col_len = buf.read_i16::<BigEndian>()?;
367        let mut tuple = Vec::with_capacity(col_len as usize);
368        for _ in 0..col_len {
369            tuple.push(TupleData::parse(buf)?);
370        }
371
372        Ok(Tuple(tuple))
373    }
374}
375
376/// A column as it appears in the replication stream
377#[derive(Debug)]
378pub struct Column {
379    flags: i8,
380    name: Bytes,
381    type_id: i32,
382    type_modifier: i32,
383}
384
385impl Column {
386    #[inline]
387    /// Flags for the column. Currently can be either 0 for no flags or 1 which marks the column as
388    /// part of the key.
389    pub fn flags(&self) -> i8 {
390        self.flags
391    }
392
393    #[inline]
394    /// Name of the column.
395    pub fn name(&self) -> io::Result<&str> {
396        get_str(&self.name)
397    }
398
399    #[inline]
400    /// ID of the column's data type.
401    pub fn type_id(&self) -> i32 {
402        self.type_id
403    }
404
405    #[inline]
406    /// Type modifier of the column (`atttypmod`).
407    pub fn type_modifier(&self) -> i32 {
408        self.type_modifier
409    }
410}
411
412impl Column {
413    fn parse(buf: &mut Buffer) -> io::Result<Self> {
414        Ok(Self {
415            flags: buf.read_i8()?,
416            name: buf.read_cstr()?,
417            type_id: buf.read_i32::<BigEndian>()?,
418            type_modifier: buf.read_i32::<BigEndian>()?,
419        })
420    }
421}
422
423/// The data of an individual column as it appears in the replication stream
424#[derive(Debug)]
425pub enum TupleData {
426    /// Represents a NULL value
427    Null,
428    /// Represents an unchanged TOASTed value (the actual value is not sent).
429    UnchangedToast,
430    /// Column data as text formatted value.
431    Text(Bytes),
432    /// Column data as binary formatted value.
433    Binary(Bytes),
434}
435
436impl TupleData {
437    fn parse(buf: &mut Buffer) -> io::Result<Self> {
438        let type_tag = buf.read_u8()?;
439
440        let tuple = match type_tag {
441            TUPLE_DATA_NULL_TAG => TupleData::Null,
442            TUPLE_DATA_TOAST_TAG => TupleData::UnchangedToast,
443            TUPLE_DATA_TEXT_TAG => {
444                let len = buf.read_i32::<BigEndian>()?;
445                let data = buf.read_buf(len as usize)?;
446                TupleData::Text(data)
447            }
448            TUPLE_DATA_BINARY_TAG => {
449                let len = buf.read_i32::<BigEndian>()?;
450                let mut data = vec![0; len as usize];
451                buf.read_exact(&mut data)?;
452                TupleData::Binary(data.into())
453            }
454            tag => {
455                return Err(io::Error::new(
456                    io::ErrorKind::InvalidInput,
457                    format!("unknown replication message tag `{}`", tag),
458                ));
459            }
460        };
461
462        Ok(tuple)
463    }
464}
465
466/// A BEGIN statement
467#[derive(Debug)]
468pub struct BeginBody {
469    final_lsn: u64,
470    timestamp: i64,
471    xid: u32,
472}
473
474impl BeginBody {
475    #[inline]
476    /// Gets the final lsn of the transaction
477    pub fn final_lsn(&self) -> Lsn {
478        self.final_lsn
479    }
480
481    #[inline]
482    /// Commit timestamp of the transaction. The value is in number of microseconds since PostgreSQL epoch (2000-01-01).
483    pub fn timestamp(&self) -> i64 {
484        self.timestamp
485    }
486
487    #[inline]
488    /// Xid of the transaction.
489    pub fn xid(&self) -> u32 {
490        self.xid
491    }
492}
493
494/// A COMMIT statement
495#[derive(Debug)]
496pub struct CommitBody {
497    flags: i8,
498    commit_lsn: u64,
499    end_lsn: u64,
500    timestamp: i64,
501}
502
503impl CommitBody {
504    #[inline]
505    /// The LSN of the commit.
506    pub fn commit_lsn(&self) -> Lsn {
507        self.commit_lsn
508    }
509
510    #[inline]
511    /// The end LSN of the transaction.
512    pub fn end_lsn(&self) -> Lsn {
513        self.end_lsn
514    }
515
516    #[inline]
517    /// Commit timestamp of the transaction. The value is in number of microseconds since PostgreSQL epoch (2000-01-01).
518    pub fn timestamp(&self) -> i64 {
519        self.timestamp
520    }
521
522    #[inline]
523    /// Flags; currently unused (will be 0).
524    pub fn flags(&self) -> i8 {
525        self.flags
526    }
527}
528
529/// An Origin replication message
530///
531/// Note that there can be multiple Origin messages inside a single transaction.
532#[derive(Debug)]
533pub struct OriginBody {
534    commit_lsn: u64,
535    name: Bytes,
536}
537
538impl OriginBody {
539    #[inline]
540    /// The LSN of the commit on the origin server.
541    pub fn commit_lsn(&self) -> Lsn {
542        self.commit_lsn
543    }
544
545    #[inline]
546    /// Name of the origin.
547    pub fn name(&self) -> io::Result<&str> {
548        get_str(&self.name)
549    }
550}
551
552/// Describes the REPLICA IDENTITY setting of a table
553#[derive(Debug)]
554pub enum ReplicaIdentity {
555    /// default selection for replica identity (primary key or nothing)
556    Default,
557    /// no replica identity is logged for this relation
558    Nothing,
559    /// all columns are logged as replica identity
560    Full,
561    /// An explicitly chosen candidate key's columns are used as replica identity.
562    /// Note this will still be set if the index has been dropped; in that case it
563    /// has the same meaning as 'd'.
564    Index,
565}
566
567/// A Relation replication message
568#[derive(Debug)]
569pub struct RelationBody {
570    rel_id: u32,
571    namespace: Bytes,
572    name: Bytes,
573    replica_identity: ReplicaIdentity,
574    columns: Vec<Column>,
575}
576
577impl RelationBody {
578    #[inline]
579    /// ID of the relation.
580    pub fn rel_id(&self) -> u32 {
581        self.rel_id
582    }
583
584    #[inline]
585    /// Namespace (empty string for pg_catalog).
586    pub fn namespace(&self) -> io::Result<&str> {
587        get_str(&self.namespace)
588    }
589
590    #[inline]
591    /// Relation name.
592    pub fn name(&self) -> io::Result<&str> {
593        get_str(&self.name)
594    }
595
596    #[inline]
597    /// Replica identity setting for the relation
598    pub fn replica_identity(&self) -> &ReplicaIdentity {
599        &self.replica_identity
600    }
601
602    #[inline]
603    /// The column definitions of this relation
604    pub fn columns(&self) -> &[Column] {
605        &self.columns
606    }
607}
608
609/// A Type replication message
610#[derive(Debug)]
611pub struct TypeBody {
612    id: u32,
613    namespace: Bytes,
614    name: Bytes,
615}
616
617impl TypeBody {
618    #[inline]
619    /// ID of the data type.
620    pub fn id(&self) -> Oid {
621        self.id
622    }
623
624    #[inline]
625    /// Namespace (empty string for pg_catalog).
626    pub fn namespace(&self) -> io::Result<&str> {
627        get_str(&self.namespace)
628    }
629
630    #[inline]
631    /// Name of the data type.
632    pub fn name(&self) -> io::Result<&str> {
633        get_str(&self.name)
634    }
635}
636
637/// An INSERT statement
638#[derive(Debug)]
639pub struct InsertBody {
640    rel_id: u32,
641    tuple: Tuple,
642}
643
644impl InsertBody {
645    #[inline]
646    /// ID of the relation corresponding to the ID in the relation message.
647    pub fn rel_id(&self) -> u32 {
648        self.rel_id
649    }
650
651    #[inline]
652    /// The inserted tuple
653    pub fn tuple(&self) -> &Tuple {
654        &self.tuple
655    }
656}
657
658/// An UPDATE statement
659#[derive(Debug)]
660pub struct UpdateBody {
661    rel_id: u32,
662    old_tuple: Option<Tuple>,
663    key_tuple: Option<Tuple>,
664    new_tuple: Tuple,
665}
666
667impl UpdateBody {
668    #[inline]
669    /// ID of the relation corresponding to the ID in the relation message.
670    pub fn rel_id(&self) -> u32 {
671        self.rel_id
672    }
673
674    #[inline]
675    /// This field is optional and is only present if the update changed data in any of the
676    /// column(s) that are part of the REPLICA IDENTITY index.
677    pub fn key_tuple(&self) -> Option<&Tuple> {
678        self.key_tuple.as_ref()
679    }
680
681    #[inline]
682    /// This field is optional and is only present if table in which the update happened has
683    /// REPLICA IDENTITY set to FULL.
684    pub fn old_tuple(&self) -> Option<&Tuple> {
685        self.old_tuple.as_ref()
686    }
687
688    #[inline]
689    /// The new tuple
690    pub fn new_tuple(&self) -> &Tuple {
691        &self.new_tuple
692    }
693}
694
695/// A DELETE statement
696#[derive(Debug)]
697pub struct DeleteBody {
698    rel_id: u32,
699    old_tuple: Option<Tuple>,
700    key_tuple: Option<Tuple>,
701}
702
703impl DeleteBody {
704    #[inline]
705    /// ID of the relation corresponding to the ID in the relation message.
706    pub fn rel_id(&self) -> u32 {
707        self.rel_id
708    }
709
710    #[inline]
711    /// This field is present if the table in which the delete has happened uses an index as
712    /// REPLICA IDENTITY.
713    pub fn key_tuple(&self) -> Option<&Tuple> {
714        self.key_tuple.as_ref()
715    }
716
717    #[inline]
718    /// This field is present if the table in which the delete has happened has REPLICA IDENTITY
719    /// set to FULL.
720    pub fn old_tuple(&self) -> Option<&Tuple> {
721        self.old_tuple.as_ref()
722    }
723}
724
725/// A TRUNCATE statement
726#[derive(Debug)]
727pub struct TruncateBody {
728    options: i8,
729    rel_ids: Vec<u32>,
730}
731
732impl TruncateBody {
733    #[inline]
734    /// The IDs of the relations corresponding to the ID in the relation messages
735    pub fn rel_ids(&self) -> &[u32] {
736        &self.rel_ids
737    }
738
739    #[inline]
740    /// Option bits for TRUNCATE: 1 for CASCADE, 2 for RESTART IDENTITY
741    pub fn options(&self) -> i8 {
742        self.options
743    }
744}
745
746struct Buffer {
747    bytes: Bytes,
748    idx: usize,
749}
750
751impl Buffer {
752    #[inline]
753    fn slice(&self) -> &[u8] {
754        &self.bytes[self.idx..]
755    }
756
757    #[inline]
758    fn read_cstr(&mut self) -> io::Result<Bytes> {
759        match memchr(0, self.slice()) {
760            Some(pos) => {
761                let start = self.idx;
762                let end = start + pos;
763                let cstr = self.bytes.slice(start..end);
764                self.idx = end + 1;
765                Ok(cstr)
766            }
767            None => Err(io::Error::new(
768                io::ErrorKind::UnexpectedEof,
769                "unexpected EOF",
770            )),
771        }
772    }
773
774    #[inline]
775    fn read_all(&mut self) -> Bytes {
776        let buf = self.bytes.slice(self.idx..);
777        self.idx = self.bytes.len();
778        buf
779    }
780
781    #[inline]
782    fn read_buf(&mut self, len: usize) -> io::Result<Bytes> {
783        if self.idx + len <= self.bytes.len() {
784            let buf = self.bytes.slice(self.idx..(self.idx + len));
785            self.idx += len;
786            Ok(buf)
787        } else {
788            Err(io::Error::new(
789                io::ErrorKind::UnexpectedEof,
790                "unexpected EOF",
791            ))
792        }
793    }
794}
795
796impl Read for Buffer {
797    #[inline]
798    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
799        let len = {
800            let slice = self.slice();
801            let len = cmp::min(slice.len(), buf.len());
802            buf[..len].copy_from_slice(&slice[..len]);
803            len
804        };
805        self.idx += len;
806        Ok(len)
807    }
808}
809
810#[inline]
811fn get_str(buf: &[u8]) -> io::Result<&str> {
812    str::from_utf8(buf).map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, e))
813}