timely/dataflow/channels/
mod.rs

1//! Structured communication between timely dataflow operators.
2
3use serde::{Deserialize, Serialize};
4use crate::communication::Push;
5use crate::Container;
6
7/// A collection of types that may be pushed at.
8pub mod pushers;
9/// A collection of types that may be pulled from.
10pub mod pullers;
11/// Parallelization contracts, describing how data must be exchanged between operators.
12pub mod pact;
13
14/// A serializable representation of timestamped data.
15#[derive(Clone)]
16pub struct Message<T, C> {
17    /// The timestamp associated with the message.
18    pub time: T,
19    /// The data in the message.
20    pub data: C,
21    /// The source worker.
22    pub from: usize,
23    /// A sequence number for this worker-to-worker stream.
24    pub seq: usize,
25}
26
27impl<T, C> Message<T, C> {
28    /// Default buffer size.
29    #[deprecated = "Use timely::buffer::default_capacity instead"]
30    pub fn default_length() -> usize {
31        crate::container::buffer::default_capacity::<C>()
32    }
33}
34
35impl<T, C: Container> Message<T, C> {
36    /// Creates a new message instance from arguments.
37    pub fn new(time: T, data: C, from: usize, seq: usize) -> Self {
38        Message { time, data, from, seq }
39    }
40
41    /// Forms a message, and pushes contents at `pusher`. Replaces `buffer` with what the pusher
42    /// leaves in place, or the container's default element. The buffer is left in an undefined state.
43    #[inline]
44    pub fn push_at<P: Push<Message<T, C>>>(buffer: &mut C, time: T, pusher: &mut P) {
45
46        let data = ::std::mem::take(buffer);
47        let message = Message::new(time, data, 0, 0);
48        let mut bundle = Some(message);
49
50        pusher.push(&mut bundle);
51
52        if let Some(message) = bundle {
53            *buffer = message.data;
54        }
55    }
56}
57
58// Instructions for serialization of `Message`.
59//
60// Serialization of each field is meant to be `u64` aligned, so that each has the ability
61// to be decoded using safe transmutation, e.g. `bytemuck`.
62impl<T, C> crate::communication::Bytesable for Message<T, C>
63where
64    T: Serialize + for<'a> Deserialize<'a>,
65    C: ContainerBytes,
66{
67    fn from_bytes(mut bytes: crate::bytes::arc::Bytes) -> Self {
68        use byteorder::ReadBytesExt;
69        let mut slice = &bytes[..];
70        let from: usize = slice.read_u64::<byteorder::LittleEndian>().unwrap().try_into().unwrap();
71        let seq: usize = slice.read_u64::<byteorder::LittleEndian>().unwrap().try_into().unwrap();
72        let time: T = ::bincode::deserialize_from(&mut slice).expect("bincode::deserialize() failed");
73        let time_size = ::bincode::serialized_size(&time).expect("bincode::serialized_size() failed") as usize;
74        // We expect to find the `data` payload at `8 + 8 + round_up(time_size)`;
75        let bytes_read = 8 + 8 + ((time_size + 7) & !7);
76        bytes.extract_to(bytes_read);
77        let data: C = ContainerBytes::from_bytes(bytes);
78        Self { time, data, from, seq }
79    }
80
81    fn length_in_bytes(&self) -> usize {
82        let time_size = ::bincode::serialized_size(&self.time).expect("bincode::serialized_size() failed") as usize;
83        // 16 comes from the two `u64` fields: `from` and `seq`.
84        16 + ((time_size + 7) & !7) + self.data.length_in_bytes()
85    }
86
87    fn into_bytes<W: ::std::io::Write>(&self, writer: &mut W) {
88        use byteorder::WriteBytesExt;
89        writer.write_u64::<byteorder::LittleEndian>(self.from.try_into().unwrap()).unwrap();
90        writer.write_u64::<byteorder::LittleEndian>(self.seq.try_into().unwrap()).unwrap();
91        ::bincode::serialize_into(&mut *writer, &self.time).expect("bincode::serialize_into() failed");
92        let time_size = ::bincode::serialized_size(&self.time).expect("bincode::serialized_size() failed") as usize;
93        let time_slop = ((time_size + 7) & !7) - time_size;
94        writer.write_all(&[0u8; 8][..time_slop]).unwrap();
95        self.data.into_bytes(&mut *writer);
96    }
97}
98
99
100/// A container-oriented version of `Bytesable` that can be implemented here for `Vec<T>` and other containers.
101pub trait ContainerBytes {
102    /// Wrap bytes as `Self`.
103    fn from_bytes(bytes: crate::bytes::arc::Bytes) -> Self;
104
105    /// The number of bytes required to serialize the data.
106    fn length_in_bytes(&self) -> usize;
107
108    /// Writes the binary representation into `writer`.
109    fn into_bytes<W: ::std::io::Write>(&self, writer: &mut W);
110}
111
112mod implementations {
113
114    use std::io::Write;
115
116    use serde::{Serialize, Deserialize};
117    use crate::dataflow::channels::ContainerBytes;
118
119    impl<T: Serialize + for<'a> Deserialize<'a>> ContainerBytes for Vec<T> {
120        fn from_bytes(bytes: crate::bytes::arc::Bytes) -> Self {
121            ::bincode::deserialize(&bytes[..]).expect("bincode::deserialize() failed")
122        }
123
124        fn length_in_bytes(&self) -> usize {
125            let length = ::bincode::serialized_size(&self).expect("bincode::serialized_size() failed") as usize;
126            (length + 7) & !7
127        }
128
129        fn into_bytes<W: Write>(&self, writer: &mut W) {
130            let mut counter = WriteCounter::new(writer);
131            ::bincode::serialize_into(&mut counter, &self).expect("bincode::serialize_into() failed");
132            let written = counter.count;
133            let written_slop = ((written + 7) & !7) - written;
134            counter.write_all(&[0u8; 8][..written_slop]).unwrap();
135        }
136    }
137
138    use write_counter::WriteCounter;
139    /// A `Write` wrapper that counts the bytes written.
140    mod write_counter {
141
142        use ::std::io::{Write, IoSlice, Result};
143        use std::fmt::Arguments;
144
145        /// A write wrapper that tracks the bytes written.
146        pub struct WriteCounter<W> {
147            inner: W,
148            pub count: usize,
149        }
150
151        impl<W> WriteCounter<W> {
152            /// Creates a new counter wrapper from a writer.
153            pub fn new(inner: W) -> Self {
154                Self { inner, count: 0 }
155            }
156        }
157
158        impl<W: Write> Write for WriteCounter<W> {
159            fn write(&mut self, buf: &[u8]) -> Result<usize> {
160                let written = self.inner.write(buf)?;
161                self.count += written;
162                Ok(written)
163            }
164            fn flush(&mut self) -> Result<()> {
165                self.inner.flush()
166            }
167            fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> Result<usize> {
168                let written = self.inner.write_vectored(bufs)?;
169                self.count += written;
170                Ok(written)
171            }
172            fn write_all(&mut self, buf: &[u8]) -> Result<()> {
173                self.count += buf.len();
174                self.inner.write_all(buf)
175            }
176            fn write_fmt(&mut self, _fmt: Arguments<'_>) -> Result<()> {
177                unimplemented!()
178            }
179        }
180    }
181}