timely/dataflow/channels/
mod.rs1use serde::{Deserialize, Serialize};
4use crate::communication::Push;
5use crate::Container;
6
7pub mod pushers;
9pub mod pullers;
11pub mod pact;
13
14#[derive(Clone)]
16pub struct Message<T, C> {
17 pub time: T,
19 pub data: C,
21 pub from: usize,
23 pub seq: usize,
25}
26
27impl<T, C> Message<T, C> {
28 #[deprecated = "Use timely::buffer::default_capacity instead"]
30 pub fn default_length() -> usize {
31 crate::container::buffer::default_capacity::<C>()
32 }
33}
34
35impl<T, C: Container> Message<T, C> {
36 pub fn new(time: T, data: C, from: usize, seq: usize) -> Self {
38 Message { time, data, from, seq }
39 }
40
41 #[inline]
44 pub fn push_at<P: Push<Message<T, C>>>(buffer: &mut C, time: T, pusher: &mut P) {
45
46 let data = ::std::mem::take(buffer);
47 let message = Message::new(time, data, 0, 0);
48 let mut bundle = Some(message);
49
50 pusher.push(&mut bundle);
51
52 if let Some(message) = bundle {
53 *buffer = message.data;
54 }
55 }
56}
57
58impl<T, C> crate::communication::Bytesable for Message<T, C>
63where
64 T: Serialize + for<'a> Deserialize<'a>,
65 C: ContainerBytes,
66{
67 fn from_bytes(mut bytes: crate::bytes::arc::Bytes) -> Self {
68 use byteorder::ReadBytesExt;
69 let mut slice = &bytes[..];
70 let from: usize = slice.read_u64::<byteorder::LittleEndian>().unwrap().try_into().unwrap();
71 let seq: usize = slice.read_u64::<byteorder::LittleEndian>().unwrap().try_into().unwrap();
72 let time: T = ::bincode::deserialize_from(&mut slice).expect("bincode::deserialize() failed");
73 let time_size = ::bincode::serialized_size(&time).expect("bincode::serialized_size() failed") as usize;
74 let bytes_read = 8 + 8 + ((time_size + 7) & !7);
76 bytes.extract_to(bytes_read);
77 let data: C = ContainerBytes::from_bytes(bytes);
78 Self { time, data, from, seq }
79 }
80
81 fn length_in_bytes(&self) -> usize {
82 let time_size = ::bincode::serialized_size(&self.time).expect("bincode::serialized_size() failed") as usize;
83 16 + ((time_size + 7) & !7) + self.data.length_in_bytes()
85 }
86
87 fn into_bytes<W: ::std::io::Write>(&self, writer: &mut W) {
88 use byteorder::WriteBytesExt;
89 writer.write_u64::<byteorder::LittleEndian>(self.from.try_into().unwrap()).unwrap();
90 writer.write_u64::<byteorder::LittleEndian>(self.seq.try_into().unwrap()).unwrap();
91 ::bincode::serialize_into(&mut *writer, &self.time).expect("bincode::serialize_into() failed");
92 let time_size = ::bincode::serialized_size(&self.time).expect("bincode::serialized_size() failed") as usize;
93 let time_slop = ((time_size + 7) & !7) - time_size;
94 writer.write_all(&[0u8; 8][..time_slop]).unwrap();
95 self.data.into_bytes(&mut *writer);
96 }
97}
98
99
100pub trait ContainerBytes {
102 fn from_bytes(bytes: crate::bytes::arc::Bytes) -> Self;
104
105 fn length_in_bytes(&self) -> usize;
107
108 fn into_bytes<W: ::std::io::Write>(&self, writer: &mut W);
110}
111
112mod implementations {
113
114 use std::io::Write;
115
116 use serde::{Serialize, Deserialize};
117 use crate::dataflow::channels::ContainerBytes;
118
119 impl<T: Serialize + for<'a> Deserialize<'a>> ContainerBytes for Vec<T> {
120 fn from_bytes(bytes: crate::bytes::arc::Bytes) -> Self {
121 ::bincode::deserialize(&bytes[..]).expect("bincode::deserialize() failed")
122 }
123
124 fn length_in_bytes(&self) -> usize {
125 let length = ::bincode::serialized_size(&self).expect("bincode::serialized_size() failed") as usize;
126 (length + 7) & !7
127 }
128
129 fn into_bytes<W: Write>(&self, writer: &mut W) {
130 let mut counter = WriteCounter::new(writer);
131 ::bincode::serialize_into(&mut counter, &self).expect("bincode::serialize_into() failed");
132 let written = counter.count;
133 let written_slop = ((written + 7) & !7) - written;
134 counter.write_all(&[0u8; 8][..written_slop]).unwrap();
135 }
136 }
137
138 use write_counter::WriteCounter;
139 mod write_counter {
141
142 use ::std::io::{Write, IoSlice, Result};
143 use std::fmt::Arguments;
144
145 pub struct WriteCounter<W> {
147 inner: W,
148 pub count: usize,
149 }
150
151 impl<W> WriteCounter<W> {
152 pub fn new(inner: W) -> Self {
154 Self { inner, count: 0 }
155 }
156 }
157
158 impl<W: Write> Write for WriteCounter<W> {
159 fn write(&mut self, buf: &[u8]) -> Result<usize> {
160 let written = self.inner.write(buf)?;
161 self.count += written;
162 Ok(written)
163 }
164 fn flush(&mut self) -> Result<()> {
165 self.inner.flush()
166 }
167 fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> Result<usize> {
168 let written = self.inner.write_vectored(bufs)?;
169 self.count += written;
170 Ok(written)
171 }
172 fn write_all(&mut self, buf: &[u8]) -> Result<()> {
173 self.count += buf.len();
174 self.inner.write_all(buf)
175 }
176 fn write_fmt(&mut self, _fmt: Arguments<'_>) -> Result<()> {
177 unimplemented!()
178 }
179 }
180 }
181}