use serde::{Deserialize, Serialize};
use crate::communication::Push;
use crate::Container;
pub mod pushers;
pub mod pullers;
pub mod pact;
#[derive(Clone)]
pub struct Message<T, C> {
pub time: T,
pub data: C,
pub from: usize,
pub seq: usize,
}
impl<T, C> Message<T, C> {
#[deprecated = "Use timely::buffer::default_capacity instead"]
pub fn default_length() -> usize {
crate::container::buffer::default_capacity::<C>()
}
}
impl<T, C: Container> Message<T, C> {
pub fn new(time: T, data: C, from: usize, seq: usize) -> Self {
Message { time, data, from, seq }
}
#[inline]
pub fn push_at<P: Push<Message<T, C>>>(buffer: &mut C, time: T, pusher: &mut P) {
let data = ::std::mem::take(buffer);
let message = Message::new(time, data, 0, 0);
let mut bundle = Some(message);
pusher.push(&mut bundle);
if let Some(message) = bundle {
*buffer = message.data;
buffer.clear();
}
}
}
impl<T, C> crate::communication::Bytesable for Message<T, C>
where
T: Serialize + for<'a> Deserialize<'a>,
C: ContainerBytes,
{
fn from_bytes(mut bytes: crate::bytes::arc::Bytes) -> Self {
use byteorder::ReadBytesExt;
let mut slice = &bytes[..];
let from: usize = slice.read_u64::<byteorder::LittleEndian>().unwrap().try_into().unwrap();
let seq: usize = slice.read_u64::<byteorder::LittleEndian>().unwrap().try_into().unwrap();
let time: T = ::bincode::deserialize_from(&mut slice).expect("bincode::deserialize() failed");
let bytes_read = bytes.len() - slice.len();
bytes.extract_to(bytes_read);
let data: C = ContainerBytes::from_bytes(bytes);
Self { time, data, from, seq }
}
fn length_in_bytes(&self) -> usize {
16 +
::bincode::serialized_size(&self.time).expect("bincode::serialized_size() failed") as usize +
self.data.length_in_bytes()
}
fn into_bytes<W: ::std::io::Write>(&self, writer: &mut W) {
use byteorder::WriteBytesExt;
writer.write_u64::<byteorder::LittleEndian>(self.from.try_into().unwrap()).unwrap();
writer.write_u64::<byteorder::LittleEndian>(self.seq.try_into().unwrap()).unwrap();
::bincode::serialize_into(&mut *writer, &self.time).expect("bincode::serialize_into() failed");
self.data.into_bytes(&mut *writer);
}
}
pub trait ContainerBytes {
fn from_bytes(bytes: crate::bytes::arc::Bytes) -> Self;
fn length_in_bytes(&self) -> usize;
fn into_bytes<W: ::std::io::Write>(&self, writer: &mut W);
}
mod implementations {
use serde::{Serialize, Deserialize};
use crate::dataflow::channels::ContainerBytes;
impl<T: Serialize + for<'a> Deserialize<'a>> ContainerBytes for Vec<T> {
fn from_bytes(bytes: crate::bytes::arc::Bytes) -> Self {
::bincode::deserialize(&bytes[..]).expect("bincode::deserialize() failed")
}
fn length_in_bytes(&self) -> usize {
::bincode::serialized_size(&self).expect("bincode::serialized_size() failed") as usize
}
fn into_bytes<W: ::std::io::Write>(&self, writer: &mut W) {
::bincode::serialize_into(writer, &self).expect("bincode::serialize_into() failed");
}
}
use crate::container::flatcontainer::FlatStack;
impl<T: Serialize + for<'a> Deserialize<'a> + crate::container::flatcontainer::Region> ContainerBytes for FlatStack<T> {
fn from_bytes(bytes: crate::bytes::arc::Bytes) -> Self {
::bincode::deserialize(&bytes[..]).expect("bincode::deserialize() failed")
}
fn length_in_bytes(&self) -> usize {
::bincode::serialized_size(&self).expect("bincode::serialized_size() failed") as usize
}
fn into_bytes<W: ::std::io::Write>(&self, writer: &mut W) {
::bincode::serialize_into(writer, &self).expect("bincode::serialize_into() failed");
}
}
}