postgres_replication/
protocol.rs

1use std::io::{self, Read};
2use std::{cmp, str};
3
4use byteorder::{BigEndian, ReadBytesExt};
5use bytes::Bytes;
6use memchr::memchr;
7use postgres_protocol::{Lsn, Oid};
8
9// replication message tags
10pub const XLOG_DATA_TAG: u8 = b'w';
11pub const PRIMARY_KEEPALIVE_TAG: u8 = b'k';
12
13// logical replication message tags
14const BEGIN_TAG: u8 = b'B';
15const MESSAGE_TAG: u8 = b'M';
16const COMMIT_TAG: u8 = b'C';
17const ORIGIN_TAG: u8 = b'O';
18const RELATION_TAG: u8 = b'R';
19const TYPE_TAG: u8 = b'Y';
20const INSERT_TAG: u8 = b'I';
21const UPDATE_TAG: u8 = b'U';
22const DELETE_TAG: u8 = b'D';
23const TRUNCATE_TAG: u8 = b'T';
24const TUPLE_NEW_TAG: u8 = b'N';
25const TUPLE_KEY_TAG: u8 = b'K';
26const TUPLE_OLD_TAG: u8 = b'O';
27const TUPLE_DATA_NULL_TAG: u8 = b'n';
28const TUPLE_DATA_TOAST_TAG: u8 = b'u';
29const TUPLE_DATA_TEXT_TAG: u8 = b't';
30const TUPLE_DATA_BINARY_TAG: u8 = b'b';
31
32// replica identity tags
33const REPLICA_IDENTITY_DEFAULT_TAG: u8 = b'd';
34const REPLICA_IDENTITY_NOTHING_TAG: u8 = b'n';
35const REPLICA_IDENTITY_FULL_TAG: u8 = b'f';
36const REPLICA_IDENTITY_INDEX_TAG: u8 = b'i';
37
38/// An enum representing Postgres backend replication messages.
39#[non_exhaustive]
40#[derive(Debug)]
41pub enum ReplicationMessage<D> {
42    XLogData(XLogDataBody<D>),
43    PrimaryKeepAlive(PrimaryKeepAliveBody),
44}
45
46impl ReplicationMessage<Bytes> {
47    #[inline]
48    pub fn parse(buf: &Bytes) -> io::Result<Self> {
49        let mut buf = Buffer {
50            bytes: buf.clone(),
51            idx: 0,
52        };
53
54        let tag = buf.read_u8()?;
55
56        let replication_message = match tag {
57            XLOG_DATA_TAG => {
58                let wal_start = buf.read_u64::<BigEndian>()?;
59                let wal_end = buf.read_u64::<BigEndian>()?;
60                let timestamp = buf.read_i64::<BigEndian>()?;
61                let data = buf.read_all();
62                ReplicationMessage::XLogData(XLogDataBody {
63                    wal_start,
64                    wal_end,
65                    timestamp,
66                    data,
67                })
68            }
69            PRIMARY_KEEPALIVE_TAG => {
70                let wal_end = buf.read_u64::<BigEndian>()?;
71                let timestamp = buf.read_i64::<BigEndian>()?;
72                let reply = buf.read_u8()?;
73                ReplicationMessage::PrimaryKeepAlive(PrimaryKeepAliveBody {
74                    wal_end,
75                    timestamp,
76                    reply,
77                })
78            }
79            tag => {
80                return Err(io::Error::new(
81                    io::ErrorKind::InvalidInput,
82                    format!("unknown replication message tag `{}`", tag),
83                ));
84            }
85        };
86
87        Ok(replication_message)
88    }
89}
90
91#[derive(Debug)]
92pub struct XLogDataBody<D> {
93    wal_start: u64,
94    wal_end: u64,
95    timestamp: i64,
96    data: D,
97}
98
99impl<D> XLogDataBody<D> {
100    #[inline]
101    pub fn wal_start(&self) -> u64 {
102        self.wal_start
103    }
104
105    #[inline]
106    pub fn wal_end(&self) -> u64 {
107        self.wal_end
108    }
109
110    #[inline]
111    pub fn timestamp(&self) -> i64 {
112        self.timestamp
113    }
114
115    #[inline]
116    pub fn data(&self) -> &D {
117        &self.data
118    }
119
120    #[inline]
121    pub fn into_data(self) -> D {
122        self.data
123    }
124
125    pub fn map_data<F, D2, E>(self, f: F) -> Result<XLogDataBody<D2>, E>
126    where
127        F: Fn(D) -> Result<D2, E>,
128    {
129        let data = f(self.data)?;
130        Ok(XLogDataBody {
131            wal_start: self.wal_start,
132            wal_end: self.wal_end,
133            timestamp: self.timestamp,
134            data,
135        })
136    }
137}
138
139#[derive(Debug)]
140pub struct PrimaryKeepAliveBody {
141    wal_end: u64,
142    timestamp: i64,
143    reply: u8,
144}
145
146impl PrimaryKeepAliveBody {
147    #[inline]
148    pub fn wal_end(&self) -> u64 {
149        self.wal_end
150    }
151
152    #[inline]
153    pub fn timestamp(&self) -> i64 {
154        self.timestamp
155    }
156
157    #[inline]
158    pub fn reply(&self) -> u8 {
159        self.reply
160    }
161}
162
163#[non_exhaustive]
164/// A message of the logical replication stream
165#[derive(Debug)]
166pub enum LogicalReplicationMessage {
167    /// A BEGIN statement
168    Begin(BeginBody),
169    /// A logical decoding message
170    Message(MessageBody),
171    /// A COMMIT statement
172    Commit(CommitBody),
173    /// An Origin replication message
174    /// Note that there can be multiple Origin messages inside a single transaction.
175    Origin(OriginBody),
176    /// A Relation replication message
177    Relation(RelationBody),
178    /// A Type replication message
179    Type(TypeBody),
180    /// An INSERT statement
181    Insert(InsertBody),
182    /// An UPDATE statement
183    Update(UpdateBody),
184    /// A DELETE statement
185    Delete(DeleteBody),
186    /// A TRUNCATE statement
187    Truncate(TruncateBody),
188}
189
190impl LogicalReplicationMessage {
191    pub fn parse(buf: &Bytes) -> io::Result<Self> {
192        let mut buf = Buffer {
193            bytes: buf.clone(),
194            idx: 0,
195        };
196
197        let tag = buf.read_u8()?;
198
199        let logical_replication_message = match tag {
200            BEGIN_TAG => Self::Begin(BeginBody {
201                final_lsn: buf.read_u64::<BigEndian>()?,
202                timestamp: buf.read_i64::<BigEndian>()?,
203                xid: buf.read_u32::<BigEndian>()?,
204            }),
205            MESSAGE_TAG => Self::Message(MessageBody {
206                flags: buf.read_i8()?,
207                message_lsn: buf.read_u64::<BigEndian>()?,
208                prefix: buf.read_cstr()?,
209                content: match buf.read_i32::<BigEndian>()? {
210                    len if len > 0 => buf.read_buf(len as usize)?,
211                    0 => Bytes::new(),
212                    len => {
213                        return Err(io::Error::new(
214                            io::ErrorKind::InvalidInput,
215                            format!("unexpected message content length `{len}`"),
216                        ))
217                    }
218                },
219            }),
220            COMMIT_TAG => Self::Commit(CommitBody {
221                flags: buf.read_i8()?,
222                commit_lsn: buf.read_u64::<BigEndian>()?,
223                end_lsn: buf.read_u64::<BigEndian>()?,
224                timestamp: buf.read_i64::<BigEndian>()?,
225            }),
226            ORIGIN_TAG => Self::Origin(OriginBody {
227                commit_lsn: buf.read_u64::<BigEndian>()?,
228                name: buf.read_cstr()?,
229            }),
230            RELATION_TAG => {
231                let rel_id = buf.read_u32::<BigEndian>()?;
232                let namespace = buf.read_cstr()?;
233                let name = buf.read_cstr()?;
234                let replica_identity = match buf.read_u8()? {
235                    REPLICA_IDENTITY_DEFAULT_TAG => ReplicaIdentity::Default,
236                    REPLICA_IDENTITY_NOTHING_TAG => ReplicaIdentity::Nothing,
237                    REPLICA_IDENTITY_FULL_TAG => ReplicaIdentity::Full,
238                    REPLICA_IDENTITY_INDEX_TAG => ReplicaIdentity::Index,
239                    tag => {
240                        return Err(io::Error::new(
241                            io::ErrorKind::InvalidInput,
242                            format!("unknown replica identity tag `{}`", tag),
243                        ));
244                    }
245                };
246                let column_len = buf.read_i16::<BigEndian>()?;
247
248                let mut columns = Vec::with_capacity(column_len as usize);
249                for _ in 0..column_len {
250                    columns.push(Column::parse(&mut buf)?);
251                }
252
253                Self::Relation(RelationBody {
254                    rel_id,
255                    namespace,
256                    name,
257                    replica_identity,
258                    columns,
259                })
260            }
261            TYPE_TAG => Self::Type(TypeBody {
262                id: buf.read_u32::<BigEndian>()?,
263                namespace: buf.read_cstr()?,
264                name: buf.read_cstr()?,
265            }),
266            INSERT_TAG => {
267                let rel_id = buf.read_u32::<BigEndian>()?;
268                let tag = buf.read_u8()?;
269
270                let tuple = match tag {
271                    TUPLE_NEW_TAG => Tuple::parse(&mut buf)?,
272                    tag => {
273                        return Err(io::Error::new(
274                            io::ErrorKind::InvalidInput,
275                            format!("unexpected tuple tag `{}`", tag),
276                        ));
277                    }
278                };
279
280                Self::Insert(InsertBody { rel_id, tuple })
281            }
282            UPDATE_TAG => {
283                let rel_id = buf.read_u32::<BigEndian>()?;
284                let tag = buf.read_u8()?;
285
286                let mut key_tuple = None;
287                let mut old_tuple = None;
288
289                let new_tuple = match tag {
290                    TUPLE_NEW_TAG => Tuple::parse(&mut buf)?,
291                    TUPLE_OLD_TAG | TUPLE_KEY_TAG => {
292                        if tag == TUPLE_OLD_TAG {
293                            old_tuple = Some(Tuple::parse(&mut buf)?);
294                        } else {
295                            key_tuple = Some(Tuple::parse(&mut buf)?);
296                        }
297
298                        match buf.read_u8()? {
299                            TUPLE_NEW_TAG => Tuple::parse(&mut buf)?,
300                            tag => {
301                                return Err(io::Error::new(
302                                    io::ErrorKind::InvalidInput,
303                                    format!("unexpected tuple tag `{}`", tag),
304                                ));
305                            }
306                        }
307                    }
308                    tag => {
309                        return Err(io::Error::new(
310                            io::ErrorKind::InvalidInput,
311                            format!("unknown tuple tag `{}`", tag),
312                        ));
313                    }
314                };
315
316                Self::Update(UpdateBody {
317                    rel_id,
318                    key_tuple,
319                    old_tuple,
320                    new_tuple,
321                })
322            }
323            DELETE_TAG => {
324                let rel_id = buf.read_u32::<BigEndian>()?;
325                let tag = buf.read_u8()?;
326
327                let mut key_tuple = None;
328                let mut old_tuple = None;
329
330                match tag {
331                    TUPLE_OLD_TAG => old_tuple = Some(Tuple::parse(&mut buf)?),
332                    TUPLE_KEY_TAG => key_tuple = Some(Tuple::parse(&mut buf)?),
333                    tag => {
334                        return Err(io::Error::new(
335                            io::ErrorKind::InvalidInput,
336                            format!("unknown tuple tag `{}`", tag),
337                        ));
338                    }
339                }
340
341                Self::Delete(DeleteBody {
342                    rel_id,
343                    key_tuple,
344                    old_tuple,
345                })
346            }
347            TRUNCATE_TAG => {
348                let relation_len = buf.read_i32::<BigEndian>()?;
349                let options = buf.read_i8()?;
350
351                let mut rel_ids = Vec::with_capacity(relation_len as usize);
352                for _ in 0..relation_len {
353                    rel_ids.push(buf.read_u32::<BigEndian>()?);
354                }
355
356                Self::Truncate(TruncateBody { options, rel_ids })
357            }
358            tag => {
359                return Err(io::Error::new(
360                    io::ErrorKind::InvalidInput,
361                    format!("unknown replication message tag `{}`", tag),
362                ));
363            }
364        };
365
366        Ok(logical_replication_message)
367    }
368}
369
370/// A row as it appears in the replication stream
371#[derive(Debug)]
372pub struct Tuple(Vec<TupleData>);
373
374impl Tuple {
375    #[inline]
376    /// The tuple data of this tuple
377    pub fn tuple_data(&self) -> &[TupleData] {
378        &self.0
379    }
380}
381
382impl Tuple {
383    fn parse(buf: &mut Buffer) -> io::Result<Self> {
384        let col_len = buf.read_i16::<BigEndian>()?;
385        let mut tuple = Vec::with_capacity(col_len as usize);
386        for _ in 0..col_len {
387            tuple.push(TupleData::parse(buf)?);
388        }
389
390        Ok(Tuple(tuple))
391    }
392}
393
394/// A column as it appears in the replication stream
395#[derive(Debug)]
396pub struct Column {
397    flags: i8,
398    name: Bytes,
399    type_id: i32,
400    type_modifier: i32,
401}
402
403impl Column {
404    #[inline]
405    /// Flags for the column. Currently can be either 0 for no flags or 1 which marks the column as
406    /// part of the key.
407    pub fn flags(&self) -> i8 {
408        self.flags
409    }
410
411    #[inline]
412    /// Name of the column.
413    pub fn name(&self) -> io::Result<&str> {
414        get_str(&self.name)
415    }
416
417    #[inline]
418    /// ID of the column's data type.
419    pub fn type_id(&self) -> i32 {
420        self.type_id
421    }
422
423    #[inline]
424    /// Type modifier of the column (`atttypmod`).
425    pub fn type_modifier(&self) -> i32 {
426        self.type_modifier
427    }
428}
429
430impl Column {
431    fn parse(buf: &mut Buffer) -> io::Result<Self> {
432        Ok(Self {
433            flags: buf.read_i8()?,
434            name: buf.read_cstr()?,
435            type_id: buf.read_i32::<BigEndian>()?,
436            type_modifier: buf.read_i32::<BigEndian>()?,
437        })
438    }
439}
440
441/// The data of an individual column as it appears in the replication stream
442#[derive(Debug)]
443pub enum TupleData {
444    /// Represents a NULL value
445    Null,
446    /// Represents an unchanged TOASTed value (the actual value is not sent).
447    UnchangedToast,
448    /// Column data as text formatted value.
449    Text(Bytes),
450    /// Column data as binary formatted value.
451    Binary(Bytes),
452}
453
454impl TupleData {
455    fn parse(buf: &mut Buffer) -> io::Result<Self> {
456        let type_tag = buf.read_u8()?;
457
458        let tuple = match type_tag {
459            TUPLE_DATA_NULL_TAG => TupleData::Null,
460            TUPLE_DATA_TOAST_TAG => TupleData::UnchangedToast,
461            TUPLE_DATA_TEXT_TAG => {
462                let len = buf.read_i32::<BigEndian>()?;
463                let data = buf.read_buf(len as usize)?;
464                TupleData::Text(data)
465            }
466            TUPLE_DATA_BINARY_TAG => {
467                let len = buf.read_i32::<BigEndian>()?;
468                let mut data = vec![0; len as usize];
469                buf.read_exact(&mut data)?;
470                TupleData::Binary(data.into())
471            }
472            tag => {
473                return Err(io::Error::new(
474                    io::ErrorKind::InvalidInput,
475                    format!("unknown replication message tag `{}`", tag),
476                ));
477            }
478        };
479
480        Ok(tuple)
481    }
482}
483
484/// A BEGIN statement
485#[derive(Debug)]
486pub struct BeginBody {
487    final_lsn: u64,
488    timestamp: i64,
489    xid: u32,
490}
491
492impl BeginBody {
493    #[inline]
494    /// Gets the final lsn of the transaction
495    pub fn final_lsn(&self) -> Lsn {
496        self.final_lsn
497    }
498
499    #[inline]
500    /// Commit timestamp of the transaction. The value is in number of microseconds since PostgreSQL epoch (2000-01-01).
501    pub fn timestamp(&self) -> i64 {
502        self.timestamp
503    }
504
505    #[inline]
506    /// Xid of the transaction.
507    pub fn xid(&self) -> u32 {
508        self.xid
509    }
510}
511
512/// A logical decoding message
513#[derive(Debug)]
514pub struct MessageBody {
515    message_lsn: u64,
516    flags: i8,
517    prefix: Bytes,
518    content: Bytes,
519}
520
521impl MessageBody {
522    #[inline]
523    /// The LSN of the logical decoding message.
524    pub fn message_lsn(&self) -> Lsn {
525        self.message_lsn
526    }
527
528    #[inline]
529    /// Flags. Currently can be either 0 for no flags or 1 if the logical decoding message is transactional.
530    pub fn flags(&self) -> i8 {
531        self.flags
532    }
533
534    #[inline]
535    /// The prefix of the logical decoding message.
536    pub fn prefix(&self) -> io::Result<&str> {
537        get_str(&self.prefix)
538    }
539
540    #[inline]
541    /// The content of the logical decoding message.
542    pub fn content(&self) -> &Bytes {
543        &self.content
544    }
545}
546
547/// A COMMIT statement
548#[derive(Debug)]
549pub struct CommitBody {
550    flags: i8,
551    commit_lsn: u64,
552    end_lsn: u64,
553    timestamp: i64,
554}
555
556impl CommitBody {
557    #[inline]
558    /// The LSN of the commit.
559    pub fn commit_lsn(&self) -> Lsn {
560        self.commit_lsn
561    }
562
563    #[inline]
564    /// The end LSN of the transaction.
565    pub fn end_lsn(&self) -> Lsn {
566        self.end_lsn
567    }
568
569    #[inline]
570    /// Commit timestamp of the transaction. The value is in number of microseconds since PostgreSQL epoch (2000-01-01).
571    pub fn timestamp(&self) -> i64 {
572        self.timestamp
573    }
574
575    #[inline]
576    /// Flags; currently unused (will be 0).
577    pub fn flags(&self) -> i8 {
578        self.flags
579    }
580}
581
582/// An Origin replication message
583///
584/// Note that there can be multiple Origin messages inside a single transaction.
585#[derive(Debug)]
586pub struct OriginBody {
587    commit_lsn: u64,
588    name: Bytes,
589}
590
591impl OriginBody {
592    #[inline]
593    /// The LSN of the commit on the origin server.
594    pub fn commit_lsn(&self) -> Lsn {
595        self.commit_lsn
596    }
597
598    #[inline]
599    /// Name of the origin.
600    pub fn name(&self) -> io::Result<&str> {
601        get_str(&self.name)
602    }
603}
604
605/// Describes the REPLICA IDENTITY setting of a table
606#[derive(Debug)]
607pub enum ReplicaIdentity {
608    /// default selection for replica identity (primary key or nothing)
609    Default,
610    /// no replica identity is logged for this relation
611    Nothing,
612    /// all columns are logged as replica identity
613    Full,
614    /// An explicitly chosen candidate key's columns are used as replica identity.
615    /// Note this will still be set if the index has been dropped; in that case it
616    /// has the same meaning as 'd'.
617    Index,
618}
619
620/// A Relation replication message
621#[derive(Debug)]
622pub struct RelationBody {
623    rel_id: u32,
624    namespace: Bytes,
625    name: Bytes,
626    replica_identity: ReplicaIdentity,
627    columns: Vec<Column>,
628}
629
630impl RelationBody {
631    #[inline]
632    /// ID of the relation.
633    pub fn rel_id(&self) -> u32 {
634        self.rel_id
635    }
636
637    #[inline]
638    /// Namespace (empty string for pg_catalog).
639    pub fn namespace(&self) -> io::Result<&str> {
640        get_str(&self.namespace)
641    }
642
643    #[inline]
644    /// Relation name.
645    pub fn name(&self) -> io::Result<&str> {
646        get_str(&self.name)
647    }
648
649    #[inline]
650    /// Replica identity setting for the relation
651    pub fn replica_identity(&self) -> &ReplicaIdentity {
652        &self.replica_identity
653    }
654
655    #[inline]
656    /// The column definitions of this relation
657    pub fn columns(&self) -> &[Column] {
658        &self.columns
659    }
660}
661
662/// A Type replication message
663#[derive(Debug)]
664pub struct TypeBody {
665    id: u32,
666    namespace: Bytes,
667    name: Bytes,
668}
669
670impl TypeBody {
671    #[inline]
672    /// ID of the data type.
673    pub fn id(&self) -> Oid {
674        self.id
675    }
676
677    #[inline]
678    /// Namespace (empty string for pg_catalog).
679    pub fn namespace(&self) -> io::Result<&str> {
680        get_str(&self.namespace)
681    }
682
683    #[inline]
684    /// Name of the data type.
685    pub fn name(&self) -> io::Result<&str> {
686        get_str(&self.name)
687    }
688}
689
690/// An INSERT statement
691#[derive(Debug)]
692pub struct InsertBody {
693    rel_id: u32,
694    tuple: Tuple,
695}
696
697impl InsertBody {
698    #[inline]
699    /// ID of the relation corresponding to the ID in the relation message.
700    pub fn rel_id(&self) -> u32 {
701        self.rel_id
702    }
703
704    #[inline]
705    /// The inserted tuple
706    pub fn tuple(&self) -> &Tuple {
707        &self.tuple
708    }
709}
710
711/// An UPDATE statement
712#[derive(Debug)]
713pub struct UpdateBody {
714    rel_id: u32,
715    old_tuple: Option<Tuple>,
716    key_tuple: Option<Tuple>,
717    new_tuple: Tuple,
718}
719
720impl UpdateBody {
721    #[inline]
722    /// ID of the relation corresponding to the ID in the relation message.
723    pub fn rel_id(&self) -> u32 {
724        self.rel_id
725    }
726
727    #[inline]
728    /// This field is optional and is only present if the update changed data in any of the
729    /// column(s) that are part of the REPLICA IDENTITY index.
730    pub fn key_tuple(&self) -> Option<&Tuple> {
731        self.key_tuple.as_ref()
732    }
733
734    #[inline]
735    /// This field is optional and is only present if table in which the update happened has
736    /// REPLICA IDENTITY set to FULL.
737    pub fn old_tuple(&self) -> Option<&Tuple> {
738        self.old_tuple.as_ref()
739    }
740
741    #[inline]
742    /// The new tuple
743    pub fn new_tuple(&self) -> &Tuple {
744        &self.new_tuple
745    }
746}
747
748/// A DELETE statement
749#[derive(Debug)]
750pub struct DeleteBody {
751    rel_id: u32,
752    old_tuple: Option<Tuple>,
753    key_tuple: Option<Tuple>,
754}
755
756impl DeleteBody {
757    #[inline]
758    /// ID of the relation corresponding to the ID in the relation message.
759    pub fn rel_id(&self) -> u32 {
760        self.rel_id
761    }
762
763    #[inline]
764    /// This field is present if the table in which the delete has happened uses an index as
765    /// REPLICA IDENTITY.
766    pub fn key_tuple(&self) -> Option<&Tuple> {
767        self.key_tuple.as_ref()
768    }
769
770    #[inline]
771    /// This field is present if the table in which the delete has happened has REPLICA IDENTITY
772    /// set to FULL.
773    pub fn old_tuple(&self) -> Option<&Tuple> {
774        self.old_tuple.as_ref()
775    }
776}
777
778/// A TRUNCATE statement
779#[derive(Debug)]
780pub struct TruncateBody {
781    options: i8,
782    rel_ids: Vec<u32>,
783}
784
785impl TruncateBody {
786    #[inline]
787    /// The IDs of the relations corresponding to the ID in the relation messages
788    pub fn rel_ids(&self) -> &[u32] {
789        &self.rel_ids
790    }
791
792    #[inline]
793    /// Option bits for TRUNCATE: 1 for CASCADE, 2 for RESTART IDENTITY
794    pub fn options(&self) -> i8 {
795        self.options
796    }
797}
798
799struct Buffer {
800    bytes: Bytes,
801    idx: usize,
802}
803
804impl Buffer {
805    #[inline]
806    fn slice(&self) -> &[u8] {
807        &self.bytes[self.idx..]
808    }
809
810    #[inline]
811    fn read_cstr(&mut self) -> io::Result<Bytes> {
812        match memchr(0, self.slice()) {
813            Some(pos) => {
814                let start = self.idx;
815                let end = start + pos;
816                let cstr = self.bytes.slice(start..end);
817                self.idx = end + 1;
818                Ok(cstr)
819            }
820            None => Err(io::Error::new(
821                io::ErrorKind::UnexpectedEof,
822                "unexpected EOF",
823            )),
824        }
825    }
826
827    #[inline]
828    fn read_all(&mut self) -> Bytes {
829        let buf = self.bytes.slice(self.idx..);
830        self.idx = self.bytes.len();
831        buf
832    }
833
834    #[inline]
835    fn read_buf(&mut self, len: usize) -> io::Result<Bytes> {
836        if self.idx + len <= self.bytes.len() {
837            let buf = self.bytes.slice(self.idx..(self.idx + len));
838            self.idx += len;
839            Ok(buf)
840        } else {
841            Err(io::Error::new(
842                io::ErrorKind::UnexpectedEof,
843                "unexpected EOF",
844            ))
845        }
846    }
847}
848
849impl Read for Buffer {
850    #[inline]
851    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
852        let len = {
853            let slice = self.slice();
854            let len = cmp::min(slice.len(), buf.len());
855            buf[..len].copy_from_slice(&slice[..len]);
856            len
857        };
858        self.idx += len;
859        Ok(len)
860    }
861}
862
863#[inline]
864fn get_str(buf: &[u8]) -> io::Result<&str> {
865    str::from_utf8(buf).map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, e))
866}