1use std::alloc::Layout;
19use std::fmt::Debug;
20use std::ptr::NonNull;
21use std::sync::Arc;
22
23use crate::alloc::{Allocation, Deallocation, ALIGNMENT};
24use crate::util::bit_chunk_iterator::{BitChunks, UnalignedBitChunk};
25use crate::BufferBuilder;
26use crate::{bytes::Bytes, native::ArrowNativeType};
27
28use super::ops::bitwise_unary_op_helper;
29use super::{MutableBuffer, ScalarBuffer};
30
31#[derive(Clone, Debug)]
34pub struct Buffer {
35 data: Arc<Bytes>,
37
38 ptr: *const u8,
43
44 length: usize,
48}
49
50impl PartialEq for Buffer {
51 fn eq(&self, other: &Self) -> bool {
52 self.as_slice().eq(other.as_slice())
53 }
54}
55
56impl Eq for Buffer {}
57
58unsafe impl Send for Buffer where Bytes: Send {}
59unsafe impl Sync for Buffer where Bytes: Sync {}
60
61impl Buffer {
62 #[inline]
64 pub fn from_bytes(bytes: Bytes) -> Self {
65 let length = bytes.len();
66 let ptr = bytes.as_ptr();
67 Buffer {
68 data: Arc::new(bytes),
69 ptr,
70 length,
71 }
72 }
73
74 pub fn ptr_offset(&self) -> usize {
78 unsafe { self.ptr.offset_from(self.data.ptr().as_ptr()) as usize }
80 }
81
82 pub fn data_ptr(&self) -> NonNull<u8> {
84 self.data.ptr()
85 }
86
87 #[inline]
89 pub fn from_vec<T: ArrowNativeType>(vec: Vec<T>) -> Self {
90 MutableBuffer::from(vec).into()
91 }
92
93 pub fn from_slice_ref<U: ArrowNativeType, T: AsRef<[U]>>(items: T) -> Self {
95 let slice = items.as_ref();
96 let capacity = std::mem::size_of_val(slice);
97 let mut buffer = MutableBuffer::with_capacity(capacity);
98 buffer.extend_from_slice(slice);
99 buffer.into()
100 }
101
102 #[deprecated(note = "Use Buffer::from_vec")]
116 pub unsafe fn from_raw_parts(ptr: NonNull<u8>, len: usize, capacity: usize) -> Self {
117 assert!(len <= capacity);
118 let layout = Layout::from_size_align(capacity, ALIGNMENT).unwrap();
119 Buffer::build_with_arguments(ptr, len, Deallocation::Standard(layout))
120 }
121
122 pub unsafe fn from_custom_allocation(
135 ptr: NonNull<u8>,
136 len: usize,
137 owner: Arc<dyn Allocation>,
138 ) -> Self {
139 Buffer::build_with_arguments(ptr, len, Deallocation::Custom(owner, len))
140 }
141
142 unsafe fn build_with_arguments(
144 ptr: NonNull<u8>,
145 len: usize,
146 deallocation: Deallocation,
147 ) -> Self {
148 let bytes = Bytes::new(ptr, len, deallocation);
149 let ptr = bytes.as_ptr();
150 Buffer {
151 ptr,
152 data: Arc::new(bytes),
153 length: len,
154 }
155 }
156
157 #[inline]
159 pub fn len(&self) -> usize {
160 self.length
161 }
162
163 #[inline]
166 pub fn capacity(&self) -> usize {
167 self.data.capacity()
168 }
169
170 #[inline]
172 pub fn is_empty(&self) -> bool {
173 self.length == 0
174 }
175
176 pub fn as_slice(&self) -> &[u8] {
178 unsafe { std::slice::from_raw_parts(self.ptr, self.length) }
179 }
180
181 pub(crate) fn deallocation(&self) -> &Deallocation {
182 self.data.deallocation()
183 }
184
185 pub fn slice(&self, offset: usize) -> Self {
192 let mut s = self.clone();
193 s.advance(offset);
194 s
195 }
196
197 #[inline]
203 pub fn advance(&mut self, offset: usize) {
204 assert!(
205 offset <= self.length,
206 "the offset of the new Buffer cannot exceed the existing length: offset={} length={}",
207 offset,
208 self.length
209 );
210 self.length -= offset;
211 self.ptr = unsafe { self.ptr.add(offset) };
216 }
217
218 pub fn slice_with_length(&self, offset: usize, length: usize) -> Self {
224 assert!(
225 offset.saturating_add(length) <= self.length,
226 "the offset of the new Buffer cannot exceed the existing length: slice offset={offset} length={length} selflen={}",
227 self.length
228 );
229 let ptr = unsafe { self.ptr.add(offset) };
232 Self {
233 data: self.data.clone(),
234 ptr,
235 length,
236 }
237 }
238
239 #[inline]
244 pub fn as_ptr(&self) -> *const u8 {
245 self.ptr
246 }
247
248 pub fn typed_data<T: ArrowNativeType>(&self) -> &[T] {
255 let (prefix, offsets, suffix) = unsafe { self.as_slice().align_to::<T>() };
259 assert!(prefix.is_empty() && suffix.is_empty());
260 offsets
261 }
262
263 pub fn bit_slice(&self, offset: usize, len: usize) -> Self {
267 if offset % 8 == 0 {
268 return self.slice(offset / 8);
269 }
270
271 bitwise_unary_op_helper(self, offset, len, |a| a)
272 }
273
274 pub fn bit_chunks(&self, offset: usize, len: usize) -> BitChunks {
278 BitChunks::new(self.as_slice(), offset, len)
279 }
280
281 #[deprecated(note = "use count_set_bits_offset instead")]
283 pub fn count_set_bits(&self) -> usize {
284 let len_in_bits = self.len() * 8;
285 self.count_set_bits_offset(0, len_in_bits)
287 }
288
289 pub fn count_set_bits_offset(&self, offset: usize, len: usize) -> usize {
292 UnalignedBitChunk::new(self.as_slice(), offset, len).count_ones()
293 }
294
295 pub fn into_mutable(self) -> Result<MutableBuffer, Self> {
299 let ptr = self.ptr;
300 let length = self.length;
301 Arc::try_unwrap(self.data)
302 .and_then(|bytes| {
303 assert_eq!(ptr, bytes.ptr().as_ptr());
305 MutableBuffer::from_bytes(bytes).map_err(Arc::new)
306 })
307 .map_err(|bytes| Buffer {
308 data: bytes,
309 ptr,
310 length,
311 })
312 }
313
314 pub fn into_vec<T: ArrowNativeType>(self) -> Result<Vec<T>, Self> {
319 let layout = match self.data.deallocation() {
320 Deallocation::Standard(l) => l,
321 _ => return Err(self), };
323
324 if self.ptr != self.data.as_ptr() {
325 return Err(self); }
327
328 let v_capacity = layout.size() / std::mem::size_of::<T>();
329 match Layout::array::<T>(v_capacity) {
330 Ok(expected) if layout == &expected => {}
331 _ => return Err(self), }
333
334 let length = self.length;
335 let ptr = self.ptr;
336 let v_len = self.length / std::mem::size_of::<T>();
337
338 Arc::try_unwrap(self.data)
339 .map(|bytes| unsafe {
340 let ptr = bytes.ptr().as_ptr() as _;
341 std::mem::forget(bytes);
342 Vec::from_raw_parts(ptr, v_len, v_capacity)
345 })
346 .map_err(|bytes| Buffer {
347 data: bytes,
348 ptr,
349 length,
350 })
351 }
352
353 #[inline]
357 pub fn ptr_eq(&self, other: &Self) -> bool {
358 self.ptr == other.ptr && self.length == other.length
359 }
360}
361
362impl From<&[u8]> for Buffer {
371 fn from(p: &[u8]) -> Self {
372 Self::from_slice_ref(p)
373 }
374}
375
376impl<const N: usize> From<[u8; N]> for Buffer {
377 fn from(p: [u8; N]) -> Self {
378 Self::from_slice_ref(p)
379 }
380}
381
382impl<const N: usize> From<&[u8; N]> for Buffer {
383 fn from(p: &[u8; N]) -> Self {
384 Self::from_slice_ref(p)
385 }
386}
387
388impl<T: ArrowNativeType> From<Vec<T>> for Buffer {
389 fn from(value: Vec<T>) -> Self {
390 Self::from_vec(value)
391 }
392}
393
394impl<T: ArrowNativeType> From<ScalarBuffer<T>> for Buffer {
395 fn from(value: ScalarBuffer<T>) -> Self {
396 value.into_inner()
397 }
398}
399
400impl FromIterator<bool> for Buffer {
402 fn from_iter<I>(iter: I) -> Self
403 where
404 I: IntoIterator<Item = bool>,
405 {
406 MutableBuffer::from_iter(iter).into()
407 }
408}
409
410impl std::ops::Deref for Buffer {
411 type Target = [u8];
412
413 fn deref(&self) -> &[u8] {
414 unsafe { std::slice::from_raw_parts(self.as_ptr(), self.len()) }
415 }
416}
417
418impl From<MutableBuffer> for Buffer {
419 #[inline]
420 fn from(buffer: MutableBuffer) -> Self {
421 buffer.into_buffer()
422 }
423}
424
425impl<T: ArrowNativeType> From<BufferBuilder<T>> for Buffer {
426 fn from(mut value: BufferBuilder<T>) -> Self {
427 value.finish()
428 }
429}
430
431impl Buffer {
432 #[inline]
450 pub unsafe fn from_trusted_len_iter<T: ArrowNativeType, I: Iterator<Item = T>>(
451 iterator: I,
452 ) -> Self {
453 MutableBuffer::from_trusted_len_iter(iterator).into()
454 }
455
456 #[inline]
463 pub unsafe fn try_from_trusted_len_iter<
464 E,
465 T: ArrowNativeType,
466 I: Iterator<Item = Result<T, E>>,
467 >(
468 iterator: I,
469 ) -> Result<Self, E> {
470 Ok(MutableBuffer::try_from_trusted_len_iter(iterator)?.into())
471 }
472}
473
474impl<T: ArrowNativeType> FromIterator<T> for Buffer {
475 fn from_iter<I: IntoIterator<Item = T>>(iter: I) -> Self {
476 let vec = Vec::from_iter(iter);
477 Buffer::from_vec(vec)
478 }
479}
480
481#[cfg(test)]
482mod tests {
483 use crate::i256;
484 use std::panic::{RefUnwindSafe, UnwindSafe};
485 use std::thread;
486
487 use super::*;
488
489 #[test]
490 fn test_buffer_data_equality() {
491 let buf1 = Buffer::from(&[0, 1, 2, 3, 4]);
492 let buf2 = Buffer::from(&[0, 1, 2, 3, 4]);
493 assert_eq!(buf1, buf2);
494
495 let buf3 = buf1.slice(2);
497 assert_ne!(buf1, buf3);
498 let buf4 = buf2.slice_with_length(2, 3);
499 assert_eq!(buf3, buf4);
500
501 let mut buf2 = MutableBuffer::new(65);
503 buf2.extend_from_slice(&[0u8, 1, 2, 3, 4]);
504
505 let buf2 = buf2.into();
506 assert_eq!(buf1, buf2);
507
508 let buf2 = Buffer::from(&[0, 0, 2, 3, 4]);
510 assert_ne!(buf1, buf2);
511
512 let buf2 = Buffer::from(&[0, 1, 2, 3]);
514 assert_ne!(buf1, buf2);
515 }
516
517 #[test]
518 fn test_from_raw_parts() {
519 let buf = Buffer::from(&[0, 1, 2, 3, 4]);
520 assert_eq!(5, buf.len());
521 assert!(!buf.as_ptr().is_null());
522 assert_eq!([0, 1, 2, 3, 4], buf.as_slice());
523 }
524
525 #[test]
526 fn test_from_vec() {
527 let buf = Buffer::from(&[0, 1, 2, 3, 4]);
528 assert_eq!(5, buf.len());
529 assert!(!buf.as_ptr().is_null());
530 assert_eq!([0, 1, 2, 3, 4], buf.as_slice());
531 }
532
533 #[test]
534 fn test_copy() {
535 let buf = Buffer::from(&[0, 1, 2, 3, 4]);
536 let buf2 = buf;
537 assert_eq!(5, buf2.len());
538 assert_eq!(64, buf2.capacity());
539 assert!(!buf2.as_ptr().is_null());
540 assert_eq!([0, 1, 2, 3, 4], buf2.as_slice());
541 }
542
543 #[test]
544 fn test_slice() {
545 let buf = Buffer::from(&[2, 4, 6, 8, 10]);
546 let buf2 = buf.slice(2);
547
548 assert_eq!([6, 8, 10], buf2.as_slice());
549 assert_eq!(3, buf2.len());
550 assert_eq!(unsafe { buf.as_ptr().offset(2) }, buf2.as_ptr());
551
552 let buf3 = buf2.slice_with_length(1, 2);
553 assert_eq!([8, 10], buf3.as_slice());
554 assert_eq!(2, buf3.len());
555 assert_eq!(unsafe { buf.as_ptr().offset(3) }, buf3.as_ptr());
556
557 let buf4 = buf.slice(5);
558 let empty_slice: [u8; 0] = [];
559 assert_eq!(empty_slice, buf4.as_slice());
560 assert_eq!(0, buf4.len());
561 assert!(buf4.is_empty());
562 assert_eq!(buf2.slice_with_length(2, 1).as_slice(), &[10]);
563 }
564
565 #[test]
566 #[should_panic(expected = "the offset of the new Buffer cannot exceed the existing length")]
567 fn test_slice_offset_out_of_bound() {
568 let buf = Buffer::from(&[2, 4, 6, 8, 10]);
569 buf.slice(6);
570 }
571
572 #[test]
573 fn test_access_concurrently() {
574 let buffer = Buffer::from([1, 2, 3, 4, 5]);
575 let buffer2 = buffer.clone();
576 assert_eq!([1, 2, 3, 4, 5], buffer.as_slice());
577
578 let buffer_copy = thread::spawn(move || {
579 buffer
581 })
582 .join();
583
584 assert!(buffer_copy.is_ok());
585 assert_eq!(buffer2, buffer_copy.ok().unwrap());
586 }
587
588 macro_rules! check_as_typed_data {
589 ($input: expr, $native_t: ty) => {{
590 let buffer = Buffer::from_slice_ref($input);
591 let slice: &[$native_t] = buffer.typed_data::<$native_t>();
592 assert_eq!($input, slice);
593 }};
594 }
595
596 #[test]
597 #[allow(clippy::float_cmp)]
598 fn test_as_typed_data() {
599 check_as_typed_data!(&[1i8, 3i8, 6i8], i8);
600 check_as_typed_data!(&[1u8, 3u8, 6u8], u8);
601 check_as_typed_data!(&[1i16, 3i16, 6i16], i16);
602 check_as_typed_data!(&[1i32, 3i32, 6i32], i32);
603 check_as_typed_data!(&[1i64, 3i64, 6i64], i64);
604 check_as_typed_data!(&[1u16, 3u16, 6u16], u16);
605 check_as_typed_data!(&[1u32, 3u32, 6u32], u32);
606 check_as_typed_data!(&[1u64, 3u64, 6u64], u64);
607 check_as_typed_data!(&[1f32, 3f32, 6f32], f32);
608 check_as_typed_data!(&[1f64, 3f64, 6f64], f64);
609 }
610
611 #[test]
612 fn test_count_bits() {
613 assert_eq!(0, Buffer::from(&[0b00000000]).count_set_bits_offset(0, 8));
614 assert_eq!(8, Buffer::from(&[0b11111111]).count_set_bits_offset(0, 8));
615 assert_eq!(3, Buffer::from(&[0b00001101]).count_set_bits_offset(0, 8));
616 assert_eq!(
617 6,
618 Buffer::from(&[0b01001001, 0b01010010]).count_set_bits_offset(0, 16)
619 );
620 assert_eq!(
621 16,
622 Buffer::from(&[0b11111111, 0b11111111]).count_set_bits_offset(0, 16)
623 );
624 }
625
626 #[test]
627 fn test_count_bits_slice() {
628 assert_eq!(
629 0,
630 Buffer::from(&[0b11111111, 0b00000000])
631 .slice(1)
632 .count_set_bits_offset(0, 8)
633 );
634 assert_eq!(
635 8,
636 Buffer::from(&[0b11111111, 0b11111111])
637 .slice_with_length(1, 1)
638 .count_set_bits_offset(0, 8)
639 );
640 assert_eq!(
641 3,
642 Buffer::from(&[0b11111111, 0b11111111, 0b00001101])
643 .slice(2)
644 .count_set_bits_offset(0, 8)
645 );
646 assert_eq!(
647 6,
648 Buffer::from(&[0b11111111, 0b01001001, 0b01010010])
649 .slice_with_length(1, 2)
650 .count_set_bits_offset(0, 16)
651 );
652 assert_eq!(
653 16,
654 Buffer::from(&[0b11111111, 0b11111111, 0b11111111, 0b11111111])
655 .slice(2)
656 .count_set_bits_offset(0, 16)
657 );
658 }
659
660 #[test]
661 fn test_count_bits_offset_slice() {
662 assert_eq!(8, Buffer::from(&[0b11111111]).count_set_bits_offset(0, 8));
663 assert_eq!(3, Buffer::from(&[0b11111111]).count_set_bits_offset(0, 3));
664 assert_eq!(5, Buffer::from(&[0b11111111]).count_set_bits_offset(3, 5));
665 assert_eq!(1, Buffer::from(&[0b11111111]).count_set_bits_offset(3, 1));
666 assert_eq!(0, Buffer::from(&[0b11111111]).count_set_bits_offset(8, 0));
667 assert_eq!(2, Buffer::from(&[0b01010101]).count_set_bits_offset(0, 3));
668 assert_eq!(
669 16,
670 Buffer::from(&[0b11111111, 0b11111111]).count_set_bits_offset(0, 16)
671 );
672 assert_eq!(
673 10,
674 Buffer::from(&[0b11111111, 0b11111111]).count_set_bits_offset(0, 10)
675 );
676 assert_eq!(
677 10,
678 Buffer::from(&[0b11111111, 0b11111111]).count_set_bits_offset(3, 10)
679 );
680 assert_eq!(
681 8,
682 Buffer::from(&[0b11111111, 0b11111111]).count_set_bits_offset(8, 8)
683 );
684 assert_eq!(
685 5,
686 Buffer::from(&[0b11111111, 0b11111111]).count_set_bits_offset(11, 5)
687 );
688 assert_eq!(
689 0,
690 Buffer::from(&[0b11111111, 0b11111111]).count_set_bits_offset(16, 0)
691 );
692 assert_eq!(
693 2,
694 Buffer::from(&[0b01101101, 0b10101010]).count_set_bits_offset(7, 5)
695 );
696 assert_eq!(
697 4,
698 Buffer::from(&[0b01101101, 0b10101010]).count_set_bits_offset(7, 9)
699 );
700 }
701
702 #[test]
703 fn test_unwind_safe() {
704 fn assert_unwind_safe<T: RefUnwindSafe + UnwindSafe>() {}
705 assert_unwind_safe::<Buffer>()
706 }
707
708 #[test]
709 fn test_from_foreign_vec() {
710 let mut vector = vec![1_i32, 2, 3, 4, 5];
711 let buffer = unsafe {
712 Buffer::from_custom_allocation(
713 NonNull::new_unchecked(vector.as_mut_ptr() as *mut u8),
714 vector.len() * std::mem::size_of::<i32>(),
715 Arc::new(vector),
716 )
717 };
718
719 let slice = buffer.typed_data::<i32>();
720 assert_eq!(slice, &[1, 2, 3, 4, 5]);
721
722 let buffer = buffer.slice(std::mem::size_of::<i32>());
723
724 let slice = buffer.typed_data::<i32>();
725 assert_eq!(slice, &[2, 3, 4, 5]);
726 }
727
728 #[test]
729 #[should_panic(expected = "the offset of the new Buffer cannot exceed the existing length")]
730 fn slice_overflow() {
731 let buffer = Buffer::from(MutableBuffer::from_len_zeroed(12));
732 buffer.slice_with_length(2, usize::MAX);
733 }
734
735 #[test]
736 fn test_vec_interop() {
737 let a: Vec<i128> = Vec::new();
739 let b = Buffer::from_vec(a);
740 b.into_vec::<i128>().unwrap();
741
742 let a: Vec<i128> = Vec::with_capacity(20);
744 let b = Buffer::from_vec(a);
745 let back = b.into_vec::<i128>().unwrap();
746 assert_eq!(back.len(), 0);
747 assert_eq!(back.capacity(), 20);
748
749 let mut a: Vec<i128> = Vec::with_capacity(3);
751 a.extend_from_slice(&[1, 2, 3]);
752 let b = Buffer::from_vec(a);
753 let back = b.into_vec::<i128>().unwrap();
754 assert_eq!(back.len(), 3);
755 assert_eq!(back.capacity(), 3);
756
757 let mut a: Vec<i128> = Vec::with_capacity(20);
759 a.extend_from_slice(&[1, 4, 7, 8, 9, 3, 6]);
760 let b = Buffer::from_vec(a);
761 let back = b.into_vec::<i128>().unwrap();
762 assert_eq!(back.len(), 7);
763 assert_eq!(back.capacity(), 20);
764
765 let a: Vec<i128> = Vec::new();
767 let b = Buffer::from_vec(a);
768 let b = b.into_vec::<i32>().unwrap_err();
769 b.into_vec::<i8>().unwrap_err();
770
771 let a: Vec<i64> = vec![1, 2, 3, 4];
775 let b = Buffer::from_vec(a);
776 let back = b.into_vec::<u64>().unwrap();
777 assert_eq!(back.len(), 4);
778 assert_eq!(back.capacity(), 4);
779
780 let mut b: Vec<i128> = Vec::with_capacity(4);
782 b.extend_from_slice(&[1, 2, 3, 4]);
783 let b = Buffer::from_vec(b);
784 let back = b.into_vec::<i256>().unwrap();
785 assert_eq!(back.len(), 2);
786 assert_eq!(back.capacity(), 2);
787
788 let b: Vec<i128> = vec![1, 2, 3];
790 let b = Buffer::from_vec(b);
791 b.into_vec::<i256>().unwrap_err();
792
793 let mut b: Vec<i128> = Vec::with_capacity(5);
795 b.extend_from_slice(&[1, 2, 3, 4]);
796 let b = Buffer::from_vec(b);
797 b.into_vec::<i256>().unwrap_err();
798
799 let mut b: Vec<i128> = Vec::with_capacity(4);
802 b.extend_from_slice(&[1, 2, 3]);
803 let b = Buffer::from_vec(b);
804 let back = b.into_vec::<i256>().unwrap();
805 assert_eq!(back.len(), 1);
806 assert_eq!(back.capacity(), 2);
807
808 let b = Buffer::from(MutableBuffer::new(10));
810 let b = b.into_vec::<u8>().unwrap_err();
811 b.into_vec::<u64>().unwrap_err();
812
813 let mut a: Vec<i128> = Vec::with_capacity(20);
815 a.extend_from_slice(&[1, 4, 7, 8, 9, 3, 6]);
816 let b = Buffer::from_vec(a);
817 let slice = b.slice_with_length(0, 64);
818
819 let slice = slice.into_vec::<i128>().unwrap_err();
821 drop(b);
822
823 let back = slice.into_vec::<i128>().unwrap();
825 assert_eq!(&back, &[1, 4, 7, 8]);
826 assert_eq!(back.capacity(), 20);
827
828 let mut a: Vec<i128> = Vec::with_capacity(8);
830 a.extend_from_slice(&[1, 4, 7, 3]);
831
832 let b = Buffer::from_vec(a);
833 let slice = b.slice_with_length(0, 34);
834 drop(b);
835
836 let back = slice.into_vec::<i128>().unwrap();
837 assert_eq!(&back, &[1, 4]);
838 assert_eq!(back.capacity(), 8);
839
840 let a: Vec<u32> = vec![1, 3, 4, 6];
842 let b = Buffer::from_vec(a).slice(2);
843 b.into_vec::<u32>().unwrap_err();
844
845 let b = MutableBuffer::new(16).into_buffer();
846 let b = b.into_vec::<u8>().unwrap_err(); let b = b.into_vec::<u32>().unwrap_err(); b.into_mutable().unwrap();
849
850 let b = Buffer::from_vec(vec![1_u32, 3, 5]);
851 let b = b.into_mutable().unwrap();
852 let b = Buffer::from(b);
853 let b = b.into_vec::<u32>().unwrap();
854 assert_eq!(b, &[1, 3, 5]);
855 }
856
857 #[test]
858 #[should_panic(expected = "capacity overflow")]
859 fn test_from_iter_overflow() {
860 let iter_len = usize::MAX / std::mem::size_of::<u64>() + 1;
861 let _ = Buffer::from_iter(std::iter::repeat(0_u64).take(iter_len));
862 }
863}