timely/dataflow/channels/
mod.rs

1//! Structured communication between timely dataflow operators.
2
3use serde::{Deserialize, Serialize};
4use crate::communication::Push;
5use crate::Container;
6
7/// A collection of types that may be pushed at.
8pub mod pushers;
9/// A collection of types that may be pulled from.
10pub mod pullers;
11/// Parallelization contracts, describing how data must be exchanged between operators.
12pub mod pact;
13
14/// A serializable representation of timestamped data.
15#[derive(Clone)]
16pub struct Message<T, C> {
17    /// The timestamp associated with the message.
18    pub time: T,
19    /// The data in the message.
20    pub data: C,
21    /// The source worker.
22    pub from: usize,
23    /// A sequence number for this worker-to-worker stream.
24    pub seq: usize,
25}
26
27impl<T, C> Message<T, C> {
28    /// Default buffer size.
29    #[deprecated = "Use timely::buffer::default_capacity instead"]
30    pub fn default_length() -> usize {
31        crate::container::buffer::default_capacity::<C>()
32    }
33}
34
35impl<T, C: Container> Message<T, C> {
36    /// Creates a new message instance from arguments.
37    pub fn new(time: T, data: C, from: usize, seq: usize) -> Self {
38        Message { time, data, from, seq }
39    }
40
41    /// Forms a message, and pushes contents at `pusher`. Replaces `buffer` with what the pusher
42    /// leaves in place, or the container's default element. The buffer is cleared.
43    #[inline]
44    pub fn push_at<P: Push<Message<T, C>>>(buffer: &mut C, time: T, pusher: &mut P) {
45
46        let data = ::std::mem::take(buffer);
47        let message = Message::new(time, data, 0, 0);
48        let mut bundle = Some(message);
49
50        pusher.push(&mut bundle);
51
52        if let Some(message) = bundle {
53            *buffer = message.data;
54            buffer.clear();
55        }
56    }
57}
58
59// Instructions for serialization of `Message`.
60//
61// Serialization of each field is meant to be `u64` aligned, so that each has the ability
62// to be decoded using safe transmutation, e.g. `bytemuck`.
63impl<T, C> crate::communication::Bytesable for Message<T, C>
64where
65    T: Serialize + for<'a> Deserialize<'a>,
66    C: ContainerBytes,
67{
68    fn from_bytes(mut bytes: crate::bytes::arc::Bytes) -> Self {
69        use byteorder::ReadBytesExt;
70        let mut slice = &bytes[..];
71        let from: usize = slice.read_u64::<byteorder::LittleEndian>().unwrap().try_into().unwrap();
72        let seq: usize = slice.read_u64::<byteorder::LittleEndian>().unwrap().try_into().unwrap();
73        let time: T = ::bincode::deserialize_from(&mut slice).expect("bincode::deserialize() failed");
74        let time_size = ::bincode::serialized_size(&time).expect("bincode::serialized_size() failed") as usize;
75        // We expect to find the `data` payload at `8 + 8 + round_up(time_size)`;
76        let bytes_read = 8 + 8 + ((time_size + 7) & !7);
77        bytes.extract_to(bytes_read);
78        let data: C = ContainerBytes::from_bytes(bytes);
79        Self { time, data, from, seq }
80    }
81
82    fn length_in_bytes(&self) -> usize {
83        let time_size = ::bincode::serialized_size(&self.time).expect("bincode::serialized_size() failed") as usize;
84        // 16 comes from the two `u64` fields: `from` and `seq`.
85        16 + ((time_size + 7) & !7) + self.data.length_in_bytes()
86    }
87
88    fn into_bytes<W: ::std::io::Write>(&self, writer: &mut W) {
89        use byteorder::WriteBytesExt;
90        writer.write_u64::<byteorder::LittleEndian>(self.from.try_into().unwrap()).unwrap();
91        writer.write_u64::<byteorder::LittleEndian>(self.seq.try_into().unwrap()).unwrap();
92        ::bincode::serialize_into(&mut *writer, &self.time).expect("bincode::serialize_into() failed");
93        let time_size = ::bincode::serialized_size(&self.time).expect("bincode::serialized_size() failed") as usize;
94        let time_slop = ((time_size + 7) & !7) - time_size;
95        writer.write_all(&[0u8; 8][..time_slop]).unwrap();
96        self.data.into_bytes(&mut *writer);
97    }
98}
99
100
101/// A container-oriented version of `Bytesable` that can be implemented here for `Vec<T>` and other containers.
102pub trait ContainerBytes {
103    /// Wrap bytes as `Self`.
104    fn from_bytes(bytes: crate::bytes::arc::Bytes) -> Self;
105
106    /// The number of bytes required to serialize the data.
107    fn length_in_bytes(&self) -> usize;
108
109    /// Writes the binary representation into `writer`.
110    fn into_bytes<W: ::std::io::Write>(&self, writer: &mut W);
111}
112
113mod implementations {
114
115    use std::io::Write;
116
117    use serde::{Serialize, Deserialize};
118    use crate::dataflow::channels::ContainerBytes;
119
120    impl<T: Serialize + for<'a> Deserialize<'a>> ContainerBytes for Vec<T> {
121        fn from_bytes(bytes: crate::bytes::arc::Bytes) -> Self {
122            ::bincode::deserialize(&bytes[..]).expect("bincode::deserialize() failed")
123        }
124
125        fn length_in_bytes(&self) -> usize {
126            let length = ::bincode::serialized_size(&self).expect("bincode::serialized_size() failed") as usize;
127            (length + 7) & !7
128        }
129
130        fn into_bytes<W: Write>(&self, writer: &mut W) {
131            let mut counter = WriteCounter::new(writer);
132            ::bincode::serialize_into(&mut counter, &self).expect("bincode::serialize_into() failed");
133            let written = counter.count;
134            let written_slop = ((written + 7) & !7) - written;
135            counter.write_all(&[0u8; 8][..written_slop]).unwrap();
136        }
137    }
138
139    use write_counter::WriteCounter;
140    /// A `Write` wrapper that counts the bytes written.
141    mod write_counter {
142
143        use ::std::io::{Write, IoSlice, Result};
144        use std::fmt::Arguments;
145
146        /// A write wrapper that tracks the bytes written.
147        pub struct WriteCounter<W> {
148            inner: W,
149            pub count: usize,
150        }
151
152        impl<W> WriteCounter<W> {
153            /// Creates a new counter wrapper from a writer.
154            pub fn new(inner: W) -> Self {
155                Self { inner, count: 0 }
156            }
157        }
158
159        impl<W: Write> Write for WriteCounter<W> {
160            fn write(&mut self, buf: &[u8]) -> Result<usize> {
161                let written = self.inner.write(buf)?;
162                self.count += written;
163                Ok(written)
164            }
165            fn flush(&mut self) -> Result<()> {
166                self.inner.flush()
167            }
168            fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> Result<usize> {
169                let written = self.inner.write_vectored(bufs)?;
170                self.count += written;
171                Ok(written)
172            }
173            fn write_all(&mut self, buf: &[u8]) -> Result<()> {
174                self.count += buf.len();
175                self.inner.write_all(buf)
176            }
177            fn write_fmt(&mut self, _fmt: Arguments<'_>) -> Result<()> {
178                unimplemented!()
179            }
180        }
181    }
182}