1#![forbid(unsafe_code)]
2
3use std::cmp;
4use std::collections::VecDeque;
5use std::io::IoSlice;
6use std::iter::FromIterator;
7
8use bytes::{Buf, BufMut, Bytes, BytesMut};
9
10fn chunks_vectored<'s, B, I>(bufs: I, dst: &mut [IoSlice<'s>]) -> usize
11where
12 I: Iterator<Item = &'s B>,
13 B: Buf + 's,
14{
15 let mut filled = 0;
16 for buf in bufs {
17 if filled == dst.len() {
18 break;
19 }
20 filled += buf.chunks_vectored(&mut dst[filled..]);
21 }
22 filled
23}
24
25#[derive(Debug, Default)]
61pub struct SegmentedSlice<'a, B> {
62 remaining: usize,
63 idx: usize,
64 bufs: &'a mut [B],
65}
66
67impl<'a, B: Buf> SegmentedSlice<'a, B> {
68 pub fn new(bufs: &'a mut [B]) -> Self {
76 let remaining = bufs.iter().map(Buf::remaining).sum();
77 let mut me = Self {
78 remaining,
79 idx: 0,
80 bufs,
81 };
82 me.clean_empty();
83 me
84 }
85
86 fn clean_empty(&mut self) {
87 while self.idx < self.bufs.len() && !self.bufs[self.idx].has_remaining() {
88 self.idx += 1;
89 }
90 }
91}
92
93impl<'a, B: Buf> Buf for SegmentedSlice<'a, B> {
94 fn remaining(&self) -> usize {
95 self.remaining
96 }
97
98 fn chunk(&self) -> &[u8] {
99 self.bufs.get(self.idx).map(Buf::chunk).unwrap_or_default()
100 }
101
102 fn advance(&mut self, mut cnt: usize) {
103 self.remaining -= cnt;
104 while cnt > 0 {
105 let first = &mut self.bufs[self.idx];
106 let rem = first.remaining();
107 let segment = cmp::min(rem, cnt);
108 first.advance(segment);
109 cnt -= segment;
110 self.clean_empty();
111 }
112 }
113
114 fn copy_to_bytes(&mut self, len: usize) -> Bytes {
115 assert!(len <= self.remaining(), "`len` greater than remaining");
116 match self.bufs.get_mut(self.idx) {
117 Some(front) if front.remaining() >= len => {
121 self.remaining -= len;
122 let res = front.copy_to_bytes(len);
123 self.clean_empty();
124 res
125 }
126 _ => {
129 let mut res = BytesMut::with_capacity(len);
130 res.put(self.take(len));
131 res.freeze()
132 }
133 }
134 }
135
136 fn chunks_vectored<'s>(&'s self, dst: &mut [IoSlice<'s>]) -> usize {
137 let bufs = self.bufs.get(self.idx..).unwrap_or_default();
138 chunks_vectored(bufs.iter(), dst)
139 }
140}
141
142#[derive(Clone, Debug)]
241pub struct SegmentedBuf<B> {
242 bufs: VecDeque<B>,
243 remaining: usize,
245}
246
247impl<B> SegmentedBuf<B> {
248 pub fn new() -> Self {
255 Self::default()
256 }
257
258 pub fn into_inner(self) -> VecDeque<B> {
260 self.into()
261 }
262
263 pub fn segments(&self) -> usize {
265 self.bufs.len()
266 }
267}
268
269impl<B: Buf> SegmentedBuf<B> {
270 pub fn push(&mut self, buf: B) {
274 self.remaining += buf.remaining();
275 self.bufs.push_back(buf);
276 self.clean_empty();
277 }
278 fn update_remaining(&mut self) {
279 self.remaining = self.bufs.iter().map(Buf::remaining).sum();
280 }
281 fn clean_empty(&mut self) {
282 loop {
283 match self.bufs.front() {
284 Some(b) if !b.has_remaining() => {
285 self.bufs.pop_front();
286 }
287 _ => break,
288 }
289 }
290 }
291}
292
293impl<B> Default for SegmentedBuf<B> {
294 fn default() -> Self {
295 Self {
296 bufs: VecDeque::new(),
297 remaining: 0,
298 }
299 }
300}
301
302impl<B: Buf> From<Vec<B>> for SegmentedBuf<B> {
303 fn from(bufs: Vec<B>) -> Self {
304 Self::from(VecDeque::from(bufs))
305 }
306}
307
308impl<B: Buf> From<VecDeque<B>> for SegmentedBuf<B> {
309 fn from(bufs: VecDeque<B>) -> Self {
310 let mut me = Self { bufs, remaining: 0 };
311 me.clean_empty();
312 me.update_remaining();
313 me
314 }
315}
316
317impl<B> From<SegmentedBuf<B>> for VecDeque<B> {
318 fn from(me: SegmentedBuf<B>) -> Self {
319 me.bufs
320 }
321}
322
323impl<B: Buf> Extend<B> for SegmentedBuf<B> {
324 fn extend<T: IntoIterator<Item = B>>(&mut self, iter: T) {
325 self.bufs.extend(iter);
326 self.clean_empty();
327 self.update_remaining();
328 }
329}
330
331impl<B: Buf> FromIterator<B> for SegmentedBuf<B> {
332 fn from_iter<T: IntoIterator<Item = B>>(iter: T) -> Self {
333 let mut me = Self {
334 bufs: VecDeque::from_iter(iter),
335 remaining: 0,
336 };
337 me.clean_empty();
338 me.update_remaining();
339 me
340 }
341}
342
343impl<B: Buf> Buf for SegmentedBuf<B> {
344 fn remaining(&self) -> usize {
345 self.remaining
346 }
347
348 fn chunk(&self) -> &[u8] {
349 self.bufs.front().map(Buf::chunk).unwrap_or_default()
350 }
351
352 fn advance(&mut self, mut cnt: usize) {
353 assert!(cnt <= self.remaining, "Advance past the end of buffer");
354 self.remaining -= cnt;
355 while cnt > 0 {
356 let front = self
357 .bufs
358 .front_mut()
359 .expect("Missing buffers to provide remaining");
360 let front_remaining = front.remaining();
361 if front_remaining >= cnt {
362 front.advance(cnt);
363 break;
364 } else {
365 cnt -= front_remaining;
367 self.bufs.pop_front();
368 }
369 }
370 self.clean_empty();
371 }
372
373 fn copy_to_bytes(&mut self, len: usize) -> Bytes {
374 assert!(len <= self.remaining(), "`len` greater than remaining");
375 match self.bufs.front_mut() {
376 Some(front) if front.remaining() >= len => {
380 self.remaining -= len;
381 let res = front.copy_to_bytes(len);
382 self.clean_empty();
383 res
384 }
385 _ => {
388 let mut res = BytesMut::with_capacity(len);
389 res.put(self.take(len));
390 res.freeze()
391 }
392 }
393 }
394
395 fn chunks_vectored<'a>(&'a self, dst: &mut [IoSlice<'a>]) -> usize {
396 chunks_vectored(self.bufs.iter(), dst)
397 }
398}
399
400#[cfg(test)]
401mod tests {
402 use std::io::Read;
403 use std::ops::Deref;
404
405 use proptest::prelude::*;
406
407 use super::*;
408
409 #[test]
410 fn empty() {
411 let mut b = SegmentedBuf::<Bytes>::new();
412
413 assert!(!b.has_remaining());
414 assert_eq!(0, b.remaining());
415 assert!(b.chunk().is_empty());
416 assert_eq!(0, b.segments());
417
418 b.copy_to_slice(&mut []);
419 b.advance(0);
420 assert_eq!(0, b.reader().read(&mut [0; 10]).unwrap());
421 }
422
423 #[test]
424 fn empty_slices() {
425 let mut b = SegmentedSlice::<&[u8]>::default();
426
427 assert!(!b.has_remaining());
428 assert_eq!(0, b.remaining());
429 assert!(b.chunk().is_empty());
430
431 b.copy_to_slice(&mut []);
432 b.advance(0);
433 assert_eq!(0, b.reader().read(&mut [0; 10]).unwrap());
434 }
435
436 fn segmented() -> SegmentedBuf<Bytes> {
437 vec![
438 Bytes::from("Hello"),
439 Bytes::from(" "),
440 Bytes::new(),
441 Bytes::from("World"),
442 ]
443 .into()
444 }
445
446 #[test]
447 fn segments() {
448 let mut b = segmented();
449 assert_eq!(11, b.remaining());
450 assert_eq!(b"Hello", b.chunk());
451 assert_eq!(4, b.segments());
452 b.advance(3);
453 assert_eq!(8, b.remaining());
454 assert_eq!(b"lo", b.chunk());
455 assert_eq!(4, b.segments());
456 }
457
458 #[test]
459 fn to_bytes_all() {
460 let mut b = segmented();
461 let bytes = b.copy_to_bytes(11);
462 assert_eq!("Hello World", &bytes);
463 }
464
465 #[test]
466 fn advance_within() {
467 let mut b = segmented();
468 b.advance(2);
469 assert_eq!(4, b.segments());
470 assert_eq!(9, b.remaining());
471 assert_eq!(b"llo", b.chunk());
472 }
473
474 #[test]
475 fn advance_border() {
476 let mut b = segmented();
477 b.advance(5);
478 assert_eq!(3, b.segments());
479 assert_eq!(6, b.remaining());
480 assert_eq!(b" ", b.chunk());
481 }
482
483 #[test]
484 fn advance_across() {
485 let mut b = segmented();
486 b.advance(7);
487 assert_eq!(1, b.segments());
488 assert_eq!(4, b.remaining());
489 assert_eq!(b"orld", b.chunk());
490 }
491
492 #[test]
493 fn empty_at_border() {
494 let mut b = segmented();
495 b.advance(6);
496 assert_eq!(1, b.segments());
497 assert_eq!(5, b.remaining());
498 assert_eq!(b"World", b.chunk());
499 }
500
501 #[test]
502 fn empty_bufs() {
503 fn is_empty(b: &SegmentedBuf<Bytes>) {
504 assert_eq!(0, b.segments());
505 assert_eq!(0, b.remaining());
506 assert_eq!(b"", b.chunk());
507 }
508
509 is_empty(&vec![].into());
510 is_empty(&vec![Bytes::new(), Bytes::new()].into());
511 is_empty(&vec![Bytes::new(), Bytes::new()].into_iter().collect());
512
513 let mut b = SegmentedBuf::new();
514 is_empty(&b);
515 b.push(Bytes::new());
516 is_empty(&b);
517 b.extend(vec![Bytes::new(), Bytes::new()]);
518 is_empty(&b);
519 }
520
521 #[test]
522 fn sliced_hello() {
523 let mut buffers = [b"Hello" as &[_], b"", b" ", b"", b"World"];
524 let buf = SegmentedSlice::new(&mut buffers);
525
526 assert_eq!(11, buf.remaining());
527 assert_eq!(b"Hello", buf.chunk());
528
529 let mut out = String::new();
530 buf.reader()
531 .read_to_string(&mut out)
532 .expect("Doesn't cause IO errors");
533 assert_eq!("Hello World", out);
534 }
535
536 #[test]
537 fn chunk_vectored() {
538 let mut b = segmented();
539 assert_eq!(b.chunks_vectored(&mut []), 0);
540 let mut slices = [IoSlice::new(&[]); 5];
541 assert_eq!(b.segments(), 4);
542 assert_eq!(b.chunks_vectored(&mut slices), 3);
543 assert_eq!(&*slices[0], b"Hello");
544 assert_eq!(&*slices[1], b" ");
545 assert_eq!(&*slices[2], b"World");
546 b.advance(2);
547 let mut slices = [IoSlice::new(&[]); 1];
548 assert_eq!(b.chunks_vectored(&mut slices), 1);
549 assert_eq!(&*slices[0], b"llo");
550 }
551
552 #[test]
553 fn chunk_vectored_nested() {
554 let mut bufs = [segmented(), segmented()];
555 let mut bufs = SegmentedSlice::new(&mut bufs);
556 let mut slices = [IoSlice::new(&[]); 10];
557 assert_eq!(bufs.chunks_vectored(&mut slices), 6);
558 assert_eq!(&*slices[0], b"Hello");
559 assert_eq!(&*slices[1], b" ");
560 assert_eq!(&*slices[2], b"World");
561 assert_eq!(&*slices[3], b"Hello");
562 assert_eq!(&*slices[4], b" ");
563 assert_eq!(&*slices[5], b"World");
564 bufs.advance(2);
565 let mut slices = [IoSlice::new(&[]); 1];
566 assert_eq!(bufs.chunks_vectored(&mut slices), 1);
567 assert_eq!(&*slices[0], b"llo");
568 }
569
570 proptest! {
571 #[test]
572 fn random(bufs: Vec<Vec<u8>>, splits in proptest::collection::vec(0..10usize, 1..10)) {
573 let concat: Vec<u8> = bufs.iter().flat_map(|b| b.iter()).copied().collect();
574 let mut segmented = bufs.iter()
575 .map(|b| &b[..])
576 .collect::<SegmentedBuf<_>>();
577 assert_eq!(concat.len(), segmented.remaining());
578 assert!(segmented.segments() <= bufs.len());
579 assert!(concat.starts_with(segmented.chunk()));
580 let mut bytes = segmented.clone().copy_to_bytes(segmented.remaining());
581 assert_eq!(&concat[..], &bytes[..]);
582 let mut sliced = bufs.iter().map(Deref::deref).collect::<Vec<&[u8]>>();
583 let mut sliced = SegmentedSlice::new(&mut sliced);
584
585 let mut fifo = SegmentedBuf::new();
586 let mut buf_pos = bufs.iter();
587
588 for split in splits {
589 if !bytes.has_remaining() {
590 break;
591 }
592 let split = cmp::min(bytes.remaining(), split);
593 while fifo.remaining() < split {
594 fifo.push(&buf_pos.next().unwrap()[..]);
595 }
596 let c1 = bytes.copy_to_bytes(split);
597 let c2 = segmented.copy_to_bytes(split);
598 let c3 = sliced.copy_to_bytes(split);
599 assert_eq!(c1, c2);
600 assert_eq!(c1, c3);
601 assert_eq!(bytes.remaining(), segmented.remaining());
602 assert_eq!(bytes.remaining(), sliced.remaining());
603 }
604 }
605 }
606}