Skip to main content

timely/
worker.rs

1//! The root of each single-threaded worker.
2
3use std::rc::Rc;
4use std::cell::{RefCell, RefMut};
5use std::any::Any;
6use std::str::FromStr;
7use std::time::{Instant, Duration};
8use std::collections::HashMap;
9use std::collections::hash_map::Entry;
10use std::sync::Arc;
11
12use crate::communication::{Allocate, Exchangeable, Push, Pull};
13use crate::communication::allocator::thread::{ThreadPusher, ThreadPuller};
14use crate::scheduling::{Schedule, Scheduler, Activations};
15use crate::progress::timestamp::{Refines};
16use crate::progress::SubgraphBuilder;
17use crate::progress::operate::Operate;
18use crate::dataflow::scopes::Child;
19use crate::logging::TimelyLogger;
20
21/// Different ways in which timely's progress tracking can work.
22///
23/// These options drive some buffering and accumulation that timely
24/// can do to try and trade volume of progress traffic against latency.
25/// By accumulating updates longer, a smaller total volume of messages
26/// are sent.
27///
28/// The `ProgressMode::Demand` variant is the most robust, and least
29/// likely to lead to catastrophic performance. The `Eager` variant
30/// is useful for getting the smallest latencies on systems with few
31/// workers, but does risk saturating the system with progress messages
32/// and should be used with care, or not at all.
33///
34/// If you are not certain which option to use, prefer `Demand`, and
35/// perhaps monitor the progress messages through timely's logging
36/// infrastructure to see if their volume is surprisingly high.
37#[derive(Debug, Default, Clone, Copy, Eq, PartialEq)]
38pub enum ProgressMode {
39    /// Eagerly transmit all progress updates produced by a worker.
40    ///
41    /// Progress messages are transmitted without consideration for the
42    /// possibility that they may unblock other workers. This can result
43    /// in a substantial volume of messages that do not result in a
44    /// change to the lower bound of outstanding work.
45    Eager,
46    /// Delay transmission of progress updates until any could advance
47    /// the global frontier of timestamps.
48    ///
49    /// As timely executes, the progress messages inform each worker of
50    /// the outstanding work remaining in the system. As workers work,
51    /// they produce changes to this outstanding work. This option
52    /// delays the communication of those changes until they might
53    /// possibly cause a change in the lower bound of all outstanding
54    /// work.
55    ///
56    /// The most common case this remedies is when one worker transmits
57    /// messages to other workers, that worker holds a capability for the
58    /// operator and timestamp. Other workers will receive messages, and
59    /// with this option will not immediately acknowledge receiving the
60    /// messages, because the held capability is strictly prior to what
61    /// the messages can affect. Once the capability is released, the
62    /// progress messages are unblocked and transmitted, in accumulated
63    /// form.
64    #[default]
65    Demand,
66}
67
68impl FromStr for ProgressMode {
69    type Err = String;
70
71    fn from_str(s: &str) -> Result<ProgressMode, String> {
72        match s {
73            "eager" => Ok(ProgressMode::Eager),
74            "demand" => Ok(ProgressMode::Demand),
75            _ => Err(format!("unknown progress mode: {}", s)),
76        }
77    }
78}
79
80/// Worker configuration.
81#[derive(Debug, Default, Clone)]
82pub struct Config {
83    /// The progress mode to use.
84    pub(crate) progress_mode: ProgressMode,
85    /// A map from parameter name to typed parameter values.
86    registry: HashMap<String, Arc<dyn Any + Send + Sync>>,
87}
88
89impl Config {
90    /// Installs options into a [getopts::Options] struct that correspond
91    /// to the parameters in the configuration.
92    ///
93    /// It is the caller's responsibility to ensure that the installed options
94    /// do not conflict with any other options that may exist in `opts`, or
95    /// that may be installed into `opts` in the future.
96    ///
97    /// This method is only available if the `getopts` feature is enabled, which
98    /// it is by default.
99    #[cfg(feature = "getopts")]
100    pub fn install_options(opts: &mut getopts::Options) {
101        opts.optopt("", "progress-mode", "progress tracking mode (eager or demand)", "MODE");
102    }
103
104    /// Instantiates a configuration based upon the parsed options in `matches`.
105    ///
106    /// The `matches` object must have been constructed from a
107    /// [getopts::Options] which contained at least the options installed by
108    /// [Self::install_options].
109    ///
110    /// This method is only available if the `getopts` feature is enabled, which
111    /// it is by default.
112    #[cfg(feature = "getopts")]
113    pub fn from_matches(matches: &getopts::Matches) -> Result<Config, String> {
114        let progress_mode = matches
115            .opt_get_default("progress-mode", ProgressMode::Eager)?;
116        Ok(Config::default().progress_mode(progress_mode))
117    }
118
119    /// Sets the progress mode to `progress_mode`.
120    pub fn progress_mode(mut self, progress_mode: ProgressMode) -> Self {
121        self.progress_mode = progress_mode;
122        self
123    }
124
125    /// Sets a typed configuration parameter for the given `key`.
126    ///
127    /// It is recommended to install a single configuration struct using a key
128    /// that uniquely identifies your project, to avoid clashes. For example,
129    /// differential dataflow registers a configuration struct under the key
130    /// "differential".
131    ///
132    /// # Examples
133    /// ```rust
134    /// let mut config = timely::Config::process(3);
135    /// config.worker.set("example".to_string(), 7u64);
136    /// timely::execute(config, |worker| {
137    ///    use crate::timely::worker::AsWorker;
138    ///    assert_eq!(worker.config().get::<u64>("example"), Some(&7));
139    /// }).unwrap();
140    /// ```
141    pub fn set<T>(&mut self, key: String, val: T) -> &mut Self
142    where
143        T: Send + Sync + 'static,
144    {
145        self.registry.insert(key, Arc::new(val));
146        self
147    }
148
149    /// Gets the value for configured parameter `key`.
150    ///
151    /// Returns `None` if `key` has not previously been set with
152    /// [Config::set], or if the specified `T` does not match the `T`
153    /// from the call to `set`.
154    ///
155    /// # Examples
156    /// ```rust
157    /// let mut config = timely::Config::process(3);
158    /// config.worker.set("example".to_string(), 7u64);
159    /// timely::execute(config, |worker| {
160    ///    use crate::timely::worker::AsWorker;
161    ///    assert_eq!(worker.config().get::<u64>("example"), Some(&7));
162    /// }).unwrap();
163    /// ```
164    pub fn get<T: 'static>(&self, key: &str) -> Option<&T> {
165        self.registry.get(key).and_then(|val| val.downcast_ref())
166    }
167}
168
169/// Methods provided by the root Worker.
170///
171/// These methods are often proxied by child scopes, and this trait provides access.
172pub trait AsWorker : Scheduler {
173    /// Returns the worker configuration parameters.
174    fn config(&self) -> &Config;
175    /// Index of the worker among its peers.
176    fn index(&self) -> usize;
177    /// Number of peer workers.
178    fn peers(&self) -> usize;
179    /// Allocates a new channel from a supplied identifier and address.
180    ///
181    /// The identifier is used to identify the underlying channel and route
182    /// its data. It should be distinct from other identifiers passed used
183    /// for allocation, but can otherwise be arbitrary.
184    ///
185    /// The address should specify a path to an operator that should be
186    /// scheduled in response to the receipt of records on the channel.
187    /// Most commonly, this would be the address of the *target* of the
188    /// channel.
189    fn allocate<T: Exchangeable>(&mut self, identifier: usize, address: Rc<[usize]>) -> (Vec<Box<dyn Push<T>>>, Box<dyn Pull<T>>);
190    /// Constructs a pipeline channel from the worker to itself.
191    ///
192    /// By default this method uses the native channel allocation mechanism, but the expectation is
193    /// that this behavior will be overridden to be more efficient.
194    fn pipeline<T: 'static>(&mut self, identifier: usize, address: Rc<[usize]>) -> (ThreadPusher<T>, ThreadPuller<T>);
195
196    /// Allocates a broadcast channel, where each pushed message is received by all.
197    fn broadcast<T: Exchangeable + Clone>(&mut self, identifier: usize, address: Rc<[usize]>) -> (Box<dyn Push<T>>, Box<dyn Pull<T>>);
198
199    /// Allocates a new worker-unique identifier.
200    fn new_identifier(&mut self) -> usize;
201    /// The next worker-unique identifier to be allocated.
202    fn peek_identifier(&self) -> usize;
203    /// Provides access to named logging streams.
204    fn log_register(&self) -> Option<RefMut<'_, crate::logging_core::Registry>>;
205    /// Acquires a logger by name, if the log register exists and the name is registered.
206    ///
207    /// For a more precise understanding of why a result is `None` one can use the direct functions.
208    fn logger_for<CB: crate::ContainerBuilder>(&self, name: &str) -> Option<timely_logging::Logger<CB>> {
209        self.log_register().and_then(|l| l.get(name))
210    }
211    /// Provides access to the timely logging stream.
212    fn logging(&self) -> Option<crate::logging::TimelyLogger> { self.logger_for("timely").map(Into::into) }
213}
214
215/// A `Worker` is the entry point to a timely dataflow computation. It wraps a `Allocate`,
216/// and has a list of dataflows that it manages.
217pub struct Worker<A: Allocate> {
218    config: Config,
219    /// An optional instant from which the start of the computation should be reckoned.
220    ///
221    /// If this is set to none, system time-based functionality will be unavailable or work badly.
222    /// For example, logging will be unavailable, and activation after a delay will be unavailable.
223    timer: Option<Instant>,
224    paths: Rc<RefCell<HashMap<usize, Rc<[usize]>>>>,
225    allocator: Rc<RefCell<A>>,
226    identifiers: Rc<RefCell<usize>>,
227    // dataflows: Rc<RefCell<Vec<Wrapper>>>,
228    dataflows: Rc<RefCell<HashMap<usize, Wrapper>>>,
229    dataflow_counter: Rc<RefCell<usize>>,
230    logging: Option<Rc<RefCell<crate::logging_core::Registry>>>,
231
232    activations: Rc<RefCell<Activations>>,
233    active_dataflows: Vec<usize>,
234
235    // Temporary storage for channel identifiers during dataflow construction.
236    // These are then associated with a dataflow once constructed.
237    temp_channel_ids: Rc<RefCell<Vec<usize>>>,
238}
239
240impl<A: Allocate> AsWorker for Worker<A> {
241    fn config(&self) -> &Config { &self.config }
242    fn index(&self) -> usize { self.allocator.borrow().index() }
243    fn peers(&self) -> usize { self.allocator.borrow().peers() }
244    fn allocate<D: Exchangeable>(&mut self, identifier: usize, address: Rc<[usize]>) -> (Vec<Box<dyn Push<D>>>, Box<dyn Pull<D>>) {
245        if address.is_empty() { panic!("Unacceptable address: Length zero"); }
246        let mut paths = self.paths.borrow_mut();
247        paths.insert(identifier, address);
248        self.temp_channel_ids.borrow_mut().push(identifier);
249        self.allocator.borrow_mut().allocate(identifier)
250    }
251    fn pipeline<T: 'static>(&mut self, identifier: usize, address: Rc<[usize]>) -> (ThreadPusher<T>, ThreadPuller<T>) {
252        if address.is_empty() { panic!("Unacceptable address: Length zero"); }
253        let mut paths = self.paths.borrow_mut();
254        paths.insert(identifier, address);
255        self.temp_channel_ids.borrow_mut().push(identifier);
256        self.allocator.borrow_mut().pipeline(identifier)
257    }
258    fn broadcast<T: Exchangeable + Clone>(&mut self, identifier: usize, address: Rc<[usize]>) -> (Box<dyn Push<T>>, Box<dyn Pull<T>>) {
259        if address.is_empty() { panic!("Unacceptable address: Length zero"); }
260        let mut paths = self.paths.borrow_mut();
261        paths.insert(identifier, address);
262        self.temp_channel_ids.borrow_mut().push(identifier);
263        self.allocator.borrow_mut().broadcast(identifier)
264    }
265
266    fn new_identifier(&mut self) -> usize { self.new_identifier() }
267    fn peek_identifier(&self) -> usize { self.peek_identifier() }
268    fn log_register(&self) -> Option<RefMut<'_, crate::logging_core::Registry>> {
269        self.log_register()
270    }
271}
272
273impl<A: Allocate> Scheduler for Worker<A> {
274    fn activations(&self) -> Rc<RefCell<Activations>> {
275        Rc::clone(&self.activations)
276    }
277}
278
279impl<A: Allocate> Worker<A> {
280    /// Allocates a new `Worker` bound to a channel allocator.
281    pub fn new(config: Config, c: A, now: Option<std::time::Instant>) -> Worker<A> {
282        Worker {
283            config,
284            timer: now,
285            paths:  Default::default(),
286            allocator: Rc::new(RefCell::new(c)),
287            identifiers:  Default::default(),
288            dataflows: Default::default(),
289            dataflow_counter:  Default::default(),
290            logging: now.map(|now| Rc::new(RefCell::new(crate::logging_core::Registry::new(now)))),
291            activations: Rc::new(RefCell::new(Activations::new(now))),
292            active_dataflows: Default::default(),
293            temp_channel_ids:  Default::default(),
294        }
295    }
296
297    /// Performs one step of the computation.
298    ///
299    /// A step gives each dataflow operator a chance to run, and is the
300    /// main way to ensure that a computation proceeds.
301    ///
302    /// # Examples
303    ///
304    /// ```
305    /// timely::execute_from_args(::std::env::args(), |worker| {
306    ///
307    ///     use timely::dataflow::operators::{ToStream, Inspect};
308    ///
309    ///     worker.dataflow::<usize,_,_>(|scope| {
310    ///         (0 .. 10)
311    ///             .to_stream(scope)
312    ///             .container::<Vec<_>>()
313    ///             .inspect(|x| println!("{:?}", x));
314    ///     });
315    ///
316    ///     worker.step();
317    /// });
318    /// ```
319    pub fn step(&mut self) -> bool {
320        self.step_or_park(Some(Duration::from_secs(0)))
321    }
322
323    /// Performs one step of the computation.
324    ///
325    /// A step gives each dataflow operator a chance to run, and is the
326    /// main way to ensure that a computation proceeds.
327    ///
328    /// This method takes an optional timeout and may park the thread until
329    /// there is work to perform or until this timeout expires. A value of
330    /// `None` allows the worker to park indefinitely, whereas a value of
331    /// `Some(Duration::new(0, 0))` will return without parking the thread.
332    ///
333    /// # Examples
334    ///
335    /// ```
336    /// timely::execute_from_args(::std::env::args(), |worker| {
337    ///
338    ///     use std::time::Duration;
339    ///     use timely::dataflow::operators::{ToStream, Inspect};
340    ///
341    ///     worker.dataflow::<usize,_,_>(|scope| {
342    ///         (0 .. 10)
343    ///             .to_stream(scope)
344    ///             .container::<Vec<_>>()
345    ///             .inspect(|x| println!("{:?}", x));
346    ///     });
347    ///
348    ///     worker.step_or_park(Some(Duration::from_secs(1)));
349    /// });
350    /// ```
351    pub fn step_or_park(&mut self, duration: Option<Duration>) -> bool {
352
353        {   // Process channel events. Activate responders.
354            let mut allocator = self.allocator.borrow_mut();
355            allocator.receive();
356            let events = allocator.events();
357            let mut borrow = events.borrow_mut();
358            let paths = self.paths.borrow();
359            borrow.sort_unstable();
360            borrow.dedup();
361            for channel in borrow.drain(..) {
362                // Consider tracking whether a channel
363                // in non-empty, and only activating
364                // on the basis of non-empty channels.
365                // TODO: This is a sloppy way to deal
366                // with channels that may not be alloc'd.
367                if let Some(path) = paths.get(&channel) {
368                    self.activations
369                        .borrow_mut()
370                        .activate(&path[..]);
371                }
372            }
373        }
374
375        // Organize activations.
376        self.activations
377            .borrow_mut()
378            .advance();
379
380        // Consider parking only if we have no pending events, some dataflows, and a non-zero duration.
381        let empty_for = self.activations.borrow().empty_for();
382        // Determine the minimum park duration, where `None` are an absence of a constraint.
383        let delay = match (duration, empty_for) {
384            (Some(x), Some(y)) => Some(std::cmp::min(x,y)),
385            (x, y) => x.or(y),
386        };
387
388        if delay != Some(Duration::new(0,0)) {
389
390            // Log parking and flush log.
391            if let Some(l) = self.logging().as_mut() {
392                l.log(crate::logging::ParkEvent::park(delay));
393                l.flush();
394            }
395
396            self.allocator
397                .borrow()
398                .await_events(delay);
399
400            // Log return from unpark.
401            self.logging().as_mut().map(|l| l.log(crate::logging::ParkEvent::unpark()));
402        }
403        else {   // Schedule active dataflows.
404
405            let active_dataflows = &mut self.active_dataflows;
406            self.activations
407                .borrow_mut()
408                .for_extensions(&[], |index| active_dataflows.push(index));
409
410            let mut dataflows = self.dataflows.borrow_mut();
411            for index in active_dataflows.drain(..) {
412                // Step dataflow if it exists, remove if not incomplete.
413                if let Entry::Occupied(mut entry) = dataflows.entry(index) {
414                    // TODO: This is a moment at which a scheduling decision is being made.
415                    let incomplete = entry.get_mut().step();
416                    if !incomplete {
417                        let mut paths = self.paths.borrow_mut();
418                        for channel in entry.get_mut().channel_ids.drain(..) {
419                            paths.remove(&channel);
420                        }
421                        entry.remove_entry();
422                    }
423                }
424            }
425        }
426
427        // Clean up, indicate if dataflows remain.
428        self.logging.as_ref().map(|l| l.borrow_mut().flush());
429        self.allocator.borrow_mut().release();
430        !self.dataflows.borrow().is_empty()
431    }
432
433    /// Calls `self.step()` as long as `func` evaluates to `true`.
434    ///
435    /// This method will continually execute even if there is not work
436    /// for the worker to perform. Consider using the similar method
437    /// `Self::step_or_park_while(duration)` to allow the worker to yield
438    /// control if that is appropriate.
439    ///
440    /// # Examples
441    ///
442    /// ```
443    /// timely::execute_from_args(::std::env::args(), |worker| {
444    ///
445    ///     use timely::dataflow::operators::{ToStream, Inspect, Probe};
446    ///
447    ///     let probe =
448    ///     worker.dataflow::<usize,_,_>(|scope| {
449    ///         (0 .. 10)
450    ///             .to_stream(scope)
451    ///             .container::<Vec<_>>()
452    ///             .inspect(|x| println!("{:?}", x))
453    ///             .probe()
454    ///             .0
455    ///     });
456    ///
457    ///     worker.step_while(|| probe.less_than(&0));
458    /// });
459    /// ```
460    pub fn step_while<F: FnMut()->bool>(&mut self, func: F) {
461        self.step_or_park_while(Some(Duration::from_secs(0)), func)
462    }
463
464    /// Calls `self.step_or_park(duration)` as long as `func` evaluates to `true`.
465    ///
466    /// This method may yield whenever there is no work to perform, as performed
467    /// by `Self::step_or_park()`. Please consult the documentation for further
468    /// information about that method and its behavior. In particular, the method
469    /// can park the worker indefinitely, if no new work re-awakens the worker.
470    ///
471    /// # Examples
472    ///
473    /// ```
474    /// timely::execute_from_args(::std::env::args(), |worker| {
475    ///
476    ///     use timely::dataflow::operators::{ToStream, Inspect, Probe};
477    ///
478    ///     let probe =
479    ///     worker.dataflow::<usize,_,_>(|scope| {
480    ///         (0 .. 10)
481    ///             .to_stream(scope)
482    ///             .container::<Vec<_>>()
483    ///             .inspect(|x| println!("{:?}", x))
484    ///             .probe()
485    ///             .0
486    ///     });
487    ///
488    ///     worker.step_or_park_while(None, || probe.less_than(&0));
489    /// });
490    /// ```
491    pub fn step_or_park_while<F: FnMut()->bool>(&mut self, duration: Option<Duration>, mut func: F) {
492        while func() { self.step_or_park(duration); }
493    }
494
495    /// The index of the worker out of its peers.
496    ///
497    /// # Examples
498    /// ```
499    /// timely::execute_from_args(::std::env::args(), |worker| {
500    ///
501    ///     let index = worker.index();
502    ///     let peers = worker.peers();
503    ///     let timer = worker.timer().unwrap();
504    ///
505    ///     println!("{:?}\tWorker {} of {}", timer.elapsed(), index, peers);
506    ///
507    /// });
508    /// ```
509    pub fn index(&self) -> usize { self.allocator.borrow().index() }
510    /// The total number of peer workers.
511    ///
512    /// # Examples
513    /// ```
514    /// timely::execute_from_args(::std::env::args(), |worker| {
515    ///
516    ///     let index = worker.index();
517    ///     let peers = worker.peers();
518    ///     let timer = worker.timer().unwrap();
519    ///
520    ///     println!("{:?}\tWorker {} of {}", timer.elapsed(), index, peers);
521    ///
522    /// });
523    /// ```
524    pub fn peers(&self) -> usize { self.allocator.borrow().peers() }
525
526    /// A timer started at the initiation of the timely computation.
527    ///
528    /// # Examples
529    /// ```
530    /// timely::execute_from_args(::std::env::args(), |worker| {
531    ///
532    ///     let index = worker.index();
533    ///     let peers = worker.peers();
534    ///     let timer = worker.timer().unwrap();
535    ///
536    ///     println!("{:?}\tWorker {} of {}", timer.elapsed(), index, peers);
537    ///
538    /// });
539    /// ```
540    pub fn timer(&self) -> Option<Instant> { self.timer }
541
542    /// Allocate a new worker-unique identifier.
543    ///
544    /// This method is public, though it is not expected to be widely used outside
545    /// of the timely dataflow system.
546    pub fn new_identifier(&mut self) -> usize {
547        *self.identifiers.borrow_mut() += 1;
548        *self.identifiers.borrow() - 1
549    }
550
551    /// The next worker-unique identifier to be allocated.
552    pub fn peek_identifier(&self) -> usize {
553        *self.identifiers.borrow()
554    }
555
556    /// Access to named loggers.
557    ///
558    /// # Examples
559    ///
560    /// ```
561    /// timely::execute_from_args(::std::env::args(), |worker| {
562    ///
563    ///     worker.log_register()
564    ///           .unwrap()
565    ///           .insert::<timely::logging::TimelyEventBuilder,_>("timely", |time, data|
566    ///               println!("{:?}\t{:?}", time, data)
567    ///           );
568    /// });
569    /// ```
570    pub fn log_register(&self) -> Option<RefMut<'_, crate::logging_core::Registry>> {
571        self.logging.as_ref().map(|l| l.borrow_mut())
572    }
573
574    /// Construct a new dataflow.
575    ///
576    /// # Examples
577    /// ```
578    /// timely::execute_from_args(::std::env::args(), |worker| {
579    ///
580    ///     // We must supply the timestamp type here, although
581    ///     // it would generally be determined by type inference.
582    ///     worker.dataflow::<usize,_,_>(|scope| {
583    ///
584    ///         // uses of `scope` to build dataflow
585    ///
586    ///     });
587    /// });
588    /// ```
589    pub fn dataflow<T, R, F>(&mut self, func: F) -> R
590    where
591        T: Refines<()>,
592        F: FnOnce(&mut Child<Self, T>)->R,
593    {
594        self.dataflow_core("Dataflow", self.logging(), Box::new(()), |_, child| func(child))
595    }
596
597    /// Construct a new dataflow with a (purely cosmetic) name.
598    ///
599    /// # Examples
600    /// ```
601    /// timely::execute_from_args(::std::env::args(), |worker| {
602    ///
603    ///     // We must supply the timestamp type here, although
604    ///     // it would generally be determined by type inference.
605    ///     worker.dataflow_named::<usize,_,_>("Some Dataflow", |scope| {
606    ///
607    ///         // uses of `scope` to build dataflow
608    ///
609    ///     });
610    /// });
611    /// ```
612    pub fn dataflow_named<T, R, F>(&mut self, name: &str, func: F) -> R
613    where
614        T: Refines<()>,
615        F: FnOnce(&mut Child<Self, T>)->R,
616    {
617        self.dataflow_core(name, self.logging(), Box::new(()), |_, child| func(child))
618    }
619
620    /// Construct a new dataflow with specific configurations.
621    ///
622    /// This method constructs a new dataflow, using a name, logger, and additional
623    /// resources specified as argument. The name is cosmetic, the logger is used to
624    /// handle events generated by the dataflow, and the additional resources are kept
625    /// alive for as long as the dataflow is alive (use case: shared library bindings).
626    ///
627    /// # Examples
628    /// ```
629    /// timely::execute_from_args(::std::env::args(), |worker| {
630    ///
631    ///     // We must supply the timestamp type here, although
632    ///     // it would generally be determined by type inference.
633    ///     worker.dataflow_core::<usize,_,_,_>(
634    ///         "dataflow X",           // Dataflow name
635    ///         None,                   // Optional logger
636    ///         37,                     // Any resources
637    ///         |resources, scope| {    // Closure
638    ///
639    ///             // uses of `resources`, `scope`to build dataflow
640    ///
641    ///         }
642    ///     );
643    /// });
644    /// ```
645    pub fn dataflow_core<T, R, F, V>(&mut self, name: &str, mut logging: Option<TimelyLogger>, mut resources: V, func: F) -> R
646    where
647        T: Refines<()>,
648        F: FnOnce(&mut V, &mut Child<Self, T>)->R,
649        V: Any+'static,
650    {
651        let dataflow_index = self.allocate_dataflow_index();
652        let addr = vec![dataflow_index].into();
653        let identifier = self.new_identifier();
654
655        let type_name = std::any::type_name::<T>();
656        let progress_logging = self.logger_for(&format!("timely/progress/{}", type_name));
657        let summary_logging  = self.logger_for(&format!("timely/summary/{}", type_name));
658        let subscope = SubgraphBuilder::new_from(addr, identifier, logging.clone(), summary_logging, name);
659        let subscope = RefCell::new(subscope);
660
661        let result = {
662            let mut builder = Child {
663                subgraph: &subscope,
664                parent: self.clone(),
665                logging: logging.clone(),
666                progress_logging,
667            };
668            func(&mut resources, &mut builder)
669        };
670
671        let mut operator = subscope.into_inner().build(self);
672
673        if let Some(l) = logging.as_mut() {
674            l.log(crate::logging::OperatesEvent {
675                id: identifier,
676                addr: operator.path().to_vec(),
677                name: operator.name().to_string(),
678            });
679            l.flush();
680        }
681
682        operator.get_internal_summary();
683        operator.set_external_summary();
684
685        let mut temp_channel_ids = self.temp_channel_ids.borrow_mut();
686        let channel_ids = temp_channel_ids.drain(..).collect::<Vec<_>>();
687
688        let wrapper = Wrapper {
689            logging,
690            identifier,
691            operate: Some(Box::new(operator)),
692            resources: Some(Box::new(resources)),
693            channel_ids,
694        };
695        self.dataflows.borrow_mut().insert(dataflow_index, wrapper);
696
697        result
698
699    }
700
701    /// Drops an identified dataflow.
702    ///
703    /// This method removes the identified dataflow, which will no longer be scheduled.
704    /// Various other resources will be cleaned up, though the method is currently in
705    /// public beta rather than expected to work. Please report all crashes and unmet
706    /// expectations!
707    pub fn drop_dataflow(&mut self, dataflow_identifier: usize) {
708        if let Some(mut entry) = self.dataflows.borrow_mut().remove(&dataflow_identifier) {
709            // Garbage collect channel_id to path information.
710            let mut paths = self.paths.borrow_mut();
711            for channel in entry.channel_ids.drain(..) {
712                paths.remove(&channel);
713            }
714        }
715    }
716
717    /// Returns the next index to be used for dataflow construction.
718    ///
719    /// This identifier will appear in the address of contained operators, and can
720    /// be used to drop the dataflow using `self.drop_dataflow()`.
721    pub fn next_dataflow_index(&self) -> usize {
722        *self.dataflow_counter.borrow()
723    }
724
725    /// List the current dataflow indices.
726    pub fn installed_dataflows(&self) -> Vec<usize> {
727        self.dataflows.borrow().keys().cloned().collect()
728    }
729
730    /// Returns `true` if there is at least one dataflow under management.
731    pub fn has_dataflows(&self) -> bool {
732        !self.dataflows.borrow().is_empty()
733    }
734
735    // Acquire a new distinct dataflow identifier.
736    fn allocate_dataflow_index(&self) -> usize {
737        *self.dataflow_counter.borrow_mut() += 1;
738        *self.dataflow_counter.borrow() - 1
739    }
740}
741
742impl<A: Allocate> Clone for Worker<A> {
743    fn clone(&self) -> Self {
744        Worker {
745            config: self.config.clone(),
746            timer: self.timer,
747            paths: Rc::clone(&self.paths),
748            allocator: Rc::clone(&self.allocator),
749            identifiers: Rc::clone(&self.identifiers),
750            dataflows: Rc::clone(&self.dataflows),
751            dataflow_counter: Rc::clone(&self.dataflow_counter),
752            logging: self.logging.clone(),
753            activations: Rc::clone(&self.activations),
754            active_dataflows: Vec::new(),
755            temp_channel_ids: Rc::clone(&self.temp_channel_ids),
756        }
757    }
758}
759
760struct Wrapper {
761    logging: Option<TimelyLogger>,
762    identifier: usize,
763    operate: Option<Box<dyn Schedule>>,
764    resources: Option<Box<dyn Any>>,
765    channel_ids: Vec<usize>,
766}
767
768impl Wrapper {
769    /// Steps the dataflow, indicates if it remains incomplete.
770    ///
771    /// If the dataflow is incomplete, this call will drop it and its resources,
772    /// dropping the dataflow first and then the resources (so that, e.g., shared
773    /// library bindings will outlive the dataflow).
774    fn step(&mut self) -> bool {
775
776        // Perhaps log information about the start of the schedule call.
777        if let Some(l) = self.logging.as_mut() {
778            l.log(crate::logging::ScheduleEvent::start(self.identifier));
779        }
780
781        let incomplete = self.operate.as_mut().map(|op| op.schedule()).unwrap_or(false);
782        if !incomplete {
783            self.operate = None;
784            self.resources = None;
785        }
786
787        // Perhaps log information about the stop of the schedule call.
788        if let Some(l) = self.logging.as_mut() {
789            l.log(crate::logging::ScheduleEvent::stop(self.identifier));
790        }
791
792        incomplete
793    }
794}
795
796impl Drop for Wrapper {
797    fn drop(&mut self) {
798        if let Some(l) = self.logging.as_mut() {
799            l.log(crate::logging::ShutdownEvent { id: self.identifier });
800        }
801        // ensure drop order
802        self.operate = None;
803        self.resources = None;
804    }
805}