use std::collections::VecDeque;
use timely::Container;
use timely::container::columnation::{Columnation, TimelyStack};
use timely::container::{ContainerBuilder, PushInto, SizableContainer};
use crate::consolidation::{consolidate_updates, consolidate_container, ConsolidateLayout};
use crate::difference::Semigroup;
pub struct VecChunker<T> {
pending: Vec<T>,
ready: VecDeque<Vec<T>>,
empty: Option<Vec<T>>,
}
impl<T> Default for VecChunker<T> {
fn default() -> Self {
Self {
pending: Vec::default(),
ready: VecDeque::default(),
empty: None,
}
}
}
impl<K, V, T, R> VecChunker<((K, V), T, R)>
where
K: Ord,
V: Ord,
T: Ord,
R: Semigroup,
{
const BUFFER_SIZE_BYTES: usize = 8 << 10;
fn chunk_capacity() -> usize {
let size = ::std::mem::size_of::<((K, V), T, R)>();
if size == 0 {
Self::BUFFER_SIZE_BYTES
} else if size <= Self::BUFFER_SIZE_BYTES {
Self::BUFFER_SIZE_BYTES / size
} else {
1
}
}
fn form_chunk(&mut self) {
consolidate_updates(&mut self.pending);
if self.pending.len() >= Self::chunk_capacity() {
while self.pending.len() > Self::chunk_capacity() {
let mut chunk = Vec::with_capacity(Self::chunk_capacity());
chunk.extend(self.pending.drain(..chunk.capacity()));
self.ready.push_back(chunk);
}
}
}
}
impl<'a, K, V, T, R> PushInto<&'a mut Vec<((K, V), T, R)>> for VecChunker<((K, V), T, R)>
where
K: Ord + Clone,
V: Ord + Clone,
T: Ord + Clone,
R: Semigroup + Clone,
{
fn push_into(&mut self, container: &'a mut Vec<((K, V), T, R)>) {
if self.pending.capacity() < Self::chunk_capacity() * 2 {
self.pending.reserve(Self::chunk_capacity() * 2 - self.pending.len());
}
let mut drain = container.drain(..).peekable();
while drain.peek().is_some() {
self.pending.extend((&mut drain).take(self.pending.capacity() - self.pending.len()));
if self.pending.len() == self.pending.capacity() {
self.form_chunk();
}
}
}
}
impl<K, V, T, R> ContainerBuilder for VecChunker<((K, V), T, R)>
where
K: Ord + Clone + 'static,
V: Ord + Clone + 'static,
T: Ord + Clone + 'static,
R: Semigroup + Clone + 'static,
{
type Container = Vec<((K, V), T, R)>;
fn extract(&mut self) -> Option<&mut Self::Container> {
if let Some(ready) = self.ready.pop_front() {
self.empty = Some(ready);
self.empty.as_mut()
} else {
None
}
}
fn finish(&mut self) -> Option<&mut Self::Container> {
if !self.pending.is_empty() {
consolidate_updates(&mut self.pending);
while !self.pending.is_empty() {
let mut chunk = Vec::with_capacity(Self::chunk_capacity());
chunk.extend(self.pending.drain(..std::cmp::min(self.pending.len(), chunk.capacity())));
self.ready.push_back(chunk);
}
}
self.empty = self.ready.pop_front();
self.empty.as_mut()
}
}
pub struct ColumnationChunker<T: Columnation> {
pending: Vec<T>,
ready: VecDeque<TimelyStack<T>>,
empty: Option<TimelyStack<T>>,
}
impl<T: Columnation> Default for ColumnationChunker<T> {
fn default() -> Self {
Self {
pending: Vec::default(),
ready: VecDeque::default(),
empty: None,
}
}
}
impl<K,V,T,R> ColumnationChunker<((K, V), T, R)>
where
K: Columnation + Ord,
V: Columnation + Ord,
T: Columnation + Ord,
R: Columnation + Semigroup,
{
const BUFFER_SIZE_BYTES: usize = 64 << 10;
fn chunk_capacity() -> usize {
let size = ::std::mem::size_of::<((K, V), T, R)>();
if size == 0 {
Self::BUFFER_SIZE_BYTES
} else if size <= Self::BUFFER_SIZE_BYTES {
Self::BUFFER_SIZE_BYTES / size
} else {
1
}
}
fn form_chunk(&mut self) {
consolidate_updates(&mut self.pending);
if self.pending.len() >= Self::chunk_capacity() {
while self.pending.len() > Self::chunk_capacity() {
let mut chunk = TimelyStack::with_capacity(Self::chunk_capacity());
for item in self.pending.drain(..chunk.capacity()) {
chunk.copy(&item);
}
self.ready.push_back(chunk);
}
}
}
}
impl<'a, K, V, T, R> PushInto<&'a mut Vec<((K, V), T, R)>> for ColumnationChunker<((K, V), T, R)>
where
K: Columnation + Ord + Clone,
V: Columnation + Ord + Clone,
T: Columnation + Ord + Clone,
R: Columnation + Semigroup + Clone,
{
fn push_into(&mut self, container: &'a mut Vec<((K, V), T, R)>) {
if self.pending.capacity() < Self::chunk_capacity() * 2 {
self.pending.reserve(Self::chunk_capacity() * 2 - self.pending.len());
}
let mut drain = container.drain(..).peekable();
while drain.peek().is_some() {
self.pending.extend((&mut drain).take(self.pending.capacity() - self.pending.len()));
if self.pending.len() == self.pending.capacity() {
self.form_chunk();
}
}
}
}
impl<K, V, T, R> ContainerBuilder for ColumnationChunker<((K, V), T, R)>
where
K: Columnation + Ord + Clone + 'static,
V: Columnation + Ord + Clone + 'static,
T: Columnation + Ord + Clone + 'static,
R: Columnation + Semigroup + Clone + 'static,
{
type Container = TimelyStack<((K,V),T,R)>;
fn extract(&mut self) -> Option<&mut Self::Container> {
if let Some(ready) = self.ready.pop_front() {
self.empty = Some(ready);
self.empty.as_mut()
} else {
None
}
}
fn finish(&mut self) -> Option<&mut Self::Container> {
consolidate_updates(&mut self.pending);
while !self.pending.is_empty() {
let mut chunk = TimelyStack::with_capacity(Self::chunk_capacity());
for item in self.pending.drain(..std::cmp::min(self.pending.len(), chunk.capacity())) {
chunk.copy(&item);
}
self.ready.push_back(chunk);
}
self.empty = self.ready.pop_front();
self.empty.as_mut()
}
}
pub struct ContainerChunker<Output> {
pending: Output,
ready: VecDeque<Output>,
empty: Output,
}
impl<Output> Default for ContainerChunker<Output>
where
Output: Default,
{
fn default() -> Self {
Self {
pending: Output::default(),
ready: VecDeque::default(),
empty: Output::default(),
}
}
}
impl<'a, Input, Output> PushInto<&'a mut Input> for ContainerChunker<Output>
where
Input: Container,
Output: SizableContainer
+ ConsolidateLayout
+ PushInto<Input::Item<'a>>
+ PushInto<Input::ItemRef<'a>>,
{
fn push_into(&mut self, container: &'a mut Input) {
if self.pending.capacity() < Output::preferred_capacity() {
self.pending.reserve(Output::preferred_capacity() - self.pending.len());
}
let form_batch = |this: &mut Self| {
if this.pending.len() == this.pending.capacity() {
consolidate_container(&mut this.pending, &mut this.empty);
std::mem::swap(&mut this.pending, &mut this.empty);
this.empty.clear();
if this.pending.len() > this.pending.capacity() / 2 {
this.ready.push_back(std::mem::take(&mut this.pending));
}
}
};
for item in container.drain() {
self.pending.push(item);
form_batch(self);
}
}
}
impl<Output> ContainerBuilder for ContainerChunker<Output>
where
Output: SizableContainer + ConsolidateLayout,
{
type Container = Output;
fn extract(&mut self) -> Option<&mut Self::Container> {
if let Some(ready) = self.ready.pop_front() {
self.empty = ready;
Some(&mut self.empty)
} else {
None
}
}
fn finish(&mut self) -> Option<&mut Self::Container> {
if !self.pending.is_empty() {
consolidate_container(&mut self.pending, &mut self.empty);
std::mem::swap(&mut self.pending, &mut self.empty);
self.empty.clear();
if !self.pending.is_empty() {
self.ready.push_back(std::mem::take(&mut self.pending));
}
}
if let Some(ready) = self.ready.pop_front() {
self.empty = ready;
Some(&mut self.empty)
} else {
None
}
}
}