timely/execute.rs
1//! Starts a timely dataflow execution from configuration information and per-worker logic.
2
3use crate::communication::{initialize_from, Allocator, allocator::AllocateBuilder, WorkerGuards};
4use crate::dataflow::scopes::Child;
5use crate::worker::Worker;
6use crate::{CommunicationConfig, WorkerConfig};
7
8/// Configures the execution of a timely dataflow computation.
9#[derive(Clone, Debug)]
10pub struct Config {
11 /// Configuration for the communication infrastructure.
12 pub communication: CommunicationConfig,
13 /// Configuration for the worker threads.
14 pub worker: WorkerConfig,
15}
16
17impl Config {
18 /// Installs options into a [getopts::Options] struct that correspond
19 /// to the parameters in the configuration.
20 ///
21 /// It is the caller's responsibility to ensure that the installed options
22 /// do not conflict with any other options that may exist in `opts`, or
23 /// that may be installed into `opts` in the future.
24 ///
25 /// This method is only available if the `getopts` feature is enabled, which
26 /// it is by default.
27 #[cfg(feature = "getopts")]
28 pub fn install_options(opts: &mut getopts::Options) {
29 CommunicationConfig::install_options(opts);
30 WorkerConfig::install_options(opts);
31 }
32
33 /// Instantiates a configuration based upon the parsed options in `matches`.
34 ///
35 /// The `matches` object must have been constructed from a
36 /// [getopts::Options] which contained at least the options installed by
37 /// [Self::install_options].
38 ///
39 /// This method is only available if the `getopts` feature is enabled, which
40 /// it is by default.
41 #[cfg(feature = "getopts")]
42 pub fn from_matches(matches: &getopts::Matches) -> Result<Config, String> {
43 Ok(Config {
44 communication: CommunicationConfig::from_matches(matches)?,
45 worker: WorkerConfig::from_matches(matches)?,
46 })
47 }
48
49 /// Constructs a new configuration by parsing the supplied text arguments.
50 ///
51 /// Most commonly, callers supply `std::env::args()` as the iterator.
52 #[cfg(feature = "getopts")]
53 pub fn from_args<I: Iterator<Item=String>>(args: I) -> Result<Config, String> {
54 let mut opts = getopts::Options::new();
55 Config::install_options(&mut opts);
56 let matches = opts.parse(args).map_err(|e| e.to_string())?;
57 Config::from_matches(&matches)
58 }
59
60 /// Constructs a `Config` that uses one worker thread and the
61 /// defaults for all other parameters.
62 pub fn thread() -> Config {
63 Config {
64 communication: CommunicationConfig::Thread,
65 worker: WorkerConfig::default(),
66 }
67 }
68
69 /// Constructs an `Config` that uses `n` worker threads and the
70 /// defaults for all other parameters.
71 pub fn process(n: usize) -> Config {
72 Config {
73 communication: CommunicationConfig::Process(n),
74 worker: WorkerConfig::default(),
75 }
76 }
77}
78
79/// Executes a single-threaded timely dataflow computation.
80///
81/// The `example` method takes a closure on a `Scope` which it executes to initialize and run a
82/// timely dataflow computation on a single thread. This method is intended for use in examples,
83/// rather than programs that may need to run across multiple workers.
84///
85/// The `example` method returns whatever the single worker returns from its closure.
86/// This is often nothing, but the worker can return something about the data it saw in order to
87/// test computations.
88///
89/// The method aggressively unwraps returned `Result<_>` types.
90///
91/// # Examples
92///
93/// The simplest example creates a stream of data and inspects it.
94///
95/// ```rust
96/// use timely::dataflow::operators::{ToStream, Inspect};
97///
98/// timely::example(|scope| {
99/// (0..10).to_stream(scope)
100/// .inspect(|x| println!("seen: {:?}", x));
101/// });
102/// ```
103///
104/// This next example captures the data and displays them once the computation is complete.
105///
106/// More precisely, the example captures a stream of events (receiving batches of data,
107/// updates to input capabilities) and displays these events.
108///
109/// ```rust
110/// use timely::dataflow::operators::{ToStream, Inspect, Capture};
111/// use timely::dataflow::operators::capture::Extract;
112///
113/// let data = timely::example(|scope| {
114/// (0..10).to_stream(scope)
115/// .inspect(|x| println!("seen: {:?}", x))
116/// .capture()
117/// });
118///
119/// // the extracted data should have data (0..10) at timestamp 0.
120/// assert_eq!(data.extract()[0].1, (0..10).collect::<Vec<_>>());
121/// ```
122pub fn example<T, F>(func: F) -> T
123where
124 T: Send+'static,
125 F: FnOnce(&mut Child<Worker<crate::communication::allocator::thread::Thread>,u64>)->T+Send+Sync+'static
126{
127 crate::execute::execute_directly(|worker| worker.dataflow(|scope| func(scope)))
128}
129
130
131/// Executes a single-threaded timely dataflow computation.
132///
133/// The `execute_directly` constructs a `Worker` and directly executes the supplied
134/// closure to construct and run a timely dataflow computation. It does not create any
135/// worker threads, and simply uses the current thread of control.
136///
137/// The closure may return a result, which will be returned from the computation.
138///
139/// # Examples
140/// ```rust
141/// use timely::dataflow::operators::{ToStream, Inspect};
142///
143/// // execute a timely dataflow using three worker threads.
144/// timely::execute_directly(|worker| {
145/// worker.dataflow::<(),_,_>(|scope| {
146/// (0..10).to_stream(scope)
147/// .inspect(|x| println!("seen: {:?}", x));
148/// })
149/// });
150/// ```
151pub fn execute_directly<T, F>(func: F) -> T
152where
153 T: Send+'static,
154 F: FnOnce(&mut Worker<crate::communication::allocator::thread::Thread>)->T+Send+Sync+'static
155{
156 let alloc = crate::communication::allocator::thread::Thread::default();
157 let mut worker = crate::worker::Worker::new(WorkerConfig::default(), alloc);
158 let result = func(&mut worker);
159 while worker.has_dataflows() {
160 worker.step_or_park(None);
161 }
162 result
163}
164
165/// Executes a timely dataflow from a configuration and per-communicator logic.
166///
167/// The `execute` method takes a `Configuration` and spins up some number of
168/// workers threads, each of which execute the supplied closure to construct
169/// and run a timely dataflow computation.
170///
171/// The closure may return a `T: Send+'static`. The `execute` method returns
172/// immediately after initializing the timely computation with a result
173/// containing a `WorkerGuards<T>` (or error information), which can be joined
174/// to recover the result `T` values from the local workers.
175///
176/// *Note*: if the caller drops the result of `execute`, the drop code will
177/// block awaiting the completion of the timely computation. If the result
178/// of the method is not captured it will be dropped, which gives the experience
179/// of `execute` blocking; to regain control after `execute` be sure to
180/// capture the results and drop them only when the calling thread has no
181/// other work to perform.
182///
183/// # Examples
184/// ```rust
185/// use timely::dataflow::operators::{ToStream, Inspect};
186///
187/// // execute a timely dataflow using three worker threads.
188/// timely::execute(timely::Config::process(3), |worker| {
189/// worker.dataflow::<(),_,_>(|scope| {
190/// (0..10).to_stream(scope)
191/// .inspect(|x| println!("seen: {:?}", x));
192/// })
193/// }).unwrap();
194/// ```
195///
196/// The following example demonstrates how one can extract data from a multi-worker execution.
197/// In a multi-process setting, each process will only receive those records present at workers
198/// in the process.
199///
200/// ```rust
201/// use std::sync::{Arc, Mutex};
202/// use timely::dataflow::operators::{ToStream, Inspect, Capture};
203/// use timely::dataflow::operators::capture::Extract;
204///
205/// // get send and recv endpoints, wrap send to share
206/// let (send, recv) = ::std::sync::mpsc::channel();
207/// let send = Arc::new(Mutex::new(send));
208///
209/// // execute a timely dataflow using three worker threads.
210/// timely::execute(timely::Config::process(3), move |worker| {
211/// let send = send.lock().unwrap().clone();
212/// worker.dataflow::<(),_,_>(move |scope| {
213/// (0..10).to_stream(scope)
214/// .inspect(|x| println!("seen: {:?}", x))
215/// .capture_into(send);
216/// });
217/// }).unwrap();
218///
219/// // the extracted data should have data (0..10) thrice at timestamp 0.
220/// assert_eq!(recv.extract()[0].1, (0..30).map(|x| x / 3).collect::<Vec<_>>());
221/// ```
222pub fn execute<T, F>(config: Config, func: F) -> Result<WorkerGuards<T>,String>
223where
224 T:Send+'static,
225 F: Fn(&mut Worker<Allocator>)->T+Send+Sync+'static,
226{
227 let (allocators, other) = config.communication.try_build()?;
228 execute_from(allocators, other, config.worker, func)
229}
230
231/// Executes a timely dataflow from supplied arguments and per-communicator logic.
232///
233/// The `execute` method takes arguments (typically `std::env::args()`) and spins up some number of
234/// workers threads, each of which execute the supplied closure to construct and run a timely
235/// dataflow computation.
236///
237/// The closure may return a `T: Send+'static`. The `execute_from_args` method
238/// returns immediately after initializing the timely computation with a result
239/// containing a `WorkerGuards<T>` (or error information), which can be joined
240/// to recover the result `T` values from the local workers.
241///
242/// *Note*: if the caller drops the result of `execute_from_args`, the drop code
243/// will block awaiting the completion of the timely computation.
244///
245/// The arguments `execute_from_args` currently understands are:
246///
247/// `-w, --threads`: number of per-process worker threads.
248///
249/// `-n, --processes`: number of processes involved in the computation.
250///
251/// `-p, --process`: identity of this process; from 0 to n-1.
252///
253/// `-h, --hostfile`: a text file whose lines are "hostname:port" in order of process identity.
254/// If not specified, `localhost` will be used, with port numbers increasing from 2101 (chosen
255/// arbitrarily).
256///
257/// This method is only available if the `getopts` feature is enabled, which
258/// it is by default.
259///
260/// # Examples
261///
262/// ```rust
263/// use timely::dataflow::operators::{ToStream, Inspect};
264///
265/// // execute a timely dataflow using command line parameters
266/// timely::execute_from_args(std::env::args(), |worker| {
267/// worker.dataflow::<(),_,_>(|scope| {
268/// (0..10).to_stream(scope)
269/// .inspect(|x| println!("seen: {:?}", x));
270/// })
271/// }).unwrap();
272/// ```
273/// ```ignore
274/// host0% cargo run -- -w 2 -n 4 -h hosts.txt -p 0
275/// host1% cargo run -- -w 2 -n 4 -h hosts.txt -p 1
276/// host2% cargo run -- -w 2 -n 4 -h hosts.txt -p 2
277/// host3% cargo run -- -w 2 -n 4 -h hosts.txt -p 3
278/// ```
279/// ```ignore
280/// % cat hosts.txt
281/// host0:port
282/// host1:port
283/// host2:port
284/// host3:port
285/// ```
286#[cfg(feature = "getopts")]
287pub fn execute_from_args<I, T, F>(iter: I, func: F) -> Result<WorkerGuards<T>,String>
288 where I: Iterator<Item=String>,
289 T:Send+'static,
290 F: Fn(&mut Worker<Allocator>)->T+Send+Sync+'static, {
291 let config = Config::from_args(iter)?;
292 execute(config, func)
293}
294
295/// Executes a timely dataflow from supplied allocators and logging.
296///
297/// Refer to [`execute`](execute()) for more details.
298///
299/// ```rust
300/// use timely::dataflow::operators::{ToStream, Inspect};
301/// use timely::WorkerConfig;
302///
303/// // execute a timely dataflow using command line parameters
304/// let (builders, other) = timely::CommunicationConfig::Process(3).try_build().unwrap();
305/// timely::execute::execute_from(builders, other, WorkerConfig::default(), |worker| {
306/// worker.dataflow::<(),_,_>(|scope| {
307/// (0..10).to_stream(scope)
308/// .inspect(|x| println!("seen: {:?}", x));
309/// })
310/// }).unwrap();
311/// ```
312pub fn execute_from<A, T, F>(
313 builders: Vec<A>,
314 others: Box<dyn ::std::any::Any+Send>,
315 worker_config: WorkerConfig,
316 func: F,
317) -> Result<WorkerGuards<T>, String>
318where
319 A: AllocateBuilder+'static,
320 T: Send+'static,
321 F: Fn(&mut Worker<<A as AllocateBuilder>::Allocator>)->T+Send+Sync+'static {
322 initialize_from(builders, others, move |allocator| {
323 let mut worker = Worker::new(worker_config.clone(), allocator);
324 let result = func(&mut worker);
325 while worker.has_dataflows() {
326 worker.step_or_park(None);
327 }
328 result
329 })
330}