Skip to main content

timely_communication/
initialize.rs

1//! Initialization logic for a generic instance of the `Allocate` channel allocation trait.
2
3use std::thread;
4#[cfg(feature = "getopts")]
5use std::io::BufRead;
6use std::sync::Arc;
7use std::fmt::{Debug, Formatter};
8use std::any::Any;
9use std::ops::DerefMut;
10#[cfg(feature = "getopts")]
11use getopts;
12use timely_logging::Logger;
13
14use crate::allocator::thread::ThreadBuilder;
15use crate::allocator::{AllocateBuilder, Allocator, AllocatorBuilder, ProcessBuilder};
16use crate::allocator::zero_copy::bytes_slab::BytesRefill;
17use crate::allocator::zero_copy::initialize::initialize_networking;
18use crate::logging::{CommunicationEventBuilder, CommunicationSetup};
19
20/// Possible configurations for the communication infrastructure.
21#[derive(Clone)]
22pub enum Config {
23    /// Use one thread.
24    Thread,
25    /// Use one process with an indicated number of threads.
26    Process(usize),
27    /// Use one process with an indicated number of threads. Use zero-copy exchange channels.
28    ProcessBinary(usize),
29    /// Expect multiple processes.
30    Cluster {
31        /// Number of per-process worker threads
32        threads: usize,
33        /// Identity of this process
34        process: usize,
35        /// Addresses of all processes
36        addresses: Vec<String>,
37        /// Verbosely report connection process
38        report: bool,
39        /// Enable intra-process zero-copy
40        zerocopy: bool,
41    }
42}
43
44impl Debug for Config {
45    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
46        match self {
47            Config::Thread => write!(f, "Config::Thread()"),
48            Config::Process(n) => write!(f, "Config::Process({})", n),
49            Config::ProcessBinary(n) => write!(f, "Config::ProcessBinary({})", n),
50            Config::Cluster { threads, process, addresses, report, zerocopy } => f
51                .debug_struct("Config::Cluster")
52                .field("threads", threads)
53                .field("process", process)
54                .field("addresses", addresses)
55                .field("report", report)
56                .field("zerocopy", zerocopy)
57                .finish_non_exhaustive()
58        }
59    }
60}
61
62/// Configuration hooks that (currently) live outside the configuration.
63///
64/// Fields are public so callers can mutate `Hooks::default()` before
65/// passing it to `try_build_with`.
66pub struct Hooks {
67    /// A mechanism to set up loggers for each communication thread.
68    pub log_fn: Arc<dyn Fn(CommunicationSetup) -> Option<Logger<CommunicationEventBuilder>> + Send + Sync>,
69    /// A strategy for refreshing bytes for `BytesSlab`.
70    pub refill: BytesRefill,
71    /// A mechanism to get a matched pair of spill policies (writer, reader) per queue.
72    pub spill: Option<crate::allocator::zero_copy::spill::SpillPolicyFn>,
73}
74
75impl Default for Hooks {
76    fn default() -> Self {
77        Self {
78            log_fn: Arc::new(|_| None),
79            refill:  BytesRefill {
80                logic: Arc::new(|size| Box::new(vec![0_u8; size]) as Box<dyn DerefMut<Target=[u8]>+Send>),
81                limit: None,
82            },
83            spill: None,
84        }
85    }
86}
87
88impl Config {
89    /// Installs options into a [`getopts::Options`] struct that corresponds
90    /// to the parameters in the configuration.
91    ///
92    /// It is the caller's responsibility to ensure that the installed options
93    /// do not conflict with any other options that may exist in `opts`, or
94    /// that may be installed into `opts` in the future.
95    ///
96    /// This method is only available if the `getopts` feature is enabled, which
97    /// it is by default.
98    #[cfg(feature = "getopts")]
99    pub fn install_options(opts: &mut getopts::Options) {
100        opts.optopt("w", "threads", "number of per-process worker threads", "NUM");
101        opts.optopt("p", "process", "identity of this process", "IDX");
102        opts.optopt("n", "processes", "number of processes", "NUM");
103        opts.optopt("h", "hostfile", "text file whose lines are process addresses", "FILE");
104        opts.optflag("r", "report", "reports connection progress");
105        opts.optflag("z", "zerocopy", "enable zero-copy for intra-process communication");
106    }
107
108    /// Instantiates a configuration based upon the parsed options in `matches`.
109    ///
110    /// The `matches` object must have been constructed from a
111    /// [`getopts::Options`] which contained at least the options installed by
112    /// [`Self::install_options`].
113    ///
114    /// This method is only available if the `getopts` feature is enabled, which
115    /// it is by default.
116    #[cfg(feature = "getopts")]
117    pub fn from_matches(matches: &getopts::Matches) -> Result<Config, String> {
118        let threads = matches.opt_get_default("w", 1_usize).map_err(|e| e.to_string())?;
119        let process = matches.opt_get_default("p", 0_usize).map_err(|e| e.to_string())?;
120        let processes = matches.opt_get_default("n", 1_usize).map_err(|e| e.to_string())?;
121        let report = matches.opt_present("report");
122        let zerocopy = matches.opt_present("zerocopy");
123
124        if processes > 1 {
125            let mut addresses = Vec::new();
126            if let Some(hosts) = matches.opt_str("h") {
127                let file = ::std::fs::File::open(hosts.clone()).map_err(|e| e.to_string())?;
128                let reader = ::std::io::BufReader::new(file);
129                for line in reader.lines().take(processes) {
130                    addresses.push(line.map_err(|e| e.to_string())?);
131                }
132                if addresses.len() < processes {
133                    return Err(format!("could only read {} addresses from {}, but -n: {}", addresses.len(), hosts, processes));
134                }
135            }
136            else {
137                for index in 0..processes {
138                    addresses.push(format!("localhost:{}", 2101 + index));
139                }
140            }
141
142            assert_eq!(processes, addresses.len());
143            Ok(Config::Cluster {
144                threads,
145                process,
146                addresses,
147                report,
148                zerocopy,
149            })
150        } else if threads > 1 {
151            if zerocopy {
152                Ok(Config::ProcessBinary(threads))
153            } else {
154                Ok(Config::Process(threads))
155            }
156        } else {
157            Ok(Config::Thread)
158        }
159    }
160
161    /// Constructs a new configuration by parsing the supplied text arguments.
162    ///
163    /// Most commonly, callers supply `std::env::args()` as the iterator.
164    ///
165    /// This method is only available if the `getopts` feature is enabled, which
166    /// it is by default.
167    /// The `Ok` variant returns the free command-line arguments as well as the config.
168    #[cfg(feature = "getopts")]
169    pub fn from_args<I: Iterator<Item=String>>(args: I) -> Result<(Config, Vec<String>), String> {
170        let mut opts = getopts::Options::new();
171        Config::install_options(&mut opts);
172        let matches = opts.parse(args).map_err(|e| e.to_string())?;
173        Config::from_matches(&matches).map(|c| (c, matches.free))
174    }
175
176    /// Attempts to assemble the described communication infrastructure.
177    pub fn try_build(self) -> Result<(Vec<AllocatorBuilder>, Box<dyn Any+Send>), String> {
178        self.try_build_with(Hooks::default())
179    }
180
181    /// Attempts to assemble the described communication infrastructure, using the supplied refill function.
182    pub fn try_build_with(self, hooks: Hooks) -> Result<(Vec<AllocatorBuilder>, Box<dyn Any+Send>), String> {
183        match self {
184            Config::Thread => {
185                Ok((vec![AllocatorBuilder::Thread(ThreadBuilder)], Box::new(())))
186            },
187            Config::Process(threads) => {
188                let builders = ProcessBuilder::new_typed_vector(threads, hooks.refill, hooks.spill)
189                    .into_iter()
190                    .map(AllocatorBuilder::Process)
191                    .collect();
192                Ok((builders, Box::new(())))
193            },
194            Config::ProcessBinary(threads) => {
195                let builders = ProcessBuilder::new_bytes_vector(threads, hooks.refill, hooks.spill)
196                    .into_iter()
197                    .map(AllocatorBuilder::Process)
198                    .collect();
199                Ok((builders, Box::new(())))
200            },
201            Config::Cluster { threads, process, addresses, report, zerocopy: false } => {
202                let process_allocators = ProcessBuilder::new_typed_vector(threads, hooks.refill.clone(), hooks.spill.clone());
203                match initialize_networking(process_allocators, addresses, process, threads, report, hooks) {
204                    Ok((stuff, guard)) => {
205                        Ok((stuff.into_iter().map(AllocatorBuilder::Tcp).collect(), Box::new(guard)))
206                    },
207                    Err(err) => Err(format!("failed to initialize networking: {}", err))
208                }
209            },
210            Config::Cluster { threads, process, addresses, report, zerocopy: true } => {
211                let process_allocators = ProcessBuilder::new_bytes_vector(threads, hooks.refill.clone(), hooks.spill.clone());
212                match initialize_networking(process_allocators, addresses, process, threads, report, hooks) {
213                    Ok((stuff, guard)) => {
214                        Ok((stuff.into_iter().map(AllocatorBuilder::Tcp).collect(), Box::new(guard)))
215                    },
216                    Err(err) => Err(format!("failed to initialize networking: {}", err))
217                }
218            }
219        }
220    }
221}
222
223/// Initializes communication and executes a distributed computation.
224///
225/// This method allocates an `allocator::Allocator` for each thread, spawns local worker threads,
226/// and invokes the supplied function with the allocator.
227/// The method returns a `WorkerGuards<T>` which can be `join`ed to retrieve the return values
228/// (or errors) of the workers.
229///
230///
231/// # Examples
232/// ```
233/// use timely_communication::Bytesable;
234///
235/// /// A wrapper that indicates the serialization/deserialization strategy.
236/// pub struct Message {
237///     /// Text contents.
238///     pub payload: String,
239/// }
240///
241/// impl Bytesable for Message {
242///     fn from_bytes(bytes: timely_bytes::arc::Bytes) -> Self {
243///         Message { payload: std::str::from_utf8(&bytes[..]).unwrap().to_string() }
244///     }
245///
246///     fn length_in_bytes(&self) -> usize {
247///         self.payload.len()
248///     }
249///
250///     fn into_bytes<W: ::std::io::Write>(&self, writer: &mut W) {
251///         writer.write_all(self.payload.as_bytes()).unwrap();
252///     }
253/// }
254///
255/// // extract the configuration from user-supplied arguments, initialize the computation.
256/// let (config, _free) = timely_communication::Config::from_args(std::env::args()).unwrap();
257/// let guards = timely_communication::initialize(config, |mut allocator| {
258///
259///     println!("worker {} of {} started", allocator.index(), allocator.peers());
260///
261///     // allocates a pair of senders list and one receiver.
262///     let (mut senders, mut receiver) = allocator.allocate(0);
263///
264///     // send typed data along each channel
265///     for i in 0 .. allocator.peers() {
266///         senders[i].send(Message { payload: format!("hello, {}", i)});
267///         senders[i].done();
268///     }
269///
270///     // no support for termination notification,
271///     // we have to count down ourselves.
272///     let mut received = 0;
273///     while received < allocator.peers() {
274///
275///         allocator.receive();
276///
277///         if let Some(message) = receiver.recv() {
278///             println!("worker {}: received: <{}>", allocator.index(), message.payload);
279///             received += 1;
280///         }
281///
282///         allocator.release();
283///     }
284///
285///     allocator.index()
286/// });
287///
288/// // computation runs until guards are joined or dropped.
289/// if let Ok(guards) = guards {
290///     for guard in guards.join() {
291///         println!("result: {:?}", guard);
292///     }
293/// }
294/// else { println!("error in computation"); }
295/// ```
296///
297/// This should produce output like:
298///
299/// ```ignore
300/// worker 0 started
301/// worker 1 started
302/// worker 0: received: <hello, 0>
303/// worker 1: received: <hello, 1>
304/// worker 0: received: <hello, 0>
305/// worker 1: received: <hello, 1>
306/// result: Ok(0)
307/// result: Ok(1)
308/// ```
309pub fn initialize<T:Send+'static, F: Fn(Allocator)->T+Send+Sync+'static>(
310    config: Config,
311    func: F,
312) -> Result<WorkerGuards<T>,String> {
313    let (allocators, others) = config.try_build()?;
314    initialize_from(allocators, others, func)
315}
316
317/// Initializes computation and runs a distributed computation.
318///
319/// This version of `initialize` allows you to explicitly specify the allocators that
320/// you want to use, by providing an explicit list of allocator builders. Additionally,
321/// you provide `others`, a `Box<Any>` which will be held by the resulting worker guard
322/// and dropped when it is dropped, which allows you to join communication threads.
323///
324/// # Examples
325/// ```
326/// use timely_communication::Bytesable;
327///
328/// /// A wrapper that indicates `bincode` as the serialization/deserialization strategy.
329/// pub struct Message {
330///     /// Text contents.
331///     pub payload: String,
332/// }
333///
334/// impl Bytesable for Message {
335///     fn from_bytes(bytes: timely_bytes::arc::Bytes) -> Self {
336///         Message { payload: std::str::from_utf8(&bytes[..]).unwrap().to_string() }
337///     }
338///
339///     fn length_in_bytes(&self) -> usize {
340///         self.payload.len()
341///     }
342///
343///     fn into_bytes<W: ::std::io::Write>(&self, writer: &mut W) {
344///         writer.write_all(self.payload.as_bytes()).unwrap();
345///     }
346/// }
347///
348/// // extract the configuration from user-supplied arguments, initialize the computation.
349/// let (config, _free) = timely_communication::Config::from_args(std::env::args()).unwrap();
350/// let guards = timely_communication::initialize(config, |mut allocator| {
351///
352///     println!("worker {} of {} started", allocator.index(), allocator.peers());
353///
354///     // allocates a pair of senders list and one receiver.
355///     let (mut senders, mut receiver) = allocator.allocate(0);
356///
357///     // send typed data along each channel
358///     for i in 0 .. allocator.peers() {
359///         senders[i].send(Message { payload: format!("hello, {}", i)});
360///         senders[i].done();
361///     }
362///
363///     // no support for termination notification,
364///     // we have to count down ourselves.
365///     let mut received = 0;
366///     while received < allocator.peers() {
367///
368///         allocator.receive();
369///
370///         if let Some(message) = receiver.recv() {
371///             println!("worker {}: received: <{}>", allocator.index(), message.payload);
372///             received += 1;
373///         }
374///
375///         allocator.release();
376///     }
377///
378///     allocator.index()
379/// });
380///
381/// // computation runs until guards are joined or dropped.
382/// if let Ok(guards) = guards {
383///     for guard in guards.join() {
384///         println!("result: {:?}", guard);
385///     }
386/// }
387/// else { println!("error in computation"); }
388/// ```
389pub fn initialize_from<T, F>(
390    builders: Vec<AllocatorBuilder>,
391    others: Box<dyn Any+Send>,
392    func: F,
393) -> Result<WorkerGuards<T>,String>
394where
395    T: Send+'static,
396    F: Fn(Allocator)->T+Send+Sync+'static
397{
398    let logic = Arc::new(func);
399    let mut guards = Vec::new();
400    for (index, builder) in builders.into_iter().enumerate() {
401        let clone = Arc::clone(&logic);
402        guards.push(thread::Builder::new()
403                            .name(format!("timely:work-{}", index))
404                            .spawn(move || {
405                                let communicator = builder.build();
406                                (*clone)(communicator)
407                            })
408                            .map_err(|e| format!("{:?}", e))?);
409    }
410
411    Ok(WorkerGuards { guards, others })
412}
413
414/// Maintains `JoinHandle`s for worker threads.
415pub struct WorkerGuards<T:Send+'static> {
416    guards: Vec<::std::thread::JoinHandle<T>>,
417    others: Box<dyn Any+Send>,
418}
419
420impl<T:Send+'static> WorkerGuards<T> {
421
422    /// Returns a reference to the indexed guard.
423    pub fn guards(&self) -> &[std::thread::JoinHandle<T>] {
424        &self.guards[..]
425    }
426
427    /// Provides access to handles that are not worker threads.
428    pub fn others(&self) -> &Box<dyn Any+Send> {
429        &self.others
430    }
431
432    /// Waits on the worker threads and returns the results they produce.
433    pub fn join(mut self) -> Vec<Result<T, String>> {
434        self.guards
435            .drain(..)
436            .map(|guard| guard.join().map_err(|e| format!("{:?}", e)))
437            .collect()
438    }
439}
440
441impl<T:Send+'static> Drop for WorkerGuards<T> {
442    fn drop(&mut self) {
443        for guard in self.guards.drain(..) {
444            guard.join().expect("Worker panic");
445        }
446        // println!("WORKER THREADS JOINED");
447    }
448}