timely_communication/allocator/zero_copy/
tcp.rs1use std::io::{self, Write};
4use std::sync::mpsc::{Sender, Receiver};
5
6use crate::networking::MessageHeader;
7
8use super::bytes_slab::{BytesRefill, BytesSlab};
9use super::bytes_exchange::MergeQueue;
10use super::stream::Stream;
11
12use timely_logging::Logger;
13
14use crate::logging::{CommunicationEvent, CommunicationEventBuilder, MessageEvent, StateEvent};
15
16fn tcp_panic(context: &'static str, cause: io::Error) -> ! {
17 panic!("timely communication error: {}: {}", context, cause)
22}
23
24pub fn recv_loop<S>(
33 mut reader: S,
34 targets: Vec<Receiver<MergeQueue>>,
35 worker_offset: usize,
36 process: usize,
37 remote: usize,
38 refill: BytesRefill,
39 logger: Option<Logger<CommunicationEventBuilder>>
40)
41where
42 S: Stream,
43{
44 let mut logger = logger.map(|logger| logger.into_typed::<CommunicationEvent>());
45 logger.as_mut().map(|l| l.log(StateEvent { send: false, process, remote, start: true }));
47
48 let mut targets: Vec<MergeQueue> = targets.into_iter().map(|x| x.recv().expect("Failed to receive MergeQueue")).collect();
49
50 let mut buffer = BytesSlab::new(20, refill);
51
52 let mut stageds = Vec::with_capacity(targets.len());
54 for _ in 0 .. targets.len() {
55 stageds.push(Vec::new());
56 }
57
58 let mut active = true;
66 while active {
67
68 buffer.ensure_capacity(1);
69
70 assert!(!buffer.empty().is_empty());
71
72 let read = match reader.read(buffer.empty()) {
74 Err(x) => tcp_panic("reading data", x),
75 Ok(0) => {
76 tcp_panic(
77 "reading data",
78 std::io::Error::new(std::io::ErrorKind::UnexpectedEof, "socket closed"),
79 );
80 }
81 Ok(n) => n,
82 };
83
84 buffer.make_valid(read);
85
86 while let Some(header) = MessageHeader::try_read(buffer.valid()) {
88
89 let peeled_bytes = header.required_bytes();
91 let bytes = buffer.extract(peeled_bytes);
92
93 logger.as_mut().map(|logger| {
95 logger.log(MessageEvent { is_send: false, header, });
96 });
97
98 if header.length > 0 {
99 for target in header.target_lower .. header.target_upper {
100 stageds[target - worker_offset].push(bytes.clone());
101 }
102 }
103 else {
104 active = false;
106 if !buffer.valid().is_empty() {
107 panic!("Clean shutdown followed by data.");
108 }
109 buffer.ensure_capacity(1);
110 if reader.read(buffer.empty()).unwrap_or_else(|e| tcp_panic("reading EOF", e)) > 0 {
111 panic!("Clean shutdown followed by data.");
112 }
113 }
114 }
115
116 for (index, staged) in stageds.iter_mut().enumerate() {
118 use crate::allocator::zero_copy::bytes_exchange::BytesPush;
120 targets[index].extend(staged.drain(..));
121 }
122 }
123
124 logger.as_mut().map(|l| l.log(StateEvent { send: false, process, remote, start: false, }));
126}
127
128pub fn send_loop<S: Stream>(
137 writer: S,
139 sources: Vec<Sender<MergeQueue>>,
140 process: usize,
141 remote: usize,
142 logger: Option<Logger<CommunicationEventBuilder>>)
143{
144 let mut logger = logger.map(|logger| logger.into_typed::<CommunicationEvent>());
145 logger.as_mut().map(|l| l.log(StateEvent { send: true, process, remote, start: true, }));
147
148 let mut sources: Vec<MergeQueue> = sources.into_iter().map(|x| {
149 let buzzer = crate::buzzer::Buzzer::default();
150 let queue = MergeQueue::new(buzzer);
151 x.send(queue.clone()).expect("failed to send MergeQueue");
152 queue
153 }).collect();
154
155 let mut writer = ::std::io::BufWriter::with_capacity(1 << 16, writer);
156 let mut stash = Vec::new();
157
158 while !sources.is_empty() {
159
160 for source in sources.iter_mut() {
162 use crate::allocator::zero_copy::bytes_exchange::BytesPull;
163 source.drain_into(&mut stash);
164 }
165
166 if stash.is_empty() {
167 writer.flush().unwrap_or_else(|e| tcp_panic("flushing writer", e));
174 sources.retain(|source| !source.is_complete());
175 if !sources.is_empty() {
176 std::thread::park();
177 }
178 }
179 else {
180 for bytes in stash.drain(..) {
182
183 logger.as_mut().map(|logger| {
185 let mut offset = 0;
186 while let Some(header) = MessageHeader::try_read(&bytes[offset..]) {
187 logger.log(MessageEvent { is_send: true, header, });
188 offset += header.required_bytes();
189 }
190 });
191
192 writer.write_all(&bytes[..]).unwrap_or_else(|e| tcp_panic("writing data", e));
193 }
194 }
195 }
196
197 let header = MessageHeader {
201 channel: 0,
202 source: 0,
203 target_lower: 0,
204 target_upper: 0,
205 length: 0,
206 seqno: 0,
207 };
208 header.write_to(&mut writer).unwrap_or_else(|e| tcp_panic("writing data", e));
209 writer.flush().unwrap_or_else(|e| tcp_panic("flushing writer", e));
210 writer.get_mut().shutdown(::std::net::Shutdown::Write).unwrap_or_else(|e| tcp_panic("shutting down writer", e));
211 logger.as_mut().map(|logger| logger.log(MessageEvent { is_send: true, header }));
212
213 logger.as_mut().map(|l| l.log(StateEvent { send: true, process, remote, start: false, }));
215}