timely_communication/allocator/zero_copy/
tcp.rs1use std::io::{self, Write};
4use std::sync::mpsc::{Sender, Receiver};
5
6use crate::networking::MessageHeader;
7
8use super::bytes_slab::{BytesRefill, BytesSlab};
9use super::bytes_exchange::MergeQueue;
10use super::spill::SpillPolicyFn;
11use super::stream::Stream;
12
13use timely_logging::Logger;
14
15use crate::logging::{CommunicationEvent, CommunicationEventBuilder, MessageEvent, StateEvent};
16
17fn tcp_panic(context: &'static str, cause: io::Error) -> ! {
18 panic!("timely communication error: {}: {}", context, cause)
23}
24
25pub fn recv_loop<S>(
34 mut reader: S,
35 targets: Vec<Receiver<MergeQueue>>,
36 worker_offset: usize,
37 process: usize,
38 remote: usize,
39 refill: BytesRefill,
40 logger: Option<Logger<CommunicationEventBuilder>>
41)
42where
43 S: Stream,
44{
45 let mut logger = logger.map(|logger| logger.into_typed::<CommunicationEvent>());
46 logger.as_mut().map(|l| l.log(StateEvent { send: false, process, remote, start: true }));
48
49 let mut targets: Vec<MergeQueue> = targets.into_iter().map(|x| x.recv().expect("Failed to receive MergeQueue")).collect();
50
51 let mut buffer = BytesSlab::new(20, refill);
52
53 let mut stageds = Vec::with_capacity(targets.len());
55 for _ in 0 .. targets.len() {
56 stageds.push(Vec::new());
57 }
58
59 let mut active = true;
67 while active {
68
69 buffer.ensure_capacity(1);
70
71 assert!(!buffer.empty().is_empty());
72
73 let read = match reader.read(buffer.empty()) {
75 Err(x) => tcp_panic("reading data", x),
76 Ok(0) => {
77 tcp_panic(
78 "reading data",
79 std::io::Error::new(std::io::ErrorKind::UnexpectedEof, "socket closed"),
80 );
81 }
82 Ok(n) => n,
83 };
84
85 buffer.make_valid(read);
86
87 while let Some(header) = MessageHeader::try_read(buffer.valid()) {
89
90 let peeled_bytes = header.required_bytes();
92 let bytes = buffer.extract(peeled_bytes);
93
94 logger.as_mut().map(|logger| {
96 logger.log(MessageEvent { is_send: false, header, });
97 });
98
99 if header.length > 0 {
100 for target in header.target_lower .. header.target_upper {
101 stageds[target - worker_offset].push(bytes.clone());
102 }
103 }
104 else {
105 active = false;
107 if !buffer.valid().is_empty() {
108 panic!("Clean shutdown followed by data.");
109 }
110 buffer.ensure_capacity(1);
111 if reader.read(buffer.empty()).unwrap_or_else(|e| tcp_panic("reading EOF", e)) > 0 {
112 panic!("Clean shutdown followed by data.");
113 }
114 }
115 }
116
117 for (index, staged) in stageds.iter_mut().enumerate() {
119 use crate::allocator::zero_copy::bytes_exchange::BytesPush;
121 targets[index].extend(staged.drain(..));
122 }
123 }
124
125 logger.as_mut().map(|l| l.log(StateEvent { send: false, process, remote, start: false, }));
127}
128
129pub fn send_loop<S: Stream>(
138 writer: S,
140 sources: Vec<Sender<MergeQueue>>,
141 process: usize,
142 remote: usize,
143 spill: Option<SpillPolicyFn>,
144 logger: Option<Logger<CommunicationEventBuilder>>)
145{
146 let mut logger = logger.map(|logger| logger.into_typed::<CommunicationEvent>());
147 logger.as_mut().map(|l| l.log(StateEvent { send: true, process, remote, start: true, }));
149
150 let mut sources: Vec<MergeQueue> = sources.into_iter().map(|x| {
153 let buzzer = crate::buzzer::Buzzer::default();
154 let (writer, reader) = match spill.as_ref() {
155 Some(build_fn) => {
156 let (w, r) = build_fn();
157 MergeQueue::new_pair(buzzer, Some(w), Some(r))
158 }
159 None => MergeQueue::new_pair(buzzer, None, None),
160 };
161 x.send(writer).expect("failed to send MergeQueue");
162 reader
163 }).collect();
164
165 let mut writer = ::std::io::BufWriter::with_capacity(1 << 16, writer);
166 let mut stash = Vec::new();
167
168 while !sources.is_empty() {
169
170 for source in sources.iter_mut() {
172 use crate::allocator::zero_copy::bytes_exchange::BytesPull;
173 source.drain_into(&mut stash);
174 }
175
176 if stash.is_empty() {
177 writer.flush().unwrap_or_else(|e| tcp_panic("flushing writer", e));
184 sources.retain(|source| !source.is_complete());
185 if !sources.is_empty() {
186 std::thread::park();
187 }
188 }
189 else {
190 for bytes in stash.drain(..) {
192
193 logger.as_mut().map(|logger| {
195 let mut offset = 0;
196 while let Some(header) = MessageHeader::try_read(&bytes[offset..]) {
197 logger.log(MessageEvent { is_send: true, header, });
198 offset += header.required_bytes();
199 }
200 });
201
202 writer.write_all(&bytes[..]).unwrap_or_else(|e| tcp_panic("writing data", e));
203 }
204 }
205 }
206
207 let header = MessageHeader {
211 channel: 0,
212 source: 0,
213 target_lower: 0,
214 target_upper: 0,
215 length: 0,
216 seqno: 0,
217 };
218 header.write_to(&mut writer).unwrap_or_else(|e| tcp_panic("writing data", e));
219 writer.flush().unwrap_or_else(|e| tcp_panic("flushing writer", e));
220 writer.get_mut().shutdown(::std::net::Shutdown::Write).unwrap_or_else(|e| tcp_panic("shutting down writer", e));
221 logger.as_mut().map(|logger| logger.log(MessageEvent { is_send: true, header }));
222
223 logger.as_mut().map(|l| l.log(StateEvent { send: true, process, remote, start: false, }));
225}