timely/worker.rs
1//! The root of each single-threaded worker.
2
3use std::rc::Rc;
4use std::cell::{RefCell, RefMut};
5use std::any::Any;
6use std::str::FromStr;
7use std::time::{Instant, Duration};
8use std::collections::HashMap;
9use std::collections::hash_map::Entry;
10use std::sync::Arc;
11
12use crate::communication::{Allocator, Exchangeable, Push, Pull};
13use crate::communication::allocator::thread::{ThreadPusher, ThreadPuller};
14use crate::scheduling::{Schedule, Activations, Activator, SyncActivator};
15use crate::progress::timestamp::{Refines};
16use crate::progress::SubgraphBuilder;
17use crate::progress::operate::Operate;
18use crate::dataflow::scope::Scope;
19use crate::logging::TimelyLogger;
20
21/// Different ways in which timely's progress tracking can work.
22///
23/// These options drive some buffering and accumulation that timely
24/// can do to try and trade volume of progress traffic against latency.
25/// By accumulating updates longer, a smaller total volume of messages
26/// are sent.
27///
28/// The `ProgressMode::Demand` variant is the most robust, and least
29/// likely to lead to catastrophic performance. The `Eager` variant
30/// is useful for getting the smallest latencies on systems with few
31/// workers, but does risk saturating the system with progress messages
32/// and should be used with care, or not at all.
33///
34/// If you are not certain which option to use, prefer `Demand`, and
35/// perhaps monitor the progress messages through timely's logging
36/// infrastructure to see if their volume is surprisingly high.
37#[derive(Debug, Default, Clone, Copy, Eq, PartialEq)]
38pub enum ProgressMode {
39 /// Eagerly transmit all progress updates produced by a worker.
40 ///
41 /// Progress messages are transmitted without consideration for the
42 /// possibility that they may unblock other workers. This can result
43 /// in a substantial volume of messages that do not result in a
44 /// change to the lower bound of outstanding work.
45 Eager,
46 /// Delay transmission of progress updates until any could advance
47 /// the global frontier of timestamps.
48 ///
49 /// As timely executes, the progress messages inform each worker of
50 /// the outstanding work remaining in the system. As workers work,
51 /// they produce changes to this outstanding work. This option
52 /// delays the communication of those changes until they might
53 /// possibly cause a change in the lower bound of all outstanding
54 /// work.
55 ///
56 /// The most common case this remedies is when one worker transmits
57 /// messages to other workers, that worker holds a capability for the
58 /// operator and timestamp. Other workers will receive messages, and
59 /// with this option will not immediately acknowledge receiving the
60 /// messages, because the held capability is strictly prior to what
61 /// the messages can affect. Once the capability is released, the
62 /// progress messages are unblocked and transmitted, in accumulated
63 /// form.
64 #[default]
65 Demand,
66}
67
68impl FromStr for ProgressMode {
69 type Err = String;
70
71 fn from_str(s: &str) -> Result<ProgressMode, String> {
72 match s {
73 "eager" => Ok(ProgressMode::Eager),
74 "demand" => Ok(ProgressMode::Demand),
75 _ => Err(format!("unknown progress mode: {}", s)),
76 }
77 }
78}
79
80/// Worker configuration.
81#[derive(Debug, Default, Clone)]
82pub struct Config {
83 /// The progress mode to use.
84 pub(crate) progress_mode: ProgressMode,
85 /// A map from parameter name to typed parameter values.
86 registry: HashMap<String, Arc<dyn Any + Send + Sync>>,
87}
88
89impl Config {
90 /// Installs options into a [getopts::Options] struct that correspond
91 /// to the parameters in the configuration.
92 ///
93 /// It is the caller's responsibility to ensure that the installed options
94 /// do not conflict with any other options that may exist in `opts`, or
95 /// that may be installed into `opts` in the future.
96 ///
97 /// This method is only available if the `getopts` feature is enabled, which
98 /// it is by default.
99 #[cfg(feature = "getopts")]
100 pub fn install_options(opts: &mut getopts::Options) {
101 opts.optopt("", "progress-mode", "progress tracking mode (eager or demand)", "MODE");
102 }
103
104 /// Instantiates a configuration based upon the parsed options in `matches`.
105 ///
106 /// The `matches` object must have been constructed from a
107 /// [getopts::Options] which contained at least the options installed by
108 /// [Self::install_options].
109 ///
110 /// This method is only available if the `getopts` feature is enabled, which
111 /// it is by default.
112 #[cfg(feature = "getopts")]
113 pub fn from_matches(matches: &getopts::Matches) -> Result<Config, String> {
114 let progress_mode = matches
115 .opt_get_default("progress-mode", ProgressMode::Demand)?;
116 Ok(Config::default().progress_mode(progress_mode))
117 }
118
119 /// Sets the progress mode to `progress_mode`.
120 pub fn progress_mode(mut self, progress_mode: ProgressMode) -> Self {
121 self.progress_mode = progress_mode;
122 self
123 }
124
125 /// Sets a typed configuration parameter for the given `key`.
126 ///
127 /// It is recommended to install a single configuration struct using a key
128 /// that uniquely identifies your project, to avoid clashes. For example,
129 /// differential dataflow registers a configuration struct under the key
130 /// "differential".
131 ///
132 /// # Examples
133 /// ```rust
134 /// let mut config = timely::Config::process(3);
135 /// config.worker.set("example".to_string(), 7u64);
136 /// timely::execute(config, |worker| {
137 /// assert_eq!(worker.config().get::<u64>("example"), Some(&7));
138 /// }).unwrap();
139 /// ```
140 pub fn set<T>(&mut self, key: String, val: T) -> &mut Self
141 where
142 T: Send + Sync + 'static,
143 {
144 self.registry.insert(key, Arc::new(val));
145 self
146 }
147
148 /// Gets the value for configured parameter `key`.
149 ///
150 /// Returns `None` if `key` has not previously been set with
151 /// [Config::set], or if the specified `T` does not match the `T`
152 /// from the call to `set`.
153 ///
154 /// # Examples
155 /// ```rust
156 /// let mut config = timely::Config::process(3);
157 /// config.worker.set("example".to_string(), 7u64);
158 /// timely::execute(config, |worker| {
159 /// assert_eq!(worker.config().get::<u64>("example"), Some(&7));
160 /// }).unwrap();
161 /// ```
162 pub fn get<T: 'static>(&self, key: &str) -> Option<&T> {
163 self.registry.get(key).and_then(|val| val.downcast_ref())
164 }
165}
166
167
168/// A `Worker` is the entry point to a timely dataflow computation. It wraps an `Allocator`
169/// and has a list of dataflows that it manages.
170pub struct Worker {
171 config: Config,
172 /// An optional instant from which the start of the computation should be reckoned.
173 ///
174 /// If this is set to none, system time-based functionality will be unavailable or work badly.
175 /// For example, logging will be unavailable, and activation after a delay will be unavailable.
176 timer: Option<Instant>,
177 paths: Rc<RefCell<HashMap<usize, Rc<[usize]>>>>,
178 allocator: Rc<RefCell<Allocator>>,
179 identifiers: Rc<RefCell<usize>>,
180 // dataflows: Rc<RefCell<Vec<Wrapper>>>,
181 dataflows: Rc<RefCell<HashMap<usize, Wrapper>>>,
182 dataflow_counter: Rc<RefCell<usize>>,
183 logging: Option<Rc<RefCell<crate::logging_core::Registry>>>,
184
185 activations: Rc<RefCell<Activations>>,
186 active_dataflows: Vec<usize>,
187
188 // Temporary storage for channel identifiers during dataflow construction.
189 // These are then associated with a dataflow once constructed.
190 temp_channel_ids: Rc<RefCell<Vec<usize>>>,
191}
192
193
194
195impl Worker {
196 /// Allocates a new `Worker` bound to a channel allocator.
197 pub fn new(config: Config, c: Allocator, now: Option<std::time::Instant>) -> Worker {
198 Worker {
199 config,
200 timer: now,
201 paths: Default::default(),
202 allocator: Rc::new(RefCell::new(c)),
203 identifiers: Default::default(),
204 dataflows: Default::default(),
205 dataflow_counter: Default::default(),
206 logging: now.map(|now| Rc::new(RefCell::new(crate::logging_core::Registry::new(now)))),
207 activations: Rc::new(RefCell::new(Activations::new(now))),
208 active_dataflows: Default::default(),
209 temp_channel_ids: Default::default(),
210 }
211 }
212
213 /// Performs one step of the computation.
214 ///
215 /// A step gives each dataflow operator a chance to run, and is the
216 /// main way to ensure that a computation proceeds.
217 ///
218 /// # Examples
219 ///
220 /// ```
221 /// timely::execute_from_args(::std::env::args(), |worker| {
222 ///
223 /// use timely::dataflow::operators::{ToStream, Inspect};
224 ///
225 /// worker.dataflow::<usize,_,_>(|scope| {
226 /// (0 .. 10)
227 /// .to_stream(scope)
228 /// .container::<Vec<_>>()
229 /// .inspect(|x| println!("{:?}", x));
230 /// });
231 ///
232 /// worker.step();
233 /// });
234 /// ```
235 pub fn step(&mut self) -> bool {
236 self.step_or_park(Some(Duration::from_secs(0)))
237 }
238
239 /// Performs one step of the computation.
240 ///
241 /// A step gives each dataflow operator a chance to run, and is the
242 /// main way to ensure that a computation proceeds.
243 ///
244 /// This method takes an optional timeout and may park the thread until
245 /// there is work to perform or until this timeout expires. A value of
246 /// `None` allows the worker to park indefinitely, whereas a value of
247 /// `Some(Duration::new(0, 0))` will return without parking the thread.
248 ///
249 /// # Examples
250 ///
251 /// ```
252 /// timely::execute_from_args(::std::env::args(), |worker| {
253 ///
254 /// use std::time::Duration;
255 /// use timely::dataflow::operators::{ToStream, Inspect};
256 ///
257 /// worker.dataflow::<usize,_,_>(|scope| {
258 /// (0 .. 10)
259 /// .to_stream(scope)
260 /// .container::<Vec<_>>()
261 /// .inspect(|x| println!("{:?}", x));
262 /// });
263 ///
264 /// worker.step_or_park(Some(Duration::from_secs(1)));
265 /// });
266 /// ```
267 pub fn step_or_park(&mut self, duration: Option<Duration>) -> bool {
268
269 { // Process channel events. Activate responders.
270 let mut allocator = self.allocator.borrow_mut();
271 allocator.receive();
272 let events = allocator.events();
273 let mut borrow = events.borrow_mut();
274 let paths = self.paths.borrow();
275 borrow.sort_unstable();
276 borrow.dedup();
277 for channel in borrow.drain(..) {
278 // Consider tracking whether a channel
279 // in non-empty, and only activating
280 // on the basis of non-empty channels.
281 // TODO: This is a sloppy way to deal
282 // with channels that may not be alloc'd.
283 if let Some(path) = paths.get(&channel) {
284 self.activations
285 .borrow_mut()
286 .activate(&path[..]);
287 }
288 }
289 }
290
291 // Organize activations.
292 self.activations
293 .borrow_mut()
294 .advance();
295
296 // Consider parking only if we have no pending events, some dataflows, and a non-zero duration.
297 let empty_for = self.activations.borrow().empty_for();
298 // Determine the minimum park duration, where `None` are an absence of a constraint.
299 let delay = match (duration, empty_for) {
300 (Some(x), Some(y)) => Some(std::cmp::min(x,y)),
301 (x, y) => x.or(y),
302 };
303
304 if delay != Some(Duration::new(0,0)) {
305
306 // Log parking and flush log.
307 if let Some(l) = self.logging().as_mut() {
308 l.log(crate::logging::ParkEvent::park(delay));
309 l.flush();
310 }
311
312 self.allocator
313 .borrow()
314 .await_events(delay);
315
316 // Log return from unpark.
317 self.logging().as_mut().map(|l| l.log(crate::logging::ParkEvent::unpark()));
318 }
319 else { // Schedule active dataflows.
320
321 let active_dataflows = &mut self.active_dataflows;
322 self.activations
323 .borrow_mut()
324 .for_extensions(&[], |index| active_dataflows.push(index));
325
326 let mut dataflows = self.dataflows.borrow_mut();
327 for index in active_dataflows.drain(..) {
328 // Step dataflow if it exists, remove if not incomplete.
329 if let Entry::Occupied(mut entry) = dataflows.entry(index) {
330 // TODO: This is a moment at which a scheduling decision is being made.
331 let incomplete = entry.get_mut().step();
332 if !incomplete {
333 let mut paths = self.paths.borrow_mut();
334 for channel in entry.get_mut().channel_ids.drain(..) {
335 paths.remove(&channel);
336 }
337 entry.remove_entry();
338 }
339 }
340 }
341 }
342
343 // Clean up, indicate if dataflows remain.
344 self.logging.as_ref().map(|l| l.borrow_mut().flush());
345 self.allocator.borrow_mut().release();
346 !self.dataflows.borrow().is_empty()
347 }
348
349 /// Calls `self.step()` as long as `func` evaluates to `true`.
350 ///
351 /// This method will continually execute even if there is not work
352 /// for the worker to perform. Consider using the similar method
353 /// `Self::step_or_park_while(duration)` to allow the worker to yield
354 /// control if that is appropriate.
355 ///
356 /// # Examples
357 ///
358 /// ```
359 /// timely::execute_from_args(::std::env::args(), |worker| {
360 ///
361 /// use timely::dataflow::operators::{ToStream, Inspect, Probe};
362 ///
363 /// let probe =
364 /// worker.dataflow::<usize,_,_>(|scope| {
365 /// (0 .. 10)
366 /// .to_stream(scope)
367 /// .container::<Vec<_>>()
368 /// .inspect(|x| println!("{:?}", x))
369 /// .probe()
370 /// .0
371 /// });
372 ///
373 /// worker.step_while(|| probe.less_than(&0));
374 /// });
375 /// ```
376 pub fn step_while<F: FnMut()->bool>(&mut self, func: F) {
377 self.step_or_park_while(Some(Duration::from_secs(0)), func)
378 }
379
380 /// Calls `self.step_or_park(duration)` as long as `func` evaluates to `true`.
381 ///
382 /// This method may yield whenever there is no work to perform, as performed
383 /// by `Self::step_or_park()`. Please consult the documentation for further
384 /// information about that method and its behavior. In particular, the method
385 /// can park the worker indefinitely, if no new work re-awakens the worker.
386 ///
387 /// # Examples
388 ///
389 /// ```
390 /// timely::execute_from_args(::std::env::args(), |worker| {
391 ///
392 /// use timely::dataflow::operators::{ToStream, Inspect, Probe};
393 ///
394 /// let probe =
395 /// worker.dataflow::<usize,_,_>(|scope| {
396 /// (0 .. 10)
397 /// .to_stream(scope)
398 /// .container::<Vec<_>>()
399 /// .inspect(|x| println!("{:?}", x))
400 /// .probe()
401 /// .0
402 /// });
403 ///
404 /// worker.step_or_park_while(None, || probe.less_than(&0));
405 /// });
406 /// ```
407 pub fn step_or_park_while<F: FnMut()->bool>(&mut self, duration: Option<Duration>, mut func: F) {
408 while func() { self.step_or_park(duration); }
409 }
410
411 /// The index of the worker out of its peers.
412 ///
413 /// # Examples
414 /// ```
415 /// timely::execute_from_args(::std::env::args(), |worker| {
416 ///
417 /// let index = worker.index();
418 /// let peers = worker.peers();
419 /// let timer = worker.timer().unwrap();
420 ///
421 /// println!("{:?}\tWorker {} of {}", timer.elapsed(), index, peers);
422 ///
423 /// });
424 /// ```
425 pub fn index(&self) -> usize { self.allocator.borrow().index() }
426 /// The total number of peer workers.
427 ///
428 /// # Examples
429 /// ```
430 /// timely::execute_from_args(::std::env::args(), |worker| {
431 ///
432 /// let index = worker.index();
433 /// let peers = worker.peers();
434 /// let timer = worker.timer().unwrap();
435 ///
436 /// println!("{:?}\tWorker {} of {}", timer.elapsed(), index, peers);
437 ///
438 /// });
439 /// ```
440 pub fn peers(&self) -> usize { self.allocator.borrow().peers() }
441
442 /// A timer started at the initiation of the timely computation.
443 ///
444 /// # Examples
445 /// ```
446 /// timely::execute_from_args(::std::env::args(), |worker| {
447 ///
448 /// let index = worker.index();
449 /// let peers = worker.peers();
450 /// let timer = worker.timer().unwrap();
451 ///
452 /// println!("{:?}\tWorker {} of {}", timer.elapsed(), index, peers);
453 ///
454 /// });
455 /// ```
456 pub fn timer(&self) -> Option<Instant> { self.timer }
457
458 /// Allocate a new worker-unique identifier.
459 ///
460 /// This method is public, though it is not expected to be widely used outside
461 /// of the timely dataflow system.
462 pub fn new_identifier(&self) -> usize {
463 *self.identifiers.borrow_mut() += 1;
464 *self.identifiers.borrow() - 1
465 }
466
467 /// The next worker-unique identifier to be allocated.
468 pub fn peek_identifier(&self) -> usize {
469 *self.identifiers.borrow()
470 }
471
472 /// Access to named loggers.
473 ///
474 /// # Examples
475 ///
476 /// ```
477 /// timely::execute_from_args(::std::env::args(), |worker| {
478 ///
479 /// worker.log_register()
480 /// .unwrap()
481 /// .insert::<timely::logging::TimelyEventBuilder,_>("timely", |time, data|
482 /// println!("{:?}\t{:?}", time, data)
483 /// );
484 /// });
485 /// ```
486 pub fn log_register(&self) -> Option<RefMut<'_, crate::logging_core::Registry>> {
487 self.logging.as_ref().map(|l| l.borrow_mut())
488 }
489
490 /// Returns the worker configuration parameters.
491 pub fn config(&self) -> &Config { &self.config }
492
493 /// Provides a shared handle to the activation scheduler.
494 pub fn activations(&self) -> Rc<RefCell<Activations>> {
495 Rc::clone(&self.activations)
496 }
497
498 /// Constructs an `Activator` tied to the specified operator address.
499 pub fn activator_for(&self, path: Rc<[usize]>) -> Activator {
500 Activator::new(path, self.activations())
501 }
502
503 /// Constructs a `SyncActivator` tied to the specified operator address.
504 pub fn sync_activator_for(&self, path: Vec<usize>) -> SyncActivator {
505 SyncActivator::new(path, self.activations().borrow().sync())
506 }
507
508 /// Acquires a logger by name, if the log register exists and the name is registered.
509 pub fn logger_for<CB: crate::ContainerBuilder>(&self, name: &str) -> Option<timely_logging::Logger<CB>> {
510 self.log_register().and_then(|l| l.get(name))
511 }
512
513 /// Provides access to the timely logging stream.
514 pub fn logging(&self) -> Option<crate::logging::TimelyLogger> { self.logger_for("timely").map(Into::into) }
515
516 /// Allocates a new channel from a supplied identifier and address.
517 pub fn allocate<D: Exchangeable>(&self, identifier: usize, address: Rc<[usize]>) -> (Vec<Box<dyn Push<D>>>, Box<dyn Pull<D>>) {
518 if address.is_empty() { panic!("Unacceptable address: Length zero"); }
519 let mut paths = self.paths.borrow_mut();
520 paths.insert(identifier, address);
521 self.temp_channel_ids.borrow_mut().push(identifier);
522 self.allocator.borrow_mut().allocate(identifier)
523 }
524
525 /// Constructs a pipeline channel from the worker to itself.
526 pub fn pipeline<T: 'static>(&self, identifier: usize, address: Rc<[usize]>) -> (ThreadPusher<T>, ThreadPuller<T>) {
527 if address.is_empty() { panic!("Unacceptable address: Length zero"); }
528 let mut paths = self.paths.borrow_mut();
529 paths.insert(identifier, address);
530 self.temp_channel_ids.borrow_mut().push(identifier);
531 self.allocator.borrow_mut().pipeline(identifier)
532 }
533
534 /// Allocates a broadcast channel, where each pushed message is received by all.
535 pub fn broadcast<T: Exchangeable + Clone>(&self, identifier: usize, address: Rc<[usize]>) -> (Box<dyn Push<T>>, Box<dyn Pull<T>>) {
536 if address.is_empty() { panic!("Unacceptable address: Length zero"); }
537 let mut paths = self.paths.borrow_mut();
538 paths.insert(identifier, address);
539 self.temp_channel_ids.borrow_mut().push(identifier);
540 self.allocator.borrow_mut().broadcast(identifier)
541 }
542
543 /// Construct a new dataflow.
544 ///
545 /// # Examples
546 /// ```
547 /// timely::execute_from_args(::std::env::args(), |worker| {
548 ///
549 /// // We must supply the timestamp type here, although
550 /// // it would generally be determined by type inference.
551 /// worker.dataflow::<usize,_,_>(|scope| {
552 ///
553 /// // uses of `scope` to build dataflow
554 ///
555 /// });
556 /// });
557 /// ```
558 pub fn dataflow<T, R, F>(&mut self, func: F) -> R
559 where
560 T: Refines<()>,
561 F: FnOnce(Scope<T>)->R,
562 {
563 self.dataflow_core("Dataflow", self.logging(), Box::new(()), |_, child| func(child))
564 }
565
566 /// Construct a new dataflow with a (purely cosmetic) name.
567 ///
568 /// # Examples
569 /// ```
570 /// timely::execute_from_args(::std::env::args(), |worker| {
571 ///
572 /// // We must supply the timestamp type here, although
573 /// // it would generally be determined by type inference.
574 /// worker.dataflow_named::<usize,_,_>("Some Dataflow", |scope| {
575 ///
576 /// // uses of `scope` to build dataflow
577 ///
578 /// });
579 /// });
580 /// ```
581 pub fn dataflow_named<T, R, F>(&mut self, name: &str, func: F) -> R
582 where
583 T: Refines<()>,
584 F: FnOnce(Scope<T>)->R,
585 {
586 self.dataflow_core(name, self.logging(), Box::new(()), |_, child| func(child))
587 }
588
589 /// Construct a new dataflow with specific configurations.
590 ///
591 /// This method constructs a new dataflow, using a name, logger, and additional
592 /// resources specified as argument. The name is cosmetic, the logger is used to
593 /// handle events generated by the dataflow, and the additional resources are kept
594 /// alive for as long as the dataflow is alive (use case: shared library bindings).
595 ///
596 /// # Examples
597 /// ```
598 /// timely::execute_from_args(::std::env::args(), |worker| {
599 ///
600 /// // We must supply the timestamp type here, although
601 /// // it would generally be determined by type inference.
602 /// worker.dataflow_core::<usize,_,_,_>(
603 /// "dataflow X", // Dataflow name
604 /// None, // Optional logger
605 /// 37, // Any resources
606 /// |resources, scope| { // Closure
607 ///
608 /// // uses of `resources`, `scope`to build dataflow
609 ///
610 /// }
611 /// );
612 /// });
613 /// ```
614 pub fn dataflow_core<T, R, F, V>(&mut self, name: &str, mut logging: Option<TimelyLogger>, mut resources: V, func: F) -> R
615 where
616 T: Refines<()>,
617 F: FnOnce(&mut V, Scope<T>)->R,
618 V: Any+'static,
619 {
620 let dataflow_index = self.allocate_dataflow_index();
621 let addr = vec![dataflow_index].into();
622 let identifier = self.new_identifier();
623
624 let subscope = SubgraphBuilder::new_from(addr, identifier, name);
625 let subscope = RefCell::new(subscope);
626
627 let result = {
628 let builder = Scope {
629 subgraph: &subscope,
630 worker: self,
631 };
632 func(&mut resources, builder)
633 };
634
635 let operator = subscope.into_inner().build(self);
636
637 if let Some(l) = logging.as_mut() {
638 l.log(crate::logging::OperatesEvent {
639 id: identifier,
640 addr: operator.path().to_vec(),
641 name: operator.name().to_string(),
642 });
643 l.flush();
644 }
645
646 let (_, _, operator) = Box::new(operator).initialize();
647
648 let mut temp_channel_ids = self.temp_channel_ids.borrow_mut();
649 let channel_ids = temp_channel_ids.drain(..).collect::<Vec<_>>();
650
651 let wrapper = Wrapper {
652 logging,
653 identifier,
654 operate: Some(operator),
655 resources: Some(Box::new(resources)),
656 channel_ids,
657 };
658 self.dataflows.borrow_mut().insert(dataflow_index, wrapper);
659
660 result
661
662 }
663
664 /// Drops an identified dataflow.
665 ///
666 /// This method removes the identified dataflow, which will no longer be scheduled.
667 /// Various other resources will be cleaned up, though the method is currently in
668 /// public beta rather than expected to work. Please report all crashes and unmet
669 /// expectations!
670 pub fn drop_dataflow(&mut self, dataflow_identifier: usize) {
671 if let Some(mut entry) = self.dataflows.borrow_mut().remove(&dataflow_identifier) {
672 // Garbage collect channel_id to path information.
673 let mut paths = self.paths.borrow_mut();
674 for channel in entry.channel_ids.drain(..) {
675 paths.remove(&channel);
676 }
677 }
678 }
679
680 /// Returns the next index to be used for dataflow construction.
681 ///
682 /// This identifier will appear in the address of contained operators, and can
683 /// be used to drop the dataflow using `self.drop_dataflow()`.
684 pub fn next_dataflow_index(&self) -> usize {
685 *self.dataflow_counter.borrow()
686 }
687
688 /// List the current dataflow indices.
689 pub fn installed_dataflows(&self) -> Vec<usize> {
690 self.dataflows.borrow().keys().cloned().collect()
691 }
692
693 /// Returns `true` if there is at least one dataflow under management.
694 pub fn has_dataflows(&self) -> bool {
695 !self.dataflows.borrow().is_empty()
696 }
697
698 // Acquire a new distinct dataflow identifier.
699 fn allocate_dataflow_index(&self) -> usize {
700 *self.dataflow_counter.borrow_mut() += 1;
701 *self.dataflow_counter.borrow() - 1
702 }
703}
704
705impl Clone for Worker {
706 fn clone(&self) -> Self {
707 Worker {
708 config: self.config.clone(),
709 timer: self.timer,
710 paths: Rc::clone(&self.paths),
711 allocator: Rc::clone(&self.allocator),
712 identifiers: Rc::clone(&self.identifiers),
713 dataflows: Rc::clone(&self.dataflows),
714 dataflow_counter: Rc::clone(&self.dataflow_counter),
715 logging: self.logging.clone(),
716 activations: Rc::clone(&self.activations),
717 active_dataflows: Vec::new(),
718 temp_channel_ids: Rc::clone(&self.temp_channel_ids),
719 }
720 }
721}
722
723struct Wrapper {
724 logging: Option<TimelyLogger>,
725 identifier: usize,
726 operate: Option<Box<dyn Schedule>>,
727 resources: Option<Box<dyn Any>>,
728 channel_ids: Vec<usize>,
729}
730
731impl Wrapper {
732 /// Steps the dataflow, indicates if it remains incomplete.
733 ///
734 /// If the dataflow is incomplete, this call will drop it and its resources,
735 /// dropping the dataflow first and then the resources (so that, e.g., shared
736 /// library bindings will outlive the dataflow).
737 fn step(&mut self) -> bool {
738
739 // Perhaps log information about the start of the schedule call.
740 if let Some(l) = self.logging.as_mut() {
741 l.log(crate::logging::ScheduleEvent::start(self.identifier));
742 }
743
744 let incomplete = self.operate.as_mut().map(|op| op.schedule()).unwrap_or(false);
745 if !incomplete {
746 self.operate = None;
747 self.resources = None;
748 }
749
750 // Perhaps log information about the stop of the schedule call.
751 if let Some(l) = self.logging.as_mut() {
752 l.log(crate::logging::ScheduleEvent::stop(self.identifier));
753 }
754
755 incomplete
756 }
757}
758
759impl Drop for Wrapper {
760 fn drop(&mut self) {
761 if let Some(l) = self.logging.as_mut() {
762 l.log(crate::logging::ShutdownEvent { id: self.identifier });
763 }
764 // ensure drop order
765 self.operate = None;
766 self.resources = None;
767 }
768}