timely_communication/allocator/zero_copy/
tcp.rs

1//! Methods related to reading from and writing to TCP connections
2
3use std::io::{self, Write};
4use std::sync::mpsc::{Sender, Receiver};
5
6use crate::networking::MessageHeader;
7
8use super::bytes_slab::{BytesRefill, BytesSlab};
9use super::bytes_exchange::MergeQueue;
10use super::stream::Stream;
11
12use timely_logging::Logger;
13
14use crate::logging::{CommunicationEvent, CommunicationEventBuilder, MessageEvent, StateEvent};
15
16fn tcp_panic(context: &'static str, cause: io::Error) -> ! {
17    // NOTE: some downstream crates sniff out "timely communication error:" from
18    // the panic message. Avoid removing or rewording this message if possible.
19    // It'd be nice to instead use `panic_any` here with a structured error
20    // type, but the panic message for `panic_any` is no good (Box<dyn Any>).
21    panic!("timely communication error: {}: {}", context, cause)
22}
23
24/// Repeatedly reads from a TcpStream and carves out messages.
25///
26/// The intended communication pattern is a sequence of (header, message)^* for valid
27/// messages, followed by a header for a zero length message indicating the end of stream.
28///
29/// If the stream ends without being shut down, or if reading from the stream fails, the
30/// receive thread panics with a message that starts with "timely communication error:"
31/// in an attempt to take down the computation and cause the failures to cascade.
32pub fn recv_loop<S>(
33    mut reader: S,
34    targets: Vec<Receiver<MergeQueue>>,
35    worker_offset: usize,
36    process: usize,
37    remote: usize,
38    refill: BytesRefill,
39    logger: Option<Logger<CommunicationEventBuilder>>
40)
41where
42    S: Stream,
43{
44    let mut logger = logger.map(|logger| logger.into_typed::<CommunicationEvent>());
45    // Log the receive thread's start.
46    logger.as_mut().map(|l| l.log(StateEvent { send: false, process, remote, start: true }));
47
48    let mut targets: Vec<MergeQueue> = targets.into_iter().map(|x| x.recv().expect("Failed to receive MergeQueue")).collect();
49
50    let mut buffer = BytesSlab::new(20, refill);
51
52    // Where we stash Bytes before handing them off.
53    let mut stageds = Vec::with_capacity(targets.len());
54    for _ in 0 .. targets.len() {
55        stageds.push(Vec::new());
56    }
57
58    // Each loop iteration adds to `self.Bytes` and consumes all complete messages.
59    // At the start of each iteration, `self.buffer[..self.length]` represents valid
60    // data, and the remaining capacity is available for reading from the reader.
61    //
62    // Once the buffer fills, we need to copy incomplete messages to a new shared
63    // allocation and place the existing Bytes into `self.in_progress`, so that it
64    // can be recovered once all readers have read what they need to.
65    let mut active = true;
66    while active {
67
68        buffer.ensure_capacity(1);
69
70        assert!(!buffer.empty().is_empty());
71
72        // Attempt to read some more bytes into self.buffer.
73        let read = match reader.read(buffer.empty()) {
74            Err(x) => tcp_panic("reading data", x),
75            Ok(0) => {
76                tcp_panic(
77                    "reading data",
78                    std::io::Error::new(std::io::ErrorKind::UnexpectedEof, "socket closed"),
79                );
80            }
81            Ok(n) => n,
82        };
83
84        buffer.make_valid(read);
85
86        // Consume complete messages from the front of self.buffer.
87        while let Some(header) = MessageHeader::try_read(buffer.valid()) {
88
89            // TODO: Consolidate message sequences sent to the same worker?
90            let peeled_bytes = header.required_bytes();
91            let bytes = buffer.extract(peeled_bytes);
92
93            // Record message receipt.
94            logger.as_mut().map(|logger| {
95                logger.log(MessageEvent { is_send: false, header, });
96            });
97
98            if header.length > 0 {
99                for target in header.target_lower .. header.target_upper {
100                    stageds[target - worker_offset].push(bytes.clone());
101                }
102            }
103            else {
104                // Shutting down; confirm absence of subsequent data.
105                active = false;
106                if !buffer.valid().is_empty() {
107                    panic!("Clean shutdown followed by data.");
108                }
109                buffer.ensure_capacity(1);
110                if reader.read(buffer.empty()).unwrap_or_else(|e| tcp_panic("reading EOF", e)) > 0 {
111                    panic!("Clean shutdown followed by data.");
112                }
113            }
114        }
115
116        // Pass bytes along to targets.
117        for (index, staged) in stageds.iter_mut().enumerate() {
118            // FIXME: try to merge `staged` before handing it to BytesPush::extend
119            use crate::allocator::zero_copy::bytes_exchange::BytesPush;
120            targets[index].extend(staged.drain(..));
121        }
122    }
123
124    // Log the receive thread's end.
125    logger.as_mut().map(|l| l.log(StateEvent { send: false, process, remote, start: false, }));
126}
127
128/// Repeatedly sends messages into a TcpStream.
129///
130/// The intended communication pattern is a sequence of (header, message)^* for valid
131/// messages, followed by a header for a zero length message indicating the end of stream.
132///
133/// If writing to the stream fails, the send thread panics with a message that starts with
134/// "timely communication error:" in an attempt to take down the computation and cause the
135/// failures to cascade.
136pub fn send_loop<S: Stream>(
137    // TODO: Maybe we don't need BufWriter with consolidation in writes.
138    writer: S,
139    sources: Vec<Sender<MergeQueue>>,
140    process: usize,
141    remote: usize,
142    logger: Option<Logger<CommunicationEventBuilder>>)
143{
144    let mut logger = logger.map(|logger| logger.into_typed::<CommunicationEvent>());
145    // Log the send thread's start.
146    logger.as_mut().map(|l| l.log(StateEvent { send: true, process, remote, start: true, }));
147
148    let mut sources: Vec<MergeQueue> = sources.into_iter().map(|x| {
149        let buzzer = crate::buzzer::Buzzer::default();
150        let queue = MergeQueue::new(buzzer);
151        x.send(queue.clone()).expect("failed to send MergeQueue");
152        queue
153    }).collect();
154
155    let mut writer = ::std::io::BufWriter::with_capacity(1 << 16, writer);
156    let mut stash = Vec::new();
157
158    while !sources.is_empty() {
159
160        // TODO: Round-robin better, to release resources fairly when overloaded.
161        for source in sources.iter_mut() {
162            use crate::allocator::zero_copy::bytes_exchange::BytesPull;
163            source.drain_into(&mut stash);
164        }
165
166        if stash.is_empty() {
167            // No evidence of records to read, but sources not yet empty (at start of loop).
168            // We are going to flush our writer (to move buffered data), double check on the
169            // sources for emptiness and wait on a signal only if we are sure that there will
170            // still be a signal incoming.
171            //
172            // We could get awoken by more data, a channel closing, or spuriously perhaps.
173            writer.flush().unwrap_or_else(|e| tcp_panic("flushing writer", e));
174            sources.retain(|source| !source.is_complete());
175            if !sources.is_empty() {
176                std::thread::park();
177            }
178        }
179        else {
180            // TODO: Could do scatter/gather write here.
181            for bytes in stash.drain(..) {
182
183                // Record message sends.
184                logger.as_mut().map(|logger| {
185                    let mut offset = 0;
186                    while let Some(header) = MessageHeader::try_read(&bytes[offset..]) {
187                        logger.log(MessageEvent { is_send: true, header, });
188                        offset += header.required_bytes();
189                    }
190                });
191
192                writer.write_all(&bytes[..]).unwrap_or_else(|e| tcp_panic("writing data", e));
193            }
194        }
195    }
196
197    // Write final zero-length header.
198    // Would be better with meaningful metadata, but as this stream merges many
199    // workers it isn't clear that there is anything specific to write here.
200    let header = MessageHeader {
201        channel:    0,
202        source:     0,
203        target_lower:     0,
204        target_upper:     0,
205        length:     0,
206        seqno:      0,
207    };
208    header.write_to(&mut writer).unwrap_or_else(|e| tcp_panic("writing data", e));
209    writer.flush().unwrap_or_else(|e| tcp_panic("flushing writer", e));
210    writer.get_mut().shutdown(::std::net::Shutdown::Write).unwrap_or_else(|e| tcp_panic("shutting down writer", e));
211    logger.as_mut().map(|logger| logger.log(MessageEvent { is_send: true, header }));
212
213    // Log the send thread's end.
214    logger.as_mut().map(|l| l.log(StateEvent { send: true, process, remote, start: false, }));
215}