Skip to main content

timely_communication/
initialize.rs

1//! Initialization logic for a generic instance of the `Allocate` channel allocation trait.
2
3use std::thread;
4#[cfg(feature = "getopts")]
5use std::io::BufRead;
6use std::sync::Arc;
7use std::fmt::{Debug, Formatter};
8use std::any::Any;
9use std::ops::DerefMut;
10#[cfg(feature = "getopts")]
11use getopts;
12use timely_logging::Logger;
13
14use crate::allocator::thread::ThreadBuilder;
15use crate::allocator::{AllocateBuilder, Allocator, AllocatorBuilder, ProcessBuilder};
16use crate::allocator::zero_copy::bytes_slab::BytesRefill;
17use crate::allocator::zero_copy::initialize::initialize_networking;
18use crate::logging::{CommunicationEventBuilder, CommunicationSetup};
19
20/// Possible configurations for the communication infrastructure.
21#[derive(Clone)]
22pub enum Config {
23    /// Use one thread.
24    Thread,
25    /// Use one process with an indicated number of threads.
26    Process(usize),
27    /// Use one process with an indicated number of threads. Use zero-copy exchange channels.
28    ProcessBinary(usize),
29    /// Expect multiple processes.
30    Cluster {
31        /// Number of per-process worker threads
32        threads: usize,
33        /// Identity of this process
34        process: usize,
35        /// Addresses of all processes
36        addresses: Vec<String>,
37        /// Verbosely report connection process
38        report: bool,
39        /// Enable intra-process zero-copy
40        zerocopy: bool,
41        /// Closure to create a new logger for a communication thread
42        log_fn: Arc<dyn Fn(CommunicationSetup) -> Option<Logger<CommunicationEventBuilder>> + Send + Sync>,
43    }
44}
45
46impl Debug for Config {
47    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
48        match self {
49            Config::Thread => write!(f, "Config::Thread()"),
50            Config::Process(n) => write!(f, "Config::Process({})", n),
51            Config::ProcessBinary(n) => write!(f, "Config::ProcessBinary({})", n),
52            Config::Cluster { threads, process, addresses, report, zerocopy, log_fn: _ } => f
53                .debug_struct("Config::Cluster")
54                .field("threads", threads)
55                .field("process", process)
56                .field("addresses", addresses)
57                .field("report", report)
58                .field("zerocopy", zerocopy)
59                .finish_non_exhaustive()
60        }
61    }
62}
63
64impl Config {
65    /// Installs options into a [`getopts::Options`] struct that corresponds
66    /// to the parameters in the configuration.
67    ///
68    /// It is the caller's responsibility to ensure that the installed options
69    /// do not conflict with any other options that may exist in `opts`, or
70    /// that may be installed into `opts` in the future.
71    ///
72    /// This method is only available if the `getopts` feature is enabled, which
73    /// it is by default.
74    #[cfg(feature = "getopts")]
75    pub fn install_options(opts: &mut getopts::Options) {
76        opts.optopt("w", "threads", "number of per-process worker threads", "NUM");
77        opts.optopt("p", "process", "identity of this process", "IDX");
78        opts.optopt("n", "processes", "number of processes", "NUM");
79        opts.optopt("h", "hostfile", "text file whose lines are process addresses", "FILE");
80        opts.optflag("r", "report", "reports connection progress");
81        opts.optflag("z", "zerocopy", "enable zero-copy for intra-process communication");
82    }
83
84    /// Instantiates a configuration based upon the parsed options in `matches`.
85    ///
86    /// The `matches` object must have been constructed from a
87    /// [`getopts::Options`] which contained at least the options installed by
88    /// [`Self::install_options`].
89    ///
90    /// This method is only available if the `getopts` feature is enabled, which
91    /// it is by default.
92    #[cfg(feature = "getopts")]
93    pub fn from_matches(matches: &getopts::Matches) -> Result<Config, String> {
94        let threads = matches.opt_get_default("w", 1_usize).map_err(|e| e.to_string())?;
95        let process = matches.opt_get_default("p", 0_usize).map_err(|e| e.to_string())?;
96        let processes = matches.opt_get_default("n", 1_usize).map_err(|e| e.to_string())?;
97        let report = matches.opt_present("report");
98        let zerocopy = matches.opt_present("zerocopy");
99
100        if processes > 1 {
101            let mut addresses = Vec::new();
102            if let Some(hosts) = matches.opt_str("h") {
103                let file = ::std::fs::File::open(hosts.clone()).map_err(|e| e.to_string())?;
104                let reader = ::std::io::BufReader::new(file);
105                for line in reader.lines().take(processes) {
106                    addresses.push(line.map_err(|e| e.to_string())?);
107                }
108                if addresses.len() < processes {
109                    return Err(format!("could only read {} addresses from {}, but -n: {}", addresses.len(), hosts, processes));
110                }
111            }
112            else {
113                for index in 0..processes {
114                    addresses.push(format!("localhost:{}", 2101 + index));
115                }
116            }
117
118            assert_eq!(processes, addresses.len());
119            Ok(Config::Cluster {
120                threads,
121                process,
122                addresses,
123                report,
124                zerocopy,
125                log_fn: Arc::new(|_| None),
126            })
127        } else if threads > 1 {
128            if zerocopy {
129                Ok(Config::ProcessBinary(threads))
130            } else {
131                Ok(Config::Process(threads))
132            }
133        } else {
134            Ok(Config::Thread)
135        }
136    }
137
138    /// Constructs a new configuration by parsing the supplied text arguments.
139    ///
140    /// Most commonly, callers supply `std::env::args()` as the iterator.
141    ///
142    /// This method is only available if the `getopts` feature is enabled, which
143    /// it is by default.
144    /// The `Ok` variant returns the free command-line arguments as well as the config.
145    #[cfg(feature = "getopts")]
146    pub fn from_args<I: Iterator<Item=String>>(args: I) -> Result<(Config, Vec<String>), String> {
147        let mut opts = getopts::Options::new();
148        Config::install_options(&mut opts);
149        let matches = opts.parse(args).map_err(|e| e.to_string())?;
150        Config::from_matches(&matches).map(|c| (c, matches.free))
151    }
152
153    /// Attempts to assemble the described communication infrastructure.
154    pub fn try_build(self) -> Result<(Vec<AllocatorBuilder>, Box<dyn Any+Send>), String> {
155        let refill = BytesRefill {
156            logic: Arc::new(|size| Box::new(vec![0_u8; size]) as Box<dyn DerefMut<Target=[u8]>>),
157            limit: None,
158        };
159        self.try_build_with(refill)
160    }
161
162    /// Attempts to assemble the described communication infrastructure, using the supplied refill function.
163    pub fn try_build_with(self, refill: BytesRefill) -> Result<(Vec<AllocatorBuilder>, Box<dyn Any+Send>), String> {
164        match self {
165            Config::Thread => {
166                Ok((vec![AllocatorBuilder::Thread(ThreadBuilder)], Box::new(())))
167            },
168            Config::Process(threads) => {
169                let builders = ProcessBuilder::new_typed_vector(threads, refill)
170                    .into_iter()
171                    .map(AllocatorBuilder::Process)
172                    .collect();
173                Ok((builders, Box::new(())))
174            },
175            Config::ProcessBinary(threads) => {
176                let builders = ProcessBuilder::new_bytes_vector(threads, refill)
177                    .into_iter()
178                    .map(AllocatorBuilder::Process)
179                    .collect();
180                Ok((builders, Box::new(())))
181            },
182            Config::Cluster { threads, process, addresses, report, zerocopy: false, log_fn } => {
183                let process_allocators = ProcessBuilder::new_typed_vector(threads, refill.clone());
184                match initialize_networking(process_allocators, addresses, process, threads, report, refill, log_fn) {
185                    Ok((stuff, guard)) => {
186                        Ok((stuff.into_iter().map(AllocatorBuilder::Tcp).collect(), Box::new(guard)))
187                    },
188                    Err(err) => Err(format!("failed to initialize networking: {}", err))
189                }
190            },
191            Config::Cluster { threads, process, addresses, report, zerocopy: true, log_fn } => {
192                let process_allocators = ProcessBuilder::new_bytes_vector(threads, refill.clone());
193                match initialize_networking(process_allocators, addresses, process, threads, report, refill, log_fn) {
194                    Ok((stuff, guard)) => {
195                        Ok((stuff.into_iter().map(AllocatorBuilder::Tcp).collect(), Box::new(guard)))
196                    },
197                    Err(err) => Err(format!("failed to initialize networking: {}", err))
198                }
199            }
200        }
201    }
202}
203
204/// Initializes communication and executes a distributed computation.
205///
206/// This method allocates an `allocator::Allocator` for each thread, spawns local worker threads,
207/// and invokes the supplied function with the allocator.
208/// The method returns a `WorkerGuards<T>` which can be `join`ed to retrieve the return values
209/// (or errors) of the workers.
210///
211///
212/// # Examples
213/// ```
214/// use timely_communication::Bytesable;
215///
216/// /// A wrapper that indicates the serialization/deserialization strategy.
217/// pub struct Message {
218///     /// Text contents.
219///     pub payload: String,
220/// }
221///
222/// impl Bytesable for Message {
223///     fn from_bytes(bytes: timely_bytes::arc::Bytes) -> Self {
224///         Message { payload: std::str::from_utf8(&bytes[..]).unwrap().to_string() }
225///     }
226///
227///     fn length_in_bytes(&self) -> usize {
228///         self.payload.len()
229///     }
230///
231///     fn into_bytes<W: ::std::io::Write>(&self, writer: &mut W) {
232///         writer.write_all(self.payload.as_bytes()).unwrap();
233///     }
234/// }
235///
236/// // extract the configuration from user-supplied arguments, initialize the computation.
237/// let (config, _free) = timely_communication::Config::from_args(std::env::args()).unwrap();
238/// let guards = timely_communication::initialize(config, |mut allocator| {
239///
240///     println!("worker {} of {} started", allocator.index(), allocator.peers());
241///
242///     // allocates a pair of senders list and one receiver.
243///     let (mut senders, mut receiver) = allocator.allocate(0);
244///
245///     // send typed data along each channel
246///     for i in 0 .. allocator.peers() {
247///         senders[i].send(Message { payload: format!("hello, {}", i)});
248///         senders[i].done();
249///     }
250///
251///     // no support for termination notification,
252///     // we have to count down ourselves.
253///     let mut received = 0;
254///     while received < allocator.peers() {
255///
256///         allocator.receive();
257///
258///         if let Some(message) = receiver.recv() {
259///             println!("worker {}: received: <{}>", allocator.index(), message.payload);
260///             received += 1;
261///         }
262///
263///         allocator.release();
264///     }
265///
266///     allocator.index()
267/// });
268///
269/// // computation runs until guards are joined or dropped.
270/// if let Ok(guards) = guards {
271///     for guard in guards.join() {
272///         println!("result: {:?}", guard);
273///     }
274/// }
275/// else { println!("error in computation"); }
276/// ```
277///
278/// This should produce output like:
279///
280/// ```ignore
281/// worker 0 started
282/// worker 1 started
283/// worker 0: received: <hello, 0>
284/// worker 1: received: <hello, 1>
285/// worker 0: received: <hello, 0>
286/// worker 1: received: <hello, 1>
287/// result: Ok(0)
288/// result: Ok(1)
289/// ```
290pub fn initialize<T:Send+'static, F: Fn(Allocator)->T+Send+Sync+'static>(
291    config: Config,
292    func: F,
293) -> Result<WorkerGuards<T>,String> {
294    let (allocators, others) = config.try_build()?;
295    initialize_from(allocators, others, func)
296}
297
298/// Initializes computation and runs a distributed computation.
299///
300/// This version of `initialize` allows you to explicitly specify the allocators that
301/// you want to use, by providing an explicit list of allocator builders. Additionally,
302/// you provide `others`, a `Box<Any>` which will be held by the resulting worker guard
303/// and dropped when it is dropped, which allows you to join communication threads.
304///
305/// # Examples
306/// ```
307/// use timely_communication::Bytesable;
308///
309/// /// A wrapper that indicates `bincode` as the serialization/deserialization strategy.
310/// pub struct Message {
311///     /// Text contents.
312///     pub payload: String,
313/// }
314///
315/// impl Bytesable for Message {
316///     fn from_bytes(bytes: timely_bytes::arc::Bytes) -> Self {
317///         Message { payload: std::str::from_utf8(&bytes[..]).unwrap().to_string() }
318///     }
319///
320///     fn length_in_bytes(&self) -> usize {
321///         self.payload.len()
322///     }
323///
324///     fn into_bytes<W: ::std::io::Write>(&self, writer: &mut W) {
325///         writer.write_all(self.payload.as_bytes()).unwrap();
326///     }
327/// }
328///
329/// // extract the configuration from user-supplied arguments, initialize the computation.
330/// let (config, _free) = timely_communication::Config::from_args(std::env::args()).unwrap();
331/// let guards = timely_communication::initialize(config, |mut allocator| {
332///
333///     println!("worker {} of {} started", allocator.index(), allocator.peers());
334///
335///     // allocates a pair of senders list and one receiver.
336///     let (mut senders, mut receiver) = allocator.allocate(0);
337///
338///     // send typed data along each channel
339///     for i in 0 .. allocator.peers() {
340///         senders[i].send(Message { payload: format!("hello, {}", i)});
341///         senders[i].done();
342///     }
343///
344///     // no support for termination notification,
345///     // we have to count down ourselves.
346///     let mut received = 0;
347///     while received < allocator.peers() {
348///
349///         allocator.receive();
350///
351///         if let Some(message) = receiver.recv() {
352///             println!("worker {}: received: <{}>", allocator.index(), message.payload);
353///             received += 1;
354///         }
355///
356///         allocator.release();
357///     }
358///
359///     allocator.index()
360/// });
361///
362/// // computation runs until guards are joined or dropped.
363/// if let Ok(guards) = guards {
364///     for guard in guards.join() {
365///         println!("result: {:?}", guard);
366///     }
367/// }
368/// else { println!("error in computation"); }
369/// ```
370pub fn initialize_from<T, F>(
371    builders: Vec<AllocatorBuilder>,
372    others: Box<dyn Any+Send>,
373    func: F,
374) -> Result<WorkerGuards<T>,String>
375where
376    T: Send+'static,
377    F: Fn(Allocator)->T+Send+Sync+'static
378{
379    let logic = Arc::new(func);
380    let mut guards = Vec::new();
381    for (index, builder) in builders.into_iter().enumerate() {
382        let clone = Arc::clone(&logic);
383        guards.push(thread::Builder::new()
384                            .name(format!("timely:work-{}", index))
385                            .spawn(move || {
386                                let communicator = builder.build();
387                                (*clone)(communicator)
388                            })
389                            .map_err(|e| format!("{:?}", e))?);
390    }
391
392    Ok(WorkerGuards { guards, others })
393}
394
395/// Maintains `JoinHandle`s for worker threads.
396pub struct WorkerGuards<T:Send+'static> {
397    guards: Vec<::std::thread::JoinHandle<T>>,
398    others: Box<dyn Any+Send>,
399}
400
401impl<T:Send+'static> WorkerGuards<T> {
402
403    /// Returns a reference to the indexed guard.
404    pub fn guards(&self) -> &[std::thread::JoinHandle<T>] {
405        &self.guards[..]
406    }
407
408    /// Provides access to handles that are not worker threads.
409    pub fn others(&self) -> &Box<dyn Any+Send> {
410        &self.others
411    }
412
413    /// Waits on the worker threads and returns the results they produce.
414    pub fn join(mut self) -> Vec<Result<T, String>> {
415        self.guards
416            .drain(..)
417            .map(|guard| guard.join().map_err(|e| format!("{:?}", e)))
418            .collect()
419    }
420}
421
422impl<T:Send+'static> Drop for WorkerGuards<T> {
423    fn drop(&mut self) {
424        for guard in self.guards.drain(..) {
425            guard.join().expect("Worker panic");
426        }
427        // println!("WORKER THREADS JOINED");
428    }
429}