timely_communication/
networking.rs

1//! Networking code for sending and receiving fixed size `Vec<u8>` between machines.
2
3use std::io;
4use std::io::{Read, Result};
5use std::net::{TcpListener, TcpStream};
6use std::sync::Arc;
7use std::thread;
8use std::thread::sleep;
9use std::time::Duration;
10
11use byteorder::{ReadBytesExt, WriteBytesExt};
12use columnar::Columnar;
13use serde::{Deserialize, Serialize};
14
15// This constant is sent along immediately after establishing a TCP stream, so
16// that it is easy to sniff out Timely traffic when it is multiplexed with
17// other traffic on the same port.
18const HANDSHAKE_MAGIC: u64 = 0xc2f1fb770118add9;
19
20/// The byte order for writing message headers and stream initialization.
21type ByteOrder = byteorder::BigEndian;
22
23/// Framing data for each `Vec<u8>` transmission, indicating a typed channel, the source and
24/// destination workers, and the length in bytes.
25// *Warning*: Adding, removing and altering fields requires to adjust the implementation below!
26#[derive(Debug, PartialEq, Eq, Hash, Clone, Copy, Serialize, Deserialize, Columnar)]
27pub struct MessageHeader {
28    /// index of channel.
29    pub channel:    usize,
30    /// index of worker sending message.
31    pub source:     usize,
32    /// lower bound of index of worker receiving message.
33    pub target_lower:     usize,
34    /// upper bound of index of worker receiving message.
35    ///
36    /// This is often `self.target_lower + 1` for point to point messages,
37    /// but can be larger for broadcast messages.
38    pub target_upper:     usize,
39    /// number of bytes in message.
40    pub length:     usize,
41    /// sequence number.
42    pub seqno:      usize,
43}
44
45impl MessageHeader {
46
47    /// The number of `usize` fields in [MessageHeader].
48    const FIELDS: usize = 6;
49
50    /// Returns a header when there is enough supporting data
51    #[inline]
52    pub fn try_read(bytes: &[u8]) -> Option<MessageHeader> {
53        let mut cursor = io::Cursor::new(bytes);
54        let mut buffer = [0; Self::FIELDS];
55        cursor.read_u64_into::<ByteOrder>(&mut buffer).ok()?;
56        let header = MessageHeader {
57            // Order must match writing order.
58            channel: buffer[0] as usize,
59            source: buffer[1] as usize,
60            target_lower: buffer[2] as usize,
61            target_upper: buffer[3] as usize,
62            length: buffer[4] as usize,
63            seqno: buffer[5] as usize,
64        };
65
66        if bytes.len() >= header.required_bytes() {
67            Some(header)
68        } else {
69            None
70        }
71    }
72
73    /// Writes the header as binary data.
74    #[inline]
75    pub fn write_to<W: ::std::io::Write>(&self, writer: &mut W) -> Result<()> {
76        let mut buffer = [0u8; std::mem::size_of::<u64>() * Self::FIELDS];
77        let mut cursor = io::Cursor::new(&mut buffer[..]);
78        // Order must match reading order.
79        cursor.write_u64::<ByteOrder>(self.channel as u64)?;
80        cursor.write_u64::<ByteOrder>(self.source as u64)?;
81        cursor.write_u64::<ByteOrder>(self.target_lower as u64)?;
82        cursor.write_u64::<ByteOrder>(self.target_upper as u64)?;
83        cursor.write_u64::<ByteOrder>(self.length as u64)?;
84        cursor.write_u64::<ByteOrder>(self.seqno as u64)?;
85
86        writer.write_all(&buffer[..])
87    }
88
89    /// The number of bytes required for the header and data.
90    #[inline]
91    pub fn required_bytes(&self) -> usize {
92        self.header_bytes() + self.length
93    }
94
95    /// The number of bytes required for the header.
96    #[inline(always)]
97    pub fn header_bytes(&self) -> usize {
98        std::mem::size_of::<u64>() * Self::FIELDS
99    }
100}
101
102/// Creates socket connections from a list of host addresses.
103///
104/// The item at index `i` in the resulting vec, is a `Some(TcpSocket)` to process `i`, except
105/// for item `my_index` which is `None` (no socket to self).
106pub fn create_sockets(addresses: Vec<String>, my_index: usize, noisy: bool) -> Result<Vec<Option<TcpStream>>> {
107
108    let hosts1 = Arc::new(addresses);
109    let hosts2 = Arc::clone(&hosts1);
110
111    let start_task = thread::spawn(move || start_connections(hosts1, my_index, noisy));
112    let await_task = thread::spawn(move || await_connections(hosts2, my_index, noisy));
113
114    let mut results = start_task.join().unwrap()?;
115    results.push(None);
116    let to_extend = await_task.join().unwrap()?;
117    results.extend(to_extend);
118
119    if noisy { println!("worker {}:\tinitialization complete", my_index) }
120
121    Ok(results)
122}
123
124
125/// Result contains connections `[0, my_index - 1]`.
126pub fn start_connections(addresses: Arc<Vec<String>>, my_index: usize, noisy: bool) -> Result<Vec<Option<TcpStream>>> {
127    let results = addresses.iter().take(my_index).enumerate().map(|(index, address)| {
128        loop {
129            match TcpStream::connect(address) {
130                Ok(mut stream) => {
131                    stream.set_nodelay(true).expect("set_nodelay call failed");
132                    stream.write_u64::<ByteOrder>(HANDSHAKE_MAGIC).expect("failed to encode/send handshake magic");
133                    stream.write_u64::<ByteOrder>(my_index as u64).expect("failed to encode/send worker index");
134                    if noisy { println!("worker {}:\tconnection to worker {}", my_index, index); }
135                    break Some(stream);
136                },
137                Err(error) => {
138                    println!("worker {}:\terror connecting to worker {}: {}; retrying", my_index, index, error);
139                    sleep(Duration::from_secs(1));
140                },
141            }
142        }
143    }).collect();
144
145    Ok(results)
146}
147
148/// Result contains connections `[my_index + 1, addresses.len() - 1]`.
149pub fn await_connections(addresses: Arc<Vec<String>>, my_index: usize, noisy: bool) -> Result<Vec<Option<TcpStream>>> {
150    let mut results: Vec<_> = (0..(addresses.len() - my_index - 1)).map(|_| None).collect();
151    let listener = TcpListener::bind(&addresses[my_index][..])?;
152
153    for _ in (my_index + 1) .. addresses.len() {
154        let mut stream = listener.accept()?.0;
155        stream.set_nodelay(true).expect("set_nodelay call failed");
156        let mut buffer = [0u8;16];
157        stream.read_exact(&mut buffer)?;
158        let mut cursor = io::Cursor::new(buffer);
159        let magic = cursor.read_u64::<ByteOrder>().expect("failed to decode magic");
160        if magic != HANDSHAKE_MAGIC {
161            return Err(io::Error::new(io::ErrorKind::InvalidData,
162                "received incorrect timely handshake"));
163        }
164        let identifier = cursor.read_u64::<ByteOrder>().expect("failed to decode worker index") as usize;
165        results[identifier - my_index - 1] = Some(stream);
166        if noisy { println!("worker {}:\tconnection from worker {}", my_index, identifier); }
167    }
168
169    Ok(results)
170}