1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
//! A simple communication infrastructure providing typed exchange channels.
//!
//! This crate is part of the timely dataflow system, used primarily for its inter-worker communication.
//! It may be independently useful, but it is separated out mostly to make clear boundaries in the project.
//!
//! Threads are spawned with an [`allocator::Generic`](allocator::generic::Generic), whose
//! [`allocate`](Allocate::allocate) method returns a pair of several send endpoints and one
//! receive endpoint. Messages sent into a send endpoint will eventually be received by the corresponding worker,
//! if it receives often enough. The point-to-point channels are each FIFO, but with no fairness guarantees.
//!
//! To be communicated, a type must implement the [`Serialize`](serde::Serialize) trait when using the
//! `bincode` feature or the [`Abomonation`](abomonation::Abomonation) trait when not.
//!
//! Channel endpoints also implement a lower-level `push` and `pull` interface (through the [`Push`](Push) and [`Pull`](Pull)
//! traits), which is used for more precise control of resources.
//!
//! # Examples
//! ```
//! use timely_communication::Allocate;
//!
//! // configure for two threads, just one process.
//! let config = timely_communication::Config::Process(2);
//!
//! // initializes communication, spawns workers
//! let guards = timely_communication::initialize(config, |mut allocator| {
//!     println!("worker {} started", allocator.index());
//!
//!     // allocates a pair of senders list and one receiver.
//!     let (mut senders, mut receiver) = allocator.allocate(0);
//!
//!     // send typed data along each channel
//!     use timely_communication::Message;
//!     senders[0].send(Message::from_typed(format!("hello, {}", 0)));
//!     senders[1].send(Message::from_typed(format!("hello, {}", 1)));
//!
//!     // no support for termination notification,
//!     // we have to count down ourselves.
//!     let mut expecting = 2;
//!     while expecting > 0 {
//!
//!         allocator.receive();
//!         if let Some(message) = receiver.recv() {
//!             use std::ops::Deref;
//!             println!("worker {}: received: <{}>", allocator.index(), message.deref());
//!             expecting -= 1;
//!         }
//!         allocator.release();
//!     }
//!
//!     // optionally, return something
//!     allocator.index()
//! });
//!
//! // computation runs until guards are joined or dropped.
//! if let Ok(guards) = guards {
//!     for guard in guards.join() {
//!         println!("result: {:?}", guard);
//!     }
//! }
//! else { println!("error in computation"); }
//! ```
//!
//! The should produce output like:
//!
//! ```ignore
//! worker 0 started
//! worker 1 started
//! worker 0: received: <hello, 0>
//! worker 1: received: <hello, 1>
//! worker 0: received: <hello, 0>
//! worker 1: received: <hello, 1>
//! result: Ok(0)
//! result: Ok(1)
//! ```

#![forbid(missing_docs)]

#[cfg(feature = "getopts")]
extern crate getopts;
#[cfg(feature = "bincode")]
extern crate bincode;
#[cfg(feature = "bincode")]
extern crate serde;

extern crate abomonation;
#[macro_use] extern crate abomonation_derive;

extern crate timely_bytes as bytes;
extern crate timely_logging as logging_core;

pub mod allocator;
pub mod networking;
pub mod initialize;
pub mod logging;
pub mod message;
pub mod buzzer;

use std::any::Any;

#[cfg(feature = "bincode")]
use serde::{Serialize, Deserialize};
#[cfg(not(feature = "bincode"))]
use abomonation::Abomonation;

pub use allocator::Generic as Allocator;
pub use allocator::Allocate;
pub use initialize::{initialize, initialize_from, Config, WorkerGuards};
pub use message::Message;

/// A composite trait for types that may be used with channels.
#[cfg(not(feature = "bincode"))]
pub trait Data : Send+Sync+Any+Abomonation+'static { }
#[cfg(not(feature = "bincode"))]
impl<T: Send+Sync+Any+Abomonation+'static> Data for T { }

/// A composite trait for types that may be used with channels.
#[cfg(feature = "bincode")]
pub trait Data : Send+Sync+Any+Serialize+for<'a>Deserialize<'a>+'static { }
#[cfg(feature = "bincode")]
impl<T: Send+Sync+Any+Serialize+for<'a>Deserialize<'a>+'static> Data for T { }

/// Pushing elements of type `T`.
///
/// This trait moves data around using references rather than ownership,
/// which provides the opportunity for zero-copy operation. In the call
/// to `push(element)` the implementor can *swap* some other value to
/// replace `element`, effectively returning the value to the caller.
///
/// Conventionally, a sequence of calls to `push()` should conclude with
/// a call of `push(&mut None)` or `done()` to signal to implementors that
/// another call to `push()` may not be coming.
pub trait Push<T> {
    /// Pushes `element` with the opportunity to take ownership.
    fn push(&mut self, element: &mut Option<T>);
    /// Pushes `element` and drops any resulting resources.
    #[inline]
    fn send(&mut self, element: T) { self.push(&mut Some(element)); }
    /// Pushes `None`, conventionally signalling a flush.
    #[inline]
    fn done(&mut self) { self.push(&mut None); }
}

impl<T, P: ?Sized + Push<T>> Push<T> for Box<P> {
    #[inline]
    fn push(&mut self, element: &mut Option<T>) { (**self).push(element) }
}

/// Pulling elements of type `T`.
pub trait Pull<T> {
    /// Pulls an element and provides the opportunity to take ownership.
    ///
    /// The puller may mutate the result, in particular take ownership of the data by
    /// replacing it with other data or even `None`. This allows the puller to return
    /// resources to the implementor.
    ///
    /// If `pull` returns `None` this conventionally signals that no more data is available
    /// at the moment, and the puller should find something better to do.
    fn pull(&mut self) -> &mut Option<T>;
    /// Takes an `Option<T>` and leaves `None` behind.
    #[inline]
    fn recv(&mut self) -> Option<T> { self.pull().take() }
}

impl<T, P: ?Sized + Pull<T>> Pull<T> for Box<P> {
    #[inline]
    fn pull(&mut self) -> &mut Option<T> { (**self).pull() }
}


use crossbeam_channel::{Sender, Receiver};

/// Allocate a matrix of send and receive changes to exchange items.
///
/// This method constructs channels for `sends` threads to create and send
/// items of type `T` to `recvs` receiver threads.
fn promise_futures<T>(sends: usize, recvs: usize) -> (Vec<Vec<Sender<T>>>, Vec<Vec<Receiver<T>>>) {

    // each pair of workers has a sender and a receiver.
    let mut senders: Vec<_> = (0 .. sends).map(|_| Vec::with_capacity(recvs)).collect();
    let mut recvers: Vec<_> = (0 .. recvs).map(|_| Vec::with_capacity(sends)).collect();

    for sender in 0 .. sends {
        for recver in 0 .. recvs {
            let (send, recv) = crossbeam_channel::unbounded();
            senders[sender].push(send);
            recvers[recver].push(recv);
        }
    }

    (senders, recvers)
}