1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
//! A simple communication infrastructure providing typed exchange channels.
//!
//! This crate is part of the timely dataflow system, used primarily for its inter-worker communication.
//! It may be independently useful, but it is separated out mostly to make clear boundaries in the project.
//!
//! Threads are spawned with an [`allocator::Generic`](allocator::generic::Generic), whose
//! [`allocate`](Allocate::allocate) method returns a pair of several send endpoints and one
//! receive endpoint. Messages sent into a send endpoint will eventually be received by the corresponding worker,
//! if it receives often enough. The point-to-point channels are each FIFO, but with no fairness guarantees.
//!
//! To be communicated, a type must implement the [`Bytesable`] trait.
//!
//! Channel endpoints also implement a lower-level `push` and `pull` interface (through the [`Push`] and [`Pull`]
//! traits), which is used for more precise control of resources.
//!
//! # Examples
//! ```
//! use timely_communication::{Allocate, Bytesable};
//! 
//! /// A wrapper that indicates `bincode` as the serialization/deserialization strategy.
//! pub struct Message {
//!     /// Text contents.
//!     pub payload: String,
//! }
//! 
//! impl Bytesable for Message {
//!     fn from_bytes(bytes: timely_bytes::arc::Bytes) -> Self {
//!         Message { payload: std::str::from_utf8(&bytes[..]).unwrap().to_string() }
//!     }
//! 
//!     fn length_in_bytes(&self) -> usize {
//!         self.payload.len()
//!     }
//! 
//!     fn into_bytes<W: ::std::io::Write>(&self, writer: &mut W) {
//!         writer.write_all(self.payload.as_bytes()).unwrap();
//!     }
//! }
//! 
//! fn main() {
//! 
//!     // extract the configuration from user-supplied arguments, initialize the computation.
//!     let config = timely_communication::Config::from_args(std::env::args()).unwrap();
//!     let guards = timely_communication::initialize(config, |mut allocator| {
//! 
//!         println!("worker {} of {} started", allocator.index(), allocator.peers());
//! 
//!         // allocates a pair of senders list and one receiver.
//!         let (mut senders, mut receiver) = allocator.allocate(0);
//! 
//!         // send typed data along each channel
//!         for i in 0 .. allocator.peers() {
//!             senders[i].send(Message { payload: format!("hello, {}", i)});
//!             senders[i].done();
//!         }
//! 
//!         // no support for termination notification,
//!         // we have to count down ourselves.
//!         let mut received = 0;
//!         while received < allocator.peers() {
//! 
//!             allocator.receive();
//! 
//!             if let Some(message) = receiver.recv() {
//!                 println!("worker {}: received: <{}>", allocator.index(), message.payload);
//!                 received += 1;
//!             }
//! 
//!             allocator.release();
//!         }
//! 
//!         allocator.index()
//!     });
//! 
//!     // computation runs until guards are joined or dropped.
//!     if let Ok(guards) = guards {
//!         for guard in guards.join() {
//!             println!("result: {:?}", guard);
//!         }
//!     }
//!     else { println!("error in computation"); }
//! }
//! ```
//!
//! This should produce output like:
//!
//! ```ignore
//! worker 0 started
//! worker 1 started
//! worker 0: received: <hello, 0>
//! worker 1: received: <hello, 1>
//! worker 0: received: <hello, 0>
//! worker 1: received: <hello, 1>
//! result: Ok(0)
//! result: Ok(1)
//! ```

#![forbid(missing_docs)]

pub mod allocator;
pub mod networking;
pub mod initialize;
pub mod logging;
pub mod buzzer;

pub use allocator::Generic as Allocator;
pub use allocator::{Allocate, Exchangeable};
pub use initialize::{initialize, initialize_from, Config, WorkerGuards};

use timely_bytes::arc::Bytes;

/// A type that can be serialized and deserialized through `Bytes`.
pub trait Bytesable {
    /// Wrap bytes as `Self`.
    fn from_bytes(bytes: Bytes) -> Self;

    /// The number of bytes required to serialize the data.
    fn length_in_bytes(&self) -> usize;

    /// Writes the binary representation into `writer`.
    fn into_bytes<W: ::std::io::Write>(&self, writer: &mut W);
}

/// Pushing elements of type `T`.
///
/// This trait moves data around using references rather than ownership,
/// which provides the opportunity for zero-copy operation. In the call
/// to `push(element)` the implementor can *swap* some other value to
/// replace `element`, effectively returning the value to the caller.
///
/// Conventionally, a sequence of calls to `push()` should conclude with
/// a call of `push(&mut None)` or `done()` to signal to implementors that
/// another call to `push()` may not be coming.
pub trait Push<T> {
    /// Pushes `element` with the opportunity to take ownership.
    fn push(&mut self, element: &mut Option<T>);
    /// Pushes `element` and drops any resulting resources.
    #[inline]
    fn send(&mut self, element: T) { self.push(&mut Some(element)); }
    /// Pushes `None`, conventionally signalling a flush.
    #[inline]
    fn done(&mut self) { self.push(&mut None); }
}

impl<T, P: ?Sized + Push<T>> Push<T> for Box<P> {
    #[inline]
    fn push(&mut self, element: &mut Option<T>) { (**self).push(element) }
}

/// Pulling elements of type `T`.
pub trait Pull<T> {
    /// Pulls an element and provides the opportunity to take ownership.
    ///
    /// The puller may mutate the result, in particular take ownership of the data by
    /// replacing it with other data or even `None`. This allows the puller to return
    /// resources to the implementor.
    ///
    /// If `pull` returns `None` this conventionally signals that no more data is available
    /// at the moment, and the puller should find something better to do.
    fn pull(&mut self) -> &mut Option<T>;
    /// Takes an `Option<T>` and leaves `None` behind.
    #[inline]
    fn recv(&mut self) -> Option<T> { self.pull().take() }
}

impl<T, P: ?Sized + Pull<T>> Pull<T> for Box<P> {
    #[inline]
    fn pull(&mut self) -> &mut Option<T> { (**self).pull() }
}


use crossbeam_channel::{Sender, Receiver};

/// Allocate a matrix of send and receive changes to exchange items.
///
/// This method constructs channels for `sends` threads to create and send
/// items of type `T` to `recvs` receiver threads.
fn promise_futures<T>(sends: usize, recvs: usize) -> (Vec<Vec<Sender<T>>>, Vec<Vec<Receiver<T>>>) {

    // each pair of workers has a sender and a receiver.
    let mut senders: Vec<_> = (0 .. sends).map(|_| Vec::with_capacity(recvs)).collect();
    let mut recvers: Vec<_> = (0 .. recvs).map(|_| Vec::with_capacity(sends)).collect();

    for sender in senders.iter_mut() {
        for recver in recvers.iter_mut() {
            let (send, recv) = crossbeam_channel::unbounded();
            sender.push(send);
            recver.push(recv);
        }
    }

    (senders, recvers)
}