timely/worker.rs
1//! The root of each single-threaded worker.
2
3use std::rc::Rc;
4use std::cell::{RefCell, RefMut};
5use std::any::Any;
6use std::str::FromStr;
7use std::time::{Instant, Duration};
8use std::collections::HashMap;
9use std::collections::hash_map::Entry;
10use std::sync::Arc;
11
12use crate::communication::{Allocate, Exchangeable, Push, Pull};
13use crate::communication::allocator::thread::{ThreadPusher, ThreadPuller};
14use crate::scheduling::{Schedule, Scheduler, Activations};
15use crate::progress::timestamp::{Refines};
16use crate::progress::SubgraphBuilder;
17use crate::progress::operate::Operate;
18use crate::dataflow::scopes::Child;
19use crate::logging::TimelyLogger;
20
21/// Different ways in which timely's progress tracking can work.
22///
23/// These options drive some buffering and accumulation that timely
24/// can do to try and trade volume of progress traffic against latency.
25/// By accumulating updates longer, a smaller total volume of messages
26/// are sent.
27///
28/// The `ProgressMode::Demand` variant is the most robust, and least
29/// likely to lead to catastrophic performance. The `Eager` variant
30/// is useful for getting the smallest latencies on systems with few
31/// workers, but does risk saturating the system with progress messages
32/// and should be used with care, or not at all.
33///
34/// If you are not certain which option to use, prefer `Demand`, and
35/// perhaps monitor the progress messages through timely's logging
36/// infrastructure to see if their volume is surprisingly high.
37#[derive(Debug, Default, Clone, Copy, Eq, PartialEq)]
38pub enum ProgressMode {
39 /// Eagerly transmit all progress updates produced by a worker.
40 ///
41 /// Progress messages are transmitted without consideration for the
42 /// possibility that they may unblock other workers. This can result
43 /// in a substantial volume of messages that do not result in a
44 /// change to the lower bound of outstanding work.
45 Eager,
46 /// Delay transmission of progress updates until any could advance
47 /// the global frontier of timestamps.
48 ///
49 /// As timely executes, the progress messages inform each worker of
50 /// the outstanding work remaining in the system. As workers work,
51 /// they produce changes to this outstanding work. This option
52 /// delays the communication of those changes until they might
53 /// possibly cause a change in the lower bound of all outstanding
54 /// work.
55 ///
56 /// The most common case this remedies is when one worker transmits
57 /// messages to other workers, that worker holds a capability for the
58 /// operator and timestamp. Other workers will receive messages, and
59 /// with this option will not immediately acknowledge receiving the
60 /// messages, because the held capability is strictly prior to what
61 /// the messages can affect. Once the capability is released, the
62 /// progress messages are unblocked and transmitted, in accumulated
63 /// form.
64 #[default]
65 Demand,
66}
67
68impl FromStr for ProgressMode {
69 type Err = String;
70
71 fn from_str(s: &str) -> Result<ProgressMode, String> {
72 match s {
73 "eager" => Ok(ProgressMode::Eager),
74 "demand" => Ok(ProgressMode::Demand),
75 _ => Err(format!("unknown progress mode: {}", s)),
76 }
77 }
78}
79
80/// Worker configuration.
81#[derive(Debug, Default, Clone)]
82pub struct Config {
83 /// The progress mode to use.
84 pub(crate) progress_mode: ProgressMode,
85 /// A map from parameter name to typed parameter values.
86 registry: HashMap<String, Arc<dyn Any + Send + Sync>>,
87}
88
89impl Config {
90 /// Installs options into a [getopts::Options] struct that correspond
91 /// to the parameters in the configuration.
92 ///
93 /// It is the caller's responsibility to ensure that the installed options
94 /// do not conflict with any other options that may exist in `opts`, or
95 /// that may be installed into `opts` in the future.
96 ///
97 /// This method is only available if the `getopts` feature is enabled, which
98 /// it is by default.
99 #[cfg(feature = "getopts")]
100 pub fn install_options(opts: &mut getopts::Options) {
101 opts.optopt("", "progress-mode", "progress tracking mode (eager or demand)", "MODE");
102 }
103
104 /// Instantiates a configuration based upon the parsed options in `matches`.
105 ///
106 /// The `matches` object must have been constructed from a
107 /// [getopts::Options] which contained at least the options installed by
108 /// [Self::install_options].
109 ///
110 /// This method is only available if the `getopts` feature is enabled, which
111 /// it is by default.
112 #[cfg(feature = "getopts")]
113 pub fn from_matches(matches: &getopts::Matches) -> Result<Config, String> {
114 let progress_mode = matches
115 .opt_get_default("progress-mode", ProgressMode::Eager)?;
116 Ok(Config::default().progress_mode(progress_mode))
117 }
118
119 /// Sets the progress mode to `progress_mode`.
120 pub fn progress_mode(mut self, progress_mode: ProgressMode) -> Self {
121 self.progress_mode = progress_mode;
122 self
123 }
124
125 /// Sets a typed configuration parameter for the given `key`.
126 ///
127 /// It is recommended to install a single configuration struct using a key
128 /// that uniquely identifies your project, to avoid clashes. For example,
129 /// differential dataflow registers a configuration struct under the key
130 /// "differential".
131 ///
132 /// # Examples
133 /// ```rust
134 /// let mut config = timely::Config::process(3);
135 /// config.worker.set("example".to_string(), 7u64);
136 /// timely::execute(config, |worker| {
137 /// use crate::timely::worker::AsWorker;
138 /// assert_eq!(worker.config().get::<u64>("example"), Some(&7));
139 /// }).unwrap();
140 /// ```
141 pub fn set<T>(&mut self, key: String, val: T) -> &mut Self
142 where
143 T: Send + Sync + 'static,
144 {
145 self.registry.insert(key, Arc::new(val));
146 self
147 }
148
149 /// Gets the value for configured parameter `key`.
150 ///
151 /// Returns `None` if `key` has not previously been set with
152 /// [Config::set], or if the specified `T` does not match the `T`
153 /// from the call to `set`.
154 ///
155 /// # Examples
156 /// ```rust
157 /// let mut config = timely::Config::process(3);
158 /// config.worker.set("example".to_string(), 7u64);
159 /// timely::execute(config, |worker| {
160 /// use crate::timely::worker::AsWorker;
161 /// assert_eq!(worker.config().get::<u64>("example"), Some(&7));
162 /// }).unwrap();
163 /// ```
164 pub fn get<T: 'static>(&self, key: &str) -> Option<&T> {
165 self.registry.get(key).and_then(|val| val.downcast_ref())
166 }
167}
168
169/// Methods provided by the root Worker.
170///
171/// These methods are often proxied by child scopes, and this trait provides access.
172pub trait AsWorker : Scheduler {
173 /// Returns the worker configuration parameters.
174 fn config(&self) -> &Config;
175 /// Index of the worker among its peers.
176 fn index(&self) -> usize;
177 /// Number of peer workers.
178 fn peers(&self) -> usize;
179 /// Allocates a new channel from a supplied identifier and address.
180 ///
181 /// The identifier is used to identify the underlying channel and route
182 /// its data. It should be distinct from other identifiers passed used
183 /// for allocation, but can otherwise be arbitrary.
184 ///
185 /// The address should specify a path to an operator that should be
186 /// scheduled in response to the receipt of records on the channel.
187 /// Most commonly, this would be the address of the *target* of the
188 /// channel.
189 fn allocate<T: Exchangeable>(&mut self, identifier: usize, address: Rc<[usize]>) -> (Vec<Box<dyn Push<T>>>, Box<dyn Pull<T>>);
190 /// Constructs a pipeline channel from the worker to itself.
191 ///
192 /// By default this method uses the native channel allocation mechanism, but the expectation is
193 /// that this behavior will be overridden to be more efficient.
194 fn pipeline<T: 'static>(&mut self, identifier: usize, address: Rc<[usize]>) -> (ThreadPusher<T>, ThreadPuller<T>);
195
196 /// Allocates a broadcast channel, where each pushed message is received by all.
197 fn broadcast<T: Exchangeable + Clone>(&mut self, identifier: usize, address: Rc<[usize]>) -> (Box<dyn Push<T>>, Box<dyn Pull<T>>);
198
199 /// Allocates a new worker-unique identifier.
200 fn new_identifier(&mut self) -> usize;
201 /// The next worker-unique identifier to be allocated.
202 fn peek_identifier(&self) -> usize;
203 /// Provides access to named logging streams.
204 fn log_register(&self) -> Option<RefMut<'_, crate::logging_core::Registry>>;
205 /// Acquires a logger by name, if the log register exists and the name is registered.
206 ///
207 /// For a more precise understanding of why a result is `None` one can use the direct functions.
208 fn logger_for<CB: crate::ContainerBuilder>(&self, name: &str) -> Option<timely_logging::Logger<CB>> {
209 self.log_register().and_then(|l| l.get(name))
210 }
211 /// Provides access to the timely logging stream.
212 fn logging(&self) -> Option<crate::logging::TimelyLogger> { self.logger_for("timely").map(Into::into) }
213}
214
215/// A `Worker` is the entry point to a timely dataflow computation. It wraps a `Allocate`,
216/// and has a list of dataflows that it manages.
217pub struct Worker<A: Allocate> {
218 config: Config,
219 /// An optional instant from which the start of the computation should be reckoned.
220 ///
221 /// If this is set to none, system time-based functionality will be unavailable or work badly.
222 /// For example, logging will be unavailable, and activation after a delay will be unavailable.
223 timer: Option<Instant>,
224 paths: Rc<RefCell<HashMap<usize, Rc<[usize]>>>>,
225 allocator: Rc<RefCell<A>>,
226 identifiers: Rc<RefCell<usize>>,
227 // dataflows: Rc<RefCell<Vec<Wrapper>>>,
228 dataflows: Rc<RefCell<HashMap<usize, Wrapper>>>,
229 dataflow_counter: Rc<RefCell<usize>>,
230 logging: Option<Rc<RefCell<crate::logging_core::Registry>>>,
231
232 activations: Rc<RefCell<Activations>>,
233 active_dataflows: Vec<usize>,
234
235 // Temporary storage for channel identifiers during dataflow construction.
236 // These are then associated with a dataflow once constructed.
237 temp_channel_ids: Rc<RefCell<Vec<usize>>>,
238}
239
240impl<A: Allocate> AsWorker for Worker<A> {
241 fn config(&self) -> &Config { &self.config }
242 fn index(&self) -> usize { self.allocator.borrow().index() }
243 fn peers(&self) -> usize { self.allocator.borrow().peers() }
244 fn allocate<D: Exchangeable>(&mut self, identifier: usize, address: Rc<[usize]>) -> (Vec<Box<dyn Push<D>>>, Box<dyn Pull<D>>) {
245 if address.is_empty() { panic!("Unacceptable address: Length zero"); }
246 let mut paths = self.paths.borrow_mut();
247 paths.insert(identifier, address);
248 self.temp_channel_ids.borrow_mut().push(identifier);
249 self.allocator.borrow_mut().allocate(identifier)
250 }
251 fn pipeline<T: 'static>(&mut self, identifier: usize, address: Rc<[usize]>) -> (ThreadPusher<T>, ThreadPuller<T>) {
252 if address.is_empty() { panic!("Unacceptable address: Length zero"); }
253 let mut paths = self.paths.borrow_mut();
254 paths.insert(identifier, address);
255 self.temp_channel_ids.borrow_mut().push(identifier);
256 self.allocator.borrow_mut().pipeline(identifier)
257 }
258 fn broadcast<T: Exchangeable + Clone>(&mut self, identifier: usize, address: Rc<[usize]>) -> (Box<dyn Push<T>>, Box<dyn Pull<T>>) {
259 if address.is_empty() { panic!("Unacceptable address: Length zero"); }
260 let mut paths = self.paths.borrow_mut();
261 paths.insert(identifier, address);
262 self.temp_channel_ids.borrow_mut().push(identifier);
263 self.allocator.borrow_mut().broadcast(identifier)
264 }
265
266 fn new_identifier(&mut self) -> usize { self.new_identifier() }
267 fn peek_identifier(&self) -> usize { self.peek_identifier() }
268 fn log_register(&self) -> Option<RefMut<'_, crate::logging_core::Registry>> {
269 self.log_register()
270 }
271}
272
273impl<A: Allocate> Scheduler for Worker<A> {
274 fn activations(&self) -> Rc<RefCell<Activations>> {
275 Rc::clone(&self.activations)
276 }
277}
278
279impl<A: Allocate> Worker<A> {
280 /// Allocates a new `Worker` bound to a channel allocator.
281 pub fn new(config: Config, c: A, now: Option<std::time::Instant>) -> Worker<A> {
282 Worker {
283 config,
284 timer: now,
285 paths: Default::default(),
286 allocator: Rc::new(RefCell::new(c)),
287 identifiers: Default::default(),
288 dataflows: Default::default(),
289 dataflow_counter: Default::default(),
290 logging: now.map(|now| Rc::new(RefCell::new(crate::logging_core::Registry::new(now)))),
291 activations: Rc::new(RefCell::new(Activations::new(now))),
292 active_dataflows: Default::default(),
293 temp_channel_ids: Default::default(),
294 }
295 }
296
297 /// Performs one step of the computation.
298 ///
299 /// A step gives each dataflow operator a chance to run, and is the
300 /// main way to ensure that a computation proceeds.
301 ///
302 /// # Examples
303 ///
304 /// ```
305 /// timely::execute_from_args(::std::env::args(), |worker| {
306 ///
307 /// use timely::dataflow::operators::{ToStream, Inspect};
308 ///
309 /// worker.dataflow::<usize,_,_>(|scope| {
310 /// (0 .. 10)
311 /// .to_stream(scope)
312 /// .container::<Vec<_>>()
313 /// .inspect(|x| println!("{:?}", x));
314 /// });
315 ///
316 /// worker.step();
317 /// });
318 /// ```
319 pub fn step(&mut self) -> bool {
320 self.step_or_park(Some(Duration::from_secs(0)))
321 }
322
323 /// Performs one step of the computation.
324 ///
325 /// A step gives each dataflow operator a chance to run, and is the
326 /// main way to ensure that a computation proceeds.
327 ///
328 /// This method takes an optional timeout and may park the thread until
329 /// there is work to perform or until this timeout expires. A value of
330 /// `None` allows the worker to park indefinitely, whereas a value of
331 /// `Some(Duration::new(0, 0))` will return without parking the thread.
332 ///
333 /// # Examples
334 ///
335 /// ```
336 /// timely::execute_from_args(::std::env::args(), |worker| {
337 ///
338 /// use std::time::Duration;
339 /// use timely::dataflow::operators::{ToStream, Inspect};
340 ///
341 /// worker.dataflow::<usize,_,_>(|scope| {
342 /// (0 .. 10)
343 /// .to_stream(scope)
344 /// .container::<Vec<_>>()
345 /// .inspect(|x| println!("{:?}", x));
346 /// });
347 ///
348 /// worker.step_or_park(Some(Duration::from_secs(1)));
349 /// });
350 /// ```
351 pub fn step_or_park(&mut self, duration: Option<Duration>) -> bool {
352
353 { // Process channel events. Activate responders.
354 let mut allocator = self.allocator.borrow_mut();
355 allocator.receive();
356 let events = allocator.events();
357 let mut borrow = events.borrow_mut();
358 let paths = self.paths.borrow();
359 borrow.sort_unstable();
360 borrow.dedup();
361 for channel in borrow.drain(..) {
362 // Consider tracking whether a channel
363 // in non-empty, and only activating
364 // on the basis of non-empty channels.
365 // TODO: This is a sloppy way to deal
366 // with channels that may not be alloc'd.
367 if let Some(path) = paths.get(&channel) {
368 self.activations
369 .borrow_mut()
370 .activate(&path[..]);
371 }
372 }
373 }
374
375 // Organize activations.
376 self.activations
377 .borrow_mut()
378 .advance();
379
380 // Consider parking only if we have no pending events, some dataflows, and a non-zero duration.
381 let empty_for = self.activations.borrow().empty_for();
382 // Determine the minimum park duration, where `None` are an absence of a constraint.
383 let delay = match (duration, empty_for) {
384 (Some(x), Some(y)) => Some(std::cmp::min(x,y)),
385 (x, y) => x.or(y),
386 };
387
388 if delay != Some(Duration::new(0,0)) {
389
390 // Log parking and flush log.
391 if let Some(l) = self.logging().as_mut() {
392 l.log(crate::logging::ParkEvent::park(delay));
393 l.flush();
394 }
395
396 self.allocator
397 .borrow()
398 .await_events(delay);
399
400 // Log return from unpark.
401 self.logging().as_mut().map(|l| l.log(crate::logging::ParkEvent::unpark()));
402 }
403 else { // Schedule active dataflows.
404
405 let active_dataflows = &mut self.active_dataflows;
406 self.activations
407 .borrow_mut()
408 .for_extensions(&[], |index| active_dataflows.push(index));
409
410 let mut dataflows = self.dataflows.borrow_mut();
411 for index in active_dataflows.drain(..) {
412 // Step dataflow if it exists, remove if not incomplete.
413 if let Entry::Occupied(mut entry) = dataflows.entry(index) {
414 // TODO: This is a moment at which a scheduling decision is being made.
415 let incomplete = entry.get_mut().step();
416 if !incomplete {
417 let mut paths = self.paths.borrow_mut();
418 for channel in entry.get_mut().channel_ids.drain(..) {
419 paths.remove(&channel);
420 }
421 entry.remove_entry();
422 }
423 }
424 }
425 }
426
427 // Clean up, indicate if dataflows remain.
428 self.logging.as_ref().map(|l| l.borrow_mut().flush());
429 self.allocator.borrow_mut().release();
430 !self.dataflows.borrow().is_empty()
431 }
432
433 /// Calls `self.step()` as long as `func` evaluates to `true`.
434 ///
435 /// This method will continually execute even if there is not work
436 /// for the worker to perform. Consider using the similar method
437 /// `Self::step_or_park_while(duration)` to allow the worker to yield
438 /// control if that is appropriate.
439 ///
440 /// # Examples
441 ///
442 /// ```
443 /// timely::execute_from_args(::std::env::args(), |worker| {
444 ///
445 /// use timely::dataflow::operators::{ToStream, Inspect, Probe};
446 ///
447 /// let probe =
448 /// worker.dataflow::<usize,_,_>(|scope| {
449 /// (0 .. 10)
450 /// .to_stream(scope)
451 /// .container::<Vec<_>>()
452 /// .inspect(|x| println!("{:?}", x))
453 /// .probe()
454 /// .0
455 /// });
456 ///
457 /// worker.step_while(|| probe.less_than(&0));
458 /// });
459 /// ```
460 pub fn step_while<F: FnMut()->bool>(&mut self, func: F) {
461 self.step_or_park_while(Some(Duration::from_secs(0)), func)
462 }
463
464 /// Calls `self.step_or_park(duration)` as long as `func` evaluates to `true`.
465 ///
466 /// This method may yield whenever there is no work to perform, as performed
467 /// by `Self::step_or_park()`. Please consult the documentation for further
468 /// information about that method and its behavior. In particular, the method
469 /// can park the worker indefinitely, if no new work re-awakens the worker.
470 ///
471 /// # Examples
472 ///
473 /// ```
474 /// timely::execute_from_args(::std::env::args(), |worker| {
475 ///
476 /// use timely::dataflow::operators::{ToStream, Inspect, Probe};
477 ///
478 /// let probe =
479 /// worker.dataflow::<usize,_,_>(|scope| {
480 /// (0 .. 10)
481 /// .to_stream(scope)
482 /// .container::<Vec<_>>()
483 /// .inspect(|x| println!("{:?}", x))
484 /// .probe()
485 /// .0
486 /// });
487 ///
488 /// worker.step_or_park_while(None, || probe.less_than(&0));
489 /// });
490 /// ```
491 pub fn step_or_park_while<F: FnMut()->bool>(&mut self, duration: Option<Duration>, mut func: F) {
492 while func() { self.step_or_park(duration); }
493 }
494
495 /// The index of the worker out of its peers.
496 ///
497 /// # Examples
498 /// ```
499 /// timely::execute_from_args(::std::env::args(), |worker| {
500 ///
501 /// let index = worker.index();
502 /// let peers = worker.peers();
503 /// let timer = worker.timer().unwrap();
504 ///
505 /// println!("{:?}\tWorker {} of {}", timer.elapsed(), index, peers);
506 ///
507 /// });
508 /// ```
509 pub fn index(&self) -> usize { self.allocator.borrow().index() }
510 /// The total number of peer workers.
511 ///
512 /// # Examples
513 /// ```
514 /// timely::execute_from_args(::std::env::args(), |worker| {
515 ///
516 /// let index = worker.index();
517 /// let peers = worker.peers();
518 /// let timer = worker.timer().unwrap();
519 ///
520 /// println!("{:?}\tWorker {} of {}", timer.elapsed(), index, peers);
521 ///
522 /// });
523 /// ```
524 pub fn peers(&self) -> usize { self.allocator.borrow().peers() }
525
526 /// A timer started at the initiation of the timely computation.
527 ///
528 /// # Examples
529 /// ```
530 /// timely::execute_from_args(::std::env::args(), |worker| {
531 ///
532 /// let index = worker.index();
533 /// let peers = worker.peers();
534 /// let timer = worker.timer().unwrap();
535 ///
536 /// println!("{:?}\tWorker {} of {}", timer.elapsed(), index, peers);
537 ///
538 /// });
539 /// ```
540 pub fn timer(&self) -> Option<Instant> { self.timer }
541
542 /// Allocate a new worker-unique identifier.
543 ///
544 /// This method is public, though it is not expected to be widely used outside
545 /// of the timely dataflow system.
546 pub fn new_identifier(&mut self) -> usize {
547 *self.identifiers.borrow_mut() += 1;
548 *self.identifiers.borrow() - 1
549 }
550
551 /// The next worker-unique identifier to be allocated.
552 pub fn peek_identifier(&self) -> usize {
553 *self.identifiers.borrow()
554 }
555
556 /// Access to named loggers.
557 ///
558 /// # Examples
559 ///
560 /// ```
561 /// timely::execute_from_args(::std::env::args(), |worker| {
562 ///
563 /// worker.log_register()
564 /// .unwrap()
565 /// .insert::<timely::logging::TimelyEventBuilder,_>("timely", |time, data|
566 /// println!("{:?}\t{:?}", time, data)
567 /// );
568 /// });
569 /// ```
570 pub fn log_register(&self) -> Option<RefMut<'_, crate::logging_core::Registry>> {
571 self.logging.as_ref().map(|l| l.borrow_mut())
572 }
573
574 /// Construct a new dataflow.
575 ///
576 /// # Examples
577 /// ```
578 /// timely::execute_from_args(::std::env::args(), |worker| {
579 ///
580 /// // We must supply the timestamp type here, although
581 /// // it would generally be determined by type inference.
582 /// worker.dataflow::<usize,_,_>(|scope| {
583 ///
584 /// // uses of `scope` to build dataflow
585 ///
586 /// });
587 /// });
588 /// ```
589 pub fn dataflow<T, R, F>(&mut self, func: F) -> R
590 where
591 T: Refines<()>,
592 F: FnOnce(&mut Child<Self, T>)->R,
593 {
594 self.dataflow_core("Dataflow", self.logging(), Box::new(()), |_, child| func(child))
595 }
596
597 /// Construct a new dataflow with a (purely cosmetic) name.
598 ///
599 /// # Examples
600 /// ```
601 /// timely::execute_from_args(::std::env::args(), |worker| {
602 ///
603 /// // We must supply the timestamp type here, although
604 /// // it would generally be determined by type inference.
605 /// worker.dataflow_named::<usize,_,_>("Some Dataflow", |scope| {
606 ///
607 /// // uses of `scope` to build dataflow
608 ///
609 /// });
610 /// });
611 /// ```
612 pub fn dataflow_named<T, R, F>(&mut self, name: &str, func: F) -> R
613 where
614 T: Refines<()>,
615 F: FnOnce(&mut Child<Self, T>)->R,
616 {
617 self.dataflow_core(name, self.logging(), Box::new(()), |_, child| func(child))
618 }
619
620 /// Construct a new dataflow with specific configurations.
621 ///
622 /// This method constructs a new dataflow, using a name, logger, and additional
623 /// resources specified as argument. The name is cosmetic, the logger is used to
624 /// handle events generated by the dataflow, and the additional resources are kept
625 /// alive for as long as the dataflow is alive (use case: shared library bindings).
626 ///
627 /// # Examples
628 /// ```
629 /// timely::execute_from_args(::std::env::args(), |worker| {
630 ///
631 /// // We must supply the timestamp type here, although
632 /// // it would generally be determined by type inference.
633 /// worker.dataflow_core::<usize,_,_,_>(
634 /// "dataflow X", // Dataflow name
635 /// None, // Optional logger
636 /// 37, // Any resources
637 /// |resources, scope| { // Closure
638 ///
639 /// // uses of `resources`, `scope`to build dataflow
640 ///
641 /// }
642 /// );
643 /// });
644 /// ```
645 pub fn dataflow_core<T, R, F, V>(&mut self, name: &str, mut logging: Option<TimelyLogger>, mut resources: V, func: F) -> R
646 where
647 T: Refines<()>,
648 F: FnOnce(&mut V, &mut Child<Self, T>)->R,
649 V: Any+'static,
650 {
651 let dataflow_index = self.allocate_dataflow_index();
652 let addr = vec![dataflow_index].into();
653 let identifier = self.new_identifier();
654
655 let type_name = std::any::type_name::<T>();
656 let progress_logging = self.logger_for(&format!("timely/progress/{}", type_name));
657 let summary_logging = self.logger_for(&format!("timely/summary/{}", type_name));
658 let subscope = SubgraphBuilder::new_from(addr, identifier, logging.clone(), summary_logging, name);
659 let subscope = RefCell::new(subscope);
660
661 let result = {
662 let mut builder = Child {
663 subgraph: &subscope,
664 parent: self.clone(),
665 logging: logging.clone(),
666 progress_logging,
667 };
668 func(&mut resources, &mut builder)
669 };
670
671 let mut operator = subscope.into_inner().build(self);
672
673 if let Some(l) = logging.as_mut() {
674 l.log(crate::logging::OperatesEvent {
675 id: identifier,
676 addr: operator.path().to_vec(),
677 name: operator.name().to_string(),
678 });
679 l.flush();
680 }
681
682 operator.get_internal_summary();
683 operator.set_external_summary();
684
685 let mut temp_channel_ids = self.temp_channel_ids.borrow_mut();
686 let channel_ids = temp_channel_ids.drain(..).collect::<Vec<_>>();
687
688 let wrapper = Wrapper {
689 logging,
690 identifier,
691 operate: Some(Box::new(operator)),
692 resources: Some(Box::new(resources)),
693 channel_ids,
694 };
695 self.dataflows.borrow_mut().insert(dataflow_index, wrapper);
696
697 result
698
699 }
700
701 /// Drops an identified dataflow.
702 ///
703 /// This method removes the identified dataflow, which will no longer be scheduled.
704 /// Various other resources will be cleaned up, though the method is currently in
705 /// public beta rather than expected to work. Please report all crashes and unmet
706 /// expectations!
707 pub fn drop_dataflow(&mut self, dataflow_identifier: usize) {
708 if let Some(mut entry) = self.dataflows.borrow_mut().remove(&dataflow_identifier) {
709 // Garbage collect channel_id to path information.
710 let mut paths = self.paths.borrow_mut();
711 for channel in entry.channel_ids.drain(..) {
712 paths.remove(&channel);
713 }
714 }
715 }
716
717 /// Returns the next index to be used for dataflow construction.
718 ///
719 /// This identifier will appear in the address of contained operators, and can
720 /// be used to drop the dataflow using `self.drop_dataflow()`.
721 pub fn next_dataflow_index(&self) -> usize {
722 *self.dataflow_counter.borrow()
723 }
724
725 /// List the current dataflow indices.
726 pub fn installed_dataflows(&self) -> Vec<usize> {
727 self.dataflows.borrow().keys().cloned().collect()
728 }
729
730 /// Returns `true` if there is at least one dataflow under management.
731 pub fn has_dataflows(&self) -> bool {
732 !self.dataflows.borrow().is_empty()
733 }
734
735 // Acquire a new distinct dataflow identifier.
736 fn allocate_dataflow_index(&self) -> usize {
737 *self.dataflow_counter.borrow_mut() += 1;
738 *self.dataflow_counter.borrow() - 1
739 }
740}
741
742impl<A: Allocate> Clone for Worker<A> {
743 fn clone(&self) -> Self {
744 Worker {
745 config: self.config.clone(),
746 timer: self.timer,
747 paths: Rc::clone(&self.paths),
748 allocator: Rc::clone(&self.allocator),
749 identifiers: Rc::clone(&self.identifiers),
750 dataflows: Rc::clone(&self.dataflows),
751 dataflow_counter: Rc::clone(&self.dataflow_counter),
752 logging: self.logging.clone(),
753 activations: Rc::clone(&self.activations),
754 active_dataflows: Vec::new(),
755 temp_channel_ids: Rc::clone(&self.temp_channel_ids),
756 }
757 }
758}
759
760struct Wrapper {
761 logging: Option<TimelyLogger>,
762 identifier: usize,
763 operate: Option<Box<dyn Schedule>>,
764 resources: Option<Box<dyn Any>>,
765 channel_ids: Vec<usize>,
766}
767
768impl Wrapper {
769 /// Steps the dataflow, indicates if it remains incomplete.
770 ///
771 /// If the dataflow is incomplete, this call will drop it and its resources,
772 /// dropping the dataflow first and then the resources (so that, e.g., shared
773 /// library bindings will outlive the dataflow).
774 fn step(&mut self) -> bool {
775
776 // Perhaps log information about the start of the schedule call.
777 if let Some(l) = self.logging.as_mut() {
778 l.log(crate::logging::ScheduleEvent::start(self.identifier));
779 }
780
781 let incomplete = self.operate.as_mut().map(|op| op.schedule()).unwrap_or(false);
782 if !incomplete {
783 self.operate = None;
784 self.resources = None;
785 }
786
787 // Perhaps log information about the stop of the schedule call.
788 if let Some(l) = self.logging.as_mut() {
789 l.log(crate::logging::ScheduleEvent::stop(self.identifier));
790 }
791
792 incomplete
793 }
794}
795
796impl Drop for Wrapper {
797 fn drop(&mut self) {
798 if let Some(l) = self.logging.as_mut() {
799 l.log(crate::logging::ShutdownEvent { id: self.identifier });
800 }
801 // ensure drop order
802 self.operate = None;
803 self.resources = None;
804 }
805}