timely_communication/
networking.rs1use std::io;
4use std::io::Result;
5use std::net::{TcpListener, TcpStream};
6use std::sync::Arc;
7use std::thread;
8use std::thread::sleep;
9use std::time::Duration;
10
11use byteorder::{ReadBytesExt, WriteBytesExt};
12use columnar::Columnar;
13use serde::{Deserialize, Serialize};
14
15type ByteOrder = byteorder::BigEndian;
17
18#[derive(Debug, PartialEq, Eq, Hash, Clone, Copy, Serialize, Deserialize, Columnar)]
22pub struct MessageHeader {
23 pub channel: usize,
25 pub source: usize,
27 pub target_lower: usize,
29 pub target_upper: usize,
34 pub length: usize,
36 pub seqno: usize,
38}
39
40impl MessageHeader {
41
42 const FIELDS: usize = 6;
44
45 #[inline]
47 pub fn try_read(bytes: &[u8]) -> Option<MessageHeader> {
48 let mut cursor = io::Cursor::new(bytes);
49 let mut buffer = [0; Self::FIELDS];
50 cursor.read_u64_into::<ByteOrder>(&mut buffer).ok()?;
51 let header = MessageHeader {
52 channel: buffer[0] as usize,
54 source: buffer[1] as usize,
55 target_lower: buffer[2] as usize,
56 target_upper: buffer[3] as usize,
57 length: buffer[4] as usize,
58 seqno: buffer[5] as usize,
59 };
60
61 if bytes.len() >= header.required_bytes() {
62 Some(header)
63 } else {
64 None
65 }
66 }
67
68 #[inline]
70 pub fn write_to<W: ::std::io::Write>(&self, writer: &mut W) -> Result<()> {
71 let mut buffer = [0u8; std::mem::size_of::<u64>() * Self::FIELDS];
72 let mut cursor = io::Cursor::new(&mut buffer[..]);
73 cursor.write_u64::<ByteOrder>(self.channel as u64)?;
75 cursor.write_u64::<ByteOrder>(self.source as u64)?;
76 cursor.write_u64::<ByteOrder>(self.target_lower as u64)?;
77 cursor.write_u64::<ByteOrder>(self.target_upper as u64)?;
78 cursor.write_u64::<ByteOrder>(self.length as u64)?;
79 cursor.write_u64::<ByteOrder>(self.seqno as u64)?;
80
81 writer.write_all(&buffer[..])
82 }
83
84 #[inline]
86 pub fn required_bytes(&self) -> usize {
87 self.header_bytes() + self.length
88 }
89
90 #[inline(always)]
92 pub fn header_bytes(&self) -> usize {
93 std::mem::size_of::<u64>() * Self::FIELDS
94 }
95}
96
97pub fn create_sockets(addresses: Vec<String>, my_index: usize, noisy: bool) -> Result<Vec<Option<TcpStream>>> {
102
103 let hosts1 = Arc::new(addresses);
104 let hosts2 = Arc::clone(&hosts1);
105
106 let start_task = thread::spawn(move || start_connections(hosts1, my_index, noisy));
107 let await_task = thread::spawn(move || await_connections(hosts2, my_index, noisy));
108
109 let mut results = start_task.join().unwrap()?;
110 results.push(None);
111 let to_extend = await_task.join().unwrap()?;
112 results.extend(to_extend);
113
114 if noisy { println!("worker {}:\tinitialization complete", my_index) }
115
116 Ok(results)
117}
118
119
120pub fn start_connections(addresses: Arc<Vec<String>>, my_index: usize, noisy: bool) -> Result<Vec<Option<TcpStream>>> {
122 let results = addresses.iter().take(my_index).enumerate().map(|(index, address)| {
123 loop {
124 match TcpStream::connect(address) {
125 Ok(mut stream) => {
126 stream.set_nodelay(true).expect("set_nodelay call failed");
127 stream.write_u64::<ByteOrder>(my_index as u64).expect("failed to encode/send worker index");
128 if noisy { println!("worker {}:\tconnection to worker {}", my_index, index); }
129 break Some(stream);
130 },
131 Err(error) => {
132 println!("worker {}:\terror connecting to worker {}: {}; retrying", my_index, index, error);
133 sleep(Duration::from_secs(1));
134 },
135 }
136 }
137 }).collect();
138
139 Ok(results)
140}
141
142pub fn await_connections(addresses: Arc<Vec<String>>, my_index: usize, noisy: bool) -> Result<Vec<Option<TcpStream>>> {
144 let mut results: Vec<_> = (0..(addresses.len() - my_index - 1)).map(|_| None).collect();
145
146 let listeners = addresses[my_index].split_whitespace().map(TcpListener::bind).collect::<Result<Vec<_>>>()?;
148 for listener in listeners.iter() { listener.set_nonblocking(true).expect("Couldn't set nonblocking"); }
149
150 while results.iter().any(Option::is_none) {
152 let mut received = false;
153 for listener in listeners.iter() {
154 match listener.accept() {
155 Ok((mut stream, _)) => {
156 stream.set_nodelay(true).expect("set_nodelay call failed");
157 let identifier = stream.read_u64::<ByteOrder>().expect("failed to decode worker index") as usize;
158 results[identifier - my_index - 1] = Some(stream);
159 if noisy { println!("worker {}:\tconnection from worker {}", my_index, identifier); }
160 received = true;
161 }
162 Err(e) => { if e.kind() != io::ErrorKind::WouldBlock { return Err(e); } }
163 }
164 }
165 if !received {
166 println!("awaiting connections (at {:?}/{:?})", results.iter().filter(|x| x.is_some()).count(), results.len());
167 sleep(Duration::from_secs(1));
168 }
169 }
170
171 Ok(results)
172}