Skip to main content

timely_communication/allocator/zero_copy/
tcp.rs

1//! Methods related to reading from and writing to TCP connections
2
3use std::io::{self, Write};
4use std::sync::mpsc::{Sender, Receiver};
5
6use crate::networking::MessageHeader;
7
8use super::bytes_slab::{BytesRefill, BytesSlab};
9use super::bytes_exchange::MergeQueue;
10use super::spill::SpillPolicyFn;
11use super::stream::Stream;
12
13use timely_logging::Logger;
14
15use crate::logging::{CommunicationEvent, CommunicationEventBuilder, MessageEvent, StateEvent};
16
17fn tcp_panic(context: &'static str, cause: io::Error) -> ! {
18    // NOTE: some downstream crates sniff out "timely communication error:" from
19    // the panic message. Avoid removing or rewording this message if possible.
20    // It'd be nice to instead use `panic_any` here with a structured error
21    // type, but the panic message for `panic_any` is no good (Box<dyn Any>).
22    panic!("timely communication error: {}: {}", context, cause)
23}
24
25/// Repeatedly reads from a TcpStream and carves out messages.
26///
27/// The intended communication pattern is a sequence of (header, message)^* for valid
28/// messages, followed by a header for a zero length message indicating the end of stream.
29///
30/// If the stream ends without being shut down, or if reading from the stream fails, the
31/// receive thread panics with a message that starts with "timely communication error:"
32/// in an attempt to take down the computation and cause the failures to cascade.
33pub fn recv_loop<S>(
34    mut reader: S,
35    targets: Vec<Receiver<MergeQueue>>,
36    worker_offset: usize,
37    process: usize,
38    remote: usize,
39    refill: BytesRefill,
40    logger: Option<Logger<CommunicationEventBuilder>>
41)
42where
43    S: Stream,
44{
45    let mut logger = logger.map(|logger| logger.into_typed::<CommunicationEvent>());
46    // Log the receive thread's start.
47    logger.as_mut().map(|l| l.log(StateEvent { send: false, process, remote, start: true }));
48
49    let mut targets: Vec<MergeQueue> = targets.into_iter().map(|x| x.recv().expect("Failed to receive MergeQueue")).collect();
50
51    let mut buffer = BytesSlab::new(20, refill);
52
53    // Where we stash Bytes before handing them off.
54    let mut stageds = Vec::with_capacity(targets.len());
55    for _ in 0 .. targets.len() {
56        stageds.push(Vec::new());
57    }
58
59    // Each loop iteration adds to `self.Bytes` and consumes all complete messages.
60    // At the start of each iteration, `self.buffer[..self.length]` represents valid
61    // data, and the remaining capacity is available for reading from the reader.
62    //
63    // Once the buffer fills, we need to copy incomplete messages to a new shared
64    // allocation and place the existing Bytes into `self.in_progress`, so that it
65    // can be recovered once all readers have read what they need to.
66    let mut active = true;
67    while active {
68
69        buffer.ensure_capacity(1);
70
71        assert!(!buffer.empty().is_empty());
72
73        // Attempt to read some more bytes into self.buffer.
74        let read = match reader.read(buffer.empty()) {
75            Err(x) => tcp_panic("reading data", x),
76            Ok(0) => {
77                tcp_panic(
78                    "reading data",
79                    std::io::Error::new(std::io::ErrorKind::UnexpectedEof, "socket closed"),
80                );
81            }
82            Ok(n) => n,
83        };
84
85        buffer.make_valid(read);
86
87        // Consume complete messages from the front of self.buffer.
88        while let Some(header) = MessageHeader::try_read(buffer.valid()) {
89
90            // TODO: Consolidate message sequences sent to the same worker?
91            let peeled_bytes = header.required_bytes();
92            let bytes = buffer.extract(peeled_bytes);
93
94            // Record message receipt.
95            logger.as_mut().map(|logger| {
96                logger.log(MessageEvent { is_send: false, header, });
97            });
98
99            if header.length > 0 {
100                for target in header.target_lower .. header.target_upper {
101                    stageds[target - worker_offset].push(bytes.clone());
102                }
103            }
104            else {
105                // Shutting down; confirm absence of subsequent data.
106                active = false;
107                if !buffer.valid().is_empty() {
108                    panic!("Clean shutdown followed by data.");
109                }
110                buffer.ensure_capacity(1);
111                if reader.read(buffer.empty()).unwrap_or_else(|e| tcp_panic("reading EOF", e)) > 0 {
112                    panic!("Clean shutdown followed by data.");
113                }
114            }
115        }
116
117        // Pass bytes along to targets.
118        for (index, staged) in stageds.iter_mut().enumerate() {
119            // FIXME: try to merge `staged` before handing it to BytesPush::extend
120            use crate::allocator::zero_copy::bytes_exchange::BytesPush;
121            targets[index].extend(staged.drain(..));
122        }
123    }
124
125    // Log the receive thread's end.
126    logger.as_mut().map(|l| l.log(StateEvent { send: false, process, remote, start: false, }));
127}
128
129/// Repeatedly sends messages into a TcpStream.
130///
131/// The intended communication pattern is a sequence of (header, message)^* for valid
132/// messages, followed by a header for a zero length message indicating the end of stream.
133///
134/// If writing to the stream fails, the send thread panics with a message that starts with
135/// "timely communication error:" in an attempt to take down the computation and cause the
136/// failures to cascade.
137pub fn send_loop<S: Stream>(
138    // TODO: Maybe we don't need BufWriter with consolidation in writes.
139    writer: S,
140    sources: Vec<Sender<MergeQueue>>,
141    process: usize,
142    remote: usize,
143    spill: Option<SpillPolicyFn>,
144    logger: Option<Logger<CommunicationEventBuilder>>)
145{
146    let mut logger = logger.map(|logger| logger.into_typed::<CommunicationEvent>());
147    // Log the send thread's start.
148    logger.as_mut().map(|l| l.log(StateEvent { send: true, process, remote, start: true, }));
149
150    // Send the policy-bearing queue to the worker (writer); keep a
151    // reader-side handle for ourselves.
152    let mut sources: Vec<MergeQueue> = sources.into_iter().map(|x| {
153        let buzzer = crate::buzzer::Buzzer::default();
154        let (writer, reader) = match spill.as_ref() {
155            Some(build_fn) => {
156                let (w, r) = build_fn();
157                MergeQueue::new_pair(buzzer, Some(w), Some(r))
158            }
159            None => MergeQueue::new_pair(buzzer, None, None),
160        };
161        x.send(writer).expect("failed to send MergeQueue");
162        reader
163    }).collect();
164
165    let mut writer = ::std::io::BufWriter::with_capacity(1 << 16, writer);
166    let mut stash = Vec::new();
167
168    while !sources.is_empty() {
169
170        // TODO: Round-robin better, to release resources fairly when overloaded.
171        for source in sources.iter_mut() {
172            use crate::allocator::zero_copy::bytes_exchange::BytesPull;
173            source.drain_into(&mut stash);
174        }
175
176        if stash.is_empty() {
177            // No evidence of records to read, but sources not yet empty (at start of loop).
178            // We are going to flush our writer (to move buffered data), double check on the
179            // sources for emptiness and wait on a signal only if we are sure that there will
180            // still be a signal incoming.
181            //
182            // We could get awoken by more data, a channel closing, or spuriously perhaps.
183            writer.flush().unwrap_or_else(|e| tcp_panic("flushing writer", e));
184            sources.retain(|source| !source.is_complete());
185            if !sources.is_empty() {
186                std::thread::park();
187            }
188        }
189        else {
190            // TODO: Could do scatter/gather write here.
191            for bytes in stash.drain(..) {
192
193                // Record message sends.
194                logger.as_mut().map(|logger| {
195                    let mut offset = 0;
196                    while let Some(header) = MessageHeader::try_read(&bytes[offset..]) {
197                        logger.log(MessageEvent { is_send: true, header, });
198                        offset += header.required_bytes();
199                    }
200                });
201
202                writer.write_all(&bytes[..]).unwrap_or_else(|e| tcp_panic("writing data", e));
203            }
204        }
205    }
206
207    // Write final zero-length header.
208    // Would be better with meaningful metadata, but as this stream merges many
209    // workers it isn't clear that there is anything specific to write here.
210    let header = MessageHeader {
211        channel:    0,
212        source:     0,
213        target_lower:     0,
214        target_upper:     0,
215        length:     0,
216        seqno:      0,
217    };
218    header.write_to(&mut writer).unwrap_or_else(|e| tcp_panic("writing data", e));
219    writer.flush().unwrap_or_else(|e| tcp_panic("flushing writer", e));
220    writer.get_mut().shutdown(::std::net::Shutdown::Write).unwrap_or_else(|e| tcp_panic("shutting down writer", e));
221    logger.as_mut().map(|logger| logger.log(MessageEvent { is_send: true, header }));
222
223    // Log the send thread's end.
224    logger.as_mut().map(|l| l.log(StateEvent { send: true, process, remote, start: false, }));
225}