timely_communication/initialize.rs
1//! Initialization logic for a generic instance of the `Allocate` channel allocation trait.
2
3use std::thread;
4#[cfg(feature = "getopts")]
5use std::io::BufRead;
6use std::sync::Arc;
7use std::fmt::{Debug, Formatter};
8use std::any::Any;
9use std::ops::DerefMut;
10#[cfg(feature = "getopts")]
11use getopts;
12use timely_logging::Logger;
13
14use crate::allocator::thread::ThreadBuilder;
15use crate::allocator::{AllocateBuilder, Process, Generic, GenericBuilder, PeerBuilder};
16use crate::allocator::zero_copy::allocator_process::ProcessBuilder;
17use crate::allocator::zero_copy::bytes_slab::BytesRefill;
18use crate::allocator::zero_copy::initialize::initialize_networking;
19use crate::logging::{CommunicationEventBuilder, CommunicationSetup};
20
21/// Possible configurations for the communication infrastructure.
22#[derive(Clone)]
23pub enum Config {
24 /// Use one thread.
25 Thread,
26 /// Use one process with an indicated number of threads.
27 Process(usize),
28 /// Use one process with an indicated number of threads. Use zero-copy exchange channels.
29 ProcessBinary(usize),
30 /// Expect multiple processes.
31 Cluster {
32 /// Number of per-process worker threads
33 threads: usize,
34 /// Identity of this process
35 process: usize,
36 /// Addresses of all processes
37 addresses: Vec<String>,
38 /// Verbosely report connection process
39 report: bool,
40 /// Enable intra-process zero-copy
41 zerocopy: bool,
42 /// Closure to create a new logger for a communication thread
43 log_fn: Arc<dyn Fn(CommunicationSetup) -> Option<Logger<CommunicationEventBuilder>> + Send + Sync>,
44 }
45}
46
47impl Debug for Config {
48 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
49 match self {
50 Config::Thread => write!(f, "Config::Thread()"),
51 Config::Process(n) => write!(f, "Config::Process({})", n),
52 Config::ProcessBinary(n) => write!(f, "Config::ProcessBinary({})", n),
53 Config::Cluster { threads, process, addresses, report, zerocopy, log_fn: _ } => f
54 .debug_struct("Config::Cluster")
55 .field("threads", threads)
56 .field("process", process)
57 .field("addresses", addresses)
58 .field("report", report)
59 .field("zerocopy", zerocopy)
60 .finish_non_exhaustive()
61 }
62 }
63}
64
65impl Config {
66 /// Installs options into a [`getopts::Options`] struct that corresponds
67 /// to the parameters in the configuration.
68 ///
69 /// It is the caller's responsibility to ensure that the installed options
70 /// do not conflict with any other options that may exist in `opts`, or
71 /// that may be installed into `opts` in the future.
72 ///
73 /// This method is only available if the `getopts` feature is enabled, which
74 /// it is by default.
75 #[cfg(feature = "getopts")]
76 pub fn install_options(opts: &mut getopts::Options) {
77 opts.optopt("w", "threads", "number of per-process worker threads", "NUM");
78 opts.optopt("p", "process", "identity of this process", "IDX");
79 opts.optopt("n", "processes", "number of processes", "NUM");
80 opts.optopt("h", "hostfile", "text file whose lines are process addresses", "FILE");
81 opts.optflag("r", "report", "reports connection progress");
82 opts.optflag("z", "zerocopy", "enable zero-copy for intra-process communication");
83 }
84
85 /// Instantiates a configuration based upon the parsed options in `matches`.
86 ///
87 /// The `matches` object must have been constructed from a
88 /// [`getopts::Options`] which contained at least the options installed by
89 /// [`Self::install_options`].
90 ///
91 /// This method is only available if the `getopts` feature is enabled, which
92 /// it is by default.
93 #[cfg(feature = "getopts")]
94 pub fn from_matches(matches: &getopts::Matches) -> Result<Config, String> {
95 let threads = matches.opt_get_default("w", 1_usize).map_err(|e| e.to_string())?;
96 let process = matches.opt_get_default("p", 0_usize).map_err(|e| e.to_string())?;
97 let processes = matches.opt_get_default("n", 1_usize).map_err(|e| e.to_string())?;
98 let report = matches.opt_present("report");
99 let zerocopy = matches.opt_present("zerocopy");
100
101 if processes > 1 {
102 let mut addresses = Vec::new();
103 if let Some(hosts) = matches.opt_str("h") {
104 let file = ::std::fs::File::open(hosts.clone()).map_err(|e| e.to_string())?;
105 let reader = ::std::io::BufReader::new(file);
106 for line in reader.lines().take(processes) {
107 addresses.push(line.map_err(|e| e.to_string())?);
108 }
109 if addresses.len() < processes {
110 return Err(format!("could only read {} addresses from {}, but -n: {}", addresses.len(), hosts, processes));
111 }
112 }
113 else {
114 for index in 0..processes {
115 addresses.push(format!("localhost:{}", 2101 + index));
116 }
117 }
118
119 assert_eq!(processes, addresses.len());
120 Ok(Config::Cluster {
121 threads,
122 process,
123 addresses,
124 report,
125 zerocopy,
126 log_fn: Arc::new(|_| None),
127 })
128 } else if threads > 1 {
129 if zerocopy {
130 Ok(Config::ProcessBinary(threads))
131 } else {
132 Ok(Config::Process(threads))
133 }
134 } else {
135 Ok(Config::Thread)
136 }
137 }
138
139 /// Constructs a new configuration by parsing the supplied text arguments.
140 ///
141 /// Most commonly, callers supply `std::env::args()` as the iterator.
142 ///
143 /// This method is only available if the `getopts` feature is enabled, which
144 /// it is by default.
145 #[cfg(feature = "getopts")]
146 pub fn from_args<I: Iterator<Item=String>>(args: I) -> Result<Config, String> {
147 let mut opts = getopts::Options::new();
148 Config::install_options(&mut opts);
149 let matches = opts.parse(args).map_err(|e| e.to_string())?;
150 Config::from_matches(&matches)
151 }
152
153 /// Attempts to assemble the described communication infrastructure.
154 pub fn try_build(self) -> Result<(Vec<GenericBuilder>, Box<dyn Any+Send>), String> {
155 let refill = BytesRefill {
156 logic: Arc::new(|size| Box::new(vec![0_u8; size]) as Box<dyn DerefMut<Target=[u8]>>),
157 limit: None,
158 };
159 self.try_build_with(refill)
160 }
161
162 /// Attempts to assemble the described communication infrastructure, using the supplied refill function.
163 pub fn try_build_with(self, refill: BytesRefill) -> Result<(Vec<GenericBuilder>, Box<dyn Any+Send>), String> {
164 match self {
165 Config::Thread => {
166 Ok((vec![GenericBuilder::Thread(ThreadBuilder)], Box::new(())))
167 },
168 Config::Process(threads) => {
169 Ok((Process::new_vector(threads, refill).into_iter().map(GenericBuilder::Process).collect(), Box::new(())))
170 },
171 Config::ProcessBinary(threads) => {
172 Ok((ProcessBuilder::new_vector(threads, refill).into_iter().map(GenericBuilder::ProcessBinary).collect(), Box::new(())))
173 },
174 Config::Cluster { threads, process, addresses, report, zerocopy: false, log_fn } => {
175 match initialize_networking::<Process>(addresses, process, threads, report, refill, log_fn) {
176 Ok((stuff, guard)) => {
177 Ok((stuff.into_iter().map(GenericBuilder::ZeroCopy).collect(), Box::new(guard)))
178 },
179 Err(err) => Err(format!("failed to initialize networking: {}", err))
180 }
181 },
182 Config::Cluster { threads, process, addresses, report, zerocopy: true, log_fn } => {
183 match initialize_networking::<ProcessBuilder>(addresses, process, threads, report, refill, log_fn) {
184 Ok((stuff, guard)) => {
185 Ok((stuff.into_iter().map(GenericBuilder::ZeroCopyBinary).collect(), Box::new(guard)))
186 },
187 Err(err) => Err(format!("failed to initialize networking: {}", err))
188 }
189 }
190 }
191 }
192}
193
194/// Initializes communication and executes a distributed computation.
195///
196/// This method allocates an `allocator::Generic` for each thread, spawns local worker threads,
197/// and invokes the supplied function with the allocator.
198/// The method returns a `WorkerGuards<T>` which can be `join`ed to retrieve the return values
199/// (or errors) of the workers.
200///
201///
202/// # Examples
203/// ```
204/// use timely_communication::{Allocate, Bytesable};
205///
206/// /// A wrapper that indicates the serialization/deserialization strategy.
207/// pub struct Message {
208/// /// Text contents.
209/// pub payload: String,
210/// }
211///
212/// impl Bytesable for Message {
213/// fn from_bytes(bytes: timely_bytes::arc::Bytes) -> Self {
214/// Message { payload: std::str::from_utf8(&bytes[..]).unwrap().to_string() }
215/// }
216///
217/// fn length_in_bytes(&self) -> usize {
218/// self.payload.len()
219/// }
220///
221/// fn into_bytes<W: ::std::io::Write>(&self, writer: &mut W) {
222/// writer.write_all(self.payload.as_bytes()).unwrap();
223/// }
224/// }
225///
226/// // extract the configuration from user-supplied arguments, initialize the computation.
227/// let config = timely_communication::Config::from_args(std::env::args()).unwrap();
228/// let guards = timely_communication::initialize(config, |mut allocator| {
229///
230/// println!("worker {} of {} started", allocator.index(), allocator.peers());
231///
232/// // allocates a pair of senders list and one receiver.
233/// let (mut senders, mut receiver) = allocator.allocate(0);
234///
235/// // send typed data along each channel
236/// for i in 0 .. allocator.peers() {
237/// senders[i].send(Message { payload: format!("hello, {}", i)});
238/// senders[i].done();
239/// }
240///
241/// // no support for termination notification,
242/// // we have to count down ourselves.
243/// let mut received = 0;
244/// while received < allocator.peers() {
245///
246/// allocator.receive();
247///
248/// if let Some(message) = receiver.recv() {
249/// println!("worker {}: received: <{}>", allocator.index(), message.payload);
250/// received += 1;
251/// }
252///
253/// allocator.release();
254/// }
255///
256/// allocator.index()
257/// });
258///
259/// // computation runs until guards are joined or dropped.
260/// if let Ok(guards) = guards {
261/// for guard in guards.join() {
262/// println!("result: {:?}", guard);
263/// }
264/// }
265/// else { println!("error in computation"); }
266/// ```
267///
268/// This should produce output like:
269///
270/// ```ignore
271/// worker 0 started
272/// worker 1 started
273/// worker 0: received: <hello, 0>
274/// worker 1: received: <hello, 1>
275/// worker 0: received: <hello, 0>
276/// worker 1: received: <hello, 1>
277/// result: Ok(0)
278/// result: Ok(1)
279/// ```
280pub fn initialize<T:Send+'static, F: Fn(Generic)->T+Send+Sync+'static>(
281 config: Config,
282 func: F,
283) -> Result<WorkerGuards<T>,String> {
284 let (allocators, others) = config.try_build()?;
285 initialize_from(allocators, others, func)
286}
287
288/// Initializes computation and runs a distributed computation.
289///
290/// This version of `initialize` allows you to explicitly specify the allocators that
291/// you want to use, by providing an explicit list of allocator builders. Additionally,
292/// you provide `others`, a `Box<Any>` which will be held by the resulting worker guard
293/// and dropped when it is dropped, which allows you to join communication threads.
294///
295/// # Examples
296/// ```
297/// use timely_communication::{Allocate, Bytesable};
298///
299/// /// A wrapper that indicates `bincode` as the serialization/deserialization strategy.
300/// pub struct Message {
301/// /// Text contents.
302/// pub payload: String,
303/// }
304///
305/// impl Bytesable for Message {
306/// fn from_bytes(bytes: timely_bytes::arc::Bytes) -> Self {
307/// Message { payload: std::str::from_utf8(&bytes[..]).unwrap().to_string() }
308/// }
309///
310/// fn length_in_bytes(&self) -> usize {
311/// self.payload.len()
312/// }
313///
314/// fn into_bytes<W: ::std::io::Write>(&self, writer: &mut W) {
315/// writer.write_all(self.payload.as_bytes()).unwrap();
316/// }
317/// }
318///
319/// // extract the configuration from user-supplied arguments, initialize the computation.
320/// let config = timely_communication::Config::from_args(std::env::args()).unwrap();
321/// let guards = timely_communication::initialize(config, |mut allocator| {
322///
323/// println!("worker {} of {} started", allocator.index(), allocator.peers());
324///
325/// // allocates a pair of senders list and one receiver.
326/// let (mut senders, mut receiver) = allocator.allocate(0);
327///
328/// // send typed data along each channel
329/// for i in 0 .. allocator.peers() {
330/// senders[i].send(Message { payload: format!("hello, {}", i)});
331/// senders[i].done();
332/// }
333///
334/// // no support for termination notification,
335/// // we have to count down ourselves.
336/// let mut received = 0;
337/// while received < allocator.peers() {
338///
339/// allocator.receive();
340///
341/// if let Some(message) = receiver.recv() {
342/// println!("worker {}: received: <{}>", allocator.index(), message.payload);
343/// received += 1;
344/// }
345///
346/// allocator.release();
347/// }
348///
349/// allocator.index()
350/// });
351///
352/// // computation runs until guards are joined or dropped.
353/// if let Ok(guards) = guards {
354/// for guard in guards.join() {
355/// println!("result: {:?}", guard);
356/// }
357/// }
358/// else { println!("error in computation"); }
359/// ```
360pub fn initialize_from<A, T, F>(
361 builders: Vec<A>,
362 others: Box<dyn Any+Send>,
363 func: F,
364) -> Result<WorkerGuards<T>,String>
365where
366 A: AllocateBuilder+'static,
367 T: Send+'static,
368 F: Fn(<A as AllocateBuilder>::Allocator)->T+Send+Sync+'static
369{
370 let logic = Arc::new(func);
371 let mut guards = Vec::new();
372 for (index, builder) in builders.into_iter().enumerate() {
373 let clone = Arc::clone(&logic);
374 guards.push(thread::Builder::new()
375 .name(format!("timely:work-{}", index))
376 .spawn(move || {
377 let communicator = builder.build();
378 (*clone)(communicator)
379 })
380 .map_err(|e| format!("{:?}", e))?);
381 }
382
383 Ok(WorkerGuards { guards, others })
384}
385
386/// Maintains `JoinHandle`s for worker threads.
387pub struct WorkerGuards<T:Send+'static> {
388 guards: Vec<::std::thread::JoinHandle<T>>,
389 others: Box<dyn Any+Send>,
390}
391
392impl<T:Send+'static> WorkerGuards<T> {
393
394 /// Returns a reference to the indexed guard.
395 pub fn guards(&self) -> &[std::thread::JoinHandle<T>] {
396 &self.guards[..]
397 }
398
399 /// Provides access to handles that are not worker threads.
400 pub fn others(&self) -> &Box<dyn Any+Send> {
401 &self.others
402 }
403
404 /// Waits on the worker threads and returns the results they produce.
405 pub fn join(mut self) -> Vec<Result<T, String>> {
406 self.guards
407 .drain(..)
408 .map(|guard| guard.join().map_err(|e| format!("{:?}", e)))
409 .collect()
410 }
411}
412
413impl<T:Send+'static> Drop for WorkerGuards<T> {
414 fn drop(&mut self) {
415 for guard in self.guards.drain(..) {
416 guard.join().expect("Worker panic");
417 }
418 // println!("WORKER THREADS JOINED");
419 }
420}