Skip to main content

timely/
worker.rs

1//! The root of each single-threaded worker.
2
3use std::rc::Rc;
4use std::cell::{RefCell, RefMut};
5use std::any::Any;
6use std::str::FromStr;
7use std::time::{Instant, Duration};
8use std::collections::HashMap;
9use std::collections::hash_map::Entry;
10use std::sync::Arc;
11
12use crate::communication::{Allocator, Exchangeable, Push, Pull};
13use crate::communication::allocator::thread::{ThreadPusher, ThreadPuller};
14use crate::scheduling::{Schedule, Activations, Activator, SyncActivator};
15use crate::progress::timestamp::{Refines};
16use crate::progress::SubgraphBuilder;
17use crate::progress::operate::Operate;
18use crate::dataflow::scope::Scope;
19use crate::logging::TimelyLogger;
20
21/// Different ways in which timely's progress tracking can work.
22///
23/// These options drive some buffering and accumulation that timely
24/// can do to try and trade volume of progress traffic against latency.
25/// By accumulating updates longer, a smaller total volume of messages
26/// are sent.
27///
28/// The `ProgressMode::Demand` variant is the most robust, and least
29/// likely to lead to catastrophic performance. The `Eager` variant
30/// is useful for getting the smallest latencies on systems with few
31/// workers, but does risk saturating the system with progress messages
32/// and should be used with care, or not at all.
33///
34/// If you are not certain which option to use, prefer `Demand`, and
35/// perhaps monitor the progress messages through timely's logging
36/// infrastructure to see if their volume is surprisingly high.
37#[derive(Debug, Default, Clone, Copy, Eq, PartialEq)]
38pub enum ProgressMode {
39    /// Eagerly transmit all progress updates produced by a worker.
40    ///
41    /// Progress messages are transmitted without consideration for the
42    /// possibility that they may unblock other workers. This can result
43    /// in a substantial volume of messages that do not result in a
44    /// change to the lower bound of outstanding work.
45    Eager,
46    /// Delay transmission of progress updates until any could advance
47    /// the global frontier of timestamps.
48    ///
49    /// As timely executes, the progress messages inform each worker of
50    /// the outstanding work remaining in the system. As workers work,
51    /// they produce changes to this outstanding work. This option
52    /// delays the communication of those changes until they might
53    /// possibly cause a change in the lower bound of all outstanding
54    /// work.
55    ///
56    /// The most common case this remedies is when one worker transmits
57    /// messages to other workers, that worker holds a capability for the
58    /// operator and timestamp. Other workers will receive messages, and
59    /// with this option will not immediately acknowledge receiving the
60    /// messages, because the held capability is strictly prior to what
61    /// the messages can affect. Once the capability is released, the
62    /// progress messages are unblocked and transmitted, in accumulated
63    /// form.
64    #[default]
65    Demand,
66}
67
68impl FromStr for ProgressMode {
69    type Err = String;
70
71    fn from_str(s: &str) -> Result<ProgressMode, String> {
72        match s {
73            "eager" => Ok(ProgressMode::Eager),
74            "demand" => Ok(ProgressMode::Demand),
75            _ => Err(format!("unknown progress mode: {}", s)),
76        }
77    }
78}
79
80/// Worker configuration.
81#[derive(Debug, Default, Clone)]
82pub struct Config {
83    /// The progress mode to use.
84    pub(crate) progress_mode: ProgressMode,
85    /// A map from parameter name to typed parameter values.
86    registry: HashMap<String, Arc<dyn Any + Send + Sync>>,
87}
88
89impl Config {
90    /// Installs options into a [getopts::Options] struct that correspond
91    /// to the parameters in the configuration.
92    ///
93    /// It is the caller's responsibility to ensure that the installed options
94    /// do not conflict with any other options that may exist in `opts`, or
95    /// that may be installed into `opts` in the future.
96    ///
97    /// This method is only available if the `getopts` feature is enabled, which
98    /// it is by default.
99    #[cfg(feature = "getopts")]
100    pub fn install_options(opts: &mut getopts::Options) {
101        opts.optopt("", "progress-mode", "progress tracking mode (eager or demand)", "MODE");
102    }
103
104    /// Instantiates a configuration based upon the parsed options in `matches`.
105    ///
106    /// The `matches` object must have been constructed from a
107    /// [getopts::Options] which contained at least the options installed by
108    /// [Self::install_options].
109    ///
110    /// This method is only available if the `getopts` feature is enabled, which
111    /// it is by default.
112    #[cfg(feature = "getopts")]
113    pub fn from_matches(matches: &getopts::Matches) -> Result<Config, String> {
114        let progress_mode = matches
115            .opt_get_default("progress-mode", ProgressMode::Demand)?;
116        Ok(Config::default().progress_mode(progress_mode))
117    }
118
119    /// Sets the progress mode to `progress_mode`.
120    pub fn progress_mode(mut self, progress_mode: ProgressMode) -> Self {
121        self.progress_mode = progress_mode;
122        self
123    }
124
125    /// Sets a typed configuration parameter for the given `key`.
126    ///
127    /// It is recommended to install a single configuration struct using a key
128    /// that uniquely identifies your project, to avoid clashes. For example,
129    /// differential dataflow registers a configuration struct under the key
130    /// "differential".
131    ///
132    /// # Examples
133    /// ```rust
134    /// let mut config = timely::Config::process(3);
135    /// config.worker.set("example".to_string(), 7u64);
136    /// timely::execute(config, |worker| {
137    ///    assert_eq!(worker.config().get::<u64>("example"), Some(&7));
138    /// }).unwrap();
139    /// ```
140    pub fn set<T>(&mut self, key: String, val: T) -> &mut Self
141    where
142        T: Send + Sync + 'static,
143    {
144        self.registry.insert(key, Arc::new(val));
145        self
146    }
147
148    /// Gets the value for configured parameter `key`.
149    ///
150    /// Returns `None` if `key` has not previously been set with
151    /// [Config::set], or if the specified `T` does not match the `T`
152    /// from the call to `set`.
153    ///
154    /// # Examples
155    /// ```rust
156    /// let mut config = timely::Config::process(3);
157    /// config.worker.set("example".to_string(), 7u64);
158    /// timely::execute(config, |worker| {
159    ///    assert_eq!(worker.config().get::<u64>("example"), Some(&7));
160    /// }).unwrap();
161    /// ```
162    pub fn get<T: 'static>(&self, key: &str) -> Option<&T> {
163        self.registry.get(key).and_then(|val| val.downcast_ref())
164    }
165}
166
167
168/// A `Worker` is the entry point to a timely dataflow computation. It wraps an `Allocator`
169/// and has a list of dataflows that it manages.
170pub struct Worker {
171    config: Config,
172    /// An optional instant from which the start of the computation should be reckoned.
173    ///
174    /// If this is set to none, system time-based functionality will be unavailable or work badly.
175    /// For example, logging will be unavailable, and activation after a delay will be unavailable.
176    timer: Option<Instant>,
177    paths: Rc<RefCell<HashMap<usize, Rc<[usize]>>>>,
178    allocator: Rc<RefCell<Allocator>>,
179    identifiers: Rc<RefCell<usize>>,
180    // dataflows: Rc<RefCell<Vec<Wrapper>>>,
181    dataflows: Rc<RefCell<HashMap<usize, Wrapper>>>,
182    dataflow_counter: Rc<RefCell<usize>>,
183    logging: Option<Rc<RefCell<crate::logging_core::Registry>>>,
184
185    activations: Rc<RefCell<Activations>>,
186    active_dataflows: Vec<usize>,
187
188    // Temporary storage for channel identifiers during dataflow construction.
189    // These are then associated with a dataflow once constructed.
190    temp_channel_ids: Rc<RefCell<Vec<usize>>>,
191}
192
193
194
195impl Worker {
196    /// Allocates a new `Worker` bound to a channel allocator.
197    pub fn new(config: Config, c: Allocator, now: Option<std::time::Instant>) -> Worker {
198        Worker {
199            config,
200            timer: now,
201            paths:  Default::default(),
202            allocator: Rc::new(RefCell::new(c)),
203            identifiers:  Default::default(),
204            dataflows: Default::default(),
205            dataflow_counter:  Default::default(),
206            logging: now.map(|now| Rc::new(RefCell::new(crate::logging_core::Registry::new(now)))),
207            activations: Rc::new(RefCell::new(Activations::new(now))),
208            active_dataflows: Default::default(),
209            temp_channel_ids:  Default::default(),
210        }
211    }
212
213    /// Performs one step of the computation.
214    ///
215    /// A step gives each dataflow operator a chance to run, and is the
216    /// main way to ensure that a computation proceeds.
217    ///
218    /// # Examples
219    ///
220    /// ```
221    /// timely::execute_from_args(::std::env::args(), |worker| {
222    ///
223    ///     use timely::dataflow::operators::{ToStream, Inspect};
224    ///
225    ///     worker.dataflow::<usize,_,_>(|scope| {
226    ///         (0 .. 10)
227    ///             .to_stream(scope)
228    ///             .container::<Vec<_>>()
229    ///             .inspect(|x| println!("{:?}", x));
230    ///     });
231    ///
232    ///     worker.step();
233    /// });
234    /// ```
235    pub fn step(&mut self) -> bool {
236        self.step_or_park(Some(Duration::from_secs(0)))
237    }
238
239    /// Performs one step of the computation.
240    ///
241    /// A step gives each dataflow operator a chance to run, and is the
242    /// main way to ensure that a computation proceeds.
243    ///
244    /// This method takes an optional timeout and may park the thread until
245    /// there is work to perform or until this timeout expires. A value of
246    /// `None` allows the worker to park indefinitely, whereas a value of
247    /// `Some(Duration::new(0, 0))` will return without parking the thread.
248    ///
249    /// # Examples
250    ///
251    /// ```
252    /// timely::execute_from_args(::std::env::args(), |worker| {
253    ///
254    ///     use std::time::Duration;
255    ///     use timely::dataflow::operators::{ToStream, Inspect};
256    ///
257    ///     worker.dataflow::<usize,_,_>(|scope| {
258    ///         (0 .. 10)
259    ///             .to_stream(scope)
260    ///             .container::<Vec<_>>()
261    ///             .inspect(|x| println!("{:?}", x));
262    ///     });
263    ///
264    ///     worker.step_or_park(Some(Duration::from_secs(1)));
265    /// });
266    /// ```
267    pub fn step_or_park(&mut self, duration: Option<Duration>) -> bool {
268
269        {   // Process channel events. Activate responders.
270            let mut allocator = self.allocator.borrow_mut();
271            allocator.receive();
272            let events = allocator.events();
273            let mut borrow = events.borrow_mut();
274            let paths = self.paths.borrow();
275            borrow.sort_unstable();
276            borrow.dedup();
277            for channel in borrow.drain(..) {
278                // Consider tracking whether a channel
279                // in non-empty, and only activating
280                // on the basis of non-empty channels.
281                // TODO: This is a sloppy way to deal
282                // with channels that may not be alloc'd.
283                if let Some(path) = paths.get(&channel) {
284                    self.activations
285                        .borrow_mut()
286                        .activate(&path[..]);
287                }
288            }
289        }
290
291        // Organize activations.
292        self.activations
293            .borrow_mut()
294            .advance();
295
296        // Consider parking only if we have no pending events, some dataflows, and a non-zero duration.
297        let empty_for = self.activations.borrow().empty_for();
298        // Determine the minimum park duration, where `None` are an absence of a constraint.
299        let delay = match (duration, empty_for) {
300            (Some(x), Some(y)) => Some(std::cmp::min(x,y)),
301            (x, y) => x.or(y),
302        };
303
304        if delay != Some(Duration::new(0,0)) {
305
306            // Log parking and flush log.
307            if let Some(l) = self.logging().as_mut() {
308                l.log(crate::logging::ParkEvent::park(delay));
309                l.flush();
310            }
311
312            self.allocator
313                .borrow()
314                .await_events(delay);
315
316            // Log return from unpark.
317            self.logging().as_mut().map(|l| l.log(crate::logging::ParkEvent::unpark()));
318        }
319        else {   // Schedule active dataflows.
320
321            let active_dataflows = &mut self.active_dataflows;
322            self.activations
323                .borrow_mut()
324                .for_extensions(&[], |index| active_dataflows.push(index));
325
326            let mut dataflows = self.dataflows.borrow_mut();
327            for index in active_dataflows.drain(..) {
328                // Step dataflow if it exists, remove if not incomplete.
329                if let Entry::Occupied(mut entry) = dataflows.entry(index) {
330                    // TODO: This is a moment at which a scheduling decision is being made.
331                    let incomplete = entry.get_mut().step();
332                    if !incomplete {
333                        let mut paths = self.paths.borrow_mut();
334                        for channel in entry.get_mut().channel_ids.drain(..) {
335                            paths.remove(&channel);
336                        }
337                        entry.remove_entry();
338                    }
339                }
340            }
341        }
342
343        // Clean up, indicate if dataflows remain.
344        self.logging.as_ref().map(|l| l.borrow_mut().flush());
345        self.allocator.borrow_mut().release();
346        !self.dataflows.borrow().is_empty()
347    }
348
349    /// Calls `self.step()` as long as `func` evaluates to `true`.
350    ///
351    /// This method will continually execute even if there is not work
352    /// for the worker to perform. Consider using the similar method
353    /// `Self::step_or_park_while(duration)` to allow the worker to yield
354    /// control if that is appropriate.
355    ///
356    /// # Examples
357    ///
358    /// ```
359    /// timely::execute_from_args(::std::env::args(), |worker| {
360    ///
361    ///     use timely::dataflow::operators::{ToStream, Inspect, Probe};
362    ///
363    ///     let probe =
364    ///     worker.dataflow::<usize,_,_>(|scope| {
365    ///         (0 .. 10)
366    ///             .to_stream(scope)
367    ///             .container::<Vec<_>>()
368    ///             .inspect(|x| println!("{:?}", x))
369    ///             .probe()
370    ///             .0
371    ///     });
372    ///
373    ///     worker.step_while(|| probe.less_than(&0));
374    /// });
375    /// ```
376    pub fn step_while<F: FnMut()->bool>(&mut self, func: F) {
377        self.step_or_park_while(Some(Duration::from_secs(0)), func)
378    }
379
380    /// Calls `self.step_or_park(duration)` as long as `func` evaluates to `true`.
381    ///
382    /// This method may yield whenever there is no work to perform, as performed
383    /// by `Self::step_or_park()`. Please consult the documentation for further
384    /// information about that method and its behavior. In particular, the method
385    /// can park the worker indefinitely, if no new work re-awakens the worker.
386    ///
387    /// # Examples
388    ///
389    /// ```
390    /// timely::execute_from_args(::std::env::args(), |worker| {
391    ///
392    ///     use timely::dataflow::operators::{ToStream, Inspect, Probe};
393    ///
394    ///     let probe =
395    ///     worker.dataflow::<usize,_,_>(|scope| {
396    ///         (0 .. 10)
397    ///             .to_stream(scope)
398    ///             .container::<Vec<_>>()
399    ///             .inspect(|x| println!("{:?}", x))
400    ///             .probe()
401    ///             .0
402    ///     });
403    ///
404    ///     worker.step_or_park_while(None, || probe.less_than(&0));
405    /// });
406    /// ```
407    pub fn step_or_park_while<F: FnMut()->bool>(&mut self, duration: Option<Duration>, mut func: F) {
408        while func() { self.step_or_park(duration); }
409    }
410
411    /// The index of the worker out of its peers.
412    ///
413    /// # Examples
414    /// ```
415    /// timely::execute_from_args(::std::env::args(), |worker| {
416    ///
417    ///     let index = worker.index();
418    ///     let peers = worker.peers();
419    ///     let timer = worker.timer().unwrap();
420    ///
421    ///     println!("{:?}\tWorker {} of {}", timer.elapsed(), index, peers);
422    ///
423    /// });
424    /// ```
425    pub fn index(&self) -> usize { self.allocator.borrow().index() }
426    /// The total number of peer workers.
427    ///
428    /// # Examples
429    /// ```
430    /// timely::execute_from_args(::std::env::args(), |worker| {
431    ///
432    ///     let index = worker.index();
433    ///     let peers = worker.peers();
434    ///     let timer = worker.timer().unwrap();
435    ///
436    ///     println!("{:?}\tWorker {} of {}", timer.elapsed(), index, peers);
437    ///
438    /// });
439    /// ```
440    pub fn peers(&self) -> usize { self.allocator.borrow().peers() }
441
442    /// A timer started at the initiation of the timely computation.
443    ///
444    /// # Examples
445    /// ```
446    /// timely::execute_from_args(::std::env::args(), |worker| {
447    ///
448    ///     let index = worker.index();
449    ///     let peers = worker.peers();
450    ///     let timer = worker.timer().unwrap();
451    ///
452    ///     println!("{:?}\tWorker {} of {}", timer.elapsed(), index, peers);
453    ///
454    /// });
455    /// ```
456    pub fn timer(&self) -> Option<Instant> { self.timer }
457
458    /// Allocate a new worker-unique identifier.
459    ///
460    /// This method is public, though it is not expected to be widely used outside
461    /// of the timely dataflow system.
462    pub fn new_identifier(&self) -> usize {
463        *self.identifiers.borrow_mut() += 1;
464        *self.identifiers.borrow() - 1
465    }
466
467    /// The next worker-unique identifier to be allocated.
468    pub fn peek_identifier(&self) -> usize {
469        *self.identifiers.borrow()
470    }
471
472    /// Access to named loggers.
473    ///
474    /// # Examples
475    ///
476    /// ```
477    /// timely::execute_from_args(::std::env::args(), |worker| {
478    ///
479    ///     worker.log_register()
480    ///           .unwrap()
481    ///           .insert::<timely::logging::TimelyEventBuilder,_>("timely", |time, data|
482    ///               println!("{:?}\t{:?}", time, data)
483    ///           );
484    /// });
485    /// ```
486    pub fn log_register(&self) -> Option<RefMut<'_, crate::logging_core::Registry>> {
487        self.logging.as_ref().map(|l| l.borrow_mut())
488    }
489
490    /// Returns the worker configuration parameters.
491    pub fn config(&self) -> &Config { &self.config }
492
493    /// Provides a shared handle to the activation scheduler.
494    pub fn activations(&self) -> Rc<RefCell<Activations>> {
495        Rc::clone(&self.activations)
496    }
497
498    /// Constructs an `Activator` tied to the specified operator address.
499    pub fn activator_for(&self, path: Rc<[usize]>) -> Activator {
500        Activator::new(path, self.activations())
501    }
502
503    /// Constructs a `SyncActivator` tied to the specified operator address.
504    pub fn sync_activator_for(&self, path: Vec<usize>) -> SyncActivator {
505        SyncActivator::new(path, self.activations().borrow().sync())
506    }
507
508    /// Acquires a logger by name, if the log register exists and the name is registered.
509    pub fn logger_for<CB: crate::ContainerBuilder>(&self, name: &str) -> Option<timely_logging::Logger<CB>> {
510        self.log_register().and_then(|l| l.get(name))
511    }
512
513    /// Provides access to the timely logging stream.
514    pub fn logging(&self) -> Option<crate::logging::TimelyLogger> { self.logger_for("timely").map(Into::into) }
515
516    /// Allocates a new channel from a supplied identifier and address.
517    pub fn allocate<D: Exchangeable>(&self, identifier: usize, address: Rc<[usize]>) -> (Vec<Box<dyn Push<D>>>, Box<dyn Pull<D>>) {
518        if address.is_empty() { panic!("Unacceptable address: Length zero"); }
519        let mut paths = self.paths.borrow_mut();
520        paths.insert(identifier, address);
521        self.temp_channel_ids.borrow_mut().push(identifier);
522        self.allocator.borrow_mut().allocate(identifier)
523    }
524
525    /// Constructs a pipeline channel from the worker to itself.
526    pub fn pipeline<T: 'static>(&self, identifier: usize, address: Rc<[usize]>) -> (ThreadPusher<T>, ThreadPuller<T>) {
527        if address.is_empty() { panic!("Unacceptable address: Length zero"); }
528        let mut paths = self.paths.borrow_mut();
529        paths.insert(identifier, address);
530        self.temp_channel_ids.borrow_mut().push(identifier);
531        self.allocator.borrow_mut().pipeline(identifier)
532    }
533
534    /// Allocates a broadcast channel, where each pushed message is received by all.
535    pub fn broadcast<T: Exchangeable + Clone>(&self, identifier: usize, address: Rc<[usize]>) -> (Box<dyn Push<T>>, Box<dyn Pull<T>>) {
536        if address.is_empty() { panic!("Unacceptable address: Length zero"); }
537        let mut paths = self.paths.borrow_mut();
538        paths.insert(identifier, address);
539        self.temp_channel_ids.borrow_mut().push(identifier);
540        self.allocator.borrow_mut().broadcast(identifier)
541    }
542
543    /// Construct a new dataflow.
544    ///
545    /// # Examples
546    /// ```
547    /// timely::execute_from_args(::std::env::args(), |worker| {
548    ///
549    ///     // We must supply the timestamp type here, although
550    ///     // it would generally be determined by type inference.
551    ///     worker.dataflow::<usize,_,_>(|scope| {
552    ///
553    ///         // uses of `scope` to build dataflow
554    ///
555    ///     });
556    /// });
557    /// ```
558    pub fn dataflow<T, R, F>(&mut self, func: F) -> R
559    where
560        T: Refines<()>,
561        F: FnOnce(Scope<T>)->R,
562    {
563        self.dataflow_core("Dataflow", self.logging(), Box::new(()), |_, child| func(child))
564    }
565
566    /// Construct a new dataflow with a (purely cosmetic) name.
567    ///
568    /// # Examples
569    /// ```
570    /// timely::execute_from_args(::std::env::args(), |worker| {
571    ///
572    ///     // We must supply the timestamp type here, although
573    ///     // it would generally be determined by type inference.
574    ///     worker.dataflow_named::<usize,_,_>("Some Dataflow", |scope| {
575    ///
576    ///         // uses of `scope` to build dataflow
577    ///
578    ///     });
579    /// });
580    /// ```
581    pub fn dataflow_named<T, R, F>(&mut self, name: &str, func: F) -> R
582    where
583        T: Refines<()>,
584        F: FnOnce(Scope<T>)->R,
585    {
586        self.dataflow_core(name, self.logging(), Box::new(()), |_, child| func(child))
587    }
588
589    /// Construct a new dataflow with specific configurations.
590    ///
591    /// This method constructs a new dataflow, using a name, logger, and additional
592    /// resources specified as argument. The name is cosmetic, the logger is used to
593    /// handle events generated by the dataflow, and the additional resources are kept
594    /// alive for as long as the dataflow is alive (use case: shared library bindings).
595    ///
596    /// # Examples
597    /// ```
598    /// timely::execute_from_args(::std::env::args(), |worker| {
599    ///
600    ///     // We must supply the timestamp type here, although
601    ///     // it would generally be determined by type inference.
602    ///     worker.dataflow_core::<usize,_,_,_>(
603    ///         "dataflow X",           // Dataflow name
604    ///         None,                   // Optional logger
605    ///         37,                     // Any resources
606    ///         |resources, scope| {    // Closure
607    ///
608    ///             // uses of `resources`, `scope`to build dataflow
609    ///
610    ///         }
611    ///     );
612    /// });
613    /// ```
614    pub fn dataflow_core<T, R, F, V>(&mut self, name: &str, mut logging: Option<TimelyLogger>, mut resources: V, func: F) -> R
615    where
616        T: Refines<()>,
617        F: FnOnce(&mut V, Scope<T>)->R,
618        V: Any+'static,
619    {
620        let dataflow_index = self.allocate_dataflow_index();
621        let addr = vec![dataflow_index].into();
622        let identifier = self.new_identifier();
623
624        let subscope = SubgraphBuilder::new_from(addr, identifier, name);
625        let subscope = RefCell::new(subscope);
626
627        let result = {
628            let builder = Scope {
629                subgraph: &subscope,
630                worker: self,
631            };
632            func(&mut resources, builder)
633        };
634
635        let operator = subscope.into_inner().build(self);
636
637        if let Some(l) = logging.as_mut() {
638            l.log(crate::logging::OperatesEvent {
639                id: identifier,
640                addr: operator.path().to_vec(),
641                name: operator.name().to_string(),
642            });
643            l.flush();
644        }
645
646        let (_, _, operator) = Box::new(operator).initialize();
647
648        let mut temp_channel_ids = self.temp_channel_ids.borrow_mut();
649        let channel_ids = temp_channel_ids.drain(..).collect::<Vec<_>>();
650
651        let wrapper = Wrapper {
652            logging,
653            identifier,
654            operate: Some(operator),
655            resources: Some(Box::new(resources)),
656            channel_ids,
657        };
658        self.dataflows.borrow_mut().insert(dataflow_index, wrapper);
659
660        result
661
662    }
663
664    /// Drops an identified dataflow.
665    ///
666    /// This method removes the identified dataflow, which will no longer be scheduled.
667    /// Various other resources will be cleaned up, though the method is currently in
668    /// public beta rather than expected to work. Please report all crashes and unmet
669    /// expectations!
670    pub fn drop_dataflow(&mut self, dataflow_identifier: usize) {
671        if let Some(mut entry) = self.dataflows.borrow_mut().remove(&dataflow_identifier) {
672            // Garbage collect channel_id to path information.
673            let mut paths = self.paths.borrow_mut();
674            for channel in entry.channel_ids.drain(..) {
675                paths.remove(&channel);
676            }
677        }
678    }
679
680    /// Returns the next index to be used for dataflow construction.
681    ///
682    /// This identifier will appear in the address of contained operators, and can
683    /// be used to drop the dataflow using `self.drop_dataflow()`.
684    pub fn next_dataflow_index(&self) -> usize {
685        *self.dataflow_counter.borrow()
686    }
687
688    /// List the current dataflow indices.
689    pub fn installed_dataflows(&self) -> Vec<usize> {
690        self.dataflows.borrow().keys().cloned().collect()
691    }
692
693    /// Returns `true` if there is at least one dataflow under management.
694    pub fn has_dataflows(&self) -> bool {
695        !self.dataflows.borrow().is_empty()
696    }
697
698    // Acquire a new distinct dataflow identifier.
699    fn allocate_dataflow_index(&self) -> usize {
700        *self.dataflow_counter.borrow_mut() += 1;
701        *self.dataflow_counter.borrow() - 1
702    }
703}
704
705impl Clone for Worker {
706    fn clone(&self) -> Self {
707        Worker {
708            config: self.config.clone(),
709            timer: self.timer,
710            paths: Rc::clone(&self.paths),
711            allocator: Rc::clone(&self.allocator),
712            identifiers: Rc::clone(&self.identifiers),
713            dataflows: Rc::clone(&self.dataflows),
714            dataflow_counter: Rc::clone(&self.dataflow_counter),
715            logging: self.logging.clone(),
716            activations: Rc::clone(&self.activations),
717            active_dataflows: Vec::new(),
718            temp_channel_ids: Rc::clone(&self.temp_channel_ids),
719        }
720    }
721}
722
723struct Wrapper {
724    logging: Option<TimelyLogger>,
725    identifier: usize,
726    operate: Option<Box<dyn Schedule>>,
727    resources: Option<Box<dyn Any>>,
728    channel_ids: Vec<usize>,
729}
730
731impl Wrapper {
732    /// Steps the dataflow, indicates if it remains incomplete.
733    ///
734    /// If the dataflow is incomplete, this call will drop it and its resources,
735    /// dropping the dataflow first and then the resources (so that, e.g., shared
736    /// library bindings will outlive the dataflow).
737    fn step(&mut self) -> bool {
738
739        // Perhaps log information about the start of the schedule call.
740        if let Some(l) = self.logging.as_mut() {
741            l.log(crate::logging::ScheduleEvent::start(self.identifier));
742        }
743
744        let incomplete = self.operate.as_mut().map(|op| op.schedule()).unwrap_or(false);
745        if !incomplete {
746            self.operate = None;
747            self.resources = None;
748        }
749
750        // Perhaps log information about the stop of the schedule call.
751        if let Some(l) = self.logging.as_mut() {
752            l.log(crate::logging::ScheduleEvent::stop(self.identifier));
753        }
754
755        incomplete
756    }
757}
758
759impl Drop for Wrapper {
760    fn drop(&mut self) {
761        if let Some(l) = self.logging.as_mut() {
762            l.log(crate::logging::ShutdownEvent { id: self.identifier });
763        }
764        // ensure drop order
765        self.operate = None;
766        self.resources = None;
767    }
768}