Skip to main content

timely/
execute.rs

1//! Starts a timely dataflow execution from configuration information and per-worker logic.
2
3use crate::communication::{initialize_from, Allocator, allocator::AllocateBuilder, WorkerGuards};
4use crate::dataflow::scopes::Child;
5use crate::worker::Worker;
6use crate::{CommunicationConfig, WorkerConfig};
7
8/// Configures the execution of a timely dataflow computation.
9#[derive(Clone, Debug)]
10pub struct Config {
11    /// Configuration for the communication infrastructure.
12    pub communication: CommunicationConfig,
13    /// Configuration for the worker threads.
14    pub worker: WorkerConfig,
15}
16
17impl Config {
18    /// Installs options into a [getopts::Options] struct that correspond
19    /// to the parameters in the configuration.
20    ///
21    /// It is the caller's responsibility to ensure that the installed options
22    /// do not conflict with any other options that may exist in `opts`, or
23    /// that may be installed into `opts` in the future.
24    ///
25    /// This method is only available if the `getopts` feature is enabled, which
26    /// it is by default.
27    #[cfg(feature = "getopts")]
28    pub fn install_options(opts: &mut getopts::Options) {
29        CommunicationConfig::install_options(opts);
30        WorkerConfig::install_options(opts);
31    }
32
33    /// Instantiates a configuration based upon the parsed options in `matches`.
34    ///
35    /// The `matches` object must have been constructed from a
36    /// [getopts::Options] which contained at least the options installed by
37    /// [Self::install_options].
38    ///
39    /// This method is only available if the `getopts` feature is enabled, which
40    /// it is by default.
41    #[cfg(feature = "getopts")]
42    pub fn from_matches(matches: &getopts::Matches) -> Result<Config, String> {
43        Ok(Config {
44            communication: CommunicationConfig::from_matches(matches)?,
45            worker: WorkerConfig::from_matches(matches)?,
46        })
47    }
48
49    /// Constructs a new configuration by parsing the supplied text arguments.
50    ///
51    /// Most commonly, callers supply `std::env::args()` as the iterator.
52    #[cfg(feature = "getopts")]
53    pub fn from_args<I: Iterator<Item=String>>(args: I) -> Result<Config, String> {
54        let mut opts = getopts::Options::new();
55        Config::install_options(&mut opts);
56        let matches = opts.parse(args).map_err(|e| e.to_string())?;
57        Config::from_matches(&matches)
58    }
59
60    /// Constructs a `Config` that uses one worker thread and the
61    /// defaults for all other parameters.
62    pub fn thread() -> Config {
63        Config {
64            communication: CommunicationConfig::Thread,
65            worker: WorkerConfig::default(),
66        }
67    }
68
69    /// Constructs an `Config` that uses `n` worker threads and the
70    /// defaults for all other parameters.
71    pub fn process(n: usize) -> Config {
72        Config {
73            communication: CommunicationConfig::Process(n),
74            worker: WorkerConfig::default(),
75        }
76    }
77}
78
79/// Executes a single-threaded timely dataflow computation.
80///
81/// The `example` method takes a closure on a `Scope` which it executes to initialize and run a
82/// timely dataflow computation on a single thread. This method is intended for use in examples,
83/// rather than programs that may need to run across multiple workers.
84///
85/// The `example` method returns whatever the single worker returns from its closure.
86/// This is often nothing, but the worker can return something about the data it saw in order to
87/// test computations.
88///
89/// The method aggressively unwraps returned `Result<_>` types.
90///
91/// # Examples
92///
93/// The simplest example creates a stream of data and inspects it.
94///
95/// ```rust
96/// use timely::dataflow::operators::{ToStream, Inspect};
97///
98/// timely::example(|scope| {
99///     (0..10).to_stream(scope)
100///            .container::<Vec<_>>()
101///            .inspect(|x| println!("seen: {:?}", x));
102/// });
103/// ```
104///
105/// This next example captures the data and displays them once the computation is complete.
106///
107/// More precisely, the example captures a stream of events (receiving batches of data,
108/// updates to input capabilities) and displays these events.
109///
110/// ```rust
111/// use timely::dataflow::operators::{ToStream, Inspect, Capture};
112/// use timely::dataflow::operators::capture::Extract;
113///
114/// let data = timely::example(|scope| {
115///     (0..10).to_stream(scope)
116///            .container::<Vec<_>>()
117///            .inspect(|x| println!("seen: {:?}", x))
118///            .capture()
119/// });
120///
121/// // the extracted data should have data (0..10) at timestamp 0.
122/// assert_eq!(data.extract()[0].1, (0..10).collect::<Vec<_>>());
123/// ```
124pub fn example<T, F>(func: F) -> T
125where
126    T: Send+'static,
127    F: FnOnce(&mut Child<Worker<crate::communication::allocator::thread::Thread>,u64>)->T+Send+Sync+'static
128{
129    crate::execute::execute_directly(|worker| worker.dataflow(|scope| func(scope)))
130}
131
132
133/// Executes a single-threaded timely dataflow computation.
134///
135/// The `execute_directly` constructs a `Worker` and directly executes the supplied
136/// closure to construct and run a timely dataflow computation. It does not create any
137/// worker threads, and simply uses the current thread of control.
138///
139/// The closure may return a result, which will be returned from the computation.
140///
141/// # Examples
142/// ```rust
143/// use timely::dataflow::operators::{ToStream, Inspect};
144///
145/// // execute a timely dataflow using three worker threads.
146/// timely::execute_directly(|worker| {
147///     worker.dataflow::<(),_,_>(|scope| {
148///         (0..10).to_stream(scope)
149///                .container::<Vec<_>>()
150///                .inspect(|x| println!("seen: {:?}", x));
151///     })
152/// });
153/// ```
154pub fn execute_directly<T, F>(func: F) -> T
155where
156    T: Send+'static,
157    F: FnOnce(&mut Worker<crate::communication::allocator::thread::Thread>)->T+Send+Sync+'static
158{
159    let alloc = crate::communication::allocator::thread::Thread::default();
160    let mut worker = crate::worker::Worker::new(WorkerConfig::default(), alloc, Some(std::time::Instant::now()));
161    let result = func(&mut worker);
162    while worker.has_dataflows() {
163        worker.step_or_park(None);
164    }
165    result
166}
167
168/// Executes a timely dataflow from a configuration and per-communicator logic.
169///
170/// The `execute` method takes a `Configuration` and spins up some number of
171/// workers threads, each of which execute the supplied closure to construct
172/// and run a timely dataflow computation.
173///
174/// The closure may return a `T: Send+'static`.  The `execute` method returns
175/// immediately after initializing the timely computation with a result
176/// containing a `WorkerGuards<T>` (or error information), which can be joined
177/// to recover the result `T` values from the local workers.
178///
179/// *Note*: if the caller drops the result of `execute`, the drop code will
180/// block awaiting the completion of the timely computation. If the result
181/// of the method is not captured it will be dropped, which gives the experience
182/// of `execute` blocking; to regain control after `execute` be sure to
183/// capture the results and drop them only when the calling thread has no
184/// other work to perform.
185///
186/// # Examples
187/// ```rust
188/// use timely::dataflow::operators::{ToStream, Inspect};
189///
190/// // execute a timely dataflow using three worker threads.
191/// timely::execute(timely::Config::process(3), |worker| {
192///     worker.dataflow::<(),_,_>(|scope| {
193///         (0..10).to_stream(scope)
194///                .container::<Vec<_>>()
195///                .inspect(|x| println!("seen: {:?}", x));
196///     })
197/// }).unwrap();
198/// ```
199///
200/// The following example demonstrates how one can extract data from a multi-worker execution.
201/// In a multi-process setting, each process will only receive those records present at workers
202/// in the process.
203///
204/// ```rust
205/// use std::sync::{Arc, Mutex};
206/// use timely::dataflow::operators::{ToStream, Inspect, Capture};
207/// use timely::dataflow::operators::capture::Extract;
208///
209/// // get send and recv endpoints, wrap send to share
210/// let (send, recv) = ::std::sync::mpsc::channel();
211/// let send = Arc::new(Mutex::new(send));
212///
213/// // execute a timely dataflow using three worker threads.
214/// timely::execute(timely::Config::process(3), move |worker| {
215///     let send = send.lock().unwrap().clone();
216///     worker.dataflow::<(),_,_>(move |scope| {
217///         (0..10).to_stream(scope)
218///                .container::<Vec<_>>()
219///                .inspect(|x| println!("seen: {:?}", x))
220///                .capture_into(send);
221///     });
222/// }).unwrap();
223///
224/// // the extracted data should have data (0..10) thrice at timestamp 0.
225/// assert_eq!(recv.extract()[0].1, (0..30).map(|x| x / 3).collect::<Vec<_>>());
226/// ```
227pub fn execute<T, F>(config: Config, func: F) -> Result<WorkerGuards<T>,String>
228where
229    T:Send+'static,
230    F: Fn(&mut Worker<Allocator>)->T+Send+Sync+'static,
231{
232    let (allocators, other) = config.communication.try_build()?;
233    execute_from(allocators, other, config.worker, func)
234}
235
236/// Executes a timely dataflow from supplied arguments and per-communicator logic.
237///
238/// The `execute` method takes arguments (typically `std::env::args()`) and spins up some number of
239/// workers threads, each of which execute the supplied closure to construct and run a timely
240/// dataflow computation.
241///
242/// The closure may return a `T: Send+'static`.  The `execute_from_args` method
243/// returns immediately after initializing the timely computation with a result
244/// containing a `WorkerGuards<T>` (or error information), which can be joined
245/// to recover the result `T` values from the local workers.
246///
247/// *Note*: if the caller drops the result of `execute_from_args`, the drop code
248/// will block awaiting the completion of the timely computation.
249///
250/// The arguments `execute_from_args` currently understands are:
251///
252/// `-w, --threads`: number of per-process worker threads.
253///
254/// `-n, --processes`: number of processes involved in the computation.
255///
256/// `-p, --process`: identity of this process; from 0 to n-1.
257///
258/// `-h, --hostfile`: a text file whose lines are "hostname:port" in order of process identity.
259/// If not specified, `localhost` will be used, with port numbers increasing from 2101 (chosen
260/// arbitrarily).
261///
262/// This method is only available if the `getopts` feature is enabled, which
263/// it is by default.
264///
265/// # Examples
266///
267/// ```rust
268/// use timely::dataflow::operators::{ToStream, Inspect};
269///
270/// // execute a timely dataflow using command line parameters
271/// timely::execute_from_args(std::env::args(), |worker| {
272///     worker.dataflow::<(),_,_>(|scope| {
273///         (0..10).to_stream(scope)
274///                .container::<Vec<_>>()
275///                .inspect(|x| println!("seen: {:?}", x));
276///     })
277/// }).unwrap();
278/// ```
279/// ```ignore
280/// host0% cargo run -- -w 2 -n 4 -h hosts.txt -p 0
281/// host1% cargo run -- -w 2 -n 4 -h hosts.txt -p 1
282/// host2% cargo run -- -w 2 -n 4 -h hosts.txt -p 2
283/// host3% cargo run -- -w 2 -n 4 -h hosts.txt -p 3
284/// ```
285/// ```ignore
286/// % cat hosts.txt
287/// host0:port
288/// host1:port
289/// host2:port
290/// host3:port
291/// ```
292#[cfg(feature = "getopts")]
293pub fn execute_from_args<I, T, F>(iter: I, func: F) -> Result<WorkerGuards<T>,String>
294    where I: Iterator<Item=String>,
295          T:Send+'static,
296          F: Fn(&mut Worker<Allocator>)->T+Send+Sync+'static, {
297    let config = Config::from_args(iter)?;
298    execute(config, func)
299}
300
301/// Executes a timely dataflow from supplied allocators and logging.
302///
303/// Refer to [`execute`](execute()) for more details.
304///
305/// ```rust
306/// use timely::dataflow::operators::{ToStream, Inspect};
307/// use timely::WorkerConfig;
308///
309/// // execute a timely dataflow using command line parameters
310/// let (builders, other) = timely::CommunicationConfig::Process(3).try_build().unwrap();
311/// timely::execute::execute_from(builders, other, WorkerConfig::default(), |worker| {
312///     worker.dataflow::<(),_,_>(|scope| {
313///         (0..10).to_stream(scope)
314///                .container::<Vec<_>>()
315///                .inspect(|x| println!("seen: {:?}", x));
316///     })
317/// }).unwrap();
318/// ```
319pub fn execute_from<A, T, F>(
320    builders: Vec<A>,
321    others: Box<dyn ::std::any::Any+Send>,
322    worker_config: WorkerConfig,
323    func: F,
324) -> Result<WorkerGuards<T>, String>
325where
326    A: AllocateBuilder+'static,
327    T: Send+'static,
328    F: Fn(&mut Worker<<A as AllocateBuilder>::Allocator>)->T+Send+Sync+'static {
329    initialize_from(builders, others, move |allocator| {
330        let mut worker = Worker::new(worker_config.clone(), allocator, Some(std::time::Instant::now()));
331        let result = func(&mut worker);
332        while worker.has_dataflows() {
333            worker.step_or_park(None);
334        }
335        result
336    })
337}