timely_communication/
lib.rs

1//! A simple communication infrastructure providing typed exchange channels.
2//!
3//! This crate is part of the timely dataflow system, used primarily for its inter-worker communication.
4//! It may be independently useful, but it is separated out mostly to make clear boundaries in the project.
5//!
6//! Threads are spawned with an [`allocator::Generic`](allocator::generic::Generic), whose
7//! [`allocate`](Allocate::allocate) method returns a pair of several send endpoints and one
8//! receive endpoint. Messages sent into a send endpoint will eventually be received by the corresponding worker,
9//! if it receives often enough. The point-to-point channels are each FIFO, but with no fairness guarantees.
10//!
11//! To be communicated, a type must implement the [`Bytesable`] trait.
12//!
13//! Channel endpoints also implement a lower-level `push` and `pull` interface (through the [`Push`] and [`Pull`]
14//! traits), which is used for more precise control of resources.
15//!
16//! # Examples
17//! ```
18//! use timely_communication::{Allocate, Bytesable};
19//!
20//! /// A wrapper that indicates `bincode` as the serialization/deserialization strategy.
21//! pub struct Message {
22//!     /// Text contents.
23//!     pub payload: String,
24//! }
25//!
26//! impl Bytesable for Message {
27//!     fn from_bytes(bytes: timely_bytes::arc::Bytes) -> Self {
28//!         Message { payload: std::str::from_utf8(&bytes[..]).unwrap().to_string() }
29//!     }
30//!
31//!     fn length_in_bytes(&self) -> usize {
32//!         self.payload.len()
33//!     }
34//!
35//!     fn into_bytes<W: ::std::io::Write>(&self, writer: &mut W) {
36//!         writer.write_all(self.payload.as_bytes()).unwrap();
37//!     }
38//! }
39//!
40//! // extract the configuration from user-supplied arguments, initialize the computation.
41//! let config = timely_communication::Config::from_args(std::env::args()).unwrap();
42//! let guards = timely_communication::initialize(config, |mut allocator| {
43//!
44//!     println!("worker {} of {} started", allocator.index(), allocator.peers());
45//!
46//!     // allocates a pair of senders list and one receiver.
47//!     let (mut senders, mut receiver) = allocator.allocate(0);
48//!
49//!     // send typed data along each channel
50//!     for i in 0 .. allocator.peers() {
51//!         senders[i].send(Message { payload: format!("hello, {}", i)});
52//!         senders[i].done();
53//!     }
54//!
55//!     // no support for termination notification,
56//!     // we have to count down ourselves.
57//!     let mut received = 0;
58//!     while received < allocator.peers() {
59//!
60//!         allocator.receive();
61//!
62//!         if let Some(message) = receiver.recv() {
63//!             println!("worker {}: received: <{}>", allocator.index(), message.payload);
64//!             received += 1;
65//!         }
66//!
67//!         allocator.release();
68//!     }
69//!
70//!     allocator.index()
71//! });
72//!
73//! // computation runs until guards are joined or dropped.
74//! if let Ok(guards) = guards {
75//!     for guard in guards.join() {
76//!         println!("result: {:?}", guard);
77//!     }
78//! }
79//! else { println!("error in computation"); }
80//! ```
81//!
82//! This should produce output like:
83//!
84//! ```ignore
85//! worker 0 started
86//! worker 1 started
87//! worker 0: received: <hello, 0>
88//! worker 1: received: <hello, 1>
89//! worker 0: received: <hello, 0>
90//! worker 1: received: <hello, 1>
91//! result: Ok(0)
92//! result: Ok(1)
93//! ```
94
95#![forbid(missing_docs)]
96
97pub mod allocator;
98pub mod networking;
99pub mod initialize;
100pub mod logging;
101pub mod buzzer;
102
103pub use allocator::Generic as Allocator;
104pub use allocator::{Allocate, Exchangeable};
105pub use initialize::{initialize, initialize_from, Config, WorkerGuards};
106
107use std::sync::mpsc::{Sender, Receiver};
108
109use timely_bytes::arc::Bytes;
110
111/// A type that can be serialized and deserialized through `Bytes`.
112pub trait Bytesable {
113    /// Wrap bytes as `Self`.
114    fn from_bytes(bytes: Bytes) -> Self;
115
116    /// The number of bytes required to serialize the data.
117    fn length_in_bytes(&self) -> usize;
118
119    /// Writes the binary representation into `writer`.
120    fn into_bytes<W: ::std::io::Write>(&self, writer: &mut W);
121}
122
123/// Pushing elements of type `T`.
124///
125/// This trait moves data around using references rather than ownership,
126/// which provides the opportunity for zero-copy operation. In the call
127/// to `push(element)` the implementor can *swap* some other value to
128/// replace `element`, effectively returning the value to the caller.
129///
130/// Conventionally, a sequence of calls to `push()` should conclude with
131/// a call of `push(&mut None)` or `done()` to signal to implementors that
132/// another call to `push()` may not be coming.
133pub trait Push<T> {
134    /// Pushes `element` with the opportunity to take ownership.
135    fn push(&mut self, element: &mut Option<T>);
136    /// Pushes `element` and drops any resulting resources.
137    #[inline]
138    fn send(&mut self, element: T) { self.push(&mut Some(element)); }
139    /// Pushes `None`, conventionally signalling a flush.
140    #[inline]
141    fn done(&mut self) { self.push(&mut None); }
142}
143
144impl<T, P: ?Sized + Push<T>> Push<T> for Box<P> {
145    #[inline]
146    fn push(&mut self, element: &mut Option<T>) { (**self).push(element) }
147}
148
149/// Pulling elements of type `T`.
150pub trait Pull<T> {
151    /// Pulls an element and provides the opportunity to take ownership.
152    ///
153    /// The puller may mutate the result, in particular take ownership of the data by
154    /// replacing it with other data or even `None`. This allows the puller to return
155    /// resources to the implementor.
156    ///
157    /// If `pull` returns `None` this conventionally signals that no more data is available
158    /// at the moment, and the puller should find something better to do.
159    fn pull(&mut self) -> &mut Option<T>;
160    /// Takes an `Option<T>` and leaves `None` behind.
161    #[inline]
162    fn recv(&mut self) -> Option<T> { self.pull().take() }
163}
164
165impl<T, P: ?Sized + Pull<T>> Pull<T> for Box<P> {
166    #[inline]
167    fn pull(&mut self) -> &mut Option<T> { (**self).pull() }
168}
169
170
171/// Allocate a matrix of send and receive changes to exchange items.
172///
173/// This method constructs channels for `sends` threads to create and send
174/// items of type `T` to `recvs` receiver threads.
175fn promise_futures<T>(sends: usize, recvs: usize) -> (Vec<Vec<Sender<T>>>, Vec<Vec<Receiver<T>>>) {
176
177    // each pair of workers has a sender and a receiver.
178    let mut senders: Vec<_> = (0 .. sends).map(|_| Vec::with_capacity(recvs)).collect();
179    let mut recvers: Vec<_> = (0 .. recvs).map(|_| Vec::with_capacity(sends)).collect();
180
181    for sender in senders.iter_mut() {
182        for recver in recvers.iter_mut() {
183            let (send, recv) = std::sync::mpsc::channel();
184            sender.push(send);
185            recver.push(recv);
186        }
187    }
188
189    (senders, recvers)
190}