Skip to main content

timely_communication/
networking.rs

1//! Networking code for sending and receiving fixed size `Vec<u8>` between machines.
2
3use std::io;
4use std::io::Result;
5use std::net::{TcpListener, TcpStream};
6use std::sync::Arc;
7use std::thread;
8use std::thread::sleep;
9use std::time::Duration;
10
11use byteorder::{ReadBytesExt, WriteBytesExt};
12use columnar::Columnar;
13use serde::{Deserialize, Serialize};
14
15/// The byte order for writing message headers and stream initialization.
16type ByteOrder = byteorder::BigEndian;
17
18/// Framing data for each `Vec<u8>` transmission, indicating a typed channel, the source and
19/// destination workers, and the length in bytes.
20// *Warning*: Adding, removing and altering fields requires to adjust the implementation below!
21#[derive(Debug, PartialEq, Eq, Hash, Clone, Copy, Serialize, Deserialize, Columnar)]
22pub struct MessageHeader {
23    /// index of channel.
24    pub channel:    usize,
25    /// index of worker sending message.
26    pub source:     usize,
27    /// lower bound of index of worker receiving message.
28    pub target_lower:     usize,
29    /// upper bound of index of worker receiving message.
30    ///
31    /// This is often `self.target_lower + 1` for point to point messages,
32    /// but can be larger for broadcast messages.
33    pub target_upper:     usize,
34    /// number of bytes in message.
35    pub length:     usize,
36    /// sequence number.
37    pub seqno:      usize,
38}
39
40impl MessageHeader {
41
42    /// The number of `usize` fields in [MessageHeader].
43    const FIELDS: usize = 6;
44
45    /// Returns a header when there is enough supporting data
46    #[inline]
47    pub fn try_read(bytes: &[u8]) -> Option<MessageHeader> {
48        let mut cursor = io::Cursor::new(bytes);
49        let mut buffer = [0; Self::FIELDS];
50        cursor.read_u64_into::<ByteOrder>(&mut buffer).ok()?;
51        let header = MessageHeader {
52            // Order must match writing order.
53            channel: buffer[0] as usize,
54            source: buffer[1] as usize,
55            target_lower: buffer[2] as usize,
56            target_upper: buffer[3] as usize,
57            length: buffer[4] as usize,
58            seqno: buffer[5] as usize,
59        };
60
61        if bytes.len() >= header.required_bytes() {
62            Some(header)
63        } else {
64            None
65        }
66    }
67
68    /// Writes the header as binary data.
69    #[inline]
70    pub fn write_to<W: ::std::io::Write>(&self, writer: &mut W) -> Result<()> {
71        let mut buffer = [0u8; std::mem::size_of::<u64>() * Self::FIELDS];
72        let mut cursor = io::Cursor::new(&mut buffer[..]);
73        // Order must match reading order.
74        cursor.write_u64::<ByteOrder>(self.channel as u64)?;
75        cursor.write_u64::<ByteOrder>(self.source as u64)?;
76        cursor.write_u64::<ByteOrder>(self.target_lower as u64)?;
77        cursor.write_u64::<ByteOrder>(self.target_upper as u64)?;
78        cursor.write_u64::<ByteOrder>(self.length as u64)?;
79        cursor.write_u64::<ByteOrder>(self.seqno as u64)?;
80
81        writer.write_all(&buffer[..])
82    }
83
84    /// The number of bytes required for the header and data.
85    #[inline]
86    pub fn required_bytes(&self) -> usize {
87        self.header_bytes() + self.length
88    }
89
90    /// The number of bytes required for the header.
91    #[inline(always)]
92    pub fn header_bytes(&self) -> usize {
93        std::mem::size_of::<u64>() * Self::FIELDS
94    }
95}
96
97/// Creates socket connections from a list of host addresses.
98///
99/// The item at index `i` in the resulting vec, is a `Some(TcpSocket)` to process `i`, except
100/// for item `my_index` which is `None` (no socket to self).
101pub fn create_sockets(addresses: Vec<String>, my_index: usize, noisy: bool) -> Result<Vec<Option<TcpStream>>> {
102
103    let hosts1 = Arc::new(addresses);
104    let hosts2 = Arc::clone(&hosts1);
105
106    let start_task = thread::spawn(move || start_connections(hosts1, my_index, noisy));
107    let await_task = thread::spawn(move || await_connections(hosts2, my_index, noisy));
108
109    let mut results = start_task.join().unwrap()?;
110    results.push(None);
111    let to_extend = await_task.join().unwrap()?;
112    results.extend(to_extend);
113
114    if noisy { println!("worker {}:\tinitialization complete", my_index) }
115
116    Ok(results)
117}
118
119
120/// Result contains connections `[0, my_index - 1]`.
121pub fn start_connections(addresses: Arc<Vec<String>>, my_index: usize, noisy: bool) -> Result<Vec<Option<TcpStream>>> {
122    let results = addresses.iter().take(my_index).enumerate().map(|(index, address)| {
123        loop {
124            match TcpStream::connect(address) {
125                Ok(mut stream) => {
126                    stream.set_nodelay(true).expect("set_nodelay call failed");
127                    stream.write_u64::<ByteOrder>(my_index as u64).expect("failed to encode/send worker index");
128                    if noisy { println!("worker {}:\tconnection to worker {}", my_index, index); }
129                    break Some(stream);
130                },
131                Err(error) => {
132                    println!("worker {}:\terror connecting to worker {}: {}; retrying", my_index, index, error);
133                    sleep(Duration::from_secs(1));
134                },
135            }
136        }
137    }).collect();
138
139    Ok(results)
140}
141
142/// Result contains connections `[my_index + 1, addresses.len() - 1]`.
143pub fn await_connections(addresses: Arc<Vec<String>>, my_index: usize, noisy: bool) -> Result<Vec<Option<TcpStream>>> {
144    let mut results: Vec<_> = (0..(addresses.len() - my_index - 1)).map(|_| None).collect();
145
146    // We may have multiple addresses to bind to, and will listen on each of them until all received.
147    let listeners = addresses[my_index].split_whitespace().map(TcpListener::bind).collect::<Result<Vec<_>>>()?;
148    for listener in listeners.iter() { listener.set_nonblocking(true).expect("Couldn't set nonblocking"); }
149
150    // Until we have all intended connections, poll each listener, sleeping briefly if none have accepted a new stream.
151    while results.iter().any(Option::is_none) {
152        let mut received = false;
153        for listener in listeners.iter() {
154            match listener.accept() {
155                Ok((mut stream, _)) => {
156                    stream.set_nodelay(true).expect("set_nodelay call failed");
157                    let identifier = stream.read_u64::<ByteOrder>().expect("failed to decode worker index") as usize;
158                    results[identifier - my_index - 1] = Some(stream);
159                    if noisy { println!("worker {}:\tconnection from worker {}", my_index, identifier); }
160                    received = true;
161                }
162                Err(e) => { if e.kind() != io::ErrorKind::WouldBlock { return Err(e); } }
163            }
164        }
165        if !received {
166            println!("awaiting connections (at {:?}/{:?})", results.iter().filter(|x| x.is_some()).count(), results.len());
167            sleep(Duration::from_secs(1));
168        }
169    }
170
171    Ok(results)
172}