protobuf_native/
io.rs

1// Copyright Materialize, Inc. All rights reserved.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License in the LICENSE file at the
6// root of this repository, or online at
7//
8//     http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16//! Auxiliary classes used for I/O.
17//!
18//! The Protocol Buffer library uses the classes in this package to deal with
19//! I/O and encoding/decoding raw bytes. Most users will not need to deal with
20//! this package. However, users who want to adapt the system to work with their
21//! own I/O abstractions – e.g., to allow Protocol Buffers to be read from a
22//! different kind of input stream without the need for a temporary buffer –
23//! should take a closer look.
24//!
25//! # Zero-copy streams
26//!
27//! The [`ZeroCopyInputStream`] and [`ZeroCopyOutputStream`] interfaces
28//! represent abstract I/O streams to and from which protocol buffers can be
29//! read and written.
30//!
31//! These interfaces are different from classic I/O streams in that they try to
32//! minimize the amount of data copying that needs to be done. To accomplish
33//! this, responsibility for allocating buffers is moved to the stream object,
34//! rather than being the responsibility of the caller. So, the stream can
35//! return a buffer which actually points directly into the final data structure
36//! where the bytes are to be stored, and the caller can interact directly with
37//! that buffer, eliminating an intermediate copy operation.
38//!
39//! As an example, consider the common case in which you are reading bytes from
40//! an array that is already in memory (or perhaps an `mmap`ed file).
41//!
42//! With classic I/O streams, you would do something like:
43//!
44//! ```
45//! # use std::io::Read;
46//! # use protobuf_native::io::ZeroCopyInputStream;
47//! # const BUFFER_SIZE: usize = 1024;
48//! # fn f(input: &mut dyn Read) {
49//! let mut buffer = [0; BUFFER_SIZE];
50//! input.read(&mut buffer);
51//! // Do something with `buffer`.
52//! # }
53//! ```
54//!
55//! Then the stream basically just calls `memcpy` to copy the data from the
56//! array into your buffer. With a `ZeroCopyInputStream`, you would do this
57//! instead:
58//!
59//! ```
60//! # use std::pin::Pin;
61//! # use protobuf_native::io::ZeroCopyInputStream;
62//! # fn f(input: Pin<&mut dyn ZeroCopyInputStream>) {
63//! let buffer = input.next();
64//! // Do something with `buffer`.
65//! # }
66//! ```
67//! Here, no copy is performed. The input stream returns a slice directly into
68//! the backing array, and the caller ends up reading directly from it.
69//
70//! If you want to be able to read the old-fashioned way, you can create a
71//! [`CodedInputStream`] or [`CodedOutputStream`] wrapping these objects and use
72//! their [`Read`]/[`Write`] implementations. These will, of course, add a copy
73//! step, but the coded streams will handle buffering so at least it will be
74//! reasonably efficient.
75//
76//! # Coded streams
77//!
78//! The [`CodedInputStream`] and [`CodedOutputStream`] classes, which wrap a
79//! [`ZeroCopyInputStream`] or [`ZeroCopyOutputStream`], respectively, and allow
80//! you to read or write individual pieces of data in various formats. In
81//! particular, these implement the varint encoding for integers, a simple
82//! variable-length encoding in which smaller numbers take fewer bytes.
83//!
84//! Typically these classes will only be used internally by the protocol buffer
85//! library in order to encode and decode protocol buffers. Clients of the
86//! library only need to know about this class if they wish to write custom
87//! message parsing or serialization procedures.
88//!
89//! For those who are interested, varint encoding is defined as follows:
90//!
91//! The encoding operates on unsigned integers of up to 64 bits in length. Each
92//! byte of the encoded value has the format:
93//!
94//! * bits 0-6: Seven bits of the number being encoded.
95//!
96//! * bit 7: Zero if this is the last byte in the encoding (in which case all
97//!   remaining bits of the number are zero) or 1 if more bytes follow. The
98//!   first byte contains the least-significant 7 bits of the number, the second
99//!   byte (if present) contains the next-least-significant 7 bits, and so on.
100//!   So, the binary number 1011000101011 would be encoded in two bytes as
101//!   "10101011 00101100".
102//!
103//! In theory, varint could be used to encode integers of any length. However,
104//! for practicality we set a limit at 64 bits. The maximum encoded length of a
105//! number is thus 10 bytes.
106
107use std::io::{self, Read, Write};
108use std::marker::{PhantomData, PhantomPinned};
109use std::mem::{self, MaybeUninit};
110use std::pin::Pin;
111use std::slice;
112
113use crate::internal::{unsafe_ffi_conversions, BoolExt, CInt, CVoid, ReadAdaptor, WriteAdaptor};
114use crate::OperationFailedError;
115
116#[cxx::bridge(namespace = "protobuf_native::io")]
117pub(crate) mod ffi {
118    extern "Rust" {
119        type ReadAdaptor<'a>;
120        fn read(self: &mut ReadAdaptor<'_>, buf: &mut [u8]) -> isize;
121
122        type WriteAdaptor<'a>;
123        fn write(self: &mut WriteAdaptor<'_>, buf: &[u8]) -> bool;
124    }
125    unsafe extern "C++" {
126        include!("protobuf-native/src/internal.h");
127        include!("protobuf-native/src/io.h");
128
129        #[namespace = "protobuf_native::internal"]
130        type CVoid = crate::internal::CVoid;
131
132        #[namespace = "protobuf_native::internal"]
133        type CInt = crate::internal::CInt;
134
135        #[namespace = "google::protobuf::io"]
136        type ZeroCopyInputStream;
137        unsafe fn DeleteZeroCopyInputStream(stream: *mut ZeroCopyInputStream);
138        unsafe fn Next(
139            self: Pin<&mut ZeroCopyInputStream>,
140            data: *mut *const CVoid,
141            size: *mut CInt,
142        ) -> bool;
143        fn BackUp(self: Pin<&mut ZeroCopyInputStream>, count: CInt);
144        fn Skip(self: Pin<&mut ZeroCopyInputStream>, count: CInt) -> bool;
145        fn ByteCount(self: &ZeroCopyInputStream) -> i64;
146
147        type ReaderStream;
148        fn NewReaderStream(adaptor: Box<ReadAdaptor<'_>>) -> *mut ReaderStream;
149        unsafe fn DeleteReaderStream(stream: *mut ReaderStream);
150
151        #[namespace = "google::protobuf::io"]
152        type ArrayInputStream;
153        unsafe fn NewArrayInputStream(data: *const u8, size: CInt) -> *mut ArrayInputStream;
154        unsafe fn DeleteArrayInputStream(stream: *mut ArrayInputStream);
155
156        #[namespace = "google::protobuf::io"]
157        type ZeroCopyOutputStream;
158        unsafe fn Next(
159            self: Pin<&mut ZeroCopyOutputStream>,
160            data: *mut *mut CVoid,
161            size: *mut CInt,
162        ) -> bool;
163        fn BackUp(self: Pin<&mut ZeroCopyOutputStream>, count: CInt);
164        fn ByteCount(self: &ZeroCopyOutputStream) -> i64;
165
166        type WriterStream;
167        fn NewWriterStream(adaptor: Box<WriteAdaptor<'_>>) -> *mut WriterStream;
168        unsafe fn DeleteWriterStream(stream: *mut WriterStream);
169
170        #[namespace = "google::protobuf::io"]
171        type ArrayOutputStream;
172        unsafe fn NewArrayOutputStream(data: *mut u8, size: CInt) -> *mut ArrayOutputStream;
173        unsafe fn DeleteArrayOutputStream(stream: *mut ArrayOutputStream);
174
175        type VecOutputStream;
176        fn NewVecOutputStream(target: &mut Vec<u8>) -> *mut VecOutputStream;
177        unsafe fn DeleteVecOutputStream(stream: *mut VecOutputStream);
178
179        #[namespace = "google::protobuf::io"]
180        type CodedInputStream;
181        unsafe fn NewCodedInputStream(ptr: *mut ZeroCopyInputStream) -> *mut CodedInputStream;
182        unsafe fn DeleteCodedInputStream(stream: *mut CodedInputStream);
183        fn IsFlat(self: &CodedInputStream) -> bool;
184        unsafe fn ReadRaw(self: Pin<&mut CodedInputStream>, buffer: *mut CVoid, size: CInt)
185            -> bool;
186        unsafe fn ReadVarint32(self: Pin<&mut CodedInputStream>, value: *mut u32) -> bool;
187        unsafe fn ReadVarint64(self: Pin<&mut CodedInputStream>, value: *mut u64) -> bool;
188        fn ReadTag(self: Pin<&mut CodedInputStream>) -> u32;
189        fn ReadTagNoLastTag(self: Pin<&mut CodedInputStream>) -> u32;
190        fn LastTagWas(self: Pin<&mut CodedInputStream>, expected: u32) -> bool;
191        fn ConsumedEntireMessage(self: Pin<&mut CodedInputStream>) -> bool;
192        fn CurrentPosition(self: &CodedInputStream) -> CInt;
193
194        #[namespace = "google::protobuf::io"]
195        type CodedOutputStream;
196        unsafe fn DeleteCodedOutputStream(stream: *mut CodedOutputStream);
197    }
198
199    impl UniquePtr<ZeroCopyOutputStream> {}
200    impl UniquePtr<CodedOutputStream> {}
201}
202
203/// Abstract interface similar to an input stream but designed to minimize
204/// copying.
205///
206/// # Examples
207///
208/// Read in a file and print its contents to stdout:
209///
210/// ```no_run
211/// use std::fs::File;
212/// use std::io::{self, Write};
213/// use protobuf_native::io::{ReaderStream, ZeroCopyInputStream};
214///
215/// let mut f = File::open("myfile")?;
216/// let mut input = ReaderStream::new(&mut f);
217/// while let Ok(buf) = input.as_mut().next() {
218///     io::stdout().write_all(buf)?;
219/// }
220/// # Ok::<_, io::Error>(())
221/// ```
222pub trait ZeroCopyInputStream: zero_copy_input_stream::Sealed {
223    /// Obtains a chunk of data from the stream.
224    ///
225    /// If the function returns an error, either there is no more data to return
226    /// or an I/O error occurred. All errors are permanent.
227    ///
228    /// It is legal for the returned buffer to have zero size, as long as
229    /// repeatedly calling `next` eventually yields a buffer with non-zero size.
230    fn next(self: Pin<&mut Self>) -> Result<&[u8], OperationFailedError> {
231        let mut data = MaybeUninit::uninit();
232        let mut size = MaybeUninit::uninit();
233        unsafe {
234            // SAFETY: `data` and `size` are non-null, as required.
235            self.upcast_mut()
236                .Next(data.as_mut_ptr(), size.as_mut_ptr())
237                .as_result()?;
238            // SAFETY: `Next` has succeeded and so has promised to provide us
239            // with a valid buffer.
240            let data = data.assume_init() as *const u8;
241            let size = size.assume_init().to_usize()?;
242            Ok(slice::from_raw_parts(data, size))
243        }
244    }
245
246    /// Backs up a number of bytes, so that the next call to [`next`] returns
247    /// data again that was already returned by the last call to `next`.
248    ///
249    /// This is useful when writing procedures that are only supposed to read up
250    /// to a certain point in the input, then return. If `next` returns a buffer
251    /// that goes beyond what you wanted to read, you can use `back_up` to
252    /// return to the point where you intended to finish.
253    ///
254    /// The last method called must have been `next`. The `count` parameter
255    /// must be less than or equal to the size of the last buffer returned
256    /// by `next`.
257    ///
258    /// [`next`]: ZeroCopyInputStream::next
259    fn back_up(self: Pin<&mut Self>, count: usize) {
260        // `count` is required to be less than the size of the buffer returned
261        // by the last call to `next`. Since `count` originated as a C int, if
262        // it's valid it must be representible as a C int. No point doing
263        // something more graceful than panicking since `BackUp` will often
264        // crash the process on too-large input.
265        let count = CInt::try_from(count).expect("count did not fit in a C int");
266        self.upcast_mut().BackUp(count)
267    }
268
269    /// Skips `count` bytes.
270    ///
271    /// Returns an error if the end of stream is reached or an I/O error
272    /// occurred. In the end-of-stream case, the stream is advanced to its end,
273    /// so [`byte_count`] will return the total size of the stream.
274    ///
275    /// [`byte_count`]: ZeroCopyInputStream::byte_count
276    fn skip(self: Pin<&mut Self>, count: usize) -> Result<(), OperationFailedError> {
277        let count = CInt::try_from(count).map_err(|_| OperationFailedError)?;
278        self.upcast_mut().Skip(count).as_result()
279    }
280
281    /// Returns the total number of bytes read since this stream was created.
282    fn byte_count(&self) -> i64 {
283        self.upcast().ByteCount()
284    }
285}
286
287mod zero_copy_input_stream {
288    use std::pin::Pin;
289
290    use crate::io::ffi;
291
292    pub trait Sealed {
293        fn upcast(&self) -> &ffi::ZeroCopyInputStream;
294        fn upcast_mut(self: Pin<&mut Self>) -> Pin<&mut ffi::ZeroCopyInputStream>;
295        unsafe fn upcast_mut_ptr(self: Pin<&mut Self>) -> *mut ffi::ZeroCopyInputStream {
296            self.upcast_mut().get_unchecked_mut() as *mut _
297        }
298    }
299}
300
301/// Converts an [`Read`] implementor to a [`ZeroCopyInputStream`].
302pub struct ReaderStream<'a> {
303    _opaque: PhantomPinned,
304    _lifetime: PhantomData<&'a ()>,
305}
306
307impl<'a> Drop for ReaderStream<'a> {
308    fn drop(&mut self) {
309        unsafe { ffi::DeleteReaderStream(self.as_ffi_mut_ptr_unpinned()) }
310    }
311}
312
313impl<'a> ReaderStream<'a> {
314    /// Creates a reader stream from the specified [`Read`] implementor.
315    pub fn new(reader: &'a mut dyn Read) -> Pin<Box<ReaderStream<'a>>> {
316        let stream = ffi::NewReaderStream(Box::new(ReadAdaptor(reader)));
317        unsafe { Self::from_ffi_owned(stream) }
318    }
319
320    unsafe_ffi_conversions!(ffi::ReaderStream);
321}
322
323impl<'a> ZeroCopyInputStream for ReaderStream<'a> {}
324
325impl<'a> zero_copy_input_stream::Sealed for ReaderStream<'a> {
326    fn upcast(&self) -> &ffi::ZeroCopyInputStream {
327        unsafe { mem::transmute(self) }
328    }
329
330    fn upcast_mut(self: Pin<&mut Self>) -> Pin<&mut ffi::ZeroCopyInputStream> {
331        unsafe { mem::transmute(self) }
332    }
333}
334
335/// A [`ZeroCopyInputStream`] specialized for reading from byte slices.
336///
337/// Using this type is more efficient than using a [`ReaderStream`] when the
338/// underlying reader is a type that exposes a simple byte slice.
339pub struct SliceInputStream<'a> {
340    _opaque: PhantomPinned,
341    _lifetime: PhantomData<&'a ()>,
342}
343
344impl<'a> Drop for SliceInputStream<'a> {
345    fn drop(&mut self) {
346        unsafe { ffi::DeleteArrayInputStream(self.as_ffi_mut_ptr_unpinned()) }
347    }
348}
349
350impl<'a> SliceInputStream<'a> {
351    /// Creates a new `SliceInputStream` from the provided byte slice.
352    pub fn new(slice: &[u8]) -> Pin<Box<SliceInputStream<'a>>> {
353        let size = CInt::expect_from(slice.len());
354        let stream = unsafe { ffi::NewArrayInputStream(slice.as_ptr(), size) };
355        unsafe { Self::from_ffi_owned(stream) }
356    }
357
358    unsafe_ffi_conversions!(ffi::ArrayInputStream);
359}
360
361impl<'a> ZeroCopyInputStream for SliceInputStream<'a> {}
362
363impl<'a> zero_copy_input_stream::Sealed for SliceInputStream<'a> {
364    fn upcast(&self) -> &ffi::ZeroCopyInputStream {
365        unsafe { mem::transmute(self) }
366    }
367
368    fn upcast_mut(self: Pin<&mut Self>) -> Pin<&mut ffi::ZeroCopyInputStream> {
369        unsafe { mem::transmute(self) }
370    }
371}
372
373/// An arbitrary stream that implements [`ZeroCopyInputStream`].
374///
375/// This is like `Box<dyn ZeroCopyInputStream>` but it avoids additional virtual
376/// method calls on the Rust side of the FFI boundary.
377pub struct DynZeroCopyInputStream<'a> {
378    _opaque: PhantomPinned,
379    lifetime_: PhantomData<&'a ()>,
380}
381
382impl<'a> Drop for DynZeroCopyInputStream<'a> {
383    fn drop(&mut self) {
384        unsafe { ffi::DeleteZeroCopyInputStream(self.as_ffi_mut_ptr_unpinned()) }
385    }
386}
387
388impl<'a> DynZeroCopyInputStream<'a> {
389    unsafe_ffi_conversions!(ffi::ZeroCopyInputStream);
390}
391
392impl ZeroCopyInputStream for DynZeroCopyInputStream<'_> {}
393
394impl zero_copy_input_stream::Sealed for DynZeroCopyInputStream<'_> {
395    fn upcast(&self) -> &ffi::ZeroCopyInputStream {
396        unsafe { mem::transmute(self) }
397    }
398
399    fn upcast_mut(self: Pin<&mut Self>) -> Pin<&mut ffi::ZeroCopyInputStream> {
400        unsafe { mem::transmute(self) }
401    }
402}
403
404/// Abstract interface similar to an output stream but designed to minimize
405/// copying.
406///
407/// # Examples
408///
409/// Copy the contents of infile to outfile, using plain [`Read`] for infile
410/// but a `ZeroCopyOutputStream` for outfile:
411///
412/// ```ignore
413/// use std::fs::File;
414/// use std::io::{self, Read, Write};
415/// use protobuf_native::io::{WriterStream, ZeroCopyOutputStream};
416///
417/// let mut infile = File::open("infile")?;
418/// let mut outfile = File::create("outfile")?;
419/// let mut output = WriterStream::new(&mut outfile);
420///
421/// while let Ok(buf) = output.next() {
422///     // Reading into uninitialized memory requires the unstable `ReadBuf` API.
423///     // See: https://rust-lang.github.io/rfcs/2930-read-buf.html
424///     let buf = ReadBuf::uninit(buf);
425///     infile.read_buf(buf)?;
426///     output.back_up(buf.remaining());
427///     if buf.filled().is_empty() {
428///         break;
429///     }
430/// }
431///
432/// # Ok::<_, io::Error>(())
433/// ```
434pub trait ZeroCopyOutputStream: zero_copy_output_stream::Sealed {
435    /// Obtains a buffer into which data can be written.
436    ///
437    /// Any data written into this buffer will eventually (maybe instantly,
438    /// maybe later on) be written to the output.
439    ///
440    /// # Safety
441    ///
442    /// If this function returns `Ok`, you **must** initialize the returned byte
443    /// slice before you either call `next` again or drop the slice. You can
444    /// choose to initialize only a portion of the byte slice by calling
445    /// [`back_up`].
446    ///
447    /// This is a very unusual invariant to maintain in Rust.
448    ///
449    /// [`back_up`]: ZeroCopyOutputStream::back_up
450    unsafe fn next(self: Pin<&mut Self>) -> Result<&mut [MaybeUninit<u8>], OperationFailedError> {
451        let mut data = MaybeUninit::uninit();
452        let mut size = MaybeUninit::uninit();
453        self.upcast_mut()
454            .Next(data.as_mut_ptr(), size.as_mut_ptr())
455            .as_result()?;
456        let data = data.assume_init() as *mut MaybeUninit<u8>;
457        let size = size.assume_init().to_usize()?;
458        Ok(slice::from_raw_parts_mut(data, size))
459    }
460
461    /// Backs up a number of bytes, so that the end of the last buffer returned
462    /// by [`next`] is not actually written.
463    ///
464    /// This is needed when you finish writing all the data you want to write,
465    /// but the last buffer was bigger than you needed. You don't want to write
466    /// a bunch of garbage after the end of your data, so you use `back_up` to
467    /// back up.
468    ///
469    /// [`next`]: ZeroCopyOutputStream::next
470    fn back_up(self: Pin<&mut Self>, count: usize) {
471        // See comment in `ZeroCopyInputStream::back_up` for why we tolerate
472        // panics here.
473        let count = CInt::try_from(count).expect("count did not fit in a C int");
474        self.upcast_mut().BackUp(count)
475    }
476
477    /// Returns the total number of bytes written since this object was created.
478    fn byte_count(&self) -> i64 {
479        self.upcast().ByteCount()
480    }
481}
482
483mod zero_copy_output_stream {
484    use std::pin::Pin;
485
486    use crate::io::ffi;
487
488    pub trait Sealed {
489        fn upcast(&self) -> &ffi::ZeroCopyOutputStream;
490        fn upcast_mut(self: Pin<&mut Self>) -> Pin<&mut ffi::ZeroCopyOutputStream>;
491        unsafe fn upcast_mut_ptr(self: Pin<&mut Self>) -> *mut ffi::ZeroCopyOutputStream {
492            self.upcast_mut().get_unchecked_mut() as *mut _
493        }
494    }
495}
496
497/// Converts an [`Write`] implementor to a [`ZeroCopyOutputStream`].
498pub struct WriterStream<'a> {
499    _opaque: PhantomPinned,
500    _lifetime: PhantomData<&'a mut ()>,
501}
502
503impl<'a> WriterStream<'a> {
504    /// Creates a writer stream from the specified [`Write`] implementor.
505    pub fn new(writer: &'a mut dyn Write) -> Pin<Box<WriterStream<'a>>> {
506        let stream = ffi::NewWriterStream(Box::new(WriteAdaptor(writer)));
507        unsafe { Self::from_ffi_owned(stream) }
508    }
509
510    unsafe_ffi_conversions!(ffi::WriterStream);
511}
512
513impl<'a> Drop for WriterStream<'a> {
514    fn drop(&mut self) {
515        unsafe { ffi::DeleteWriterStream(self.as_ffi_mut_ptr_unpinned()) }
516    }
517}
518
519impl<'a> ZeroCopyOutputStream for WriterStream<'a> {}
520
521impl<'a> zero_copy_output_stream::Sealed for WriterStream<'a> {
522    fn upcast(&self) -> &ffi::ZeroCopyOutputStream {
523        unsafe { mem::transmute(self) }
524    }
525
526    fn upcast_mut(self: Pin<&mut Self>) -> Pin<&mut ffi::ZeroCopyOutputStream> {
527        unsafe { mem::transmute(self) }
528    }
529}
530
531/// A [`ZeroCopyOutputStream`] specialized for writing to byte slices.
532///
533/// Using this type is more efficient than using a [`WriterStream`] when the
534/// underlying writer is a type that exposes a simple mutable byte slice.
535pub struct SliceOutputStream<'a> {
536    _opaque: PhantomPinned,
537    _lifetime: PhantomData<&'a ()>,
538}
539
540impl<'a> SliceOutputStream<'a> {
541    /// Creates a new `SliceOutputStream` from the provided byte slice.
542    pub fn new(slice: &mut [u8]) -> Pin<Box<SliceOutputStream<'a>>> {
543        let size = CInt::expect_from(slice.len());
544        let stream = unsafe { ffi::NewArrayOutputStream(slice.as_mut_ptr(), size) };
545        unsafe { Self::from_ffi_owned(stream) }
546    }
547
548    unsafe_ffi_conversions!(ffi::ArrayOutputStream);
549}
550
551impl<'a> Drop for SliceOutputStream<'a> {
552    fn drop(&mut self) {
553        unsafe { ffi::DeleteArrayOutputStream(self.as_ffi_mut_ptr_unpinned()) }
554    }
555}
556
557impl<'a> ZeroCopyOutputStream for SliceOutputStream<'a> {}
558
559impl<'a> zero_copy_output_stream::Sealed for SliceOutputStream<'a> {
560    fn upcast(&self) -> &ffi::ZeroCopyOutputStream {
561        unsafe { mem::transmute(self) }
562    }
563
564    fn upcast_mut(self: Pin<&mut Self>) -> Pin<&mut ffi::ZeroCopyOutputStream> {
565        unsafe { mem::transmute(self) }
566    }
567}
568
569/// A [`ZeroCopyOutputStream`] specialized for writing to byte vectors.
570///
571/// Using this type is more efficient than using a [`WriterStream`] when the
572/// underlying writer is a byte vector.
573pub struct VecOutputStream<'a> {
574    _opaque: PhantomPinned,
575    _lifetime: PhantomData<&'a ()>,
576}
577
578impl<'a> VecOutputStream<'a> {
579    /// Creates a new `VecOutputStream` from the provided byte vector.
580    pub fn new(vec: &mut Vec<u8>) -> Pin<Box<VecOutputStream<'a>>> {
581        let stream = ffi::NewVecOutputStream(vec);
582        unsafe { Self::from_ffi_owned(stream) }
583    }
584
585    unsafe_ffi_conversions!(ffi::VecOutputStream);
586}
587
588impl<'a> Drop for VecOutputStream<'a> {
589    fn drop(&mut self) {
590        unsafe { ffi::DeleteVecOutputStream(self.as_ffi_mut_ptr_unpinned()) }
591    }
592}
593
594impl<'a> ZeroCopyOutputStream for VecOutputStream<'a> {}
595
596impl<'a> zero_copy_output_stream::Sealed for VecOutputStream<'a> {
597    fn upcast(&self) -> &ffi::ZeroCopyOutputStream {
598        unsafe { mem::transmute(self) }
599    }
600
601    fn upcast_mut(self: Pin<&mut Self>) -> Pin<&mut ffi::ZeroCopyOutputStream> {
602        unsafe { mem::transmute(self) }
603    }
604}
605
606/// Type which reads and decodes binary data which is composed of varint-
607/// encoded integers and fixed-width pieces.
608///
609/// Wraps a [`ZeroCopyInputStream`]. Most users will not need to deal with
610/// `CodedInputStream`.
611///
612/// Most methods of `CodedInputStream` that return a `Result` return an error if
613/// an underlying I/O error occurs or if the data is malformed. Once such a
614/// failure occurs, the `CodedInputStream` is broken and is no longer useful.
615pub struct CodedInputStream<'a> {
616    _opaque: PhantomPinned,
617    _lifetime: PhantomData<&'a ()>,
618}
619
620impl<'a> Drop for CodedInputStream<'a> {
621    fn drop(&mut self) {
622        unsafe { ffi::DeleteCodedInputStream(self.as_ffi_mut_ptr_unpinned()) }
623    }
624}
625
626impl<'a> CodedInputStream<'a> {
627    /// Creates a `CodedInputStream` that reads from the given
628    /// [`ZeroCopyInputStream`].
629    pub fn new(input: Pin<&'a mut dyn ZeroCopyInputStream>) -> Pin<Box<CodedInputStream<'a>>> {
630        let stream = unsafe { ffi::NewCodedInputStream(input.upcast_mut_ptr()) };
631        unsafe { Self::from_ffi_owned(stream) }
632    }
633
634    /// Reports whether this coded input stream reads from a flat array instead
635    /// of a [`ZeroCopyInputStream`].
636    pub fn is_flat(&self) -> bool {
637        self.as_ffi().IsFlat()
638    }
639
640    /// Reads an unsigned integer with varint encoding, truncating to 32 bits.
641    ///
642    /// Reading a 32-bit value is equivalent to reading a 64-bit one and casting
643    /// it to `u32`, but may be more efficient.
644    pub fn read_varint32(self: Pin<&mut Self>) -> Result<u32, OperationFailedError> {
645        let mut value = MaybeUninit::uninit();
646        // SAFETY: `ReadVarint32` promises to initialize `value` if it returns
647        // true.
648        unsafe {
649            match self.as_ffi_mut().ReadVarint32(value.as_mut_ptr()) {
650                true => Ok(value.assume_init()),
651                false => Err(OperationFailedError),
652            }
653        }
654    }
655
656    /// Reads an unsigned 64-bit integer with varint encoding.
657    pub fn read_varint64(self: Pin<&mut Self>) -> Result<u64, OperationFailedError> {
658        let mut value = MaybeUninit::uninit();
659        // SAFETY: `ReadVarint32` promises to initialize `value` if it returns
660        // true.
661        unsafe {
662            match self.as_ffi_mut().ReadVarint64(value.as_mut_ptr()) {
663                true => Ok(value.assume_init()),
664                false => Err(OperationFailedError),
665            }
666        }
667    }
668
669    /// Reads a tag.
670    ///
671    /// This calls [`read_varint32`] and returns the result. Also updates the
672    /// last tag value, which can be checked with [`last_tag_was`].
673    ///
674    /// [`read_varint32`]: CodedInputStream::read_varint32
675    /// [`last_tag_was`]: CodedInputStream::last_tag_was
676    pub fn read_tag(self: Pin<&mut Self>) -> Result<u32, OperationFailedError> {
677        match self.as_ffi_mut().ReadTag() {
678            0 => Err(OperationFailedError), // 0 is error sentinel
679            tag => Ok(tag),
680        }
681    }
682
683    /// Like [`read_tag`], but does not update the last tag
684    /// value.
685    ///
686    /// [`read_tag`]: `CodedInputStream::read_tag`
687    pub fn read_tag_no_last_tag(self: Pin<&mut Self>) -> Result<u32, OperationFailedError> {
688        match self.as_ffi_mut().ReadTag() {
689            0 => Err(OperationFailedError), // 0 is error sentinel
690            tag => Ok(tag),
691        }
692    }
693
694    /// Reports whether the last call to [`read_tag`] or
695    /// [`read_tag_with_cutoff`] returned the given value.
696    ///
697    /// [`read_tag_no_last_tag`] and [`read_tag_with_cutoff_no_last_tag`] do not
698    /// preserve the last returned value.
699    ///
700    /// This is needed because parsers for some types of embedded messages (with
701    /// field type `TYPE_GROUP`) don't actually know that they've reached the
702    /// end of a message until they see an `ENDGROUP` tag, which was actually
703    /// part of the enclosing message. The enclosing message would like to check
704    /// that tag to make sure it had the right number, so it calls
705    /// `last_tag_was` on return from the embedded parser to check.
706    ///
707    /// [`read_tag`]: CodedInputStream::read_tag
708    /// [`read_tag_with_cutoff`]: CodedInputStream::read_tag_with_cutoff
709    /// [`read_tag_no_last_tag`]: CodedInputStream::read_tag_no_last_tag
710    /// [`read_tag_with_cutoff_no_last_tag`]: CodedInputStream::read_tag_with_cutoff_no_last_tag
711    pub fn last_tag_was(self: Pin<&mut Self>, expected: u32) -> bool {
712        self.as_ffi_mut().LastTagWas(expected)
713    }
714
715    /// When parsing a message (but NOT a group), this method must be called
716    /// immediately after [`MessageLite::merge_from_coded_stream`] returns (if
717    /// it returns true) to further verify that the message ended in a
718    /// legitimate way.
719    ///
720    /// For example, this verifies that parsing did not end on an end-group tag.
721    /// It also checks for some cases where, due to optimizations,
722    /// `merge_from_coded_stream` can incorrectly return true.
723    ///
724    /// [`MessageLite::merge_from_coded_stream`]: crate::MessageLite::merge_from_coded_stream
725    pub fn consumed_entire_message(self: Pin<&mut Self>) -> bool {
726        self.as_ffi_mut().ConsumedEntireMessage()
727    }
728
729    /// Returns the stream's current position relative to the beginning of the
730    /// input.
731    pub fn current_position(&self) -> usize {
732        self.as_ffi()
733            .CurrentPosition()
734            .to_usize()
735            .expect("stream position not representable as usize")
736    }
737
738    unsafe_ffi_conversions!(ffi::CodedInputStream);
739}
740
741impl<'a> Read for Pin<&mut CodedInputStream<'a>> {
742    fn read(&mut self, buf: &mut [u8]) -> Result<usize, io::Error> {
743        let start = self.current_position();
744        let data = buf.as_mut_ptr() as *mut CVoid;
745        let size = CInt::try_from(buf.len()).map_err(|_| {
746            io::Error::new(
747                io::ErrorKind::InvalidInput,
748                "buffer exceeds size of a C int",
749            )
750        })?;
751        unsafe { self.as_mut().as_ffi_mut().ReadRaw(data, size) };
752        let end = self.current_position();
753        Ok(end - start)
754    }
755}
756
757/// Type which encodes and writes binary data which is composed of varint-
758/// encoded integers and fixed-width pieces.
759///
760/// Wraps a [`ZeroCopyOutputStream`]. Most users will not need to deal with
761/// `CodedOutputStream`.
762///
763/// Most methods of `CodedOutputStream` which return a `bool` return false if an
764/// underlying I/O error occurs. Once such a failure occurs, the
765/// CodedOutputStream is broken and is no longer useful. The `write_*` methods
766/// do not return the stream status, but will invalidate the stream if an error
767/// occurs. The client can probe `had_error` to determine the status.
768pub struct CodedOutputStream<'a> {
769    _opaque: PhantomPinned,
770    _lifetime: PhantomData<&'a ()>,
771}
772
773impl<'a> CodedOutputStream<'a> {
774    unsafe_ffi_conversions!(ffi::CodedOutputStream);
775}
776
777impl<'a> Drop for CodedOutputStream<'a> {
778    fn drop(&mut self) {
779        unsafe { ffi::DeleteCodedOutputStream(self.as_ffi_mut_ptr_unpinned()) }
780    }
781}