Skip to main content

timely_communication/
initialize.rs

1//! Initialization logic for a generic instance of the `Allocate` channel allocation trait.
2
3use std::thread;
4#[cfg(feature = "getopts")]
5use std::io::BufRead;
6use std::sync::Arc;
7use std::fmt::{Debug, Formatter};
8use std::any::Any;
9use std::ops::DerefMut;
10#[cfg(feature = "getopts")]
11use getopts;
12use timely_logging::Logger;
13
14use crate::allocator::thread::ThreadBuilder;
15use crate::allocator::{AllocateBuilder, Process, Generic, GenericBuilder, PeerBuilder};
16use crate::allocator::zero_copy::allocator_process::ProcessBuilder;
17use crate::allocator::zero_copy::bytes_slab::BytesRefill;
18use crate::allocator::zero_copy::initialize::initialize_networking;
19use crate::logging::{CommunicationEventBuilder, CommunicationSetup};
20
21/// Possible configurations for the communication infrastructure.
22#[derive(Clone)]
23pub enum Config {
24    /// Use one thread.
25    Thread,
26    /// Use one process with an indicated number of threads.
27    Process(usize),
28    /// Use one process with an indicated number of threads. Use zero-copy exchange channels.
29    ProcessBinary(usize),
30    /// Expect multiple processes.
31    Cluster {
32        /// Number of per-process worker threads
33        threads: usize,
34        /// Identity of this process
35        process: usize,
36        /// Addresses of all processes
37        addresses: Vec<String>,
38        /// Verbosely report connection process
39        report: bool,
40        /// Enable intra-process zero-copy
41        zerocopy: bool,
42        /// Closure to create a new logger for a communication thread
43        log_fn: Arc<dyn Fn(CommunicationSetup) -> Option<Logger<CommunicationEventBuilder>> + Send + Sync>,
44    }
45}
46
47impl Debug for Config {
48    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
49        match self {
50            Config::Thread => write!(f, "Config::Thread()"),
51            Config::Process(n) => write!(f, "Config::Process({})", n),
52            Config::ProcessBinary(n) => write!(f, "Config::ProcessBinary({})", n),
53            Config::Cluster { threads, process, addresses, report, zerocopy, log_fn: _ } => f
54                .debug_struct("Config::Cluster")
55                .field("threads", threads)
56                .field("process", process)
57                .field("addresses", addresses)
58                .field("report", report)
59                .field("zerocopy", zerocopy)
60                .finish_non_exhaustive()
61        }
62    }
63}
64
65impl Config {
66    /// Installs options into a [`getopts::Options`] struct that corresponds
67    /// to the parameters in the configuration.
68    ///
69    /// It is the caller's responsibility to ensure that the installed options
70    /// do not conflict with any other options that may exist in `opts`, or
71    /// that may be installed into `opts` in the future.
72    ///
73    /// This method is only available if the `getopts` feature is enabled, which
74    /// it is by default.
75    #[cfg(feature = "getopts")]
76    pub fn install_options(opts: &mut getopts::Options) {
77        opts.optopt("w", "threads", "number of per-process worker threads", "NUM");
78        opts.optopt("p", "process", "identity of this process", "IDX");
79        opts.optopt("n", "processes", "number of processes", "NUM");
80        opts.optopt("h", "hostfile", "text file whose lines are process addresses", "FILE");
81        opts.optflag("r", "report", "reports connection progress");
82        opts.optflag("z", "zerocopy", "enable zero-copy for intra-process communication");
83    }
84
85    /// Instantiates a configuration based upon the parsed options in `matches`.
86    ///
87    /// The `matches` object must have been constructed from a
88    /// [`getopts::Options`] which contained at least the options installed by
89    /// [`Self::install_options`].
90    ///
91    /// This method is only available if the `getopts` feature is enabled, which
92    /// it is by default.
93    #[cfg(feature = "getopts")]
94    pub fn from_matches(matches: &getopts::Matches) -> Result<Config, String> {
95        let threads = matches.opt_get_default("w", 1_usize).map_err(|e| e.to_string())?;
96        let process = matches.opt_get_default("p", 0_usize).map_err(|e| e.to_string())?;
97        let processes = matches.opt_get_default("n", 1_usize).map_err(|e| e.to_string())?;
98        let report = matches.opt_present("report");
99        let zerocopy = matches.opt_present("zerocopy");
100
101        if processes > 1 {
102            let mut addresses = Vec::new();
103            if let Some(hosts) = matches.opt_str("h") {
104                let file = ::std::fs::File::open(hosts.clone()).map_err(|e| e.to_string())?;
105                let reader = ::std::io::BufReader::new(file);
106                for line in reader.lines().take(processes) {
107                    addresses.push(line.map_err(|e| e.to_string())?);
108                }
109                if addresses.len() < processes {
110                    return Err(format!("could only read {} addresses from {}, but -n: {}", addresses.len(), hosts, processes));
111                }
112            }
113            else {
114                for index in 0..processes {
115                    addresses.push(format!("localhost:{}", 2101 + index));
116                }
117            }
118
119            assert_eq!(processes, addresses.len());
120            Ok(Config::Cluster {
121                threads,
122                process,
123                addresses,
124                report,
125                zerocopy,
126                log_fn: Arc::new(|_| None),
127            })
128        } else if threads > 1 {
129            if zerocopy {
130                Ok(Config::ProcessBinary(threads))
131            } else {
132                Ok(Config::Process(threads))
133            }
134        } else {
135            Ok(Config::Thread)
136        }
137    }
138
139    /// Constructs a new configuration by parsing the supplied text arguments.
140    ///
141    /// Most commonly, callers supply `std::env::args()` as the iterator.
142    ///
143    /// This method is only available if the `getopts` feature is enabled, which
144    /// it is by default.
145    /// The `Ok` variant returns the free command-line arguments as well as the config.
146    #[cfg(feature = "getopts")]
147    pub fn from_args<I: Iterator<Item=String>>(args: I) -> Result<(Config, Vec<String>), String> {
148        let mut opts = getopts::Options::new();
149        Config::install_options(&mut opts);
150        let matches = opts.parse(args).map_err(|e| e.to_string())?;
151        Config::from_matches(&matches).map(|c| (c, matches.free))
152    }
153
154    /// Attempts to assemble the described communication infrastructure.
155    pub fn try_build(self) -> Result<(Vec<GenericBuilder>, Box<dyn Any+Send>), String> {
156        let refill = BytesRefill {
157            logic: Arc::new(|size| Box::new(vec![0_u8; size]) as Box<dyn DerefMut<Target=[u8]>>),
158            limit: None,
159        };
160        self.try_build_with(refill)
161    }
162
163    /// Attempts to assemble the described communication infrastructure, using the supplied refill function.
164    pub fn try_build_with(self, refill: BytesRefill) -> Result<(Vec<GenericBuilder>, Box<dyn Any+Send>), String> {
165        match self {
166            Config::Thread => {
167                Ok((vec![GenericBuilder::Thread(ThreadBuilder)], Box::new(())))
168            },
169            Config::Process(threads) => {
170                Ok((Process::new_vector(threads, refill).into_iter().map(GenericBuilder::Process).collect(), Box::new(())))
171            },
172            Config::ProcessBinary(threads) => {
173                Ok((ProcessBuilder::new_vector(threads, refill).into_iter().map(GenericBuilder::ProcessBinary).collect(), Box::new(())))
174            },
175            Config::Cluster { threads, process, addresses, report, zerocopy: false, log_fn } => {
176                match initialize_networking::<Process>(addresses, process, threads, report, refill, log_fn) {
177                    Ok((stuff, guard)) => {
178                        Ok((stuff.into_iter().map(GenericBuilder::ZeroCopy).collect(), Box::new(guard)))
179                    },
180                    Err(err) => Err(format!("failed to initialize networking: {}", err))
181                }
182            },
183            Config::Cluster { threads, process, addresses, report, zerocopy: true, log_fn } => {
184                match initialize_networking::<ProcessBuilder>(addresses, process, threads, report, refill, log_fn) {
185                    Ok((stuff, guard)) => {
186                        Ok((stuff.into_iter().map(GenericBuilder::ZeroCopyBinary).collect(), Box::new(guard)))
187                    },
188                    Err(err) => Err(format!("failed to initialize networking: {}", err))
189                }
190            }
191        }
192    }
193}
194
195/// Initializes communication and executes a distributed computation.
196///
197/// This method allocates an `allocator::Generic` for each thread, spawns local worker threads,
198/// and invokes the supplied function with the allocator.
199/// The method returns a `WorkerGuards<T>` which can be `join`ed to retrieve the return values
200/// (or errors) of the workers.
201///
202///
203/// # Examples
204/// ```
205/// use timely_communication::{Allocate, Bytesable};
206///
207/// /// A wrapper that indicates the serialization/deserialization strategy.
208/// pub struct Message {
209///     /// Text contents.
210///     pub payload: String,
211/// }
212///
213/// impl Bytesable for Message {
214///     fn from_bytes(bytes: timely_bytes::arc::Bytes) -> Self {
215///         Message { payload: std::str::from_utf8(&bytes[..]).unwrap().to_string() }
216///     }
217///
218///     fn length_in_bytes(&self) -> usize {
219///         self.payload.len()
220///     }
221///
222///     fn into_bytes<W: ::std::io::Write>(&self, writer: &mut W) {
223///         writer.write_all(self.payload.as_bytes()).unwrap();
224///     }
225/// }
226///
227/// // extract the configuration from user-supplied arguments, initialize the computation.
228/// let (config, _free) = timely_communication::Config::from_args(std::env::args()).unwrap();
229/// let guards = timely_communication::initialize(config, |mut allocator| {
230///
231///     println!("worker {} of {} started", allocator.index(), allocator.peers());
232///
233///     // allocates a pair of senders list and one receiver.
234///     let (mut senders, mut receiver) = allocator.allocate(0);
235///
236///     // send typed data along each channel
237///     for i in 0 .. allocator.peers() {
238///         senders[i].send(Message { payload: format!("hello, {}", i)});
239///         senders[i].done();
240///     }
241///
242///     // no support for termination notification,
243///     // we have to count down ourselves.
244///     let mut received = 0;
245///     while received < allocator.peers() {
246///
247///         allocator.receive();
248///
249///         if let Some(message) = receiver.recv() {
250///             println!("worker {}: received: <{}>", allocator.index(), message.payload);
251///             received += 1;
252///         }
253///
254///         allocator.release();
255///     }
256///
257///     allocator.index()
258/// });
259///
260/// // computation runs until guards are joined or dropped.
261/// if let Ok(guards) = guards {
262///     for guard in guards.join() {
263///         println!("result: {:?}", guard);
264///     }
265/// }
266/// else { println!("error in computation"); }
267/// ```
268///
269/// This should produce output like:
270///
271/// ```ignore
272/// worker 0 started
273/// worker 1 started
274/// worker 0: received: <hello, 0>
275/// worker 1: received: <hello, 1>
276/// worker 0: received: <hello, 0>
277/// worker 1: received: <hello, 1>
278/// result: Ok(0)
279/// result: Ok(1)
280/// ```
281pub fn initialize<T:Send+'static, F: Fn(Generic)->T+Send+Sync+'static>(
282    config: Config,
283    func: F,
284) -> Result<WorkerGuards<T>,String> {
285    let (allocators, others) = config.try_build()?;
286    initialize_from(allocators, others, func)
287}
288
289/// Initializes computation and runs a distributed computation.
290///
291/// This version of `initialize` allows you to explicitly specify the allocators that
292/// you want to use, by providing an explicit list of allocator builders. Additionally,
293/// you provide `others`, a `Box<Any>` which will be held by the resulting worker guard
294/// and dropped when it is dropped, which allows you to join communication threads.
295///
296/// # Examples
297/// ```
298/// use timely_communication::{Allocate, Bytesable};
299///
300/// /// A wrapper that indicates `bincode` as the serialization/deserialization strategy.
301/// pub struct Message {
302///     /// Text contents.
303///     pub payload: String,
304/// }
305///
306/// impl Bytesable for Message {
307///     fn from_bytes(bytes: timely_bytes::arc::Bytes) -> Self {
308///         Message { payload: std::str::from_utf8(&bytes[..]).unwrap().to_string() }
309///     }
310///
311///     fn length_in_bytes(&self) -> usize {
312///         self.payload.len()
313///     }
314///
315///     fn into_bytes<W: ::std::io::Write>(&self, writer: &mut W) {
316///         writer.write_all(self.payload.as_bytes()).unwrap();
317///     }
318/// }
319///
320/// // extract the configuration from user-supplied arguments, initialize the computation.
321/// let (config, _free) = timely_communication::Config::from_args(std::env::args()).unwrap();
322/// let guards = timely_communication::initialize(config, |mut allocator| {
323///
324///     println!("worker {} of {} started", allocator.index(), allocator.peers());
325///
326///     // allocates a pair of senders list and one receiver.
327///     let (mut senders, mut receiver) = allocator.allocate(0);
328///
329///     // send typed data along each channel
330///     for i in 0 .. allocator.peers() {
331///         senders[i].send(Message { payload: format!("hello, {}", i)});
332///         senders[i].done();
333///     }
334///
335///     // no support for termination notification,
336///     // we have to count down ourselves.
337///     let mut received = 0;
338///     while received < allocator.peers() {
339///
340///         allocator.receive();
341///
342///         if let Some(message) = receiver.recv() {
343///             println!("worker {}: received: <{}>", allocator.index(), message.payload);
344///             received += 1;
345///         }
346///
347///         allocator.release();
348///     }
349///
350///     allocator.index()
351/// });
352///
353/// // computation runs until guards are joined or dropped.
354/// if let Ok(guards) = guards {
355///     for guard in guards.join() {
356///         println!("result: {:?}", guard);
357///     }
358/// }
359/// else { println!("error in computation"); }
360/// ```
361pub fn initialize_from<A, T, F>(
362    builders: Vec<A>,
363    others: Box<dyn Any+Send>,
364    func: F,
365) -> Result<WorkerGuards<T>,String>
366where
367    A: AllocateBuilder+'static,
368    T: Send+'static,
369    F: Fn(<A as AllocateBuilder>::Allocator)->T+Send+Sync+'static
370{
371    let logic = Arc::new(func);
372    let mut guards = Vec::new();
373    for (index, builder) in builders.into_iter().enumerate() {
374        let clone = Arc::clone(&logic);
375        guards.push(thread::Builder::new()
376                            .name(format!("timely:work-{}", index))
377                            .spawn(move || {
378                                let communicator = builder.build();
379                                (*clone)(communicator)
380                            })
381                            .map_err(|e| format!("{:?}", e))?);
382    }
383
384    Ok(WorkerGuards { guards, others })
385}
386
387/// Maintains `JoinHandle`s for worker threads.
388pub struct WorkerGuards<T:Send+'static> {
389    guards: Vec<::std::thread::JoinHandle<T>>,
390    others: Box<dyn Any+Send>,
391}
392
393impl<T:Send+'static> WorkerGuards<T> {
394
395    /// Returns a reference to the indexed guard.
396    pub fn guards(&self) -> &[std::thread::JoinHandle<T>] {
397        &self.guards[..]
398    }
399
400    /// Provides access to handles that are not worker threads.
401    pub fn others(&self) -> &Box<dyn Any+Send> {
402        &self.others
403    }
404
405    /// Waits on the worker threads and returns the results they produce.
406    pub fn join(mut self) -> Vec<Result<T, String>> {
407        self.guards
408            .drain(..)
409            .map(|guard| guard.join().map_err(|e| format!("{:?}", e)))
410            .collect()
411    }
412}
413
414impl<T:Send+'static> Drop for WorkerGuards<T> {
415    fn drop(&mut self) {
416        for guard in self.guards.drain(..) {
417            guard.join().expect("Worker panic");
418        }
419        // println!("WORKER THREADS JOINED");
420    }
421}