timely/dataflow/channels/
mod.rs
1use serde::{Deserialize, Serialize};
4use crate::communication::Push;
5use crate::Container;
6
7pub mod pushers;
9pub mod pullers;
11pub mod pact;
13
14#[derive(Clone)]
16pub struct Message<T, C> {
17 pub time: T,
19 pub data: C,
21 pub from: usize,
23 pub seq: usize,
25}
26
27impl<T, C> Message<T, C> {
28 #[deprecated = "Use timely::buffer::default_capacity instead"]
30 pub fn default_length() -> usize {
31 crate::container::buffer::default_capacity::<C>()
32 }
33}
34
35impl<T, C: Container> Message<T, C> {
36 pub fn new(time: T, data: C, from: usize, seq: usize) -> Self {
38 Message { time, data, from, seq }
39 }
40
41 #[inline]
44 pub fn push_at<P: Push<Message<T, C>>>(buffer: &mut C, time: T, pusher: &mut P) {
45
46 let data = ::std::mem::take(buffer);
47 let message = Message::new(time, data, 0, 0);
48 let mut bundle = Some(message);
49
50 pusher.push(&mut bundle);
51
52 if let Some(message) = bundle {
53 *buffer = message.data;
54 buffer.clear();
55 }
56 }
57}
58
59impl<T, C> crate::communication::Bytesable for Message<T, C>
64where
65 T: Serialize + for<'a> Deserialize<'a>,
66 C: ContainerBytes,
67{
68 fn from_bytes(mut bytes: crate::bytes::arc::Bytes) -> Self {
69 use byteorder::ReadBytesExt;
70 let mut slice = &bytes[..];
71 let from: usize = slice.read_u64::<byteorder::LittleEndian>().unwrap().try_into().unwrap();
72 let seq: usize = slice.read_u64::<byteorder::LittleEndian>().unwrap().try_into().unwrap();
73 let time: T = ::bincode::deserialize_from(&mut slice).expect("bincode::deserialize() failed");
74 let time_size = ::bincode::serialized_size(&time).expect("bincode::serialized_size() failed") as usize;
75 let bytes_read = 8 + 8 + ((time_size + 7) & !7);
77 bytes.extract_to(bytes_read);
78 let data: C = ContainerBytes::from_bytes(bytes);
79 Self { time, data, from, seq }
80 }
81
82 fn length_in_bytes(&self) -> usize {
83 let time_size = ::bincode::serialized_size(&self.time).expect("bincode::serialized_size() failed") as usize;
84 16 + ((time_size + 7) & !7) + self.data.length_in_bytes()
86 }
87
88 fn into_bytes<W: ::std::io::Write>(&self, writer: &mut W) {
89 use byteorder::WriteBytesExt;
90 writer.write_u64::<byteorder::LittleEndian>(self.from.try_into().unwrap()).unwrap();
91 writer.write_u64::<byteorder::LittleEndian>(self.seq.try_into().unwrap()).unwrap();
92 ::bincode::serialize_into(&mut *writer, &self.time).expect("bincode::serialize_into() failed");
93 let time_size = ::bincode::serialized_size(&self.time).expect("bincode::serialized_size() failed") as usize;
94 let time_slop = ((time_size + 7) & !7) - time_size;
95 writer.write_all(&[0u8; 8][..time_slop]).unwrap();
96 self.data.into_bytes(&mut *writer);
97 }
98}
99
100
101pub trait ContainerBytes {
103 fn from_bytes(bytes: crate::bytes::arc::Bytes) -> Self;
105
106 fn length_in_bytes(&self) -> usize;
108
109 fn into_bytes<W: ::std::io::Write>(&self, writer: &mut W);
111}
112
113mod implementations {
114
115 use std::io::Write;
116
117 use serde::{Serialize, Deserialize};
118 use crate::dataflow::channels::ContainerBytes;
119
120 impl<T: Serialize + for<'a> Deserialize<'a>> ContainerBytes for Vec<T> {
121 fn from_bytes(bytes: crate::bytes::arc::Bytes) -> Self {
122 ::bincode::deserialize(&bytes[..]).expect("bincode::deserialize() failed")
123 }
124
125 fn length_in_bytes(&self) -> usize {
126 let length = ::bincode::serialized_size(&self).expect("bincode::serialized_size() failed") as usize;
127 (length + 7) & !7
128 }
129
130 fn into_bytes<W: Write>(&self, writer: &mut W) {
131 let mut counter = WriteCounter::new(writer);
132 ::bincode::serialize_into(&mut counter, &self).expect("bincode::serialize_into() failed");
133 let written = counter.count;
134 let written_slop = ((written + 7) & !7) - written;
135 counter.write_all(&[0u8; 8][..written_slop]).unwrap();
136 }
137 }
138
139 use write_counter::WriteCounter;
140 mod write_counter {
142
143 use ::std::io::{Write, IoSlice, Result};
144 use std::fmt::Arguments;
145
146 pub struct WriteCounter<W> {
148 inner: W,
149 pub count: usize,
150 }
151
152 impl<W> WriteCounter<W> {
153 pub fn new(inner: W) -> Self {
155 Self { inner, count: 0 }
156 }
157 }
158
159 impl<W: Write> Write for WriteCounter<W> {
160 fn write(&mut self, buf: &[u8]) -> Result<usize> {
161 let written = self.inner.write(buf)?;
162 self.count += written;
163 Ok(written)
164 }
165 fn flush(&mut self) -> Result<()> {
166 self.inner.flush()
167 }
168 fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> Result<usize> {
169 let written = self.inner.write_vectored(bufs)?;
170 self.count += written;
171 Ok(written)
172 }
173 fn write_all(&mut self, buf: &[u8]) -> Result<()> {
174 self.count += buf.len();
175 self.inner.write_all(buf)
176 }
177 fn write_fmt(&mut self, _fmt: Arguments<'_>) -> Result<()> {
178 unimplemented!()
179 }
180 }
181 }
182}