timely/dataflow/operators/
capability.rs

1//! Capabilities to send data from operators
2//!
3//! Timely dataflow operators are only able to send data if they possess a "capability",
4//! a system-created object which warns the runtime that the operator may still produce
5//! output records.
6//!
7//! The timely dataflow runtime creates a capability and provides it to an operator whenever
8//! the operator receives input data. The capabilities allow the operator to respond to the
9//! received data, immediately or in the future, for as long as the capability is held.
10//!
11//! Timely dataflow's progress tracking infrastructure communicates the number of outstanding
12//! capabilities across all workers.
13//! Each operator may hold on to its capabilities, and may clone, advance, and drop them.
14//! Each of these actions informs the timely dataflow runtime of changes to the number of outstanding
15//! capabilities, so that the runtime can notice when the count for some capability reaches zero.
16//! While an operator can hold capabilities indefinitely, and create as many new copies of them
17//! as it would like, the progress tracking infrastructure will not move forward until the
18//! operators eventually release their capabilities.
19//!
20//! Note that these capabilities currently lack the property of "transferability":
21//! An operator should not hand its capabilities to some other operator. In the future, we should
22//! probably bind capabilities more strongly to a specific operator and output.
23
24use std::{borrow, error::Error, fmt::Display, ops::Deref};
25use std::rc::Rc;
26use std::cell::RefCell;
27use std::fmt::{self, Debug};
28
29use crate::order::PartialOrder;
30use crate::progress::Timestamp;
31use crate::progress::ChangeBatch;
32use crate::progress::operate::PortConnectivity;
33use crate::scheduling::Activations;
34use crate::dataflow::channels::pullers::counter::ConsumedGuard;
35
36/// An internal trait expressing the capability to send messages with a given timestamp.
37pub trait CapabilityTrait<T: Timestamp> {
38    /// The timestamp associated with the capability.
39    fn time(&self) -> &T;
40    /// Validates that the capability is valid for a specific internal buffer and output port.
41    fn valid_for_output(&self, query_buffer: &Rc<RefCell<ChangeBatch<T>>>, port: usize) -> bool;
42}
43
44impl<T: Timestamp, C: CapabilityTrait<T>> CapabilityTrait<T> for &C {
45    fn time(&self) -> &T { (**self).time() }
46    fn valid_for_output(&self, query_buffer: &Rc<RefCell<ChangeBatch<T>>>, port: usize) -> bool {
47        (**self).valid_for_output(query_buffer, port)
48    }
49}
50impl<T: Timestamp, C: CapabilityTrait<T>> CapabilityTrait<T> for &mut C {
51    fn time(&self) -> &T { (**self).time() }
52    fn valid_for_output(&self, query_buffer: &Rc<RefCell<ChangeBatch<T>>>, port: usize) -> bool {
53        (**self).valid_for_output(query_buffer, port)
54    }
55}
56
57/// The capability to send data with a certain timestamp on a dataflow edge.
58///
59/// Capabilities are used by timely dataflow's progress tracking machinery to restrict and track
60/// when user code retains the ability to send messages on dataflow edges. All capabilities are
61/// constructed by the system, and should eventually be dropped by the user. Failure to drop
62/// a capability (for whatever reason) will cause timely dataflow's progress tracking to stall.
63pub struct Capability<T: Timestamp> {
64    time: T,
65    internal: Rc<RefCell<ChangeBatch<T>>>,
66}
67
68impl<T: Timestamp> CapabilityTrait<T> for Capability<T> {
69    fn time(&self) -> &T { &self.time }
70    fn valid_for_output(&self, query_buffer: &Rc<RefCell<ChangeBatch<T>>>, _port: usize) -> bool {
71        Rc::ptr_eq(&self.internal, query_buffer)
72    }
73}
74
75impl<T: Timestamp> Capability<T> {
76    /// Creates a new capability at `time` while incrementing (and keeping a reference to) the provided
77    /// [`ChangeBatch`].
78    pub(crate) fn new(time: T, internal: Rc<RefCell<ChangeBatch<T>>>) -> Self {
79        internal.borrow_mut().update(time.clone(), 1);
80
81        Self {
82            time,
83            internal,
84        }
85    }
86
87    /// The timestamp associated with this capability.
88    pub fn time(&self) -> &T {
89        &self.time
90    }
91
92    /// Makes a new capability for a timestamp `new_time` greater or equal to the timestamp of
93    /// the source capability (`self`).
94    ///
95    /// This method panics if `self.time` is not less or equal to `new_time`.
96    pub fn delayed(&self, new_time: &T) -> Capability<T> {
97        /// Makes the panic branch cold & outlined to decrease code bloat & give
98        /// the inner function the best chance possible of being inlined with
99        /// minimal code bloat
100        #[cold]
101        #[inline(never)]
102        fn delayed_panic(capability: &dyn Debug, invalid_time: &dyn Debug) -> ! {
103            // Formatting & panic machinery is relatively expensive in terms of code bloat, so
104            // we outline it
105            panic!(
106                "Attempted to delay {:?} to {:?}, which is not beyond the capability's time.",
107                capability,
108                invalid_time,
109            )
110        }
111
112        self.try_delayed(new_time)
113            .unwrap_or_else(|| delayed_panic(self, new_time))
114    }
115
116    /// Attempts to make a new capability for a timestamp `new_time` that is
117    /// greater or equal to the timestamp of the source capability (`self`).
118    ///
119    /// Returns [`None`] `self.time` is not less or equal to `new_time`.
120    pub fn try_delayed(&self, new_time: &T) -> Option<Capability<T>> {
121        if self.time.less_equal(new_time) {
122            Some(Self::new(new_time.clone(), Rc::clone(&self.internal)))
123        } else {
124            None
125        }
126    }
127
128    /// Downgrades the capability to one corresponding to `new_time`.
129    ///
130    /// This method panics if `self.time` is not less or equal to `new_time`.
131    pub fn downgrade(&mut self, new_time: &T) {
132        /// Makes the panic branch cold & outlined to decrease code bloat & give
133        /// the inner function the best chance possible of being inlined with
134        /// minimal code bloat
135        #[cold]
136        #[inline(never)]
137        fn downgrade_panic(capability: &dyn Debug, invalid_time: &dyn Debug) -> ! {
138            // Formatting & panic machinery is relatively expensive in terms of code bloat, so
139            // we outline it
140            panic!(
141                "Attempted to downgrade {:?} to {:?}, which is not beyond the capability's time.",
142                capability,
143                invalid_time,
144            )
145        }
146
147        self.try_downgrade(new_time)
148            .unwrap_or_else(|_| downgrade_panic(self, new_time))
149    }
150
151    /// Attempts to downgrade the capability to one corresponding to `new_time`.
152    ///
153    /// Returns a [DowngradeError] if `self.time` is not less or equal to `new_time`.
154    pub fn try_downgrade(&mut self, new_time: &T) -> Result<(), DowngradeError> {
155        if let Some(new_capability) = self.try_delayed(new_time) {
156            *self = new_capability;
157            Ok(())
158        } else {
159            Err(DowngradeError(()))
160        }
161    }
162}
163
164// Necessary for correctness. When a capability is dropped, the "internal" `ChangeBatch` needs to be
165// updated accordingly to inform the rest of the system that the operator has released its permit
166// to send data and request notification at the associated timestamp.
167impl<T: Timestamp> Drop for Capability<T> {
168    fn drop(&mut self) {
169        self.internal.borrow_mut().update(self.time.clone(), -1);
170    }
171}
172
173impl<T: Timestamp> Clone for Capability<T> {
174    fn clone(&self) -> Capability<T> {
175        Self::new(self.time.clone(), Rc::clone(&self.internal))
176    }
177}
178
179impl<T: Timestamp> Deref for Capability<T> {
180    type Target = T;
181
182    fn deref(&self) -> &T {
183        &self.time
184    }
185}
186
187impl<T: Timestamp> Debug for Capability<T> {
188    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
189        f.debug_struct("Capability")
190            .field("time", &self.time)
191            .field("internal", &"...")
192            .finish()
193    }
194}
195
196impl<T: Timestamp> PartialEq for Capability<T> {
197    fn eq(&self, other: &Self) -> bool {
198        self.time() == other.time() && Rc::ptr_eq(&self.internal, &other.internal)
199    }
200}
201impl<T: Timestamp> Eq for Capability<T> { }
202
203impl<T: Timestamp> PartialOrder for Capability<T> {
204    fn less_equal(&self, other: &Self) -> bool {
205        self.time().less_equal(other.time()) && Rc::ptr_eq(&self.internal, &other.internal)
206    }
207}
208
209impl<T: Timestamp+::std::hash::Hash> ::std::hash::Hash for Capability<T> {
210    fn hash<H: ::std::hash::Hasher>(&self, state: &mut H) {
211        self.time.hash(state);
212    }
213}
214
215/// An error produced when trying to downgrade a capability with a time
216/// that's not less than or equal to the current capability
217#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
218pub struct DowngradeError(());
219
220impl Display for DowngradeError {
221    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
222        f.write_str("could not downgrade the given capability")
223    }
224}
225
226impl Error for DowngradeError {}
227
228/// A shared list of shared output capability buffers.
229type CapabilityUpdates<T> = Rc<RefCell<Vec<Rc<RefCell<ChangeBatch<T>>>>>>;
230
231/// An capability of an input port.
232///
233/// Holding onto this capability will implicitly hold on to a capability for all the outputs
234/// ports this input is connected to, after the connection summaries have been applied.
235///
236/// This input capability supplies a `retain_for_output(self)` method which consumes the input
237/// capability and turns it into a [Capability] for a specific output port.
238pub struct InputCapability<T: Timestamp> {
239    /// Output capability buffers, for use in minting capabilities.
240    internal: CapabilityUpdates<T>,
241    /// Timestamp summaries for each output.
242    summaries: Rc<RefCell<PortConnectivity<T::Summary>>>,
243    /// A drop guard that updates the consumed capability this InputCapability refers to on drop
244    consumed_guard: ConsumedGuard<T>,
245}
246
247impl<T: Timestamp> CapabilityTrait<T> for InputCapability<T> {
248    fn time(&self) -> &T { self.time() }
249    fn valid_for_output(&self, query_buffer: &Rc<RefCell<ChangeBatch<T>>>, port: usize) -> bool {
250        let summaries_borrow = self.summaries.borrow();
251        let internal_borrow = self.internal.borrow();
252        // To be valid, the output buffer must match and the timestamp summary needs to be the default.
253        Rc::ptr_eq(&internal_borrow[port], query_buffer) &&
254        summaries_borrow.get(port).map_or(false, |path| path.elements() == [Default::default()])
255    }
256}
257
258impl<T: Timestamp> InputCapability<T> {
259    /// Creates a new capability reference at `time` while incrementing (and keeping a reference to)
260    /// the provided [`ChangeBatch`].
261    pub(crate) fn new(internal: CapabilityUpdates<T>, summaries: Rc<RefCell<PortConnectivity<T::Summary>>>, guard: ConsumedGuard<T>) -> Self {
262        InputCapability {
263            internal,
264            summaries,
265            consumed_guard: guard,
266        }
267    }
268
269    /// The timestamp associated with this capability.
270    pub fn time(&self) -> &T {
271        self.consumed_guard.time()
272    }
273
274    /// Makes a new capability for a timestamp `new_time` greater or equal to the timestamp of
275    /// the source capability (`self`).
276    ///
277    /// This method panics if `self.time` is not less or equal to `new_time`.
278    pub fn delayed(&self, new_time: &T) -> Capability<T> {
279        self.delayed_for_output(new_time, 0)
280    }
281
282    /// Delays capability for a specific output port.
283    pub fn delayed_for_output(&self, new_time: &T, output_port: usize) -> Capability<T> {
284        use crate::progress::timestamp::PathSummary;
285        if let Some(path) = self.summaries.borrow().get(output_port) {
286            if path.iter().flat_map(|summary| summary.results_in(self.time())).any(|time| time.less_equal(new_time)) {
287                Capability::new(new_time.clone(), Rc::clone(&self.internal.borrow()[output_port]))
288            } else {
289                panic!("Attempted to delay to a time ({:?}) not greater or equal to the operators input-output summary ({:?}) applied to the capabilities time ({:?})", new_time, path, self.time());
290            }
291        }
292        else {
293            panic!("Attempted to delay a capability for a disconnected output");
294        }
295    }
296
297    /// Transform to an owned capability.
298    ///
299    /// This method produces an owned capability which must be dropped to release the
300    /// capability. Users should take care that these capabilities are only stored for
301    /// as long as they are required, as failing to drop them may result in livelock.
302    ///
303    /// This method panics if the timestamp summary to output zero strictly advances the time.
304    pub fn retain(self) -> Capability<T> {
305        self.retain_for_output(0)
306    }
307
308    /// Transforms to an owned capability for a specific output port.
309    ///
310    /// This method panics if the timestamp summary to `output_port` strictly advances the time.
311    pub fn retain_for_output(self, output_port: usize) -> Capability<T> {
312        use crate::progress::timestamp::PathSummary;
313        let self_time = self.time().clone();
314        if let Some(path) = self.summaries.borrow().get(output_port) {
315            if path.iter().flat_map(|summary| summary.results_in(&self_time)).any(|time| time.less_equal(&self_time)) {
316                Capability::new(self_time, Rc::clone(&self.internal.borrow()[output_port]))
317            }
318            else {
319                panic!("Attempted to retain a time ({:?}) not greater or equal to the operators input-output summary ({:?}) applied to the capabilities time ({:?})", self_time, path, self_time);
320            }
321        }
322        else {
323            panic!("Attempted to retain a capability for a disconnected output");
324        }
325    }
326}
327
328impl<T: Timestamp> Deref for InputCapability<T> {
329    type Target = T;
330
331    fn deref(&self) -> &T {
332        self.time()
333    }
334}
335
336impl<T: Timestamp> Debug for InputCapability<T> {
337    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
338        f.debug_struct("InputCapability")
339            .field("time", self.time())
340            .field("internal", &"...")
341            .finish()
342    }
343}
344
345/// Capability that activates on drop or downgrade.
346#[derive(Clone, Debug)]
347pub struct ActivateCapability<T: Timestamp> {
348    pub(crate) capability: Capability<T>,
349    pub(crate) address: Rc<[usize]>,
350    pub(crate) activations: Rc<RefCell<Activations>>,
351}
352
353impl<T: Timestamp> CapabilityTrait<T> for ActivateCapability<T> {
354    fn time(&self) -> &T { self.capability.time() }
355    fn valid_for_output(&self, query_buffer: &Rc<RefCell<ChangeBatch<T>>>, port: usize) -> bool {
356        self.capability.valid_for_output(query_buffer, port)
357    }
358}
359
360impl<T: Timestamp> ActivateCapability<T> {
361    /// Creates a new activating capability.
362    pub fn new(capability: Capability<T>, address: Rc<[usize]>, activations: Rc<RefCell<Activations>>) -> Self {
363        Self {
364            capability,
365            address,
366            activations,
367        }
368    }
369
370    /// The timestamp associated with this capability.
371    pub fn time(&self) -> &T {
372        self.capability.time()
373    }
374
375    /// Creates a new delayed capability.
376    pub fn delayed(&self, time: &T) -> Self {
377        ActivateCapability {
378            capability: self.capability.delayed(time),
379            address: Rc::clone(&self.address),
380            activations: Rc::clone(&self.activations),
381        }
382    }
383
384    /// Downgrades this capability.
385    pub fn downgrade(&mut self, time: &T) {
386        self.capability.downgrade(time);
387        self.activations.borrow_mut().activate(&self.address);
388    }
389}
390
391impl<T: Timestamp> Drop for ActivateCapability<T> {
392    fn drop(&mut self) {
393        self.activations.borrow_mut().activate(&self.address);
394    }
395}
396
397/// A set of capabilities, for possibly incomparable times.
398#[derive(Clone, Debug)]
399pub struct CapabilitySet<T: Timestamp> {
400    elements: Vec<Capability<T>>,
401}
402
403impl<T: Timestamp> CapabilitySet<T> {
404
405    /// Allocates an empty capability set.
406    pub fn new() -> Self {
407        Self { elements: Vec::new() }
408    }
409
410    /// Allocates an empty capability set with space for `capacity` elements
411    pub fn with_capacity(capacity: usize) -> Self {
412        Self { elements: Vec::with_capacity(capacity) }
413    }
414
415    /// Allocates a capability set containing a single capability.
416    ///
417    /// # Examples
418    /// ```
419    /// use std::collections::HashMap;
420    /// use timely::dataflow::{
421    ///     operators::{ToStream, generic::Operator},
422    ///     channels::pact::Pipeline,
423    /// };
424    /// use timely::dataflow::operators::CapabilitySet;
425    ///
426    /// timely::example(|scope| {
427    ///     vec![()].into_iter().to_stream(scope)
428    ///         .unary_frontier(Pipeline, "example", |default_cap, _info| {
429    ///             let mut cap = CapabilitySet::from_elem(default_cap);
430    ///             move |(input, frontier), output| {
431    ///                 cap.downgrade(&frontier.frontier());
432    ///                 input.for_each_time(|time, data| {});
433    ///                 let a_cap = cap.first();
434    ///                 if let Some(a_cap) = a_cap.as_ref() {
435    ///                     output.session(a_cap).give(());
436    ///                 }
437    ///             }
438    ///         })
439    ///         .container::<Vec<_>>();
440    /// });
441    /// ```
442    pub fn from_elem(cap: Capability<T>) -> Self {
443        Self { elements: vec![cap] }
444    }
445
446    /// Inserts `capability` into the set, discarding redundant capabilities.
447    pub fn insert(&mut self, capability: Capability<T>) {
448        if !self.elements.iter().any(|c| c.less_equal(&capability)) {
449            self.elements.retain(|c| !capability.less_equal(c));
450            self.elements.push(capability);
451        }
452    }
453
454    /// Creates a new capability to send data at `time`.
455    ///
456    /// This method panics if there does not exist a capability in `self.elements` less or equal to `time`.
457    pub fn delayed(&self, time: &T) -> Capability<T> {
458        /// Makes the panic branch cold & outlined to decrease code bloat & give
459        /// the inner function the best chance possible of being inlined with
460        /// minimal code bloat
461        #[cold]
462        #[inline(never)]
463        fn delayed_panic(invalid_time: &dyn Debug) -> ! {
464            // Formatting & panic machinery is relatively expensive in terms of code bloat, so
465            // we outline it
466            panic!(
467                "failed to create a delayed capability, the current set does not \
468                have an element less than or equal to {:?}",
469                invalid_time,
470            )
471        }
472
473        self.try_delayed(time)
474            .unwrap_or_else(|| delayed_panic(time))
475    }
476
477    /// Attempts to create a new capability to send data at `time`.
478    ///
479    /// Returns [`None`] if there does not exist a capability in `self.elements` less or equal to `time`.
480    pub fn try_delayed(&self, time: &T) -> Option<Capability<T>> {
481        self.elements
482            .iter()
483            .find(|capability| capability.time().less_equal(time))
484            .and_then(|capability| capability.try_delayed(time))
485    }
486
487    /// Downgrades the set of capabilities to correspond with the times in `frontier`.
488    ///
489    /// This method panics if any element of `frontier` is not greater or equal to some element of `self.elements`.
490    pub fn downgrade<B, F>(&mut self, frontier: F)
491    where
492        B: borrow::Borrow<T>,
493        F: IntoIterator<Item = B>,
494    {
495        /// Makes the panic branch cold & outlined to decrease code bloat & give
496        /// the inner function the best chance possible of being inlined with
497        /// minimal code bloat
498        #[cold]
499        #[inline(never)]
500        fn downgrade_panic() -> ! {
501            // Formatting & panic machinery is relatively expensive in terms of code bloat, so
502            // we outline it
503            panic!(
504                "Attempted to downgrade a CapabilitySet with a frontier containing an element \
505                that was not beyond an element within the set"
506            )
507        }
508
509        self.try_downgrade(frontier)
510            .unwrap_or_else(|_| downgrade_panic())
511    }
512
513    /// Attempts to downgrade the set of capabilities to correspond with the times in `frontier`.
514    ///
515    /// Returns [`None`] if any element of `frontier` is not greater or equal to some element of `self.elements`.
516    ///
517    /// **Warning**: If an error is returned the capability set may be in an inconsistent state and can easily
518    /// cause logic errors within the program if not properly handled.
519    ///
520    pub fn try_downgrade<B, F>(&mut self, frontier: F) -> Result<(), DowngradeError>
521    where
522        B: borrow::Borrow<T>,
523        F: IntoIterator<Item = B>,
524    {
525        let count = self.elements.len();
526        for time in frontier.into_iter() {
527            let capability = self.try_delayed(time.borrow()).ok_or(DowngradeError(()))?;
528            self.elements.push(capability);
529        }
530        self.elements.drain(..count);
531
532        Ok(())
533    }
534}
535
536impl<T> From<Vec<Capability<T>>> for CapabilitySet<T>
537where
538    T: Timestamp,
539{
540    fn from(capabilities: Vec<Capability<T>>) -> Self {
541        let mut this = Self::with_capacity(capabilities.len());
542        for capability in capabilities {
543            this.insert(capability);
544        }
545
546        this
547    }
548}
549
550impl<T: Timestamp> Default for CapabilitySet<T> {
551    fn default() -> Self {
552        Self::new()
553    }
554}
555
556impl<T: Timestamp> Deref for CapabilitySet<T> {
557    type Target=[Capability<T>];
558
559    fn deref(&self) -> &[Capability<T>] {
560        &self.elements
561    }
562}