use crate::communication::Push;
use crate::container::{ContainerBuilder, CapacityContainerBuilder, PushInto};
use crate::dataflow::channels::{Bundle, Message};
use crate::dataflow::operators::Capability;
use crate::progress::Timestamp;
use crate::Container;
#[derive(Debug)]
pub struct Buffer<T, CB, P> {
time: Option<T>,
builder: CB,
pusher: P,
}
impl<T, CB: Default, P> Buffer<T, CB, P> {
pub fn new(pusher: P) -> Self {
Self {
time: None,
builder: Default::default(),
pusher,
}
}
pub fn inner(&mut self) -> &mut P { &mut self.pusher }
pub fn builder(&self) -> &CB {
&self.builder
}
}
impl<T, C: Container, P: Push<Bundle<T, C>>> Buffer<T, CapacityContainerBuilder<C>, P> where T: Eq+Clone {
#[inline]
pub fn session(&mut self, time: &T) -> Session<T, CapacityContainerBuilder<C>, P> {
self.session_with_builder(time)
}
#[inline]
pub fn autoflush_session(&mut self, cap: Capability<T>) -> AutoflushSession<T, CapacityContainerBuilder<C>, P> where T: Timestamp {
self.autoflush_session_with_builder(cap)
}
fn give_container(&mut self, container: &mut C) {
if !container.is_empty() {
self.builder.push_container(container);
self.extract_and_send();
}
}
}
impl<T, CB: ContainerBuilder, P: Push<Bundle<T, CB::Container>>> Buffer<T, CB, P> where T: Eq+Clone {
pub fn session_with_builder(&mut self, time: &T) -> Session<T, CB, P> {
if let Some(true) = self.time.as_ref().map(|x| x != time) { self.flush(); }
self.time = Some(time.clone());
Session { buffer: self }
}
pub fn autoflush_session_with_builder(&mut self, cap: Capability<T>) -> AutoflushSession<T, CB, P> where T: Timestamp {
if let Some(true) = self.time.as_ref().map(|x| x != cap.time()) { self.flush(); }
self.time = Some(cap.time().clone());
AutoflushSession {
buffer: self,
_capability: cap,
}
}
}
impl<T, CB: ContainerBuilder, P: Push<Bundle<T, CB::Container>>> Buffer<T, CB, P> where T: Eq+Clone {
pub fn cease(&mut self) {
self.flush();
self.pusher.push(&mut None);
}
#[inline]
fn extract_and_send(&mut self) {
while let Some(container) = self.builder.extract() {
let time = self.time.as_ref().unwrap().clone();
Message::push_at(container, time, &mut self.pusher);
}
}
#[inline]
fn flush(&mut self) {
while let Some(container) = self.builder.finish() {
let time = self.time.as_ref().unwrap().clone();
Message::push_at(container, time, &mut self.pusher);
}
}
}
impl<T, CB, P, D> PushInto<D> for Buffer<T, CB, P>
where
T: Eq+Clone,
CB: ContainerBuilder + PushInto<D>,
P: Push<Bundle<T, CB::Container>>
{
#[inline]
fn push_into(&mut self, item: D) {
self.builder.push_into(item);
self.extract_and_send();
}
}
pub struct Session<'a, T, CB, P> {
buffer: &'a mut Buffer<T, CB, P>,
}
impl<'a, T, C: Container, P> Session<'a, T, CapacityContainerBuilder<C>, P>
where
T: Eq + Clone + 'a,
P: Push<Bundle<T, C>> + 'a,
{
pub fn give_container(&mut self, container: &mut C) {
self.buffer.give_container(container)
}
}
impl<'a, T, CB, P> Session<'a, T, CB, P>
where
T: Eq + Clone + 'a,
CB: ContainerBuilder + 'a,
P: Push<Bundle<T, CB::Container>> + 'a
{
pub fn builder(&self) -> &CB {
self.buffer.builder()
}
#[inline]
pub fn give<D>(&mut self, data: D) where CB: PushInto<D> {
self.push_into(data);
}
#[inline]
pub fn give_iterator<I>(&mut self, iter: I)
where
I: Iterator,
CB: PushInto<I::Item>,
{
for item in iter {
self.push_into(item);
}
}
}
impl<'a, T, CB, P, D> PushInto<D> for Session<'a, T, CB, P>
where
T: Eq + Clone + 'a,
CB: ContainerBuilder + PushInto<D> + 'a,
P: Push<Bundle<T, CB::Container>> + 'a,
{
#[inline]
fn push_into(&mut self, item: D) {
self.buffer.push_into(item);
}
}
pub struct AutoflushSession<'a, T, CB, P>
where
T: Timestamp + 'a,
CB: ContainerBuilder + 'a,
P: Push<Bundle<T, CB::Container>> + 'a,
{
buffer: &'a mut Buffer<T, CB, P>,
_capability: Capability<T>,
}
impl<'a, T, CB, P> AutoflushSession<'a, T, CB, P>
where
T: Timestamp + 'a,
CB: ContainerBuilder + 'a,
P: Push<Bundle<T, CB::Container>> + 'a,
{
#[inline]
pub fn give<D>(&mut self, data: D)
where
CB: PushInto<D>,
{
self.push_into(data);
}
#[inline]
pub fn give_iterator<I, D>(&mut self, iter: I)
where
I: Iterator<Item=D>,
CB: PushInto<D>,
{
for item in iter {
self.push_into(item);
}
}
}
impl<'a, T, CB, P, D> PushInto<D> for AutoflushSession<'a, T, CB, P>
where
T: Timestamp + 'a,
CB: ContainerBuilder + PushInto<D> + 'a,
P: Push<Bundle<T, CB::Container>> + 'a,
{
#[inline]
fn push_into(&mut self, item: D) {
self.buffer.push_into(item);
}
}
impl<'a, T, CB, P> Drop for AutoflushSession<'a, T, CB, P>
where
T: Timestamp + 'a,
CB: ContainerBuilder + 'a,
P: Push<Bundle<T, CB::Container>> + 'a,
{
fn drop(&mut self) {
self.buffer.cease();
}
}