timely_communication/
networking.rs
1use std::io;
4use std::io::{Read, Result};
5use std::net::{TcpListener, TcpStream};
6use std::sync::Arc;
7use std::thread;
8use std::thread::sleep;
9use std::time::Duration;
10
11use byteorder::{ReadBytesExt, WriteBytesExt};
12use columnar::Columnar;
13use serde::{Deserialize, Serialize};
14
15const HANDSHAKE_MAGIC: u64 = 0xc2f1fb770118add9;
19
20type ByteOrder = byteorder::BigEndian;
22
23#[derive(Debug, PartialEq, Eq, Hash, Clone, Copy, Serialize, Deserialize, Columnar)]
27pub struct MessageHeader {
28 pub channel: usize,
30 pub source: usize,
32 pub target_lower: usize,
34 pub target_upper: usize,
39 pub length: usize,
41 pub seqno: usize,
43}
44
45impl MessageHeader {
46
47 const FIELDS: usize = 6;
49
50 #[inline]
52 pub fn try_read(bytes: &[u8]) -> Option<MessageHeader> {
53 let mut cursor = io::Cursor::new(bytes);
54 let mut buffer = [0; Self::FIELDS];
55 cursor.read_u64_into::<ByteOrder>(&mut buffer).ok()?;
56 let header = MessageHeader {
57 channel: buffer[0] as usize,
59 source: buffer[1] as usize,
60 target_lower: buffer[2] as usize,
61 target_upper: buffer[3] as usize,
62 length: buffer[4] as usize,
63 seqno: buffer[5] as usize,
64 };
65
66 if bytes.len() >= header.required_bytes() {
67 Some(header)
68 } else {
69 None
70 }
71 }
72
73 #[inline]
75 pub fn write_to<W: ::std::io::Write>(&self, writer: &mut W) -> Result<()> {
76 let mut buffer = [0u8; std::mem::size_of::<u64>() * Self::FIELDS];
77 let mut cursor = io::Cursor::new(&mut buffer[..]);
78 cursor.write_u64::<ByteOrder>(self.channel as u64)?;
80 cursor.write_u64::<ByteOrder>(self.source as u64)?;
81 cursor.write_u64::<ByteOrder>(self.target_lower as u64)?;
82 cursor.write_u64::<ByteOrder>(self.target_upper as u64)?;
83 cursor.write_u64::<ByteOrder>(self.length as u64)?;
84 cursor.write_u64::<ByteOrder>(self.seqno as u64)?;
85
86 writer.write_all(&buffer[..])
87 }
88
89 #[inline]
91 pub fn required_bytes(&self) -> usize {
92 self.header_bytes() + self.length
93 }
94
95 #[inline(always)]
97 pub fn header_bytes(&self) -> usize {
98 std::mem::size_of::<u64>() * Self::FIELDS
99 }
100}
101
102pub fn create_sockets(addresses: Vec<String>, my_index: usize, noisy: bool) -> Result<Vec<Option<TcpStream>>> {
107
108 let hosts1 = Arc::new(addresses);
109 let hosts2 = Arc::clone(&hosts1);
110
111 let start_task = thread::spawn(move || start_connections(hosts1, my_index, noisy));
112 let await_task = thread::spawn(move || await_connections(hosts2, my_index, noisy));
113
114 let mut results = start_task.join().unwrap()?;
115 results.push(None);
116 let to_extend = await_task.join().unwrap()?;
117 results.extend(to_extend);
118
119 if noisy { println!("worker {}:\tinitialization complete", my_index) }
120
121 Ok(results)
122}
123
124
125pub fn start_connections(addresses: Arc<Vec<String>>, my_index: usize, noisy: bool) -> Result<Vec<Option<TcpStream>>> {
127 let results = addresses.iter().take(my_index).enumerate().map(|(index, address)| {
128 loop {
129 match TcpStream::connect(address) {
130 Ok(mut stream) => {
131 stream.set_nodelay(true).expect("set_nodelay call failed");
132 stream.write_u64::<ByteOrder>(HANDSHAKE_MAGIC).expect("failed to encode/send handshake magic");
133 stream.write_u64::<ByteOrder>(my_index as u64).expect("failed to encode/send worker index");
134 if noisy { println!("worker {}:\tconnection to worker {}", my_index, index); }
135 break Some(stream);
136 },
137 Err(error) => {
138 println!("worker {}:\terror connecting to worker {}: {}; retrying", my_index, index, error);
139 sleep(Duration::from_secs(1));
140 },
141 }
142 }
143 }).collect();
144
145 Ok(results)
146}
147
148pub fn await_connections(addresses: Arc<Vec<String>>, my_index: usize, noisy: bool) -> Result<Vec<Option<TcpStream>>> {
150 let mut results: Vec<_> = (0..(addresses.len() - my_index - 1)).map(|_| None).collect();
151 let listener = TcpListener::bind(&addresses[my_index][..])?;
152
153 for _ in (my_index + 1) .. addresses.len() {
154 let mut stream = listener.accept()?.0;
155 stream.set_nodelay(true).expect("set_nodelay call failed");
156 let mut buffer = [0u8;16];
157 stream.read_exact(&mut buffer)?;
158 let mut cursor = io::Cursor::new(buffer);
159 let magic = cursor.read_u64::<ByteOrder>().expect("failed to decode magic");
160 if magic != HANDSHAKE_MAGIC {
161 return Err(io::Error::new(io::ErrorKind::InvalidData,
162 "received incorrect timely handshake"));
163 }
164 let identifier = cursor.read_u64::<ByteOrder>().expect("failed to decode worker index") as usize;
165 results[identifier - my_index - 1] = Some(stream);
166 if noisy { println!("worker {}:\tconnection from worker {}", my_index, identifier); }
167 }
168
169 Ok(results)
170}