1pub use self::dictionary::DatumContainer;
17pub use self::dictionary::DatumSeq;
18pub use self::offset_opt::OffsetOptimized;
19pub use self::spines::{
20 RowBatcher, RowBuilder, RowRowBatcher, RowRowBuilder, RowRowColPagedBuilder, RowRowSpine,
21 RowSpine, RowValBatcher, RowValBuilder, RowValSpine, ValRowBatcher, ValRowBuilder,
22 ValRowColPagedBuilder, ValRowSpine,
23};
24use differential_dataflow::trace::implementations::OffsetList;
25
26pub static DICTIONARY_COMPRESSION: std::sync::atomic::AtomicBool =
28 std::sync::atomic::AtomicBool::new(false);
29
30mod spines {
32 use std::rc::Rc;
33
34 use columnation::Columnation;
35 use differential_dataflow::trace::implementations::Layout;
36 use differential_dataflow::trace::implementations::Update;
37 use differential_dataflow::trace::implementations::merge_batcher::MergeBatcher;
38 use differential_dataflow::trace::implementations::ord_neu::{
39 OrdKeyBatch, OrdValBatch, OrdValBuilder,
40 };
41 use differential_dataflow::trace::implementations::spine_fueled::Spine;
42 use differential_dataflow::trace::rc_blanket_impls::RcBuilder;
43 use mz_repr::Row;
44 use mz_timely_util::columnar::Column;
45 use mz_timely_util::columnation::{ColInternalMerger, ColumnationStack};
46
47 use crate::{DatumContainer, OffsetOptimized};
48
49 type KeyValBatcher<K, V, T, D> = MergeBatcher<ColInternalMerger<(K, V), T, D>>;
52 type KeyBatcher<K, T, D> = KeyValBatcher<K, (), T, D>;
53
54 pub type RowRowSpine<T, R> = Spine<Rc<OrdValBatch<RowRowLayout<((Row, Row), T, R)>>>>;
55 pub type RowRowBatcher<T, R> = KeyValBatcher<Row, Row, T, R>;
56 pub type RowRowBuilder<T, R> = RcBuilder<crate::dictionary::builders::RowRowBuilder<T, R>>;
57
58 pub type RowRowColPagedBuilder<T, R> =
64 RcBuilder<OrdValBuilder<RowRowLayout<((Row, Row), T, R)>, Column<((Row, Row), T, R)>>>;
65
66 pub type RowValSpine<V, T, R> = Spine<Rc<OrdValBatch<RowValLayout<((Row, V), T, R)>>>>;
67 pub type RowValBatcher<V, T, R> = KeyValBatcher<Row, V, T, R>;
68 pub type RowValBuilder<V, T, R> =
69 RcBuilder<crate::dictionary::builders::RowValBuilder<V, T, R>>;
70
71 pub type RowSpine<T, R> = Spine<Rc<OrdKeyBatch<RowLayout<((Row, ()), T, R)>>>>;
72 pub type RowBatcher<T, R> = KeyBatcher<Row, T, R>;
73 pub type RowBuilder<T, R> = RcBuilder<crate::dictionary::builders::RowBuilder<T, R>>;
74
75 pub type ValRowSpine<K, T, R> = Spine<Rc<OrdValBatch<ValRowLayout<((K, Row), T, R)>>>>;
76 pub type ValRowBatcher<K, T, R> = KeyValBatcher<K, Row, T, R>;
77 pub type ValRowBuilder<K, T, R> =
78 RcBuilder<crate::dictionary::builders::ValRowBuilder<K, T, R>>;
79
80 pub type ValRowColPagedBuilder<K, T, R> =
85 RcBuilder<OrdValBuilder<ValRowLayout<((K, Row), T, R)>, Column<((K, Row), T, R)>>>;
86
87 pub struct RowRowLayout<U: Update<Key = Row, Val = Row>> {
89 phantom: std::marker::PhantomData<U>,
90 }
91 pub struct RowValLayout<U: Update<Key = Row>> {
92 phantom: std::marker::PhantomData<U>,
93 }
94 pub struct RowLayout<U: Update<Key = Row, Val = ()>> {
95 phantom: std::marker::PhantomData<U>,
96 }
97 pub struct ValRowLayout<U: Update<Val = Row>> {
100 phantom: std::marker::PhantomData<U>,
101 }
102
103 impl<U: Update<Key = Row, Val = Row>> Layout for RowRowLayout<U>
104 where
105 U::Time: Columnation,
106 U::Diff: Columnation,
107 {
108 type KeyContainer = DatumContainer;
109 type ValContainer = DatumContainer;
110 type TimeContainer = ColumnationStack<U::Time>;
111 type DiffContainer = ColumnationStack<U::Diff>;
112 type OffsetContainer = OffsetOptimized;
113 }
114 impl<U: Update<Key = Row>> Layout for RowValLayout<U>
115 where
116 U::Val: Columnation,
117 U::Time: Columnation,
118 U::Diff: Columnation,
119 {
120 type KeyContainer = DatumContainer;
121 type ValContainer = ColumnationStack<U::Val>;
122 type TimeContainer = ColumnationStack<U::Time>;
123 type DiffContainer = ColumnationStack<U::Diff>;
124 type OffsetContainer = OffsetOptimized;
125 }
126 impl<U: Update<Key = Row, Val = ()>> Layout for RowLayout<U>
127 where
128 U::Time: Columnation,
129 U::Diff: Columnation,
130 {
131 type KeyContainer = DatumContainer;
132 type ValContainer = ColumnationStack<()>;
133 type TimeContainer = ColumnationStack<U::Time>;
134 type DiffContainer = ColumnationStack<U::Diff>;
135 type OffsetContainer = OffsetOptimized;
136 }
137 impl<U: Update<Val = Row>> Layout for ValRowLayout<U>
138 where
139 U::Key: Columnation,
140 U::Time: Columnation,
141 U::Diff: Columnation,
142 {
143 type KeyContainer = ColumnationStack<U::Key>;
144 type ValContainer = DatumContainer;
145 type TimeContainer = ColumnationStack<U::Time>;
146 type DiffContainer = ColumnationStack<U::Diff>;
147 type OffsetContainer = OffsetOptimized;
148 }
149}
150
151#[cfg(test)]
152mod tests {
153 use crate::DatumContainer;
154 use differential_dataflow::trace::implementations::BatchContainer;
155 use mz_repr::adt::date::Date;
156 use mz_repr::adt::interval::Interval;
157 use mz_repr::{Datum, Row, SqlScalarType};
158
159 #[mz_ore::test]
160 #[cfg_attr(miri, ignore)] fn test_round_trip() {
162 fn round_trip(datums: Vec<Datum>) {
163 let row = Row::pack(datums.clone());
164
165 let mut container = DatumContainer::with_capacity(row.byte_len());
166 container.push_own(&row);
167
168 println!("{:?}", container.index(0).iter.data);
171
172 let datums2 = container.index(0).collect::<Vec<_>>();
173 assert_eq!(datums, datums2);
174 }
175
176 round_trip(vec![]);
177 round_trip(
178 SqlScalarType::enumerate()
179 .iter()
180 .flat_map(|r#type| r#type.interesting_datums())
181 .collect(),
182 );
183 round_trip(vec![
184 Datum::Null,
185 Datum::Null,
186 Datum::False,
187 Datum::True,
188 Datum::Int16(-21),
189 Datum::Int32(-42),
190 Datum::Int64(-2_147_483_648 - 42),
191 Datum::UInt8(0),
192 Datum::UInt8(1),
193 Datum::UInt16(0),
194 Datum::UInt16(1),
195 Datum::UInt16(1 << 8),
196 Datum::UInt32(0),
197 Datum::UInt32(1),
198 Datum::UInt32(1 << 8),
199 Datum::UInt32(1 << 16),
200 Datum::UInt32(1 << 24),
201 Datum::UInt64(0),
202 Datum::UInt64(1),
203 Datum::UInt64(1 << 8),
204 Datum::UInt64(1 << 16),
205 Datum::UInt64(1 << 24),
206 Datum::UInt64(1 << 32),
207 Datum::UInt64(1 << 40),
208 Datum::UInt64(1 << 48),
209 Datum::UInt64(1 << 56),
210 Datum::Date(Date::from_pg_epoch(365 * 45 + 21).unwrap()),
211 Datum::Interval(Interval {
212 months: 312,
213 ..Default::default()
214 }),
215 Datum::Interval(Interval::new(0, 0, 1_012_312)),
216 Datum::Bytes(&[]),
217 Datum::Bytes(&[0, 2, 1, 255]),
218 Datum::String(""),
219 Datum::String("العَرَبِيَّة"),
220 ]);
221 }
222
223 #[mz_ore::test]
231 #[cfg_attr(miri, ignore)] fn test_codec_round_trip() {
233 use crate::row_codec::ColumnsCodec;
234
235 let values = ["apple", "banana", "cherry"];
240 let rows: Vec<Row> = (0..3_000)
241 .map(|i| {
242 Row::pack_slice(&[
243 Datum::String(values[i % values.len()]),
244 Datum::Int64(i64::try_from(i).unwrap()),
245 Datum::String(values[(i / 7) % values.len()]),
246 ])
247 })
248 .collect();
249
250 let mut stats1 = ColumnsCodec::default();
253 let mut stats2 = ColumnsCodec::default();
254 let mut scratch = Vec::new();
255 for (i, row) in rows.iter().enumerate() {
256 scratch.clear();
257 let stats = if i % 2 == 0 { &mut stats1 } else { &mut stats2 };
258 stats.encode(ColumnsCodec::borrow_row(row), &mut scratch);
259 }
260
261 let merged = ColumnsCodec::new_from([&stats1, &stats2]);
262 let safe = stats1.new_safe();
263 for mut codec in [merged, safe] {
264 let mut compressed_any = false;
265 for row in &rows {
266 let mut buf = Vec::new();
267 codec.encode(ColumnsCodec::borrow_row(row), &mut buf);
268
269 let decoded = codec.decode(&buf).collect::<Vec<_>>();
270 let expected = ColumnsCodec::borrow_row(row).collect::<Vec<_>>();
271 assert_eq!(decoded, expected, "round-trip mismatch for {row:?}");
272
273 compressed_any |= buf.len() < row.data().len();
274 }
275 assert!(
276 compressed_any,
277 "dictionary never engaged; test no longer covers the compressed path",
278 );
279 }
280 }
281
282 #[mz_ore::test]
303 #[cfg_attr(miri, ignore)] fn test_safe_codec_merge_bitmap_carryover() {
305 use crate::row_codec::ColumnsCodec;
306
307 let short_rows: Vec<Row> = (0..256)
311 .map(|i| Row::pack_slice(&[Datum::String(&format!("s{i}"))]))
312 .collect();
313 let long_values: Vec<String> = (0..64).map(|i| format!("{i:0>300}")).collect();
319
320 let mut stats = ColumnsCodec::default();
322 let mut scratch = Vec::new();
323 for row in &short_rows {
324 scratch.clear();
325 stats.encode(ColumnsCodec::borrow_row(row), &mut scratch);
326 }
327
328 let mut safe = stats.new_safe();
331 for _ in 0..8 {
332 for v in &long_values {
333 let row = Row::pack_slice(&[Datum::String(v)]);
334 scratch.clear();
335 safe.encode(ColumnsCodec::borrow_row(&row), &mut scratch);
336 }
337 }
338
339 let mut merged = ColumnsCodec::new_from([&safe]);
343 for row in &short_rows {
344 let mut buf = Vec::new();
345 merged.encode(ColumnsCodec::borrow_row(row), &mut buf);
346 let decoded = merged.decode(&buf).collect::<Vec<_>>();
347 let expected = ColumnsCodec::borrow_row(row).collect::<Vec<_>>();
348 assert_eq!(decoded, expected, "round-trip mismatch for {row:?}");
349 }
350 }
351
352 #[mz_ore::test]
358 fn test_safe_tag_base() {
359 use crate::row_codec::SAFE_TAG_BASE;
360 let check = |datum: Datum| {
361 let row = Row::pack_slice(&[datum]);
362 let data = row.data();
363 assert!(!data.is_empty(), "empty encoding for {datum:?}");
364 assert!(
365 data[0] < SAFE_TAG_BASE,
366 "datum {datum:?} encodes with first byte {} >= SAFE_TAG_BASE ({}); \
367 a new row tag has crossed the safe boundary",
368 data[0],
369 SAFE_TAG_BASE,
370 );
371 };
372 for ty in SqlScalarType::enumerate().iter() {
373 for datum in ty.interesting_datums() {
374 check(datum);
375 }
376 }
377 }
378}
379
380mod bytes_container {
382
383 use differential_dataflow::trace::implementations::BatchContainer;
384 use timely::container::PushInto;
385
386 use mz_ore::region::Region;
387
388 pub struct BytesContainer {
390 length: usize,
392 batches: Vec<BytesBatch>,
393 }
394
395 impl BytesContainer {
396 #[inline]
398 pub fn heap_size(&self, mut callback: impl FnMut(usize, usize)) {
399 callback(
401 self.batches.len() * std::mem::size_of::<BytesBatch>(),
402 self.batches.capacity() * std::mem::size_of::<BytesBatch>(),
403 );
404 for batch in self.batches.iter() {
405 batch.offsets.heap_size(&mut callback);
406 callback(batch.storage.len(), batch.storage.capacity());
407 }
408 }
409 }
410
411 impl BatchContainer for BytesContainer {
412 type Owned = Vec<u8>;
413 type ReadItem<'a> = &'a [u8];
414
415 #[inline]
416 fn into_owned<'a>(item: Self::ReadItem<'a>) -> Self::Owned {
417 item.to_vec()
418 }
419
420 #[inline]
421 fn clone_onto<'a>(item: Self::ReadItem<'a>, other: &mut Self::Owned) {
422 other.clear();
423 other.extend_from_slice(item);
424 }
425
426 #[inline(always)]
427 fn push_ref(&mut self, item: Self::ReadItem<'_>) {
428 self.push_into(item);
429 }
430
431 #[inline(always)]
432 fn push_own(&mut self, item: &Self::Owned) {
433 self.push_into(item.as_slice())
434 }
435
436 fn clear(&mut self) {
437 self.batches.clear();
438 self.batches.push(BytesBatch::with_capacities(0, 0));
439 self.length = 0;
440 }
441
442 fn with_capacity(size: usize) -> Self {
443 Self {
444 length: 0,
445 batches: vec![BytesBatch::with_capacities(size, size)],
446 }
447 }
448
449 fn merge_capacity(cont1: &Self, cont2: &Self) -> Self {
450 let mut item_cap = 1;
451 let mut byte_cap = 0;
452 for batch in cont1.batches.iter() {
453 item_cap += batch.offsets.len() - 1;
454 byte_cap += batch.storage.len();
455 }
456 for batch in cont2.batches.iter() {
457 item_cap += batch.offsets.len() - 1;
458 byte_cap += batch.storage.len();
459 }
460 Self {
461 length: 0,
462 batches: vec![BytesBatch::with_capacities(item_cap, byte_cap)],
463 }
464 }
465
466 #[inline(always)]
467 fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b> {
468 item
469 }
470
471 #[inline]
472 fn index(&self, mut index: usize) -> Self::ReadItem<'_> {
473 for batch in self.batches.iter() {
474 if index < batch.len() {
475 return batch.index(index);
476 }
477 index -= batch.len();
478 }
479 panic!("Index out of bounds");
480 }
481
482 #[inline(always)]
483 fn len(&self) -> usize {
484 self.length
485 }
486 }
487
488 impl PushInto<&[u8]> for BytesContainer {
489 #[inline]
490 fn push_into(&mut self, item: &[u8]) {
491 self.length += 1;
492 if let Some(batch) = self.batches.last_mut() {
493 let success = batch.try_push(item);
494 if !success {
495 let item_cap = 2 * batch.offsets.len();
497 let byte_cap = std::cmp::max(2 * batch.storage.capacity(), item.len());
498 let mut new_batch = BytesBatch::with_capacities(item_cap, byte_cap);
499 assert!(new_batch.try_push(item));
500 self.batches.push(new_batch);
501 }
502 }
503 }
504 }
505
506 pub struct BytesBatch {
510 offsets: crate::OffsetOptimized,
511 storage: Region<u8>,
512 len: usize,
513 }
514
515 impl BytesBatch {
516 fn try_push(&mut self, slice: &[u8]) -> bool {
519 if self.storage.len() + slice.len() <= self.storage.capacity() {
520 self.storage.extend_from_slice(slice);
521 self.offsets.push_into(self.storage.len());
522 self.len += 1;
523 true
524 } else {
525 false
526 }
527 }
528 #[inline]
529 fn index(&self, index: usize) -> &[u8] {
530 let lower = self.offsets.index(index);
531 let upper = self.offsets.index(index + 1);
532 &self.storage[lower..upper]
533 }
534 #[inline(always)]
535 fn len(&self) -> usize {
536 debug_assert_eq!(self.len, self.offsets.len() - 1);
537 self.len
538 }
539
540 fn with_capacities(item_cap: usize, byte_cap: usize) -> Self {
541 let mut offsets = crate::OffsetOptimized::with_capacity(item_cap + 1);
543 offsets.push_into(0);
544 Self {
545 offsets,
546 storage: Region::new_auto(byte_cap.next_power_of_two()),
547 len: 0,
548 }
549 }
550 }
551}
552
553mod offset_opt {
554 use differential_dataflow::trace::implementations::BatchContainer;
555 use differential_dataflow::trace::implementations::OffsetList;
556 use timely::container::PushInto;
557
558 enum OffsetStride {
559 Empty,
560 Zero,
561 Striding(usize, usize),
562 Saturated(usize, usize, usize),
563 }
564
565 impl OffsetStride {
566 #[inline]
568 fn push(&mut self, item: usize) -> bool {
569 match self {
570 OffsetStride::Empty => {
571 if item == 0 {
572 *self = OffsetStride::Zero;
573 true
574 } else {
575 false
576 }
577 }
578 OffsetStride::Zero => {
579 *self = OffsetStride::Striding(item, 2);
580 true
581 }
582 OffsetStride::Striding(stride, count) => {
583 if item == *stride * *count {
584 *count += 1;
585 true
586 } else if item == *stride * (*count - 1) {
587 *self = OffsetStride::Saturated(*stride, *count, 1);
588 true
589 } else {
590 false
591 }
592 }
593 OffsetStride::Saturated(stride, count, reps) => {
594 if item == *stride * (*count - 1) {
595 *reps += 1;
596 true
597 } else {
598 false
599 }
600 }
601 }
602 }
603
604 #[inline]
605 fn index(&self, index: usize) -> usize {
606 match self {
607 OffsetStride::Empty => {
608 panic!("Empty OffsetStride")
609 }
610 OffsetStride::Zero => 0,
611 OffsetStride::Striding(stride, _steps) => *stride * index,
612 OffsetStride::Saturated(stride, steps, _reps) => {
613 if index < *steps {
614 *stride * index
615 } else {
616 *stride * (*steps - 1)
617 }
618 }
619 }
620 }
621
622 #[inline]
623 fn len(&self) -> usize {
624 match self {
625 OffsetStride::Empty => 0,
626 OffsetStride::Zero => 1,
627 OffsetStride::Striding(_stride, steps) => *steps,
628 OffsetStride::Saturated(_stride, steps, reps) => *steps + *reps,
629 }
630 }
631 }
632
633 pub struct OffsetOptimized {
634 strided: OffsetStride,
635 spilled: OffsetList,
636 }
637
638 impl BatchContainer for OffsetOptimized {
639 type Owned = usize;
640 type ReadItem<'a> = usize;
641
642 #[inline]
643 fn into_owned<'a>(item: Self::ReadItem<'a>) -> Self::Owned {
644 item
645 }
646
647 #[inline]
648 fn push_ref(&mut self, item: Self::ReadItem<'_>) {
649 self.push_into(item)
650 }
651
652 #[inline]
653 fn push_own(&mut self, item: &Self::Owned) {
654 self.push_into(*item)
655 }
656
657 fn clear(&mut self) {
658 self.strided = OffsetStride::Empty;
659 self.spilled.clear();
660 }
661
662 fn with_capacity(_size: usize) -> Self {
663 Self {
664 strided: OffsetStride::Empty,
665 spilled: OffsetList::with_capacity(0),
666 }
667 }
668
669 fn merge_capacity(_cont1: &Self, _cont2: &Self) -> Self {
670 Self {
671 strided: OffsetStride::Empty,
672 spilled: OffsetList::with_capacity(0),
673 }
674 }
675
676 #[inline]
677 fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b> {
678 item
679 }
680
681 #[inline]
682 fn index(&self, index: usize) -> Self::ReadItem<'_> {
683 if index < self.strided.len() {
684 self.strided.index(index)
685 } else {
686 self.spilled.index(index - self.strided.len())
687 }
688 }
689
690 #[inline]
691 fn len(&self) -> usize {
692 self.strided.len() + self.spilled.len()
693 }
694 }
695
696 impl PushInto<usize> for OffsetOptimized {
697 #[inline]
698 fn push_into(&mut self, item: usize) {
699 if !self.spilled.is_empty() {
700 self.spilled.push(item);
701 } else {
702 let inserted = self.strided.push(item);
703 if !inserted {
704 self.spilled.push(item);
705 }
706 }
707 }
708 }
709
710 impl OffsetOptimized {
711 pub fn heap_size(&self, callback: impl FnMut(usize, usize)) {
712 crate::offset_list_size(&self.spilled, callback);
713 }
714 }
715}
716
717#[inline]
719pub(crate) fn offset_list_size(data: &OffsetList, mut callback: impl FnMut(usize, usize)) {
720 #[inline(always)]
723 fn vec_size<T: Copy>(data: &Vec<T>, mut callback: impl FnMut(usize, usize)) {
724 let size_of_t = std::mem::size_of::<T>();
725 callback(data.len() * size_of_t, data.capacity() * size_of_t);
726 }
727
728 vec_size(&data.smol, &mut callback);
729 vec_size(&data.chonk, callback);
730}
731
732mod dictionary {
755
756 use differential_dataflow::trace::implementations::BatchContainer;
757
758 use mz_repr::{Row, RowRef};
759
760 use super::row_codec::{ColumnsCodec, ColumnsIter};
761
762 pub mod builders {
768
769 use columnation::Columnation;
770 use differential_dataflow::difference::Semigroup;
771 use differential_dataflow::lattice::Lattice;
772 use differential_dataflow::trace::Builder;
773 use differential_dataflow::trace::Description;
774 use differential_dataflow::trace::implementations::ord_neu::{OrdKeyBatch, OrdKeyBuilder};
775 use differential_dataflow::trace::implementations::ord_neu::{OrdValBatch, OrdValBuilder};
776 use mz_timely_util::columnation::ColumnationStack as TimelyStack;
777 use timely::progress::Timestamp;
778
779 use mz_repr::Row;
780
781 use super::super::row_codec::ColumnsCodec;
782 use super::{DatumContainer, DatumSeq};
783 use crate::DICTIONARY_COMPRESSION;
784 use crate::spines::{RowLayout, RowRowLayout, RowValLayout, ValRowLayout};
785
786 fn build_codec<'a>(rows: impl IntoIterator<Item = &'a Row>) -> Option<ColumnsCodec> {
790 if !DICTIONARY_COMPRESSION.load(std::sync::atomic::Ordering::Relaxed) {
791 return None;
792 }
793 let mut stats = ColumnsCodec::default();
794 let mut buf = Vec::default();
795 for row in rows {
796 if !row.is_empty() {
797 stats.encode(DatumSeq::borrow_as(row).bytes_iter(), &mut buf);
798 buf.clear();
799 }
800 }
801 Some(ColumnsCodec::new_from([&stats]))
802 }
803
804 pub struct RowRowBuilder<
805 T: Lattice + Timestamp + Columnation,
806 R: Ord + Semigroup + Columnation + 'static,
807 > {
808 inner: OrdValBuilder<RowRowLayout<((Row, Row), T, R)>, TimelyStack<((Row, Row), T, R)>>,
809 }
810
811 impl<T: Lattice + Timestamp + Columnation, R: Ord + Semigroup + Columnation + 'static>
812 Builder for RowRowBuilder<T, R>
813 {
814 type Input = TimelyStack<((Row, Row), T, R)>;
815 type Time = T;
816 type Output = OrdValBatch<RowRowLayout<((Row, Row), T, R)>>;
817
818 fn with_capacity(keys: usize, vals: usize, upds: usize) -> Self {
819 Self {
820 inner: Builder::with_capacity(keys, vals, upds),
821 }
822 }
823 fn push(&mut self, chunk: &mut Self::Input) {
824 self.inner.push(chunk)
825 }
826 fn done(self, description: Description<Self::Time>) -> Self::Output {
827 self.inner.done(description)
828 }
829 fn seal(
830 chain: &mut Vec<Self::Input>,
831 description: Description<Self::Time>,
832 ) -> Self::Output {
833 let key_codec = build_codec(
834 chain
835 .iter()
836 .flat_map(|link| link.iter().map(|((k, _), _, _)| k)),
837 );
838 let val_codec = build_codec(
839 chain
840 .iter()
841 .flat_map(|link| link.iter().map(|((_, v), _, _)| v)),
842 );
843
844 use differential_dataflow::trace::implementations::BuilderInput;
845
846 let (keys, vals, upds) = <Self::Input as BuilderInput<
847 DatumContainer,
848 DatumContainer,
849 >>::key_val_upd_counts(&chain[..]);
850 let mut builder = Self::with_capacity(keys, vals, upds);
851 builder.inner.result.keys.codec = key_codec;
855 builder.inner.result.keys.stats = None;
856 builder.inner.result.vals.vals.codec = val_codec;
857 builder.inner.result.vals.vals.stats = None;
858
859 for mut chunk in chain.drain(..) {
860 builder.push(&mut chunk);
861 }
862
863 builder.done(description)
864 }
865 }
866
867 pub struct RowValBuilder<
868 V: Ord + Clone + Columnation + 'static,
869 T: Lattice + Timestamp + Columnation,
870 R: Ord + Semigroup + Columnation + 'static,
871 > {
872 inner: OrdValBuilder<RowValLayout<((Row, V), T, R)>, TimelyStack<((Row, V), T, R)>>,
873 }
874
875 impl<
876 V: Ord + Clone + Columnation,
877 T: Lattice + Timestamp + Columnation,
878 R: Ord + Semigroup + Columnation + 'static,
879 > Builder for RowValBuilder<V, T, R>
880 {
881 type Input = TimelyStack<((Row, V), T, R)>;
882 type Time = T;
883 type Output = OrdValBatch<RowValLayout<((Row, V), T, R)>>;
884
885 fn with_capacity(keys: usize, vals: usize, upds: usize) -> Self {
886 Self {
887 inner: Builder::with_capacity(keys, vals, upds),
888 }
889 }
890 fn push(&mut self, chunk: &mut Self::Input) {
891 self.inner.push(chunk)
892 }
893 fn done(self, description: Description<Self::Time>) -> Self::Output {
894 self.inner.done(description)
895 }
896 fn seal(
897 chain: &mut Vec<Self::Input>,
898 description: Description<Self::Time>,
899 ) -> Self::Output {
900 let key_codec = build_codec(
901 chain
902 .iter()
903 .flat_map(|link| link.iter().map(|((k, _), _, _)| k)),
904 );
905
906 use differential_dataflow::trace::implementations::BuilderInput;
907
908 let (keys, vals, upds) = <Self::Input as BuilderInput<
909 DatumContainer,
910 TimelyStack<V>,
911 >>::key_val_upd_counts(&chain[..]);
912 let mut builder = Self::with_capacity(keys, vals, upds);
913 builder.inner.result.keys.codec = key_codec;
915 builder.inner.result.keys.stats = None;
916
917 for mut chunk in chain.drain(..) {
918 builder.push(&mut chunk);
919 }
920
921 builder.done(description)
922 }
923 }
924
925 pub struct RowBuilder<
926 T: Lattice + Timestamp + Columnation,
927 R: Ord + Semigroup + Columnation + 'static,
928 > {
929 inner: OrdKeyBuilder<RowLayout<((Row, ()), T, R)>, TimelyStack<((Row, ()), T, R)>>,
930 }
931
932 impl<T: Lattice + Timestamp + Columnation, R: Ord + Semigroup + Columnation + 'static>
933 Builder for RowBuilder<T, R>
934 {
935 type Input = TimelyStack<((Row, ()), T, R)>;
936 type Time = T;
937 type Output = OrdKeyBatch<RowLayout<((Row, ()), T, R)>>;
938
939 fn with_capacity(keys: usize, vals: usize, upds: usize) -> Self {
940 Self {
941 inner: Builder::with_capacity(keys, vals, upds),
942 }
943 }
944 fn push(&mut self, chunk: &mut Self::Input) {
945 self.inner.push(chunk)
946 }
947 fn done(self, description: Description<Self::Time>) -> Self::Output {
948 self.inner.done(description)
949 }
950 fn seal(
951 chain: &mut Vec<Self::Input>,
952 description: Description<Self::Time>,
953 ) -> Self::Output {
954 let key_codec = build_codec(
955 chain
956 .iter()
957 .flat_map(|link| link.iter().map(|((k, _), _, _)| k)),
958 );
959
960 use differential_dataflow::trace::implementations::BuilderInput;
961
962 let (keys, vals, upds) = <Self::Input as BuilderInput<
963 DatumContainer,
964 TimelyStack<()>,
965 >>::key_val_upd_counts(&chain[..]);
966 let mut builder = Self::with_capacity(keys, vals, upds);
967 builder.inner.result.keys.codec = key_codec;
969 builder.inner.result.keys.stats = None;
970
971 for mut chunk in chain.drain(..) {
972 builder.push(&mut chunk);
973 }
974
975 builder.done(description)
976 }
977 }
978
979 pub struct ValRowBuilder<
983 K: Ord + Clone + Columnation + 'static,
984 T: Lattice + Timestamp + Columnation,
985 R: Ord + Semigroup + Columnation + 'static,
986 > {
987 inner: OrdValBuilder<ValRowLayout<((K, Row), T, R)>, TimelyStack<((K, Row), T, R)>>,
988 }
989
990 impl<
991 K: Ord + Clone + Columnation,
992 T: Lattice + Timestamp + Columnation,
993 R: Ord + Semigroup + Columnation + 'static,
994 > Builder for ValRowBuilder<K, T, R>
995 {
996 type Input = TimelyStack<((K, Row), T, R)>;
997 type Time = T;
998 type Output = OrdValBatch<ValRowLayout<((K, Row), T, R)>>;
999
1000 fn with_capacity(keys: usize, vals: usize, upds: usize) -> Self {
1001 Self {
1002 inner: Builder::with_capacity(keys, vals, upds),
1003 }
1004 }
1005 fn push(&mut self, chunk: &mut Self::Input) {
1006 self.inner.push(chunk)
1007 }
1008 fn done(self, description: Description<Self::Time>) -> Self::Output {
1009 self.inner.done(description)
1010 }
1011 fn seal(
1012 chain: &mut Vec<Self::Input>,
1013 description: Description<Self::Time>,
1014 ) -> Self::Output {
1015 let val_codec = build_codec(
1016 chain
1017 .iter()
1018 .flat_map(|link| link.iter().map(|((_, v), _, _)| v)),
1019 );
1020
1021 use differential_dataflow::trace::implementations::BuilderInput;
1022
1023 let (keys, vals, upds) = <Self::Input as BuilderInput<
1024 TimelyStack<K>,
1025 DatumContainer,
1026 >>::key_val_upd_counts(&chain[..]);
1027 let mut builder = Self::with_capacity(keys, vals, upds);
1028 builder.inner.result.vals.vals.codec = val_codec;
1030 builder.inner.result.vals.vals.stats = None;
1031
1032 for mut chunk in chain.drain(..) {
1033 builder.push(&mut chunk);
1034 }
1035
1036 builder.done(description)
1037 }
1038 }
1039 }
1040
1041 pub struct DatumContainer {
1042 codec: Option<ColumnsCodec>,
1045 inner: super::bytes_container::BytesContainer,
1047 staging: Vec<u8>,
1049 stats: Option<ColumnsCodec>,
1052 }
1053
1054 impl BatchContainer for DatumContainer {
1055 type Owned = Row;
1056 type ReadItem<'a> = DatumSeq<'a>;
1057
1058 fn with_capacity(size: usize) -> Self {
1059 let stats = if crate::DICTIONARY_COMPRESSION.load(std::sync::atomic::Ordering::Relaxed)
1060 {
1061 Some(Default::default())
1062 } else {
1063 None
1064 };
1065
1066 Self {
1067 codec: None,
1068 inner: BatchContainer::with_capacity(size),
1069 staging: Vec::new(),
1070 stats,
1071 }
1072 }
1073 fn merge_capacity(cont1: &Self, cont2: &Self) -> Self {
1074 let codec = match (&cont1.codec, &cont2.codec) {
1081 (Some(c1), Some(c2)) => Some(ColumnsCodec::new_from([c1, c2])),
1082 _ => None,
1083 };
1084
1085 Self {
1086 codec,
1087 inner: BatchContainer::merge_capacity(&cont1.inner, &cont2.inner),
1088 staging: Vec::new(),
1089 stats: None,
1090 }
1091 }
1092 #[inline]
1093 fn index(&self, index: usize) -> Self::ReadItem<'_> {
1094 let data = self.inner.index(index);
1095 let iter = if let Some(codec) = &self.codec {
1096 codec.decode(data)
1097 } else {
1098 unsafe { ColumnsIter::without_codec(data) }
1102 };
1103 DatumSeq { iter }
1104 }
1105 #[inline(always)]
1106 fn len(&self) -> usize {
1107 self.inner.len()
1108 }
1109
1110 #[inline(always)]
1111 fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b> {
1112 item
1113 }
1114
1115 #[inline(always)]
1116 fn into_owned<'a>(item: Self::ReadItem<'a>) -> Self::Owned {
1117 if item.iter.index.is_none() {
1119 return unsafe { Row::from_bytes_unchecked(item.iter.data) };
1121 }
1122 Row::pack(item)
1123 }
1124
1125 #[inline(always)]
1126 fn clone_onto<'a>(item: Self::ReadItem<'a>, other: &mut Self::Owned) {
1127 if item.iter.index.is_none() {
1129 let mut packer = other.packer();
1130 unsafe { packer.extend_by_slice_unchecked(item.iter.data) };
1132 return;
1133 }
1134 other.packer().extend(item);
1135 }
1136
1137 #[inline(always)]
1138 fn push_ref(&mut self, item: Self::ReadItem<'_>) {
1139 if self.codec.is_none() && self.stats.is_none() && item.iter.index.is_none() {
1141 self.inner.push_ref(item.iter.data);
1142 return;
1143 }
1144 self.push_into(item);
1145 }
1146
1147 #[inline(always)]
1148 fn push_own(&mut self, item: &Self::Owned) {
1149 if self.codec.is_none() && self.stats.is_none() {
1151 self.inner.push_ref(item.data());
1152 return;
1153 }
1154 self.push_into(item);
1155 }
1156
1157 #[inline(always)]
1158 fn clear(&mut self) {
1159 self.inner.clear();
1160 self.staging.clear();
1161 self.codec = None;
1167 self.stats = if crate::DICTIONARY_COMPRESSION.load(std::sync::atomic::Ordering::Relaxed)
1168 {
1169 Some(Default::default())
1170 } else {
1171 None
1172 };
1173 }
1174 }
1175
1176 impl DatumContainer {
1177 #[inline]
1179 pub fn heap_size(&self, mut callback: impl FnMut(usize, usize)) {
1180 self.inner.heap_size(&mut callback);
1181 callback(self.staging.len(), self.staging.capacity());
1184 if let Some(codec) = &self.codec {
1185 codec.heap_size(&mut callback);
1186 }
1187 if let Some(stats) = &self.stats {
1188 stats.heap_size(&mut callback);
1189 }
1190 }
1191 }
1192
1193 use timely::container::PushInto;
1194 impl PushInto<Row> for DatumContainer {
1195 #[inline(always)]
1196 fn push_into(&mut self, item: Row) {
1197 self.push_into(&item);
1198 }
1199 }
1200
1201 impl PushInto<&Row> for DatumContainer {
1202 #[inline(always)]
1203 fn push_into(&mut self, item: &Row) {
1204 self.push_into(DatumSeq::borrow_as(item));
1205 }
1206 }
1207
1208 impl PushInto<&RowRef> for DatumContainer {
1209 #[inline(always)]
1210 fn push_into(&mut self, item: &RowRef) {
1211 self.push_into(DatumSeq::borrow_as(item));
1212 }
1213 }
1214
1215 const STATS_THRESHOLD: usize = 64 * 1024;
1226
1227 impl PushInto<DatumSeq<'_>> for DatumContainer {
1228 #[inline]
1229 fn push_into(&mut self, item: DatumSeq<'_>) {
1230 if self.codec.is_none() && self.stats.is_none() && item.iter.index.is_none() {
1233 self.inner.push_ref(item.iter.data);
1234 return;
1235 }
1236
1237 if self.codec.is_none() && self.stats.is_some() && self.inner.len() >= STATS_THRESHOLD {
1239 let stats = self.stats.take().unwrap();
1240 self.codec = Some(stats.new_safe());
1241 }
1242
1243 if let Some(codec) = &mut self.codec {
1244 codec.encode(item.bytes_iter(), &mut self.staging);
1246 } else if let Some(stats) = &mut self.stats {
1247 stats.observe(item.bytes_iter());
1252 for slice in item.bytes_iter() {
1253 self.staging.extend_from_slice(slice);
1254 }
1255 } else {
1256 for slice in item.bytes_iter() {
1258 self.staging.extend_from_slice(slice);
1259 }
1260 }
1261 self.inner.push_ref(&self.staging[..]);
1262 self.staging.clear();
1263 }
1264 }
1265
1266 use mz_repr::{Datum, read_datum};
1267
1268 #[derive(Debug)]
1275 pub struct DatumSeq<'a> {
1276 pub iter: ColumnsIter<'a>,
1277 }
1278
1279 impl<'a> DatumSeq<'a> {
1280 #[inline(always)]
1281 fn borrow_as(other: &'a RowRef) -> Self {
1282 Self {
1283 iter: ColumnsCodec::borrow_row(other),
1284 }
1285 }
1286
1287 #[inline]
1290 pub fn from_row(row: &'a Row) -> Self {
1291 Self::borrow_as(row)
1292 }
1293
1294 #[inline]
1295 pub fn to_row(&self) -> Row {
1296 if self.iter.index.is_none() {
1298 return unsafe { Row::from_bytes_unchecked(self.iter.data) };
1299 }
1300 Row::pack(*self)
1301 }
1302 }
1303
1304 impl<'a> Copy for DatumSeq<'a> {}
1305 impl<'a> Clone for DatumSeq<'a> {
1306 #[inline(always)]
1307 fn clone(&self) -> Self {
1308 *self
1309 }
1310 }
1311
1312 use std::cmp::Ordering;
1313 impl<'a, 'b> PartialEq<DatumSeq<'a>> for DatumSeq<'b> {
1314 #[inline(always)]
1315 fn eq(&self, other: &DatumSeq<'a>) -> bool {
1316 if self.iter.index.is_none() && other.iter.index.is_none() {
1318 return self.iter.data == other.iter.data;
1319 }
1320 Iterator::eq(self.iter, other.iter)
1321 }
1322 }
1323 impl<'a> Eq for DatumSeq<'a> {}
1324 impl<'a, 'b> PartialOrd<DatumSeq<'a>> for DatumSeq<'b> {
1325 #[inline(always)]
1326 fn partial_cmp(&self, other: &DatumSeq<'a>) -> Option<Ordering> {
1327 if self.iter.index.is_none() && other.iter.index.is_none() {
1329 let left = self.iter.data;
1330 let right = other.iter.data;
1331 return Some(match left.len().cmp(&right.len()) {
1332 Ordering::Equal => left.cmp(right),
1333 other => other,
1334 });
1335 }
1336 let mut left = self.iter.flatten();
1349 let mut right = other.iter.flatten();
1350 let mut first_diff = Ordering::Equal;
1351 loop {
1352 match (left.next(), right.next()) {
1353 (Some(l), Some(r)) => {
1354 if first_diff == Ordering::Equal {
1355 first_diff = l.cmp(r);
1356 }
1357 }
1358 (None, Some(_)) => return Some(Ordering::Less),
1360 (Some(_), None) => return Some(Ordering::Greater),
1362 (None, None) => return Some(first_diff),
1364 }
1365 }
1366 }
1367 }
1368 impl<'a> Ord for DatumSeq<'a> {
1369 #[inline(always)]
1370 fn cmp(&self, other: &Self) -> Ordering {
1371 self.partial_cmp(other).unwrap()
1372 }
1373 }
1374
1375 impl<'a> PartialEq<&'a Row> for DatumSeq<'a> {
1376 #[inline(always)]
1377 fn eq(&self, other: &&'a Row) -> bool {
1378 self.eq(&Self::borrow_as(*other))
1379 }
1380 }
1381
1382 impl<'a, 'b> PartialEq<&'b RowRef> for DatumSeq<'a> {
1386 #[inline(always)]
1387 fn eq(&self, other: &&'b RowRef) -> bool {
1388 self.eq(&DatumSeq::borrow_as(*other))
1389 }
1390 }
1391
1392 impl<'a> DatumSeq<'a> {
1393 #[inline(always)]
1394 pub fn bytes_iter(self) -> ColumnsIter<'a> {
1395 self.iter
1396 }
1397 }
1398
1399 impl<'a> Iterator for DatumSeq<'a> {
1400 type Item = Datum<'a>;
1401 #[inline(always)]
1402 fn next(&mut self) -> Option<Self::Item> {
1403 self.iter
1409 .next()
1410 .map(|mut bytes| unsafe { read_datum(&mut bytes) })
1411 }
1412 }
1413
1414 use mz_repr::RowArena;
1415 use mz_repr::fixed_length::ExtendDatums;
1416 impl<'long> ExtendDatums for DatumSeq<'long> {
1417 #[inline]
1418 fn extend_datums<'a>(
1419 &'a self,
1420 _arena: &'a RowArena,
1421 target: &mut Vec<Datum<'a>>,
1422 max: Option<usize>,
1423 ) {
1424 if self.iter.index.is_none() {
1430 let mut data = self.iter.data;
1431 match max {
1432 Some(max) => {
1433 let mut n = 0;
1434 while n < max && !data.is_empty() {
1435 target.push(unsafe { read_datum(&mut data) });
1436 n += 1;
1437 }
1438 }
1439 None => {
1440 while !data.is_empty() {
1441 target.push(unsafe { read_datum(&mut data) });
1442 }
1443 }
1444 }
1445 } else {
1446 match max {
1447 Some(max) => target.extend((*self).take(max)),
1448 None => target.extend(*self),
1449 }
1450 }
1451 }
1452 }
1453}
1454
1455mod row_codec {
1460
1461 pub use self::misra_gries::MisraGries;
1462 pub use columns::{ColumnsCodec, ColumnsIter};
1463 pub use dictionary::DictionaryCodec;
1464 #[cfg(test)]
1465 pub use dictionary::SAFE_TAG_BASE;
1466
1467 mod columns {
1475
1476 use mz_repr::{RowRef, read_datum};
1477
1478 use super::DictionaryCodec;
1479
1480 #[derive(Default, Debug)]
1482 pub struct ColumnsCodec {
1483 columns: Vec<DictionaryCodec>,
1484 }
1485
1486 impl ColumnsCodec {
1487 pub(crate) fn decode<'a>(&'a self, bytes: &'a [u8]) -> ColumnsIter<'a> {
1489 ColumnsIter {
1490 index: Some(self),
1491 column: 0,
1492 data: bytes,
1493 }
1494 }
1495 pub(crate) fn encode<'a, I>(&mut self, iter: I, output: &mut Vec<u8>)
1497 where
1498 I: IntoIterator<Item = &'a [u8]>,
1499 {
1500 for (index, bytes) in iter.into_iter().enumerate() {
1501 if self.columns.len() <= index {
1502 self.columns.push(Default::default());
1503 }
1504 self.columns[index].encode(std::iter::once(bytes), output);
1505 }
1506 }
1507
1508 pub(crate) fn new_from<'a>(stats: impl IntoIterator<Item = &'a Self>) -> Self {
1510 let stats = stats.into_iter().collect::<Vec<_>>();
1513 let cols = stats.iter().map(|s| s.columns.len()).max().unwrap_or(0);
1514 let mut columns = Vec::with_capacity(cols);
1515 let default: DictionaryCodec = Default::default();
1516 for index in 0..cols {
1517 columns.push(DictionaryCodec::new_from(
1518 stats
1519 .iter()
1520 .map(|s| s.columns.get(index).unwrap_or(&default)),
1521 ));
1522 }
1523 Self { columns }
1524 }
1525
1526 #[inline(always)]
1528 pub(crate) fn borrow_row(row: &RowRef) -> ColumnsIter<'_> {
1529 ColumnsIter {
1530 index: None,
1531 column: 0,
1532 data: row.data(),
1533 }
1534 }
1535 }
1536
1537 impl ColumnsCodec {
1538 pub(crate) fn heap_size(&self, callback: &mut impl FnMut(usize, usize)) {
1540 let elem = std::mem::size_of::<DictionaryCodec>();
1541 callback(self.columns.len() * elem, self.columns.capacity() * elem);
1542 for column in &self.columns {
1543 column.heap_size(callback);
1544 }
1545 }
1546 }
1547
1548 impl ColumnsCodec {
1549 #[inline]
1555 pub(crate) fn observe<'a, I>(&mut self, iter: I)
1556 where
1557 I: IntoIterator<Item = &'a [u8]>,
1558 {
1559 for (index, bytes) in iter.into_iter().enumerate() {
1560 if self.columns.len() <= index {
1561 self.columns.push(Default::default());
1562 }
1563 self.columns[index].observe(bytes);
1564 }
1565 }
1566 }
1567
1568 impl ColumnsCodec {
1569 pub(crate) fn new_safe(self) -> Self {
1575 let columns = self
1576 .columns
1577 .into_iter()
1578 .map(DictionaryCodec::new_safe)
1579 .collect();
1580 Self { columns }
1581 }
1582 }
1583
1584 #[derive(Debug, Copy, Clone)]
1585 pub struct ColumnsIter<'a> {
1586 pub index: Option<&'a ColumnsCodec>,
1588 pub column: usize,
1589 pub data: &'a [u8],
1590 }
1591
1592 impl<'a> Iterator for ColumnsIter<'a> {
1593 type Item = &'a [u8];
1594 #[inline(always)]
1595 fn next(&mut self) -> Option<Self::Item> {
1596 if self.data.is_empty() {
1597 None
1598 } else if let Some(bytes) = self
1599 .index
1600 .as_ref()
1601 .and_then(|i| i.columns.get(self.column))
1602 .and_then(|i| i.decode.get(self.data[0].into()))
1603 {
1604 self.data = &self.data[1..];
1605 self.column += 1;
1606 Some(bytes)
1607 } else {
1608 let mut data = self.data;
1609 let data_len = data.len();
1610 unsafe {
1611 read_datum(&mut data);
1612 }
1613 let (prev, next) = self.data.split_at(data_len - data.len());
1614 self.data = next;
1615 self.column += 1;
1616 Some(prev)
1617 }
1618 }
1619 }
1620
1621 impl<'a> ColumnsIter<'a> {
1622 #[inline(always)]
1626 pub unsafe fn without_codec(data: &'a [u8]) -> Self {
1627 Self {
1628 index: None,
1629 column: 0,
1630 data,
1631 }
1632 }
1633 }
1634 }
1635
1636 mod dictionary {
1651
1652 use std::collections::BTreeMap;
1653
1654 pub use super::{BytesMap, MisraGries};
1655
1656 pub const SAFE_TAG_BASE: u8 = 122;
1668
1669 #[derive(Default, Debug)]
1673 pub struct DictionaryCodec {
1674 encode: BTreeMap<Vec<u8>, u8>,
1675 pub decode: BytesMap,
1676 stats: (MisraGries<Vec<u8>>, [u64; 4]),
1677 }
1678
1679 impl DictionaryCodec {
1680 pub(super) fn encode<'a, I>(&mut self, iter: I, output: &mut Vec<u8>)
1687 where
1688 I: IntoIterator<Item = &'a [u8]>,
1689 {
1690 for bytes in iter.into_iter() {
1691 debug_assert!(
1692 !bytes.is_empty(),
1693 "row encoding never yields empty column slices",
1694 );
1695 if let Some(b) = self.encode.get(bytes) {
1697 output.push(*b);
1698 } else {
1699 debug_assert!(
1709 self.decode.get(bytes[0].into()).is_none(),
1710 "raw datum first-byte {} collides with a dictionary tag; \
1711 decode would be ambiguous",
1712 bytes[0],
1713 );
1714 output.extend(bytes);
1715 }
1716 self.observe(bytes);
1717 }
1718 }
1719
1720 pub(super) fn new_from<'a>(stats: impl IntoIterator<Item = &'a Self>) -> Self {
1722 let mut mg = MisraGries::default();
1724 let mut tags: [u64; 4] = [0; 4];
1725 for stat in stats.into_iter() {
1726 for (thing, count) in stat.stats.0.clone().done() {
1727 mg.update(thing, count);
1728 }
1729 tags[0] |= stat.stats.1[0];
1730 tags[1] |= stat.stats.1[1];
1731 tags[2] |= stat.stats.1[2];
1732 tags[3] |= stat.stats.1[3];
1733 }
1734 let mut mg = mg
1735 .done()
1736 .into_iter()
1737 .filter(|(next_bytes, count)| next_bytes.len() > 1 && count > &1);
1738 let mut encode = BTreeMap::new();
1740 let mut decode = BytesMap::default();
1741 for tag in 0..=255 {
1742 let tag_idx: usize = (tag % 4).into();
1743 let shift = tag >> 2;
1744 if (tags[tag_idx] >> shift) & 0x01 != 0 {
1745 decode.push(None);
1747 } else if let Some((next_bytes, _count)) = mg.next() {
1748 decode.push(Some(&next_bytes[..]));
1749 encode.insert(next_bytes, tag);
1750 } else {
1751 decode.push(None);
1756 }
1757 }
1758
1759 Self {
1760 encode,
1761 decode,
1762 stats: (MisraGries::default(), [0u64; 4]),
1763 }
1764 }
1765 }
1766
1767 impl DictionaryCodec {
1768 pub fn heap_size(&self, callback: &mut impl FnMut(usize, usize)) {
1774 let entry = std::mem::size_of::<(Vec<u8>, u8)>();
1775 callback(self.encode.len() * entry, self.encode.len() * entry);
1776 for key in self.encode.keys() {
1777 callback(key.len(), key.capacity());
1778 }
1779 self.decode.heap_size(callback);
1780 self.stats.0.heap_size(callback);
1781 }
1782
1783 #[inline]
1805 pub fn observe(&mut self, bytes: &[u8]) {
1806 debug_assert!(
1807 !bytes.is_empty(),
1808 "row encoding never yields empty column slices",
1809 );
1810 let tag = bytes[0];
1811 let tag_idx: usize = (tag % 4).into();
1812 self.stats.1[tag_idx] |= 1 << (tag >> 2);
1813 self.stats.0.insert_ref(bytes);
1814 }
1815
1816 pub(super) fn new_safe(stats: Self) -> Self {
1820 let (mg, observed_tags) = stats.stats;
1831 let mut mg = mg
1832 .done()
1833 .into_iter()
1834 .filter(|(next_bytes, count)| next_bytes.len() > 1 && count > &1);
1835 let mut encode = BTreeMap::new();
1836 let mut decode = BytesMap::default();
1837 for _ in 0..SAFE_TAG_BASE {
1839 decode.push(None);
1840 }
1841 for tag in SAFE_TAG_BASE..=255 {
1843 if let Some((next_bytes, _count)) = mg.next() {
1844 decode.push(Some(&next_bytes[..]));
1845 encode.insert(next_bytes, tag);
1846 }
1847 }
1848 Self {
1849 encode,
1850 decode,
1851 stats: (MisraGries::default(), observed_tags),
1852 }
1853 }
1854 }
1855 }
1856
1857 #[derive(Debug)]
1862 pub struct BytesMap {
1863 offsets: Vec<usize>,
1864 bytes: Vec<u8>,
1865 }
1866 impl Default for BytesMap {
1867 #[inline(always)]
1868 fn default() -> Self {
1869 Self {
1870 offsets: vec![0],
1871 bytes: Vec::new(),
1872 }
1873 }
1874 }
1875 impl BytesMap {
1876 #[inline]
1877 fn push(&mut self, input: Option<&[u8]>) {
1878 if let Some(bytes) = input {
1879 self.bytes.extend(bytes);
1880 }
1881 self.offsets.push(self.bytes.len());
1882 }
1883 fn heap_size(&self, callback: &mut impl FnMut(usize, usize)) {
1885 let off = std::mem::size_of::<usize>();
1886 callback(self.offsets.len() * off, self.offsets.capacity() * off);
1887 callback(self.bytes.len(), self.bytes.capacity());
1888 }
1889 #[inline]
1890 fn get(&self, index: usize) -> Option<&[u8]> {
1891 if index < self.offsets.len() - 1 {
1892 let lower = self.offsets[index];
1893 let upper = self.offsets[index + 1];
1894 if lower < upper {
1895 Some(&self.bytes[lower..upper])
1896 } else {
1897 None
1898 }
1899 } else {
1900 None
1901 }
1902 }
1903 }
1904
1905 mod misra_gries {
1906
1907 use std::collections::BTreeMap;
1908
1909 #[derive(Clone, Debug)]
1916 pub struct MisraGries<T: Ord> {
1917 inner: BTreeMap<T, usize>,
1918 k: usize,
1919 }
1920
1921 impl<T: Ord> Default for MisraGries<T> {
1922 #[inline(always)]
1923 fn default() -> Self {
1924 Self {
1925 inner: BTreeMap::new(),
1926 k: 512,
1927 }
1928 }
1929 }
1930
1931 impl<T: Ord> MisraGries<T> {
1932 #[inline(always)]
1934 pub fn insert(&mut self, element: T) {
1935 self.update(element, 1);
1936 }
1937 #[inline]
1939 pub fn update(&mut self, element: T, count: usize) {
1940 *self.inner.entry(element).or_insert(0) += count;
1941 if self.inner.len() > 2 * self.k {
1942 self.tidy();
1943 }
1944 }
1945
1946 pub fn done(self) -> Vec<(T, usize)> {
1948 let mut result: Vec<_> = self.inner.into_iter().collect();
1949 result.sort_by(|x, y| y.1.cmp(&x.1));
1950 result
1951 }
1952
1953 fn tidy(&mut self) {
1957 let mut counts: Vec<usize> = self.inner.values().copied().collect();
1958 counts.sort_unstable_by(|a, b| b.cmp(a));
1959 let sub_weight = counts.get(self.k).copied().unwrap_or(0);
1961 if sub_weight > 0 {
1962 self.inner.retain(|_, count| {
1963 *count = count.saturating_sub(sub_weight);
1964 *count > 0
1965 });
1966 }
1967 }
1968 }
1969
1970 impl MisraGries<Vec<u8>> {
1971 pub fn heap_size(&self, callback: &mut impl FnMut(usize, usize)) {
1976 let entry = std::mem::size_of::<(Vec<u8>, usize)>();
1977 callback(self.inner.len() * entry, self.inner.len() * entry);
1978 for key in self.inner.keys() {
1979 callback(key.len(), key.capacity());
1980 }
1981 }
1982
1983 #[inline]
1985 pub fn insert_ref(&mut self, element: &[u8]) {
1986 if let Some(count) = self.inner.get_mut(element) {
1987 *count += 1;
1988 } else {
1989 self.insert(element.to_owned());
1990 }
1991 }
1992 }
1993
1994 impl<T: Ord> std::ops::AddAssign for MisraGries<T> {
1995 fn add_assign(&mut self, rhs: Self) {
1996 for (element, count) in rhs.done() {
1997 self.update(element, count);
1998 }
1999 }
2000 }
2001 }
2002}