1//! Capabilities to send data from operators
2//!
3//! Timely dataflow operators are only able to send data if they possess a "capability",
4//! a system-created object which warns the runtime that the operator may still produce
5//! output records.
6//!
7//! The timely dataflow runtime creates a capability and provides it to an operator whenever
8//! the operator receives input data. The capabilities allow the operator to respond to the
9//! received data, immediately or in the future, for as long as the capability is held.
10//!
11//! Timely dataflow's progress tracking infrastructure communicates the number of outstanding
12//! capabilities across all workers.
13//! Each operator may hold on to its capabilities, and may clone, advance, and drop them.
14//! Each of these actions informs the timely dataflow runtime of changes to the number of outstanding
15//! capabilities, so that the runtime can notice when the count for some capability reaches zero.
16//! While an operator can hold capabilities indefinitely, and create as many new copies of them
17//! as it would like, the progress tracking infrastructure will not move forward until the
18//! operators eventually release their capabilities.
19//!
20//! Note that these capabilities currently lack the property of "transferability":
21//! An operator should not hand its capabilities to some other operator. In the future, we should
22//! probably bind capabilities more strongly to a specific operator and output.
2324use std::{borrow, error::Error, fmt::Display, ops::Deref};
25use std::rc::Rc;
26use std::cell::RefCell;
27use std::fmt::{self, Debug};
2829use crate::order::PartialOrder;
30use crate::progress::Timestamp;
31use crate::progress::ChangeBatch;
32use crate::progress::operate::PortConnectivity;
33use crate::scheduling::Activations;
34use crate::dataflow::channels::pullers::counter::ConsumedGuard;
3536/// An internal trait expressing the capability to send messages with a given timestamp.
37pub trait CapabilityTrait<T: Timestamp> {
38/// The timestamp associated with the capability.
39fn time(&self) -> &T;
40fn valid_for_output(&self, query_buffer: &Rc<RefCell<ChangeBatch<T>>>) -> bool;
41}
4243impl<T: Timestamp, C: CapabilityTrait<T>> CapabilityTrait<T> for &C {
44fn time(&self) -> &T { (**self).time() }
45fn valid_for_output(&self, query_buffer: &Rc<RefCell<ChangeBatch<T>>>) -> bool {
46 (**self).valid_for_output(query_buffer)
47 }
48}
49impl<T: Timestamp, C: CapabilityTrait<T>> CapabilityTrait<T> for &mut C {
50fn time(&self) -> &T { (**self).time() }
51fn valid_for_output(&self, query_buffer: &Rc<RefCell<ChangeBatch<T>>>) -> bool {
52 (**self).valid_for_output(query_buffer)
53 }
54}
5556/// The capability to send data with a certain timestamp on a dataflow edge.
57///
58/// Capabilities are used by timely dataflow's progress tracking machinery to restrict and track
59/// when user code retains the ability to send messages on dataflow edges. All capabilities are
60/// constructed by the system, and should eventually be dropped by the user. Failure to drop
61/// a capability (for whatever reason) will cause timely dataflow's progress tracking to stall.
62pub struct Capability<T: Timestamp> {
63 time: T,
64 internal: Rc<RefCell<ChangeBatch<T>>>,
65}
6667impl<T: Timestamp> CapabilityTrait<T> for Capability<T> {
68fn time(&self) -> &T { &self.time }
69fn valid_for_output(&self, query_buffer: &Rc<RefCell<ChangeBatch<T>>>) -> bool {
70 Rc::ptr_eq(&self.internal, query_buffer)
71 }
72}
7374impl<T: Timestamp> Capability<T> {
75/// Creates a new capability at `time` while incrementing (and keeping a reference to) the provided
76 /// [`ChangeBatch`].
77pub(crate) fn new(time: T, internal: Rc<RefCell<ChangeBatch<T>>>) -> Self {
78 internal.borrow_mut().update(time.clone(), 1);
7980Self {
81 time,
82 internal,
83 }
84 }
8586/// The timestamp associated with this capability.
87pub fn time(&self) -> &T {
88&self.time
89 }
9091/// Makes a new capability for a timestamp `new_time` greater or equal to the timestamp of
92 /// the source capability (`self`).
93 ///
94 /// This method panics if `self.time` is not less or equal to `new_time`.
95pub fn delayed(&self, new_time: &T) -> Capability<T> {
96/// Makes the panic branch cold & outlined to decrease code bloat & give
97 /// the inner function the best chance possible of being inlined with
98 /// minimal code bloat
99#[cold]
100 #[inline(never)]
101fn delayed_panic(capability: &dyn Debug, invalid_time: &dyn Debug) -> ! {
102// Formatting & panic machinery is relatively expensive in terms of code bloat, so
103 // we outline it
104panic!(
105"Attempted to delay {:?} to {:?}, which is not beyond the capability's time.",
106 capability,
107 invalid_time,
108 )
109 }
110111self.try_delayed(new_time)
112 .unwrap_or_else(|| delayed_panic(self, new_time))
113 }
114115/// Attempts to make a new capability for a timestamp `new_time` that is
116 /// greater or equal to the timestamp of the source capability (`self`).
117 ///
118 /// Returns [`None`] `self.time` is not less or equal to `new_time`.
119pub fn try_delayed(&self, new_time: &T) -> Option<Capability<T>> {
120if self.time.less_equal(new_time) {
121Some(Self::new(new_time.clone(), Rc::clone(&self.internal)))
122 } else {
123None
124}
125 }
126127/// Downgrades the capability to one corresponding to `new_time`.
128 ///
129 /// This method panics if `self.time` is not less or equal to `new_time`.
130pub fn downgrade(&mut self, new_time: &T) {
131/// Makes the panic branch cold & outlined to decrease code bloat & give
132 /// the inner function the best chance possible of being inlined with
133 /// minimal code bloat
134#[cold]
135 #[inline(never)]
136fn downgrade_panic(capability: &dyn Debug, invalid_time: &dyn Debug) -> ! {
137// Formatting & panic machinery is relatively expensive in terms of code bloat, so
138 // we outline it
139panic!(
140"Attempted to downgrade {:?} to {:?}, which is not beyond the capability's time.",
141 capability,
142 invalid_time,
143 )
144 }
145146self.try_downgrade(new_time)
147 .unwrap_or_else(|_| downgrade_panic(self, new_time))
148 }
149150/// Attempts to downgrade the capability to one corresponding to `new_time`.
151 ///
152 /// Returns a [DowngradeError] if `self.time` is not less or equal to `new_time`.
153pub fn try_downgrade(&mut self, new_time: &T) -> Result<(), DowngradeError> {
154if let Some(new_capability) = self.try_delayed(new_time) {
155*self = new_capability;
156Ok(())
157 } else {
158Err(DowngradeError(()))
159 }
160 }
161}
162163// Necessary for correctness. When a capability is dropped, the "internal" `ChangeBatch` needs to be
164// updated accordingly to inform the rest of the system that the operator has released its permit
165// to send data and request notification at the associated timestamp.
166impl<T: Timestamp> Drop for Capability<T> {
167fn drop(&mut self) {
168self.internal.borrow_mut().update(self.time.clone(), -1);
169 }
170}
171172impl<T: Timestamp> Clone for Capability<T> {
173fn clone(&self) -> Capability<T> {
174Self::new(self.time.clone(), Rc::clone(&self.internal))
175 }
176}
177178impl<T: Timestamp> Deref for Capability<T> {
179type Target = T;
180181fn deref(&self) -> &T {
182&self.time
183 }
184}
185186impl<T: Timestamp> Debug for Capability<T> {
187fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
188 f.debug_struct("Capability")
189 .field("time", &self.time)
190 .field("internal", &"...")
191 .finish()
192 }
193}
194195impl<T: Timestamp> PartialEq for Capability<T> {
196fn eq(&self, other: &Self) -> bool {
197self.time() == other.time() && Rc::ptr_eq(&self.internal, &other.internal)
198 }
199}
200impl<T: Timestamp> Eq for Capability<T> { }
201202impl<T: Timestamp> PartialOrder for Capability<T> {
203fn less_equal(&self, other: &Self) -> bool {
204self.time().less_equal(other.time()) && Rc::ptr_eq(&self.internal, &other.internal)
205 }
206}
207208impl<T: Timestamp> ::std::hash::Hash for Capability<T> {
209fn hash<H: ::std::hash::Hasher>(&self, state: &mut H) {
210self.time.hash(state);
211 }
212}
213214/// An error produced when trying to downgrade a capability with a time
215/// that's not less than or equal to the current capability
216#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
217pub struct DowngradeError(());
218219impl Display for DowngradeError {
220fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
221 f.write_str("could not downgrade the given capability")
222 }
223}
224225impl Error for DowngradeError {}
226227/// A shared list of shared output capability buffers.
228type CapabilityUpdates<T> = Rc<RefCell<Vec<Rc<RefCell<ChangeBatch<T>>>>>>;
229230/// An capability of an input port.
231///
232/// Holding onto this capability will implicitly holds onto a capability for all the outputs
233/// ports this input is connected to, after the connection summaries have been applied.
234///
235/// This input capability supplies a `retain_for_output(self)` method which consumes the input
236/// capability and turns it into a [Capability] for a specific output port.
237pub struct InputCapability<T: Timestamp> {
238/// Output capability buffers, for use in minting capabilities.
239internal: CapabilityUpdates<T>,
240/// Timestamp summaries for each output.
241summaries: Rc<RefCell<PortConnectivity<T::Summary>>>,
242/// A drop guard that updates the consumed capability this InputCapability refers to on drop
243consumed_guard: ConsumedGuard<T>,
244}
245246impl<T: Timestamp> CapabilityTrait<T> for InputCapability<T> {
247fn time(&self) -> &T { self.time() }
248fn valid_for_output(&self, query_buffer: &Rc<RefCell<ChangeBatch<T>>>) -> bool {
249let summaries_borrow = self.summaries.borrow();
250let internal_borrow = self.internal.borrow();
251// To be valid, the output buffer must match and the timestamp summary needs to be the default.
252let result = summaries_borrow.iter_ports().any(|(port, path)| {
253 Rc::ptr_eq(&internal_borrow[port], query_buffer) && path.len() == 1 && path[0] == Default::default()
254 });
255 result
256 }
257}
258259impl<T: Timestamp> InputCapability<T> {
260/// Creates a new capability reference at `time` while incrementing (and keeping a reference to)
261 /// the provided [`ChangeBatch`].
262pub(crate) fn new(internal: CapabilityUpdates<T>, summaries: Rc<RefCell<PortConnectivity<T::Summary>>>, guard: ConsumedGuard<T>) -> Self {
263 InputCapability {
264 internal,
265 summaries,
266 consumed_guard: guard,
267 }
268 }
269270/// The timestamp associated with this capability.
271pub fn time(&self) -> &T {
272self.consumed_guard.time()
273 }
274275/// Makes a new capability for a timestamp `new_time` greater or equal to the timestamp of
276 /// the source capability (`self`).
277 ///
278 /// This method panics if `self.time` is not less or equal to `new_time`.
279pub fn delayed(&self, new_time: &T) -> Capability<T> {
280self.delayed_for_output(new_time, 0)
281 }
282283/// Delays capability for a specific output port.
284pub fn delayed_for_output(&self, new_time: &T, output_port: usize) -> Capability<T> {
285use crate::progress::timestamp::PathSummary;
286if let Some(path) = self.summaries.borrow().get(output_port) {
287if path.iter().flat_map(|summary| summary.results_in(self.time())).any(|time| time.less_equal(new_time)) {
288 Capability::new(new_time.clone(), Rc::clone(&self.internal.borrow()[output_port]))
289 } else {
290panic!("Attempted to delay to a time ({:?}) not greater or equal to the operators input-output summary ({:?}) applied to the capabilities time ({:?})", new_time, path, self.time());
291 }
292 }
293else {
294panic!("Attempted to delay a capability for a disconnected output");
295 }
296 }
297298/// Transform to an owned capability.
299 ///
300 /// This method produces an owned capability which must be dropped to release the
301 /// capability. Users should take care that these capabilities are only stored for
302 /// as long as they are required, as failing to drop them may result in livelock.
303 ///
304 /// This method panics if the timestamp summary to output zero strictly advances the time.
305pub fn retain(self) -> Capability<T> {
306self.retain_for_output(0)
307 }
308309/// Transforms to an owned capability for a specific output port.
310 ///
311 /// This method panics if the timestamp summary to `output_port` strictly advances the time.
312pub fn retain_for_output(self, output_port: usize) -> Capability<T> {
313use crate::progress::timestamp::PathSummary;
314let self_time = self.time().clone();
315if let Some(path) = self.summaries.borrow().get(output_port) {
316if path.iter().flat_map(|summary| summary.results_in(&self_time)).any(|time| time.less_equal(&self_time)) {
317 Capability::new(self_time, Rc::clone(&self.internal.borrow()[output_port]))
318 }
319else {
320panic!("Attempted to retain a time ({:?}) not greater or equal to the operators input-output summary ({:?}) applied to the capabilities time ({:?})", self_time, path, self_time);
321 }
322 }
323else {
324panic!("Attempted to retain a capability for a disconnected output");
325 }
326 }
327}
328329impl<T: Timestamp> Deref for InputCapability<T> {
330type Target = T;
331332fn deref(&self) -> &T {
333self.time()
334 }
335}
336337impl<T: Timestamp> Debug for InputCapability<T> {
338fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
339 f.debug_struct("InputCapability")
340 .field("time", self.time())
341 .field("internal", &"...")
342 .finish()
343 }
344}
345346/// Capability that activates on drop.
347#[derive(Clone, Debug)]
348pub struct ActivateCapability<T: Timestamp> {
349pub(crate) capability: Capability<T>,
350pub(crate) address: Rc<[usize]>,
351pub(crate) activations: Rc<RefCell<Activations>>,
352}
353354impl<T: Timestamp> CapabilityTrait<T> for ActivateCapability<T> {
355fn time(&self) -> &T { self.capability.time() }
356fn valid_for_output(&self, query_buffer: &Rc<RefCell<ChangeBatch<T>>>) -> bool {
357self.capability.valid_for_output(query_buffer)
358 }
359}
360361impl<T: Timestamp> ActivateCapability<T> {
362/// Creates a new activating capability.
363pub fn new(capability: Capability<T>, address: Rc<[usize]>, activations: Rc<RefCell<Activations>>) -> Self {
364Self {
365 capability,
366 address,
367 activations,
368 }
369 }
370371/// The timestamp associated with this capability.
372pub fn time(&self) -> &T {
373self.capability.time()
374 }
375376/// Creates a new delayed capability.
377pub fn delayed(&self, time: &T) -> Self {
378 ActivateCapability {
379 capability: self.capability.delayed(time),
380 address: Rc::clone(&self.address),
381 activations: Rc::clone(&self.activations),
382 }
383 }
384385/// Downgrades this capability.
386pub fn downgrade(&mut self, time: &T) {
387self.capability.downgrade(time);
388self.activations.borrow_mut().activate(&self.address);
389 }
390}
391392impl<T: Timestamp> Drop for ActivateCapability<T> {
393fn drop(&mut self) {
394self.activations.borrow_mut().activate(&self.address);
395 }
396}
397398/// A set of capabilities, for possibly incomparable times.
399#[derive(Clone, Debug)]
400pub struct CapabilitySet<T: Timestamp> {
401 elements: Vec<Capability<T>>,
402}
403404impl<T: Timestamp> CapabilitySet<T> {
405406/// Allocates an empty capability set.
407pub fn new() -> Self {
408Self { elements: Vec::new() }
409 }
410411/// Allocates an empty capability set with space for `capacity` elements
412pub fn with_capacity(capacity: usize) -> Self {
413Self { elements: Vec::with_capacity(capacity) }
414 }
415416/// Allocates a capability set containing a single capability.
417 ///
418 /// # Examples
419 /// ```
420 /// use std::collections::HashMap;
421 /// use timely::dataflow::{
422 /// operators::{ToStream, generic::Operator},
423 /// channels::pact::Pipeline,
424 /// };
425 /// use timely::dataflow::operators::CapabilitySet;
426 ///
427 /// timely::example(|scope| {
428 /// vec![()].into_iter().to_stream(scope)
429 /// .unary_frontier(Pipeline, "example", |default_cap, _info| {
430 /// let mut cap = CapabilitySet::from_elem(default_cap);
431 /// move |input, output| {
432 /// cap.downgrade(&input.frontier().frontier());
433 /// while let Some((time, data)) = input.next() {
434 /// }
435 /// let a_cap = cap.first();
436 /// if let Some(a_cap) = a_cap.as_ref() {
437 /// output.session(a_cap).give(());
438 /// }
439 /// }
440 /// })
441 /// .container::<Vec<_>>();
442 /// });
443 /// ```
444pub fn from_elem(cap: Capability<T>) -> Self {
445Self { elements: vec![cap] }
446 }
447448/// Inserts `capability` into the set, discarding redundant capabilities.
449pub fn insert(&mut self, capability: Capability<T>) {
450if !self.elements.iter().any(|c| c.less_equal(&capability)) {
451self.elements.retain(|c| !capability.less_equal(c));
452self.elements.push(capability);
453 }
454 }
455456/// Creates a new capability to send data at `time`.
457 ///
458 /// This method panics if there does not exist a capability in `self.elements` less or equal to `time`.
459pub fn delayed(&self, time: &T) -> Capability<T> {
460/// Makes the panic branch cold & outlined to decrease code bloat & give
461 /// the inner function the best chance possible of being inlined with
462 /// minimal code bloat
463#[cold]
464 #[inline(never)]
465fn delayed_panic(invalid_time: &dyn Debug) -> ! {
466// Formatting & panic machinery is relatively expensive in terms of code bloat, so
467 // we outline it
468panic!(
469"failed to create a delayed capability, the current set does not \
470 have an element less than or equal to {:?}",
471 invalid_time,
472 )
473 }
474475self.try_delayed(time)
476 .unwrap_or_else(|| delayed_panic(time))
477 }
478479/// Attempts to create a new capability to send data at `time`.
480 ///
481 /// Returns [`None`] if there does not exist a capability in `self.elements` less or equal to `time`.
482pub fn try_delayed(&self, time: &T) -> Option<Capability<T>> {
483self.elements
484 .iter()
485 .find(|capability| capability.time().less_equal(time))
486 .and_then(|capability| capability.try_delayed(time))
487 }
488489/// Downgrades the set of capabilities to correspond with the times in `frontier`.
490 ///
491 /// This method panics if any element of `frontier` is not greater or equal to some element of `self.elements`.
492pub fn downgrade<B, F>(&mut self, frontier: F)
493where
494B: borrow::Borrow<T>,
495 F: IntoIterator<Item = B>,
496 {
497/// Makes the panic branch cold & outlined to decrease code bloat & give
498 /// the inner function the best chance possible of being inlined with
499 /// minimal code bloat
500#[cold]
501 #[inline(never)]
502fn downgrade_panic() -> ! {
503// Formatting & panic machinery is relatively expensive in terms of code bloat, so
504 // we outline it
505panic!(
506"Attempted to downgrade a CapabilitySet with a frontier containing an element \
507 that was not beyond an element within the set"
508)
509 }
510511self.try_downgrade(frontier)
512 .unwrap_or_else(|_| downgrade_panic())
513 }
514515/// Attempts to downgrade the set of capabilities to correspond with the times in `frontier`.
516 ///
517 /// Returns [`None`] if any element of `frontier` is not greater or equal to some element of `self.elements`.
518 ///
519 /// **Warning**: If an error is returned the capability set may be in an inconsistent state and can easily
520 /// cause logic errors within the program if not properly handled.
521 ///
522pub fn try_downgrade<B, F>(&mut self, frontier: F) -> Result<(), DowngradeError>
523where
524B: borrow::Borrow<T>,
525 F: IntoIterator<Item = B>,
526 {
527let count = self.elements.len();
528for time in frontier.into_iter() {
529let capability = self.try_delayed(time.borrow()).ok_or(DowngradeError(()))?;
530self.elements.push(capability);
531 }
532self.elements.drain(..count);
533534Ok(())
535 }
536}
537538impl<T> From<Vec<Capability<T>>> for CapabilitySet<T>
539where
540T: Timestamp,
541{
542fn from(capabilities: Vec<Capability<T>>) -> Self {
543let mut this = Self::with_capacity(capabilities.len());
544for capability in capabilities {
545 this.insert(capability);
546 }
547548 this
549 }
550}
551552impl<T: Timestamp> Default for CapabilitySet<T> {
553fn default() -> Self {
554Self::new()
555 }
556}
557558impl<T: Timestamp> Deref for CapabilitySet<T> {
559type Target=[Capability<T>];
560561fn deref(&self) -> &[Capability<T>] {
562&self.elements
563 }
564}