timely_communication/lib.rs
1//! A simple communication infrastructure providing typed exchange channels.
2//!
3//! This crate is part of the timely dataflow system, used primarily for its inter-worker communication.
4//! It may be independently useful, but it is separated out mostly to make clear boundaries in the project.
5//!
6//! Threads are spawned with an [`allocator::Generic`](allocator::generic::Generic), whose
7//! [`allocate`](Allocate::allocate) method returns a pair of several send endpoints and one
8//! receive endpoint. Messages sent into a send endpoint will eventually be received by the corresponding worker,
9//! if it receives often enough. The point-to-point channels are each FIFO, but with no fairness guarantees.
10//!
11//! To be communicated, a type must implement the [`Bytesable`] trait.
12//!
13//! Channel endpoints also implement a lower-level `push` and `pull` interface (through the [`Push`] and [`Pull`]
14//! traits), which is used for more precise control of resources.
15//!
16//! # Examples
17//! ```
18//! use timely_communication::{Allocate, Bytesable};
19//!
20//! /// A wrapper that indicates `bincode` as the serialization/deserialization strategy.
21//! pub struct Message {
22//! /// Text contents.
23//! pub payload: String,
24//! }
25//!
26//! impl Bytesable for Message {
27//! fn from_bytes(bytes: timely_bytes::arc::Bytes) -> Self {
28//! Message { payload: std::str::from_utf8(&bytes[..]).unwrap().to_string() }
29//! }
30//!
31//! fn length_in_bytes(&self) -> usize {
32//! self.payload.len()
33//! }
34//!
35//! fn into_bytes<W: ::std::io::Write>(&self, writer: &mut W) {
36//! writer.write_all(self.payload.as_bytes()).unwrap();
37//! }
38//! }
39//!
40//! // extract the configuration from user-supplied arguments, initialize the computation.
41//! let config = timely_communication::Config::from_args(std::env::args()).unwrap();
42//! let guards = timely_communication::initialize(config, |mut allocator| {
43//!
44//! println!("worker {} of {} started", allocator.index(), allocator.peers());
45//!
46//! // allocates a pair of senders list and one receiver.
47//! let (mut senders, mut receiver) = allocator.allocate(0);
48//!
49//! // send typed data along each channel
50//! for i in 0 .. allocator.peers() {
51//! senders[i].send(Message { payload: format!("hello, {}", i)});
52//! senders[i].done();
53//! }
54//!
55//! // no support for termination notification,
56//! // we have to count down ourselves.
57//! let mut received = 0;
58//! while received < allocator.peers() {
59//!
60//! allocator.receive();
61//!
62//! if let Some(message) = receiver.recv() {
63//! println!("worker {}: received: <{}>", allocator.index(), message.payload);
64//! received += 1;
65//! }
66//!
67//! allocator.release();
68//! }
69//!
70//! allocator.index()
71//! });
72//!
73//! // computation runs until guards are joined or dropped.
74//! if let Ok(guards) = guards {
75//! for guard in guards.join() {
76//! println!("result: {:?}", guard);
77//! }
78//! }
79//! else { println!("error in computation"); }
80//! ```
81//!
82//! This should produce output like:
83//!
84//! ```ignore
85//! worker 0 started
86//! worker 1 started
87//! worker 0: received: <hello, 0>
88//! worker 1: received: <hello, 1>
89//! worker 0: received: <hello, 0>
90//! worker 1: received: <hello, 1>
91//! result: Ok(0)
92//! result: Ok(1)
93//! ```
94
95#![forbid(missing_docs)]
96
97pub mod allocator;
98pub mod networking;
99pub mod initialize;
100pub mod logging;
101pub mod buzzer;
102
103pub use allocator::Generic as Allocator;
104pub use allocator::{Allocate, Exchangeable};
105pub use initialize::{initialize, initialize_from, Config, WorkerGuards};
106
107use std::sync::mpsc::{Sender, Receiver};
108
109use timely_bytes::arc::Bytes;
110
111/// A type that can be serialized and deserialized through `Bytes`.
112pub trait Bytesable {
113 /// Wrap bytes as `Self`.
114 fn from_bytes(bytes: Bytes) -> Self;
115
116 /// The number of bytes required to serialize the data.
117 fn length_in_bytes(&self) -> usize;
118
119 /// Writes the binary representation into `writer`.
120 fn into_bytes<W: ::std::io::Write>(&self, writer: &mut W);
121}
122
123/// Pushing elements of type `T`.
124///
125/// This trait moves data around using references rather than ownership,
126/// which provides the opportunity for zero-copy operation. In the call
127/// to `push(element)` the implementor can *swap* some other value to
128/// replace `element`, effectively returning the value to the caller.
129///
130/// Conventionally, a sequence of calls to `push()` should conclude with
131/// a call of `push(&mut None)` or `done()` to signal to implementors that
132/// another call to `push()` may not be coming.
133pub trait Push<T> {
134 /// Pushes `element` with the opportunity to take ownership.
135 fn push(&mut self, element: &mut Option<T>);
136 /// Pushes `element` and drops any resulting resources.
137 #[inline]
138 fn send(&mut self, element: T) { self.push(&mut Some(element)); }
139 /// Pushes `None`, conventionally signalling a flush.
140 #[inline]
141 fn done(&mut self) { self.push(&mut None); }
142}
143
144impl<T, P: ?Sized + Push<T>> Push<T> for Box<P> {
145 #[inline]
146 fn push(&mut self, element: &mut Option<T>) { (**self).push(element) }
147}
148
149/// Pulling elements of type `T`.
150pub trait Pull<T> {
151 /// Pulls an element and provides the opportunity to take ownership.
152 ///
153 /// The puller may mutate the result, in particular take ownership of the data by
154 /// replacing it with other data or even `None`. This allows the puller to return
155 /// resources to the implementor.
156 ///
157 /// If `pull` returns `None` this conventionally signals that no more data is available
158 /// at the moment, and the puller should find something better to do.
159 fn pull(&mut self) -> &mut Option<T>;
160 /// Takes an `Option<T>` and leaves `None` behind.
161 #[inline]
162 fn recv(&mut self) -> Option<T> { self.pull().take() }
163}
164
165impl<T, P: ?Sized + Pull<T>> Pull<T> for Box<P> {
166 #[inline]
167 fn pull(&mut self) -> &mut Option<T> { (**self).pull() }
168}
169
170
171/// Allocate a matrix of send and receive changes to exchange items.
172///
173/// This method constructs channels for `sends` threads to create and send
174/// items of type `T` to `recvs` receiver threads.
175fn promise_futures<T>(sends: usize, recvs: usize) -> (Vec<Vec<Sender<T>>>, Vec<Vec<Receiver<T>>>) {
176
177 // each pair of workers has a sender and a receiver.
178 let mut senders: Vec<_> = (0 .. sends).map(|_| Vec::with_capacity(recvs)).collect();
179 let mut recvers: Vec<_> = (0 .. recvs).map(|_| Vec::with_capacity(sends)).collect();
180
181 for sender in senders.iter_mut() {
182 for recver in recvers.iter_mut() {
183 let (send, recv) = std::sync::mpsc::channel();
184 sender.push(send);
185 recver.push(recv);
186 }
187 }
188
189 (senders, recvers)
190}