timely/dataflow/operators/
capability.rs

1//! Capabilities to send data from operators
2//!
3//! Timely dataflow operators are only able to send data if they possess a "capability",
4//! a system-created object which warns the runtime that the operator may still produce
5//! output records.
6//!
7//! The timely dataflow runtime creates a capability and provides it to an operator whenever
8//! the operator receives input data. The capabilities allow the operator to respond to the
9//! received data, immediately or in the future, for as long as the capability is held.
10//!
11//! Timely dataflow's progress tracking infrastructure communicates the number of outstanding
12//! capabilities across all workers.
13//! Each operator may hold on to its capabilities, and may clone, advance, and drop them.
14//! Each of these actions informs the timely dataflow runtime of changes to the number of outstanding
15//! capabilities, so that the runtime can notice when the count for some capability reaches zero.
16//! While an operator can hold capabilities indefinitely, and create as many new copies of them
17//! as it would like, the progress tracking infrastructure will not move forward until the
18//! operators eventually release their capabilities.
19//!
20//! Note that these capabilities currently lack the property of "transferability":
21//! An operator should not hand its capabilities to some other operator. In the future, we should
22//! probably bind capabilities more strongly to a specific operator and output.
23
24use std::{borrow, error::Error, fmt::Display, ops::Deref};
25use std::rc::Rc;
26use std::cell::RefCell;
27use std::fmt::{self, Debug};
28
29use crate::order::PartialOrder;
30use crate::progress::Timestamp;
31use crate::progress::ChangeBatch;
32use crate::progress::operate::PortConnectivity;
33use crate::scheduling::Activations;
34use crate::dataflow::channels::pullers::counter::ConsumedGuard;
35
36/// An internal trait expressing the capability to send messages with a given timestamp.
37pub trait CapabilityTrait<T: Timestamp> {
38    /// The timestamp associated with the capability.
39    fn time(&self) -> &T;
40    fn valid_for_output(&self, query_buffer: &Rc<RefCell<ChangeBatch<T>>>, port: usize) -> bool;
41}
42
43impl<T: Timestamp, C: CapabilityTrait<T>> CapabilityTrait<T> for &C {
44    fn time(&self) -> &T { (**self).time() }
45    fn valid_for_output(&self, query_buffer: &Rc<RefCell<ChangeBatch<T>>>, port: usize) -> bool {
46        (**self).valid_for_output(query_buffer, port)
47    }
48}
49impl<T: Timestamp, C: CapabilityTrait<T>> CapabilityTrait<T> for &mut C {
50    fn time(&self) -> &T { (**self).time() }
51    fn valid_for_output(&self, query_buffer: &Rc<RefCell<ChangeBatch<T>>>, port: usize) -> bool {
52        (**self).valid_for_output(query_buffer, port)
53    }
54}
55
56/// The capability to send data with a certain timestamp on a dataflow edge.
57///
58/// Capabilities are used by timely dataflow's progress tracking machinery to restrict and track
59/// when user code retains the ability to send messages on dataflow edges. All capabilities are
60/// constructed by the system, and should eventually be dropped by the user. Failure to drop
61/// a capability (for whatever reason) will cause timely dataflow's progress tracking to stall.
62pub struct Capability<T: Timestamp> {
63    time: T,
64    internal: Rc<RefCell<ChangeBatch<T>>>,
65}
66
67impl<T: Timestamp> CapabilityTrait<T> for Capability<T> {
68    fn time(&self) -> &T { &self.time }
69    fn valid_for_output(&self, query_buffer: &Rc<RefCell<ChangeBatch<T>>>, _port: usize) -> bool {
70        Rc::ptr_eq(&self.internal, query_buffer)
71    }
72}
73
74impl<T: Timestamp> Capability<T> {
75    /// Creates a new capability at `time` while incrementing (and keeping a reference to) the provided
76    /// [`ChangeBatch`].
77    pub(crate) fn new(time: T, internal: Rc<RefCell<ChangeBatch<T>>>) -> Self {
78        internal.borrow_mut().update(time.clone(), 1);
79
80        Self {
81            time,
82            internal,
83        }
84    }
85
86    /// The timestamp associated with this capability.
87    pub fn time(&self) -> &T {
88        &self.time
89    }
90
91    /// Makes a new capability for a timestamp `new_time` greater or equal to the timestamp of
92    /// the source capability (`self`).
93    ///
94    /// This method panics if `self.time` is not less or equal to `new_time`.
95    pub fn delayed(&self, new_time: &T) -> Capability<T> {
96        /// Makes the panic branch cold & outlined to decrease code bloat & give
97        /// the inner function the best chance possible of being inlined with
98        /// minimal code bloat
99        #[cold]
100        #[inline(never)]
101        fn delayed_panic(capability: &dyn Debug, invalid_time: &dyn Debug) -> ! {
102            // Formatting & panic machinery is relatively expensive in terms of code bloat, so
103            // we outline it
104            panic!(
105                "Attempted to delay {:?} to {:?}, which is not beyond the capability's time.",
106                capability,
107                invalid_time,
108            )
109        }
110
111        self.try_delayed(new_time)
112            .unwrap_or_else(|| delayed_panic(self, new_time))
113    }
114
115    /// Attempts to make a new capability for a timestamp `new_time` that is
116    /// greater or equal to the timestamp of the source capability (`self`).
117    ///
118    /// Returns [`None`] `self.time` is not less or equal to `new_time`.
119    pub fn try_delayed(&self, new_time: &T) -> Option<Capability<T>> {
120        if self.time.less_equal(new_time) {
121            Some(Self::new(new_time.clone(), Rc::clone(&self.internal)))
122        } else {
123            None
124        }
125    }
126
127    /// Downgrades the capability to one corresponding to `new_time`.
128    ///
129    /// This method panics if `self.time` is not less or equal to `new_time`.
130    pub fn downgrade(&mut self, new_time: &T) {
131        /// Makes the panic branch cold & outlined to decrease code bloat & give
132        /// the inner function the best chance possible of being inlined with
133        /// minimal code bloat
134        #[cold]
135        #[inline(never)]
136        fn downgrade_panic(capability: &dyn Debug, invalid_time: &dyn Debug) -> ! {
137            // Formatting & panic machinery is relatively expensive in terms of code bloat, so
138            // we outline it
139            panic!(
140                "Attempted to downgrade {:?} to {:?}, which is not beyond the capability's time.",
141                capability,
142                invalid_time,
143            )
144        }
145
146        self.try_downgrade(new_time)
147            .unwrap_or_else(|_| downgrade_panic(self, new_time))
148    }
149
150    /// Attempts to downgrade the capability to one corresponding to `new_time`.
151    ///
152    /// Returns a [DowngradeError] if `self.time` is not less or equal to `new_time`.
153    pub fn try_downgrade(&mut self, new_time: &T) -> Result<(), DowngradeError> {
154        if let Some(new_capability) = self.try_delayed(new_time) {
155            *self = new_capability;
156            Ok(())
157        } else {
158            Err(DowngradeError(()))
159        }
160    }
161}
162
163// Necessary for correctness. When a capability is dropped, the "internal" `ChangeBatch` needs to be
164// updated accordingly to inform the rest of the system that the operator has released its permit
165// to send data and request notification at the associated timestamp.
166impl<T: Timestamp> Drop for Capability<T> {
167    fn drop(&mut self) {
168        self.internal.borrow_mut().update(self.time.clone(), -1);
169    }
170}
171
172impl<T: Timestamp> Clone for Capability<T> {
173    fn clone(&self) -> Capability<T> {
174        Self::new(self.time.clone(), Rc::clone(&self.internal))
175    }
176}
177
178impl<T: Timestamp> Deref for Capability<T> {
179    type Target = T;
180
181    fn deref(&self) -> &T {
182        &self.time
183    }
184}
185
186impl<T: Timestamp> Debug for Capability<T> {
187    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
188        f.debug_struct("Capability")
189            .field("time", &self.time)
190            .field("internal", &"...")
191            .finish()
192    }
193}
194
195impl<T: Timestamp> PartialEq for Capability<T> {
196    fn eq(&self, other: &Self) -> bool {
197        self.time() == other.time() && Rc::ptr_eq(&self.internal, &other.internal)
198    }
199}
200impl<T: Timestamp> Eq for Capability<T> { }
201
202impl<T: Timestamp> PartialOrder for Capability<T> {
203    fn less_equal(&self, other: &Self) -> bool {
204        self.time().less_equal(other.time()) && Rc::ptr_eq(&self.internal, &other.internal)
205    }
206}
207
208impl<T: Timestamp> ::std::hash::Hash for Capability<T> {
209    fn hash<H: ::std::hash::Hasher>(&self, state: &mut H) {
210        self.time.hash(state);
211    }
212}
213
214/// An error produced when trying to downgrade a capability with a time
215/// that's not less than or equal to the current capability
216#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
217pub struct DowngradeError(());
218
219impl Display for DowngradeError {
220    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
221        f.write_str("could not downgrade the given capability")
222    }
223}
224
225impl Error for DowngradeError {}
226
227/// A shared list of shared output capability buffers.
228type CapabilityUpdates<T> = Rc<RefCell<Vec<Rc<RefCell<ChangeBatch<T>>>>>>;
229
230/// An capability of an input port.
231///
232/// Holding onto this capability will implicitly holds onto a capability for all the outputs
233/// ports this input is connected to, after the connection summaries have been applied.
234///
235/// This input capability supplies a `retain_for_output(self)` method which consumes the input
236/// capability and turns it into a [Capability] for a specific output port.
237pub struct InputCapability<T: Timestamp> {
238    /// Output capability buffers, for use in minting capabilities.
239    internal: CapabilityUpdates<T>,
240    /// Timestamp summaries for each output.
241    summaries: Rc<RefCell<PortConnectivity<T::Summary>>>,
242    /// A drop guard that updates the consumed capability this InputCapability refers to on drop
243    consumed_guard: ConsumedGuard<T>,
244}
245
246impl<T: Timestamp> CapabilityTrait<T> for InputCapability<T> {
247    fn time(&self) -> &T { self.time() }
248    fn valid_for_output(&self, query_buffer: &Rc<RefCell<ChangeBatch<T>>>, port: usize) -> bool {
249        let summaries_borrow = self.summaries.borrow();
250        let internal_borrow = self.internal.borrow();
251        // To be valid, the output buffer must match and the timestamp summary needs to be the default.
252        Rc::ptr_eq(&internal_borrow[port], query_buffer) &&
253        summaries_borrow.get(port).map_or(false, |path| path.elements() == [Default::default()])
254    }
255}
256
257impl<T: Timestamp> InputCapability<T> {
258    /// Creates a new capability reference at `time` while incrementing (and keeping a reference to)
259    /// the provided [`ChangeBatch`].
260    pub(crate) fn new(internal: CapabilityUpdates<T>, summaries: Rc<RefCell<PortConnectivity<T::Summary>>>, guard: ConsumedGuard<T>) -> Self {
261        InputCapability {
262            internal,
263            summaries,
264            consumed_guard: guard,
265        }
266    }
267
268    /// The timestamp associated with this capability.
269    pub fn time(&self) -> &T {
270        self.consumed_guard.time()
271    }
272
273    /// Makes a new capability for a timestamp `new_time` greater or equal to the timestamp of
274    /// the source capability (`self`).
275    ///
276    /// This method panics if `self.time` is not less or equal to `new_time`.
277    pub fn delayed(&self, new_time: &T) -> Capability<T> {
278        self.delayed_for_output(new_time, 0)
279    }
280
281    /// Delays capability for a specific output port.
282    pub fn delayed_for_output(&self, new_time: &T, output_port: usize) -> Capability<T> {
283        use crate::progress::timestamp::PathSummary;
284        if let Some(path) = self.summaries.borrow().get(output_port) {
285            if path.iter().flat_map(|summary| summary.results_in(self.time())).any(|time| time.less_equal(new_time)) {
286                Capability::new(new_time.clone(), Rc::clone(&self.internal.borrow()[output_port]))
287            } else {
288                panic!("Attempted to delay to a time ({:?}) not greater or equal to the operators input-output summary ({:?}) applied to the capabilities time ({:?})", new_time, path, self.time());
289            }
290        }
291        else {
292            panic!("Attempted to delay a capability for a disconnected output");
293        }
294    }
295
296    /// Transform to an owned capability.
297    ///
298    /// This method produces an owned capability which must be dropped to release the
299    /// capability. Users should take care that these capabilities are only stored for
300    /// as long as they are required, as failing to drop them may result in livelock.
301    ///
302    /// This method panics if the timestamp summary to output zero strictly advances the time.
303    pub fn retain(self) -> Capability<T> {
304        self.retain_for_output(0)
305    }
306
307    /// Transforms to an owned capability for a specific output port.
308    ///
309    /// This method panics if the timestamp summary to `output_port` strictly advances the time.
310    pub fn retain_for_output(self, output_port: usize) -> Capability<T> {
311        use crate::progress::timestamp::PathSummary;
312        let self_time = self.time().clone();
313        if let Some(path) = self.summaries.borrow().get(output_port) {
314            if path.iter().flat_map(|summary| summary.results_in(&self_time)).any(|time| time.less_equal(&self_time)) {
315                Capability::new(self_time, Rc::clone(&self.internal.borrow()[output_port]))
316            }
317            else {
318                panic!("Attempted to retain a time ({:?}) not greater or equal to the operators input-output summary ({:?}) applied to the capabilities time ({:?})", self_time, path, self_time);
319            }
320        }
321        else {
322            panic!("Attempted to retain a capability for a disconnected output");
323        }
324    }
325}
326
327impl<T: Timestamp> Deref for InputCapability<T> {
328    type Target = T;
329
330    fn deref(&self) -> &T {
331        self.time()
332    }
333}
334
335impl<T: Timestamp> Debug for InputCapability<T> {
336    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
337        f.debug_struct("InputCapability")
338            .field("time", self.time())
339            .field("internal", &"...")
340            .finish()
341    }
342}
343
344/// Capability that activates on drop.
345#[derive(Clone, Debug)]
346pub struct ActivateCapability<T: Timestamp> {
347    pub(crate) capability: Capability<T>,
348    pub(crate) address: Rc<[usize]>,
349    pub(crate) activations: Rc<RefCell<Activations>>,
350}
351
352impl<T: Timestamp> CapabilityTrait<T> for ActivateCapability<T> {
353    fn time(&self) -> &T { self.capability.time() }
354    fn valid_for_output(&self, query_buffer: &Rc<RefCell<ChangeBatch<T>>>, port: usize) -> bool {
355        self.capability.valid_for_output(query_buffer, port)
356    }
357}
358
359impl<T: Timestamp> ActivateCapability<T> {
360    /// Creates a new activating capability.
361    pub fn new(capability: Capability<T>, address: Rc<[usize]>, activations: Rc<RefCell<Activations>>) -> Self {
362        Self {
363            capability,
364            address,
365            activations,
366        }
367    }
368
369    /// The timestamp associated with this capability.
370    pub fn time(&self) -> &T {
371        self.capability.time()
372    }
373
374    /// Creates a new delayed capability.
375    pub fn delayed(&self, time: &T) -> Self {
376        ActivateCapability {
377            capability: self.capability.delayed(time),
378            address: Rc::clone(&self.address),
379            activations: Rc::clone(&self.activations),
380        }
381    }
382
383    /// Downgrades this capability.
384    pub fn downgrade(&mut self, time: &T) {
385        self.capability.downgrade(time);
386        self.activations.borrow_mut().activate(&self.address);
387    }
388}
389
390impl<T: Timestamp> Drop for ActivateCapability<T> {
391    fn drop(&mut self) {
392        self.activations.borrow_mut().activate(&self.address);
393    }
394}
395
396/// A set of capabilities, for possibly incomparable times.
397#[derive(Clone, Debug)]
398pub struct CapabilitySet<T: Timestamp> {
399    elements: Vec<Capability<T>>,
400}
401
402impl<T: Timestamp> CapabilitySet<T> {
403
404    /// Allocates an empty capability set.
405    pub fn new() -> Self {
406        Self { elements: Vec::new() }
407    }
408
409    /// Allocates an empty capability set with space for `capacity` elements
410    pub fn with_capacity(capacity: usize) -> Self {
411        Self { elements: Vec::with_capacity(capacity) }
412    }
413
414    /// Allocates a capability set containing a single capability.
415    ///
416    /// # Examples
417    /// ```
418    /// use std::collections::HashMap;
419    /// use timely::dataflow::{
420    ///     operators::{ToStream, generic::Operator},
421    ///     channels::pact::Pipeline,
422    /// };
423    /// use timely::dataflow::operators::CapabilitySet;
424    ///
425    /// timely::example(|scope| {
426    ///     vec![()].into_iter().to_stream(scope)
427    ///         .unary_frontier(Pipeline, "example", |default_cap, _info| {
428    ///             let mut cap = CapabilitySet::from_elem(default_cap);
429    ///             move |input, output| {
430    ///                 cap.downgrade(&input.frontier().frontier());
431    ///                 while let Some((time, data)) = input.next() {
432    ///                 }
433    ///                 let a_cap = cap.first();
434    ///                 if let Some(a_cap) = a_cap.as_ref() {
435    ///                     output.session(a_cap).give(());
436    ///                 }
437    ///             }
438    ///         })
439    ///         .container::<Vec<_>>();
440    /// });
441    /// ```
442    pub fn from_elem(cap: Capability<T>) -> Self {
443        Self { elements: vec![cap] }
444    }
445
446    /// Inserts `capability` into the set, discarding redundant capabilities.
447    pub fn insert(&mut self, capability: Capability<T>) {
448        if !self.elements.iter().any(|c| c.less_equal(&capability)) {
449            self.elements.retain(|c| !capability.less_equal(c));
450            self.elements.push(capability);
451        }
452    }
453
454    /// Creates a new capability to send data at `time`.
455    ///
456    /// This method panics if there does not exist a capability in `self.elements` less or equal to `time`.
457    pub fn delayed(&self, time: &T) -> Capability<T> {
458        /// Makes the panic branch cold & outlined to decrease code bloat & give
459        /// the inner function the best chance possible of being inlined with
460        /// minimal code bloat
461        #[cold]
462        #[inline(never)]
463        fn delayed_panic(invalid_time: &dyn Debug) -> ! {
464            // Formatting & panic machinery is relatively expensive in terms of code bloat, so
465            // we outline it
466            panic!(
467                "failed to create a delayed capability, the current set does not \
468                have an element less than or equal to {:?}",
469                invalid_time,
470            )
471        }
472
473        self.try_delayed(time)
474            .unwrap_or_else(|| delayed_panic(time))
475    }
476
477    /// Attempts to create a new capability to send data at `time`.
478    ///
479    /// Returns [`None`] if there does not exist a capability in `self.elements` less or equal to `time`.
480    pub fn try_delayed(&self, time: &T) -> Option<Capability<T>> {
481        self.elements
482            .iter()
483            .find(|capability| capability.time().less_equal(time))
484            .and_then(|capability| capability.try_delayed(time))
485    }
486
487    /// Downgrades the set of capabilities to correspond with the times in `frontier`.
488    ///
489    /// This method panics if any element of `frontier` is not greater or equal to some element of `self.elements`.
490    pub fn downgrade<B, F>(&mut self, frontier: F)
491    where
492        B: borrow::Borrow<T>,
493        F: IntoIterator<Item = B>,
494    {
495        /// Makes the panic branch cold & outlined to decrease code bloat & give
496        /// the inner function the best chance possible of being inlined with
497        /// minimal code bloat
498        #[cold]
499        #[inline(never)]
500        fn downgrade_panic() -> ! {
501            // Formatting & panic machinery is relatively expensive in terms of code bloat, so
502            // we outline it
503            panic!(
504                "Attempted to downgrade a CapabilitySet with a frontier containing an element \
505                that was not beyond an element within the set"
506            )
507        }
508
509        self.try_downgrade(frontier)
510            .unwrap_or_else(|_| downgrade_panic())
511    }
512
513    /// Attempts to downgrade the set of capabilities to correspond with the times in `frontier`.
514    ///
515    /// Returns [`None`] if any element of `frontier` is not greater or equal to some element of `self.elements`.
516    ///
517    /// **Warning**: If an error is returned the capability set may be in an inconsistent state and can easily
518    /// cause logic errors within the program if not properly handled.
519    ///
520    pub fn try_downgrade<B, F>(&mut self, frontier: F) -> Result<(), DowngradeError>
521    where
522        B: borrow::Borrow<T>,
523        F: IntoIterator<Item = B>,
524    {
525        let count = self.elements.len();
526        for time in frontier.into_iter() {
527            let capability = self.try_delayed(time.borrow()).ok_or(DowngradeError(()))?;
528            self.elements.push(capability);
529        }
530        self.elements.drain(..count);
531
532        Ok(())
533    }
534}
535
536impl<T> From<Vec<Capability<T>>> for CapabilitySet<T>
537where
538    T: Timestamp,
539{
540    fn from(capabilities: Vec<Capability<T>>) -> Self {
541        let mut this = Self::with_capacity(capabilities.len());
542        for capability in capabilities {
543            this.insert(capability);
544        }
545
546        this
547    }
548}
549
550impl<T: Timestamp> Default for CapabilitySet<T> {
551    fn default() -> Self {
552        Self::new()
553    }
554}
555
556impl<T: Timestamp> Deref for CapabilitySet<T> {
557    type Target=[Capability<T>];
558
559    fn deref(&self) -> &[Capability<T>] {
560        &self.elements
561    }
562}