protobuf_native/io.rs
1// Copyright Materialize, Inc. All rights reserved.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License in the LICENSE file at the
6// root of this repository, or online at
7//
8// http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16//! Auxiliary classes used for I/O.
17//!
18//! The Protocol Buffer library uses the classes in this package to deal with
19//! I/O and encoding/decoding raw bytes. Most users will not need to deal with
20//! this package. However, users who want to adapt the system to work with their
21//! own I/O abstractions – e.g., to allow Protocol Buffers to be read from a
22//! different kind of input stream without the need for a temporary buffer –
23//! should take a closer look.
24//!
25//! # Zero-copy streams
26//!
27//! The [`ZeroCopyInputStream`] and [`ZeroCopyOutputStream`] interfaces
28//! represent abstract I/O streams to and from which protocol buffers can be
29//! read and written.
30//!
31//! These interfaces are different from classic I/O streams in that they try to
32//! minimize the amount of data copying that needs to be done. To accomplish
33//! this, responsibility for allocating buffers is moved to the stream object,
34//! rather than being the responsibility of the caller. So, the stream can
35//! return a buffer which actually points directly into the final data structure
36//! where the bytes are to be stored, and the caller can interact directly with
37//! that buffer, eliminating an intermediate copy operation.
38//!
39//! As an example, consider the common case in which you are reading bytes from
40//! an array that is already in memory (or perhaps an `mmap`ed file).
41//!
42//! With classic I/O streams, you would do something like:
43//!
44//! ```
45//! # use std::io::Read;
46//! # use protobuf_native::io::ZeroCopyInputStream;
47//! # const BUFFER_SIZE: usize = 1024;
48//! # fn f(input: &mut dyn Read) {
49//! let mut buffer = [0; BUFFER_SIZE];
50//! input.read(&mut buffer);
51//! // Do something with `buffer`.
52//! # }
53//! ```
54//!
55//! Then the stream basically just calls `memcpy` to copy the data from the
56//! array into your buffer. With a `ZeroCopyInputStream`, you would do this
57//! instead:
58//!
59//! ```
60//! # use std::pin::Pin;
61//! # use protobuf_native::io::ZeroCopyInputStream;
62//! # fn f(input: Pin<&mut dyn ZeroCopyInputStream>) {
63//! let buffer = input.next();
64//! // Do something with `buffer`.
65//! # }
66//! ```
67//! Here, no copy is performed. The input stream returns a slice directly into
68//! the backing array, and the caller ends up reading directly from it.
69//
70//! If you want to be able to read the old-fashioned way, you can create a
71//! [`CodedInputStream`] or [`CodedOutputStream`] wrapping these objects and use
72//! their [`Read`]/[`Write`] implementations. These will, of course, add a copy
73//! step, but the coded streams will handle buffering so at least it will be
74//! reasonably efficient.
75//
76//! # Coded streams
77//!
78//! The [`CodedInputStream`] and [`CodedOutputStream`] classes, which wrap a
79//! [`ZeroCopyInputStream`] or [`ZeroCopyOutputStream`], respectively, and allow
80//! you to read or write individual pieces of data in various formats. In
81//! particular, these implement the varint encoding for integers, a simple
82//! variable-length encoding in which smaller numbers take fewer bytes.
83//!
84//! Typically these classes will only be used internally by the protocol buffer
85//! library in order to encode and decode protocol buffers. Clients of the
86//! library only need to know about this class if they wish to write custom
87//! message parsing or serialization procedures.
88//!
89//! For those who are interested, varint encoding is defined as follows:
90//!
91//! The encoding operates on unsigned integers of up to 64 bits in length. Each
92//! byte of the encoded value has the format:
93//!
94//! * bits 0-6: Seven bits of the number being encoded.
95//!
96//! * bit 7: Zero if this is the last byte in the encoding (in which case all
97//! remaining bits of the number are zero) or 1 if more bytes follow. The
98//! first byte contains the least-significant 7 bits of the number, the second
99//! byte (if present) contains the next-least-significant 7 bits, and so on.
100//! So, the binary number 1011000101011 would be encoded in two bytes as
101//! "10101011 00101100".
102//!
103//! In theory, varint could be used to encode integers of any length. However,
104//! for practicality we set a limit at 64 bits. The maximum encoded length of a
105//! number is thus 10 bytes.
106
107use std::io::{self, Read, Write};
108use std::marker::{PhantomData, PhantomPinned};
109use std::mem::{self, MaybeUninit};
110use std::pin::Pin;
111use std::slice;
112
113use crate::internal::{unsafe_ffi_conversions, BoolExt, CInt, CVoid, ReadAdaptor, WriteAdaptor};
114use crate::OperationFailedError;
115
116#[cxx::bridge(namespace = "protobuf_native::io")]
117pub(crate) mod ffi {
118 extern "Rust" {
119 type ReadAdaptor<'a>;
120 fn read(self: &mut ReadAdaptor<'_>, buf: &mut [u8]) -> isize;
121
122 type WriteAdaptor<'a>;
123 fn write(self: &mut WriteAdaptor<'_>, buf: &[u8]) -> bool;
124 }
125 unsafe extern "C++" {
126 include!("protobuf-native/src/internal.h");
127 include!("protobuf-native/src/io.h");
128
129 #[namespace = "protobuf_native::internal"]
130 type CVoid = crate::internal::CVoid;
131
132 #[namespace = "protobuf_native::internal"]
133 type CInt = crate::internal::CInt;
134
135 #[namespace = "google::protobuf::io"]
136 type ZeroCopyInputStream;
137 unsafe fn DeleteZeroCopyInputStream(stream: *mut ZeroCopyInputStream);
138 unsafe fn Next(
139 self: Pin<&mut ZeroCopyInputStream>,
140 data: *mut *const CVoid,
141 size: *mut CInt,
142 ) -> bool;
143 fn BackUp(self: Pin<&mut ZeroCopyInputStream>, count: CInt);
144 fn Skip(self: Pin<&mut ZeroCopyInputStream>, count: CInt) -> bool;
145 fn ByteCount(self: &ZeroCopyInputStream) -> i64;
146
147 type ReaderStream;
148 fn NewReaderStream(adaptor: Box<ReadAdaptor<'_>>) -> *mut ReaderStream;
149 unsafe fn DeleteReaderStream(stream: *mut ReaderStream);
150
151 #[namespace = "google::protobuf::io"]
152 type ArrayInputStream;
153 unsafe fn NewArrayInputStream(data: *const u8, size: CInt) -> *mut ArrayInputStream;
154 unsafe fn DeleteArrayInputStream(stream: *mut ArrayInputStream);
155
156 #[namespace = "google::protobuf::io"]
157 type ZeroCopyOutputStream;
158 unsafe fn Next(
159 self: Pin<&mut ZeroCopyOutputStream>,
160 data: *mut *mut CVoid,
161 size: *mut CInt,
162 ) -> bool;
163 fn BackUp(self: Pin<&mut ZeroCopyOutputStream>, count: CInt);
164 fn ByteCount(self: &ZeroCopyOutputStream) -> i64;
165
166 type WriterStream;
167 fn NewWriterStream(adaptor: Box<WriteAdaptor<'_>>) -> *mut WriterStream;
168 unsafe fn DeleteWriterStream(stream: *mut WriterStream);
169
170 #[namespace = "google::protobuf::io"]
171 type ArrayOutputStream;
172 unsafe fn NewArrayOutputStream(data: *mut u8, size: CInt) -> *mut ArrayOutputStream;
173 unsafe fn DeleteArrayOutputStream(stream: *mut ArrayOutputStream);
174
175 type VecOutputStream;
176 fn NewVecOutputStream(target: &mut Vec<u8>) -> *mut VecOutputStream;
177 unsafe fn DeleteVecOutputStream(stream: *mut VecOutputStream);
178
179 #[namespace = "google::protobuf::io"]
180 type CodedInputStream;
181 unsafe fn NewCodedInputStream(ptr: *mut ZeroCopyInputStream) -> *mut CodedInputStream;
182 unsafe fn DeleteCodedInputStream(stream: *mut CodedInputStream);
183 fn IsFlat(self: &CodedInputStream) -> bool;
184 unsafe fn ReadRaw(self: Pin<&mut CodedInputStream>, buffer: *mut CVoid, size: CInt)
185 -> bool;
186 unsafe fn ReadVarint32(self: Pin<&mut CodedInputStream>, value: *mut u32) -> bool;
187 unsafe fn ReadVarint64(self: Pin<&mut CodedInputStream>, value: *mut u64) -> bool;
188 fn ReadTag(self: Pin<&mut CodedInputStream>) -> u32;
189 fn ReadTagNoLastTag(self: Pin<&mut CodedInputStream>) -> u32;
190 fn LastTagWas(self: Pin<&mut CodedInputStream>, expected: u32) -> bool;
191 fn ConsumedEntireMessage(self: Pin<&mut CodedInputStream>) -> bool;
192 fn CurrentPosition(self: &CodedInputStream) -> CInt;
193
194 #[namespace = "google::protobuf::io"]
195 type CodedOutputStream;
196 unsafe fn DeleteCodedOutputStream(stream: *mut CodedOutputStream);
197 }
198
199 impl UniquePtr<ZeroCopyOutputStream> {}
200 impl UniquePtr<CodedOutputStream> {}
201}
202
203/// Abstract interface similar to an input stream but designed to minimize
204/// copying.
205///
206/// # Examples
207///
208/// Read in a file and print its contents to stdout:
209///
210/// ```no_run
211/// use std::fs::File;
212/// use std::io::{self, Write};
213/// use protobuf_native::io::{ReaderStream, ZeroCopyInputStream};
214///
215/// let mut f = File::open("myfile")?;
216/// let mut input = ReaderStream::new(&mut f);
217/// while let Ok(buf) = input.as_mut().next() {
218/// io::stdout().write_all(buf)?;
219/// }
220/// # Ok::<_, io::Error>(())
221/// ```
222pub trait ZeroCopyInputStream: zero_copy_input_stream::Sealed {
223 /// Obtains a chunk of data from the stream.
224 ///
225 /// If the function returns an error, either there is no more data to return
226 /// or an I/O error occurred. All errors are permanent.
227 ///
228 /// It is legal for the returned buffer to have zero size, as long as
229 /// repeatedly calling `next` eventually yields a buffer with non-zero size.
230 fn next(self: Pin<&mut Self>) -> Result<&[u8], OperationFailedError> {
231 let mut data = MaybeUninit::uninit();
232 let mut size = MaybeUninit::uninit();
233 unsafe {
234 // SAFETY: `data` and `size` are non-null, as required.
235 self.upcast_mut()
236 .Next(data.as_mut_ptr(), size.as_mut_ptr())
237 .as_result()?;
238 // SAFETY: `Next` has succeeded and so has promised to provide us
239 // with a valid buffer.
240 let data = data.assume_init() as *const u8;
241 let size = size.assume_init().to_usize()?;
242 Ok(slice::from_raw_parts(data, size))
243 }
244 }
245
246 /// Backs up a number of bytes, so that the next call to [`next`] returns
247 /// data again that was already returned by the last call to `next`.
248 ///
249 /// This is useful when writing procedures that are only supposed to read up
250 /// to a certain point in the input, then return. If `next` returns a buffer
251 /// that goes beyond what you wanted to read, you can use `back_up` to
252 /// return to the point where you intended to finish.
253 ///
254 /// The last method called must have been `next`. The `count` parameter
255 /// must be less than or equal to the size of the last buffer returned
256 /// by `next`.
257 ///
258 /// [`next`]: ZeroCopyInputStream::next
259 fn back_up(self: Pin<&mut Self>, count: usize) {
260 // `count` is required to be less than the size of the buffer returned
261 // by the last call to `next`. Since `count` originated as a C int, if
262 // it's valid it must be representible as a C int. No point doing
263 // something more graceful than panicking since `BackUp` will often
264 // crash the process on too-large input.
265 let count = CInt::try_from(count).expect("count did not fit in a C int");
266 self.upcast_mut().BackUp(count)
267 }
268
269 /// Skips `count` bytes.
270 ///
271 /// Returns an error if the end of stream is reached or an I/O error
272 /// occurred. In the end-of-stream case, the stream is advanced to its end,
273 /// so [`byte_count`] will return the total size of the stream.
274 ///
275 /// [`byte_count`]: ZeroCopyInputStream::byte_count
276 fn skip(self: Pin<&mut Self>, count: usize) -> Result<(), OperationFailedError> {
277 let count = CInt::try_from(count).map_err(|_| OperationFailedError)?;
278 self.upcast_mut().Skip(count).as_result()
279 }
280
281 /// Returns the total number of bytes read since this stream was created.
282 fn byte_count(&self) -> i64 {
283 self.upcast().ByteCount()
284 }
285}
286
287mod zero_copy_input_stream {
288 use std::pin::Pin;
289
290 use crate::io::ffi;
291
292 pub trait Sealed {
293 fn upcast(&self) -> &ffi::ZeroCopyInputStream;
294 fn upcast_mut(self: Pin<&mut Self>) -> Pin<&mut ffi::ZeroCopyInputStream>;
295 unsafe fn upcast_mut_ptr(self: Pin<&mut Self>) -> *mut ffi::ZeroCopyInputStream {
296 self.upcast_mut().get_unchecked_mut() as *mut _
297 }
298 }
299}
300
301/// Converts an [`Read`] implementor to a [`ZeroCopyInputStream`].
302pub struct ReaderStream<'a> {
303 _opaque: PhantomPinned,
304 _lifetime: PhantomData<&'a ()>,
305}
306
307impl<'a> Drop for ReaderStream<'a> {
308 fn drop(&mut self) {
309 unsafe { ffi::DeleteReaderStream(self.as_ffi_mut_ptr_unpinned()) }
310 }
311}
312
313impl<'a> ReaderStream<'a> {
314 /// Creates a reader stream from the specified [`Read`] implementor.
315 pub fn new(reader: &'a mut dyn Read) -> Pin<Box<ReaderStream<'a>>> {
316 let stream = ffi::NewReaderStream(Box::new(ReadAdaptor(reader)));
317 unsafe { Self::from_ffi_owned(stream) }
318 }
319
320 unsafe_ffi_conversions!(ffi::ReaderStream);
321}
322
323impl<'a> ZeroCopyInputStream for ReaderStream<'a> {}
324
325impl<'a> zero_copy_input_stream::Sealed for ReaderStream<'a> {
326 fn upcast(&self) -> &ffi::ZeroCopyInputStream {
327 unsafe { mem::transmute(self) }
328 }
329
330 fn upcast_mut(self: Pin<&mut Self>) -> Pin<&mut ffi::ZeroCopyInputStream> {
331 unsafe { mem::transmute(self) }
332 }
333}
334
335/// A [`ZeroCopyInputStream`] specialized for reading from byte slices.
336///
337/// Using this type is more efficient than using a [`ReaderStream`] when the
338/// underlying reader is a type that exposes a simple byte slice.
339pub struct SliceInputStream<'a> {
340 _opaque: PhantomPinned,
341 _lifetime: PhantomData<&'a ()>,
342}
343
344impl<'a> Drop for SliceInputStream<'a> {
345 fn drop(&mut self) {
346 unsafe { ffi::DeleteArrayInputStream(self.as_ffi_mut_ptr_unpinned()) }
347 }
348}
349
350impl<'a> SliceInputStream<'a> {
351 /// Creates a new `SliceInputStream` from the provided byte slice.
352 pub fn new(slice: &[u8]) -> Pin<Box<SliceInputStream<'a>>> {
353 let size = CInt::expect_from(slice.len());
354 let stream = unsafe { ffi::NewArrayInputStream(slice.as_ptr(), size) };
355 unsafe { Self::from_ffi_owned(stream) }
356 }
357
358 unsafe_ffi_conversions!(ffi::ArrayInputStream);
359}
360
361impl<'a> ZeroCopyInputStream for SliceInputStream<'a> {}
362
363impl<'a> zero_copy_input_stream::Sealed for SliceInputStream<'a> {
364 fn upcast(&self) -> &ffi::ZeroCopyInputStream {
365 unsafe { mem::transmute(self) }
366 }
367
368 fn upcast_mut(self: Pin<&mut Self>) -> Pin<&mut ffi::ZeroCopyInputStream> {
369 unsafe { mem::transmute(self) }
370 }
371}
372
373/// An arbitrary stream that implements [`ZeroCopyInputStream`].
374///
375/// This is like `Box<dyn ZeroCopyInputStream>` but it avoids additional virtual
376/// method calls on the Rust side of the FFI boundary.
377pub struct DynZeroCopyInputStream<'a> {
378 _opaque: PhantomPinned,
379 lifetime_: PhantomData<&'a ()>,
380}
381
382impl<'a> Drop for DynZeroCopyInputStream<'a> {
383 fn drop(&mut self) {
384 unsafe { ffi::DeleteZeroCopyInputStream(self.as_ffi_mut_ptr_unpinned()) }
385 }
386}
387
388impl<'a> DynZeroCopyInputStream<'a> {
389 unsafe_ffi_conversions!(ffi::ZeroCopyInputStream);
390}
391
392impl ZeroCopyInputStream for DynZeroCopyInputStream<'_> {}
393
394impl zero_copy_input_stream::Sealed for DynZeroCopyInputStream<'_> {
395 fn upcast(&self) -> &ffi::ZeroCopyInputStream {
396 unsafe { mem::transmute(self) }
397 }
398
399 fn upcast_mut(self: Pin<&mut Self>) -> Pin<&mut ffi::ZeroCopyInputStream> {
400 unsafe { mem::transmute(self) }
401 }
402}
403
404/// Abstract interface similar to an output stream but designed to minimize
405/// copying.
406///
407/// # Examples
408///
409/// Copy the contents of infile to outfile, using plain [`Read`] for infile
410/// but a `ZeroCopyOutputStream` for outfile:
411///
412/// ```ignore
413/// use std::fs::File;
414/// use std::io::{self, Read, Write};
415/// use protobuf_native::io::{WriterStream, ZeroCopyOutputStream};
416///
417/// let mut infile = File::open("infile")?;
418/// let mut outfile = File::create("outfile")?;
419/// let mut output = WriterStream::new(&mut outfile);
420///
421/// while let Ok(buf) = output.next() {
422/// // Reading into uninitialized memory requires the unstable `ReadBuf` API.
423/// // See: https://rust-lang.github.io/rfcs/2930-read-buf.html
424/// let buf = ReadBuf::uninit(buf);
425/// infile.read_buf(buf)?;
426/// output.back_up(buf.remaining());
427/// if buf.filled().is_empty() {
428/// break;
429/// }
430/// }
431///
432/// # Ok::<_, io::Error>(())
433/// ```
434pub trait ZeroCopyOutputStream: zero_copy_output_stream::Sealed {
435 /// Obtains a buffer into which data can be written.
436 ///
437 /// Any data written into this buffer will eventually (maybe instantly,
438 /// maybe later on) be written to the output.
439 ///
440 /// # Safety
441 ///
442 /// If this function returns `Ok`, you **must** initialize the returned byte
443 /// slice before you either call `next` again or drop the slice. You can
444 /// choose to initialize only a portion of the byte slice by calling
445 /// [`back_up`].
446 ///
447 /// This is a very unusual invariant to maintain in Rust.
448 ///
449 /// [`back_up`]: ZeroCopyOutputStream::back_up
450 unsafe fn next(self: Pin<&mut Self>) -> Result<&mut [MaybeUninit<u8>], OperationFailedError> {
451 let mut data = MaybeUninit::uninit();
452 let mut size = MaybeUninit::uninit();
453 self.upcast_mut()
454 .Next(data.as_mut_ptr(), size.as_mut_ptr())
455 .as_result()?;
456 let data = data.assume_init() as *mut MaybeUninit<u8>;
457 let size = size.assume_init().to_usize()?;
458 Ok(slice::from_raw_parts_mut(data, size))
459 }
460
461 /// Backs up a number of bytes, so that the end of the last buffer returned
462 /// by [`next`] is not actually written.
463 ///
464 /// This is needed when you finish writing all the data you want to write,
465 /// but the last buffer was bigger than you needed. You don't want to write
466 /// a bunch of garbage after the end of your data, so you use `back_up` to
467 /// back up.
468 ///
469 /// [`next`]: ZeroCopyOutputStream::next
470 fn back_up(self: Pin<&mut Self>, count: usize) {
471 // See comment in `ZeroCopyInputStream::back_up` for why we tolerate
472 // panics here.
473 let count = CInt::try_from(count).expect("count did not fit in a C int");
474 self.upcast_mut().BackUp(count)
475 }
476
477 /// Returns the total number of bytes written since this object was created.
478 fn byte_count(&self) -> i64 {
479 self.upcast().ByteCount()
480 }
481}
482
483mod zero_copy_output_stream {
484 use std::pin::Pin;
485
486 use crate::io::ffi;
487
488 pub trait Sealed {
489 fn upcast(&self) -> &ffi::ZeroCopyOutputStream;
490 fn upcast_mut(self: Pin<&mut Self>) -> Pin<&mut ffi::ZeroCopyOutputStream>;
491 unsafe fn upcast_mut_ptr(self: Pin<&mut Self>) -> *mut ffi::ZeroCopyOutputStream {
492 self.upcast_mut().get_unchecked_mut() as *mut _
493 }
494 }
495}
496
497/// Converts an [`Write`] implementor to a [`ZeroCopyOutputStream`].
498pub struct WriterStream<'a> {
499 _opaque: PhantomPinned,
500 _lifetime: PhantomData<&'a mut ()>,
501}
502
503impl<'a> WriterStream<'a> {
504 /// Creates a writer stream from the specified [`Write`] implementor.
505 pub fn new(writer: &'a mut dyn Write) -> Pin<Box<WriterStream<'a>>> {
506 let stream = ffi::NewWriterStream(Box::new(WriteAdaptor(writer)));
507 unsafe { Self::from_ffi_owned(stream) }
508 }
509
510 unsafe_ffi_conversions!(ffi::WriterStream);
511}
512
513impl<'a> Drop for WriterStream<'a> {
514 fn drop(&mut self) {
515 unsafe { ffi::DeleteWriterStream(self.as_ffi_mut_ptr_unpinned()) }
516 }
517}
518
519impl<'a> ZeroCopyOutputStream for WriterStream<'a> {}
520
521impl<'a> zero_copy_output_stream::Sealed for WriterStream<'a> {
522 fn upcast(&self) -> &ffi::ZeroCopyOutputStream {
523 unsafe { mem::transmute(self) }
524 }
525
526 fn upcast_mut(self: Pin<&mut Self>) -> Pin<&mut ffi::ZeroCopyOutputStream> {
527 unsafe { mem::transmute(self) }
528 }
529}
530
531/// A [`ZeroCopyOutputStream`] specialized for writing to byte slices.
532///
533/// Using this type is more efficient than using a [`WriterStream`] when the
534/// underlying writer is a type that exposes a simple mutable byte slice.
535pub struct SliceOutputStream<'a> {
536 _opaque: PhantomPinned,
537 _lifetime: PhantomData<&'a ()>,
538}
539
540impl<'a> SliceOutputStream<'a> {
541 /// Creates a new `SliceOutputStream` from the provided byte slice.
542 pub fn new(slice: &mut [u8]) -> Pin<Box<SliceOutputStream<'a>>> {
543 let size = CInt::expect_from(slice.len());
544 let stream = unsafe { ffi::NewArrayOutputStream(slice.as_mut_ptr(), size) };
545 unsafe { Self::from_ffi_owned(stream) }
546 }
547
548 unsafe_ffi_conversions!(ffi::ArrayOutputStream);
549}
550
551impl<'a> Drop for SliceOutputStream<'a> {
552 fn drop(&mut self) {
553 unsafe { ffi::DeleteArrayOutputStream(self.as_ffi_mut_ptr_unpinned()) }
554 }
555}
556
557impl<'a> ZeroCopyOutputStream for SliceOutputStream<'a> {}
558
559impl<'a> zero_copy_output_stream::Sealed for SliceOutputStream<'a> {
560 fn upcast(&self) -> &ffi::ZeroCopyOutputStream {
561 unsafe { mem::transmute(self) }
562 }
563
564 fn upcast_mut(self: Pin<&mut Self>) -> Pin<&mut ffi::ZeroCopyOutputStream> {
565 unsafe { mem::transmute(self) }
566 }
567}
568
569/// A [`ZeroCopyOutputStream`] specialized for writing to byte vectors.
570///
571/// Using this type is more efficient than using a [`WriterStream`] when the
572/// underlying writer is a byte vector.
573pub struct VecOutputStream<'a> {
574 _opaque: PhantomPinned,
575 _lifetime: PhantomData<&'a ()>,
576}
577
578impl<'a> VecOutputStream<'a> {
579 /// Creates a new `VecOutputStream` from the provided byte vector.
580 pub fn new(vec: &mut Vec<u8>) -> Pin<Box<VecOutputStream<'a>>> {
581 let stream = ffi::NewVecOutputStream(vec);
582 unsafe { Self::from_ffi_owned(stream) }
583 }
584
585 unsafe_ffi_conversions!(ffi::VecOutputStream);
586}
587
588impl<'a> Drop for VecOutputStream<'a> {
589 fn drop(&mut self) {
590 unsafe { ffi::DeleteVecOutputStream(self.as_ffi_mut_ptr_unpinned()) }
591 }
592}
593
594impl<'a> ZeroCopyOutputStream for VecOutputStream<'a> {}
595
596impl<'a> zero_copy_output_stream::Sealed for VecOutputStream<'a> {
597 fn upcast(&self) -> &ffi::ZeroCopyOutputStream {
598 unsafe { mem::transmute(self) }
599 }
600
601 fn upcast_mut(self: Pin<&mut Self>) -> Pin<&mut ffi::ZeroCopyOutputStream> {
602 unsafe { mem::transmute(self) }
603 }
604}
605
606/// Type which reads and decodes binary data which is composed of varint-
607/// encoded integers and fixed-width pieces.
608///
609/// Wraps a [`ZeroCopyInputStream`]. Most users will not need to deal with
610/// `CodedInputStream`.
611///
612/// Most methods of `CodedInputStream` that return a `Result` return an error if
613/// an underlying I/O error occurs or if the data is malformed. Once such a
614/// failure occurs, the `CodedInputStream` is broken and is no longer useful.
615pub struct CodedInputStream<'a> {
616 _opaque: PhantomPinned,
617 _lifetime: PhantomData<&'a ()>,
618}
619
620impl<'a> Drop for CodedInputStream<'a> {
621 fn drop(&mut self) {
622 unsafe { ffi::DeleteCodedInputStream(self.as_ffi_mut_ptr_unpinned()) }
623 }
624}
625
626impl<'a> CodedInputStream<'a> {
627 /// Creates a `CodedInputStream` that reads from the given
628 /// [`ZeroCopyInputStream`].
629 pub fn new(input: Pin<&'a mut dyn ZeroCopyInputStream>) -> Pin<Box<CodedInputStream<'a>>> {
630 let stream = unsafe { ffi::NewCodedInputStream(input.upcast_mut_ptr()) };
631 unsafe { Self::from_ffi_owned(stream) }
632 }
633
634 /// Reports whether this coded input stream reads from a flat array instead
635 /// of a [`ZeroCopyInputStream`].
636 pub fn is_flat(&self) -> bool {
637 self.as_ffi().IsFlat()
638 }
639
640 /// Reads an unsigned integer with varint encoding, truncating to 32 bits.
641 ///
642 /// Reading a 32-bit value is equivalent to reading a 64-bit one and casting
643 /// it to `u32`, but may be more efficient.
644 pub fn read_varint32(self: Pin<&mut Self>) -> Result<u32, OperationFailedError> {
645 let mut value = MaybeUninit::uninit();
646 // SAFETY: `ReadVarint32` promises to initialize `value` if it returns
647 // true.
648 unsafe {
649 match self.as_ffi_mut().ReadVarint32(value.as_mut_ptr()) {
650 true => Ok(value.assume_init()),
651 false => Err(OperationFailedError),
652 }
653 }
654 }
655
656 /// Reads an unsigned 64-bit integer with varint encoding.
657 pub fn read_varint64(self: Pin<&mut Self>) -> Result<u64, OperationFailedError> {
658 let mut value = MaybeUninit::uninit();
659 // SAFETY: `ReadVarint32` promises to initialize `value` if it returns
660 // true.
661 unsafe {
662 match self.as_ffi_mut().ReadVarint64(value.as_mut_ptr()) {
663 true => Ok(value.assume_init()),
664 false => Err(OperationFailedError),
665 }
666 }
667 }
668
669 /// Reads a tag.
670 ///
671 /// This calls [`read_varint32`] and returns the result. Also updates the
672 /// last tag value, which can be checked with [`last_tag_was`].
673 ///
674 /// [`read_varint32`]: CodedInputStream::read_varint32
675 /// [`last_tag_was`]: CodedInputStream::last_tag_was
676 pub fn read_tag(self: Pin<&mut Self>) -> Result<u32, OperationFailedError> {
677 match self.as_ffi_mut().ReadTag() {
678 0 => Err(OperationFailedError), // 0 is error sentinel
679 tag => Ok(tag),
680 }
681 }
682
683 /// Like [`read_tag`], but does not update the last tag
684 /// value.
685 ///
686 /// [`read_tag`]: `CodedInputStream::read_tag`
687 pub fn read_tag_no_last_tag(self: Pin<&mut Self>) -> Result<u32, OperationFailedError> {
688 match self.as_ffi_mut().ReadTag() {
689 0 => Err(OperationFailedError), // 0 is error sentinel
690 tag => Ok(tag),
691 }
692 }
693
694 /// Reports whether the last call to [`read_tag`] or
695 /// [`read_tag_with_cutoff`] returned the given value.
696 ///
697 /// [`read_tag_no_last_tag`] and [`read_tag_with_cutoff_no_last_tag`] do not
698 /// preserve the last returned value.
699 ///
700 /// This is needed because parsers for some types of embedded messages (with
701 /// field type `TYPE_GROUP`) don't actually know that they've reached the
702 /// end of a message until they see an `ENDGROUP` tag, which was actually
703 /// part of the enclosing message. The enclosing message would like to check
704 /// that tag to make sure it had the right number, so it calls
705 /// `last_tag_was` on return from the embedded parser to check.
706 ///
707 /// [`read_tag`]: CodedInputStream::read_tag
708 /// [`read_tag_with_cutoff`]: CodedInputStream::read_tag_with_cutoff
709 /// [`read_tag_no_last_tag`]: CodedInputStream::read_tag_no_last_tag
710 /// [`read_tag_with_cutoff_no_last_tag`]: CodedInputStream::read_tag_with_cutoff_no_last_tag
711 pub fn last_tag_was(self: Pin<&mut Self>, expected: u32) -> bool {
712 self.as_ffi_mut().LastTagWas(expected)
713 }
714
715 /// When parsing a message (but NOT a group), this method must be called
716 /// immediately after [`MessageLite::merge_from_coded_stream`] returns (if
717 /// it returns true) to further verify that the message ended in a
718 /// legitimate way.
719 ///
720 /// For example, this verifies that parsing did not end on an end-group tag.
721 /// It also checks for some cases where, due to optimizations,
722 /// `merge_from_coded_stream` can incorrectly return true.
723 ///
724 /// [`MessageLite::merge_from_coded_stream`]: crate::MessageLite::merge_from_coded_stream
725 pub fn consumed_entire_message(self: Pin<&mut Self>) -> bool {
726 self.as_ffi_mut().ConsumedEntireMessage()
727 }
728
729 /// Returns the stream's current position relative to the beginning of the
730 /// input.
731 pub fn current_position(&self) -> usize {
732 self.as_ffi()
733 .CurrentPosition()
734 .to_usize()
735 .expect("stream position not representable as usize")
736 }
737
738 unsafe_ffi_conversions!(ffi::CodedInputStream);
739}
740
741impl<'a> Read for Pin<&mut CodedInputStream<'a>> {
742 fn read(&mut self, buf: &mut [u8]) -> Result<usize, io::Error> {
743 let start = self.current_position();
744 let data = buf.as_mut_ptr() as *mut CVoid;
745 let size = CInt::try_from(buf.len()).map_err(|_| {
746 io::Error::new(
747 io::ErrorKind::InvalidInput,
748 "buffer exceeds size of a C int",
749 )
750 })?;
751 unsafe { self.as_mut().as_ffi_mut().ReadRaw(data, size) };
752 let end = self.current_position();
753 Ok(end - start)
754 }
755}
756
757/// Type which encodes and writes binary data which is composed of varint-
758/// encoded integers and fixed-width pieces.
759///
760/// Wraps a [`ZeroCopyOutputStream`]. Most users will not need to deal with
761/// `CodedOutputStream`.
762///
763/// Most methods of `CodedOutputStream` which return a `bool` return false if an
764/// underlying I/O error occurs. Once such a failure occurs, the
765/// CodedOutputStream is broken and is no longer useful. The `write_*` methods
766/// do not return the stream status, but will invalidate the stream if an error
767/// occurs. The client can probe `had_error` to determine the status.
768pub struct CodedOutputStream<'a> {
769 _opaque: PhantomPinned,
770 _lifetime: PhantomData<&'a ()>,
771}
772
773impl<'a> CodedOutputStream<'a> {
774 unsafe_ffi_conversions!(ffi::CodedOutputStream);
775}
776
777impl<'a> Drop for CodedOutputStream<'a> {
778 fn drop(&mut self) {
779 unsafe { ffi::DeleteCodedOutputStream(self.as_ffi_mut_ptr_unpinned()) }
780 }
781}