timely/dataflow/operators/capability.rs
1//! Capabilities to send data from operators
2//!
3//! Timely dataflow operators are only able to send data if they possess a "capability",
4//! a system-created object which warns the runtime that the operator may still produce
5//! output records.
6//!
7//! The timely dataflow runtime creates a capability and provides it to an operator whenever
8//! the operator receives input data. The capabilities allow the operator to respond to the
9//! received data, immediately or in the future, for as long as the capability is held.
10//!
11//! Timely dataflow's progress tracking infrastructure communicates the number of outstanding
12//! capabilities across all workers.
13//! Each operator may hold on to its capabilities, and may clone, advance, and drop them.
14//! Each of these actions informs the timely dataflow runtime of changes to the number of outstanding
15//! capabilities, so that the runtime can notice when the count for some capability reaches zero.
16//! While an operator can hold capabilities indefinitely, and create as many new copies of them
17//! as it would like, the progress tracking infrastructure will not move forward until the
18//! operators eventually release their capabilities.
19//!
20//! Note that these capabilities currently lack the property of "transferability":
21//! An operator should not hand its capabilities to some other operator. In the future, we should
22//! probably bind capabilities more strongly to a specific operator and output.
23
24use std::{borrow, error::Error, fmt::Display, ops::Deref};
25use std::rc::Rc;
26use std::cell::RefCell;
27use std::fmt::{self, Debug};
28
29use crate::order::PartialOrder;
30use crate::progress::Timestamp;
31use crate::progress::ChangeBatch;
32use crate::progress::operate::PortConnectivity;
33use crate::scheduling::Activations;
34use crate::dataflow::channels::pullers::counter::ConsumedGuard;
35
36/// An internal trait expressing the capability to send messages with a given timestamp.
37pub trait CapabilityTrait<T: Timestamp> {
38 /// The timestamp associated with the capability.
39 fn time(&self) -> &T;
40 /// Validates that the capability is valid for a specific internal buffer and output port.
41 fn valid_for_output(&self, query_buffer: &Rc<RefCell<ChangeBatch<T>>>, port: usize) -> bool;
42}
43
44impl<T: Timestamp, C: CapabilityTrait<T>> CapabilityTrait<T> for &C {
45 fn time(&self) -> &T { (**self).time() }
46 fn valid_for_output(&self, query_buffer: &Rc<RefCell<ChangeBatch<T>>>, port: usize) -> bool {
47 (**self).valid_for_output(query_buffer, port)
48 }
49}
50impl<T: Timestamp, C: CapabilityTrait<T>> CapabilityTrait<T> for &mut C {
51 fn time(&self) -> &T { (**self).time() }
52 fn valid_for_output(&self, query_buffer: &Rc<RefCell<ChangeBatch<T>>>, port: usize) -> bool {
53 (**self).valid_for_output(query_buffer, port)
54 }
55}
56
57/// The capability to send data with a certain timestamp on a dataflow edge.
58///
59/// Capabilities are used by timely dataflow's progress tracking machinery to restrict and track
60/// when user code retains the ability to send messages on dataflow edges. All capabilities are
61/// constructed by the system, and should eventually be dropped by the user. Failure to drop
62/// a capability (for whatever reason) will cause timely dataflow's progress tracking to stall.
63pub struct Capability<T: Timestamp> {
64 time: T,
65 internal: Rc<RefCell<ChangeBatch<T>>>,
66}
67
68impl<T: Timestamp> CapabilityTrait<T> for Capability<T> {
69 fn time(&self) -> &T { &self.time }
70 fn valid_for_output(&self, query_buffer: &Rc<RefCell<ChangeBatch<T>>>, _port: usize) -> bool {
71 Rc::ptr_eq(&self.internal, query_buffer)
72 }
73}
74
75impl<T: Timestamp> Capability<T> {
76 /// Creates a new capability at `time` while incrementing (and keeping a reference to) the provided
77 /// [`ChangeBatch`].
78 pub(crate) fn new(time: T, internal: Rc<RefCell<ChangeBatch<T>>>) -> Self {
79 internal.borrow_mut().update(time.clone(), 1);
80
81 Self {
82 time,
83 internal,
84 }
85 }
86
87 /// The timestamp associated with this capability.
88 pub fn time(&self) -> &T {
89 &self.time
90 }
91
92 /// Makes a new capability for a timestamp `new_time` greater or equal to the timestamp of
93 /// the source capability (`self`).
94 ///
95 /// This method panics if `self.time` is not less or equal to `new_time`.
96 pub fn delayed(&self, new_time: &T) -> Capability<T> {
97 /// Makes the panic branch cold & outlined to decrease code bloat & give
98 /// the inner function the best chance possible of being inlined with
99 /// minimal code bloat
100 #[cold]
101 #[inline(never)]
102 fn delayed_panic(capability: &dyn Debug, invalid_time: &dyn Debug) -> ! {
103 // Formatting & panic machinery is relatively expensive in terms of code bloat, so
104 // we outline it
105 panic!(
106 "Attempted to delay {:?} to {:?}, which is not beyond the capability's time.",
107 capability,
108 invalid_time,
109 )
110 }
111
112 self.try_delayed(new_time)
113 .unwrap_or_else(|| delayed_panic(self, new_time))
114 }
115
116 /// Attempts to make a new capability for a timestamp `new_time` that is
117 /// greater or equal to the timestamp of the source capability (`self`).
118 ///
119 /// Returns [`None`] `self.time` is not less or equal to `new_time`.
120 pub fn try_delayed(&self, new_time: &T) -> Option<Capability<T>> {
121 if self.time.less_equal(new_time) {
122 Some(Self::new(new_time.clone(), Rc::clone(&self.internal)))
123 } else {
124 None
125 }
126 }
127
128 /// Downgrades the capability to one corresponding to `new_time`.
129 ///
130 /// This method panics if `self.time` is not less or equal to `new_time`.
131 pub fn downgrade(&mut self, new_time: &T) {
132 /// Makes the panic branch cold & outlined to decrease code bloat & give
133 /// the inner function the best chance possible of being inlined with
134 /// minimal code bloat
135 #[cold]
136 #[inline(never)]
137 fn downgrade_panic(capability: &dyn Debug, invalid_time: &dyn Debug) -> ! {
138 // Formatting & panic machinery is relatively expensive in terms of code bloat, so
139 // we outline it
140 panic!(
141 "Attempted to downgrade {:?} to {:?}, which is not beyond the capability's time.",
142 capability,
143 invalid_time,
144 )
145 }
146
147 self.try_downgrade(new_time)
148 .unwrap_or_else(|_| downgrade_panic(self, new_time))
149 }
150
151 /// Attempts to downgrade the capability to one corresponding to `new_time`.
152 ///
153 /// Returns a [DowngradeError] if `self.time` is not less or equal to `new_time`.
154 pub fn try_downgrade(&mut self, new_time: &T) -> Result<(), DowngradeError> {
155 if let Some(new_capability) = self.try_delayed(new_time) {
156 *self = new_capability;
157 Ok(())
158 } else {
159 Err(DowngradeError(()))
160 }
161 }
162}
163
164// Necessary for correctness. When a capability is dropped, the "internal" `ChangeBatch` needs to be
165// updated accordingly to inform the rest of the system that the operator has released its permit
166// to send data and request notification at the associated timestamp.
167impl<T: Timestamp> Drop for Capability<T> {
168 fn drop(&mut self) {
169 self.internal.borrow_mut().update(self.time.clone(), -1);
170 }
171}
172
173impl<T: Timestamp> Clone for Capability<T> {
174 fn clone(&self) -> Capability<T> {
175 Self::new(self.time.clone(), Rc::clone(&self.internal))
176 }
177}
178
179impl<T: Timestamp> Deref for Capability<T> {
180 type Target = T;
181
182 fn deref(&self) -> &T {
183 &self.time
184 }
185}
186
187impl<T: Timestamp> Debug for Capability<T> {
188 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
189 f.debug_struct("Capability")
190 .field("time", &self.time)
191 .field("internal", &"...")
192 .finish()
193 }
194}
195
196impl<T: Timestamp> PartialEq for Capability<T> {
197 fn eq(&self, other: &Self) -> bool {
198 self.time() == other.time() && Rc::ptr_eq(&self.internal, &other.internal)
199 }
200}
201impl<T: Timestamp> Eq for Capability<T> { }
202
203impl<T: Timestamp> PartialOrder for Capability<T> {
204 fn less_equal(&self, other: &Self) -> bool {
205 self.time().less_equal(other.time()) && Rc::ptr_eq(&self.internal, &other.internal)
206 }
207}
208
209impl<T: Timestamp+::std::hash::Hash> ::std::hash::Hash for Capability<T> {
210 fn hash<H: ::std::hash::Hasher>(&self, state: &mut H) {
211 self.time.hash(state);
212 }
213}
214
215/// An error produced when trying to downgrade a capability with a time
216/// that's not less than or equal to the current capability
217#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
218pub struct DowngradeError(());
219
220impl Display for DowngradeError {
221 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
222 f.write_str("could not downgrade the given capability")
223 }
224}
225
226impl Error for DowngradeError {}
227
228/// A shared list of shared output capability buffers.
229type CapabilityUpdates<T> = Rc<RefCell<Vec<Rc<RefCell<ChangeBatch<T>>>>>>;
230
231/// An capability of an input port.
232///
233/// Holding onto this capability will implicitly hold on to a capability for all the outputs
234/// ports this input is connected to, after the connection summaries have been applied.
235///
236/// This input capability supplies a `retain_for_output(self)` method which consumes the input
237/// capability and turns it into a [Capability] for a specific output port.
238pub struct InputCapability<T: Timestamp> {
239 /// Output capability buffers, for use in minting capabilities.
240 internal: CapabilityUpdates<T>,
241 /// Timestamp summaries for each output.
242 summaries: Rc<RefCell<PortConnectivity<T::Summary>>>,
243 /// A drop guard that updates the consumed capability this InputCapability refers to on drop
244 consumed_guard: ConsumedGuard<T>,
245}
246
247impl<T: Timestamp> CapabilityTrait<T> for InputCapability<T> {
248 fn time(&self) -> &T { self.time() }
249 fn valid_for_output(&self, query_buffer: &Rc<RefCell<ChangeBatch<T>>>, port: usize) -> bool {
250 let summaries_borrow = self.summaries.borrow();
251 let internal_borrow = self.internal.borrow();
252 // To be valid, the output buffer must match and the timestamp summary needs to be the default.
253 Rc::ptr_eq(&internal_borrow[port], query_buffer) &&
254 summaries_borrow.get(port).map_or(false, |path| path.elements() == [Default::default()])
255 }
256}
257
258impl<T: Timestamp> InputCapability<T> {
259 /// Creates a new capability reference at `time` while incrementing (and keeping a reference to)
260 /// the provided [`ChangeBatch`].
261 pub(crate) fn new(internal: CapabilityUpdates<T>, summaries: Rc<RefCell<PortConnectivity<T::Summary>>>, guard: ConsumedGuard<T>) -> Self {
262 InputCapability {
263 internal,
264 summaries,
265 consumed_guard: guard,
266 }
267 }
268
269 /// The timestamp associated with this capability.
270 #[inline]
271 pub fn time(&self) -> &T {
272 self.consumed_guard.time()
273 }
274
275 /// Delays capability for a specific output port.
276 ///
277 /// Makes a new capability for a timestamp `new_time` greater or equal to the timestamp of
278 /// the source capability (`self`).
279 ///
280 /// This method panics if `self.time` is not less or equal to `new_time`.
281 pub fn delayed(&self, new_time: &T, output_port: usize) -> Capability<T> {
282 use crate::progress::timestamp::PathSummary;
283 if let Some(path) = self.summaries.borrow().get(output_port) {
284 if path.iter().flat_map(|summary| summary.results_in(self.time())).any(|time| time.less_equal(new_time)) {
285 Capability::new(new_time.clone(), Rc::clone(&self.internal.borrow()[output_port]))
286 } else {
287 panic!("Attempted to delay to a time ({:?}) not greater or equal to the operators input-output summary ({:?}) applied to the capabilities time ({:?})", new_time, path, self.time());
288 }
289 }
290 else {
291 panic!("Attempted to delay a capability for a disconnected output");
292 }
293 }
294
295 /// Transforms to an owned capability for a specific output port.
296 ///
297 /// This method produces an owned capability which must be dropped to release the
298 /// capability. Users should take care that these capabilities are only stored for
299 /// as long as they are required, as failing to drop them may result in livelock.
300 ///
301 /// This method panics if the timestamp summary to `output_port` strictly advances the time.
302 #[inline]
303 pub fn retain(&self, output_port: usize) -> Capability<T> {
304 self.delayed(self.time(), output_port)
305 }
306}
307
308impl<T: Timestamp> Deref for InputCapability<T> {
309 type Target = T;
310
311 fn deref(&self) -> &T {
312 self.time()
313 }
314}
315
316impl<T: Timestamp> Debug for InputCapability<T> {
317 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
318 f.debug_struct("InputCapability")
319 .field("time", self.time())
320 .field("internal", &"...")
321 .finish()
322 }
323}
324
325/// Capability that activates on drop or downgrade.
326#[derive(Clone, Debug)]
327pub struct ActivateCapability<T: Timestamp> {
328 pub(crate) capability: Capability<T>,
329 pub(crate) address: Rc<[usize]>,
330 pub(crate) activations: Rc<RefCell<Activations>>,
331}
332
333impl<T: Timestamp> CapabilityTrait<T> for ActivateCapability<T> {
334 fn time(&self) -> &T { self.capability.time() }
335 fn valid_for_output(&self, query_buffer: &Rc<RefCell<ChangeBatch<T>>>, port: usize) -> bool {
336 self.capability.valid_for_output(query_buffer, port)
337 }
338}
339
340impl<T: Timestamp> ActivateCapability<T> {
341 /// Creates a new activating capability.
342 pub fn new(capability: Capability<T>, address: Rc<[usize]>, activations: Rc<RefCell<Activations>>) -> Self {
343 Self {
344 capability,
345 address,
346 activations,
347 }
348 }
349
350 /// The timestamp associated with this capability.
351 pub fn time(&self) -> &T {
352 self.capability.time()
353 }
354
355 /// Creates a new delayed capability.
356 pub fn delayed(&self, time: &T) -> Self {
357 ActivateCapability {
358 capability: self.capability.delayed(time),
359 address: Rc::clone(&self.address),
360 activations: Rc::clone(&self.activations),
361 }
362 }
363
364 /// Downgrades this capability.
365 pub fn downgrade(&mut self, time: &T) {
366 self.capability.downgrade(time);
367 self.activations.borrow_mut().activate(&self.address);
368 }
369}
370
371impl<T: Timestamp> Drop for ActivateCapability<T> {
372 fn drop(&mut self) {
373 self.activations.borrow_mut().activate(&self.address);
374 }
375}
376
377/// A set of capabilities, for possibly incomparable times.
378#[derive(Clone, Debug)]
379pub struct CapabilitySet<T: Timestamp> {
380 elements: Vec<Capability<T>>,
381}
382
383impl<T: Timestamp> CapabilitySet<T> {
384
385 /// Allocates an empty capability set.
386 pub fn new() -> Self {
387 Self { elements: Vec::new() }
388 }
389
390 /// Allocates an empty capability set with space for `capacity` elements
391 pub fn with_capacity(capacity: usize) -> Self {
392 Self { elements: Vec::with_capacity(capacity) }
393 }
394
395 /// Allocates a capability set containing a single capability.
396 ///
397 /// # Examples
398 /// ```
399 /// use std::collections::HashMap;
400 /// use timely::dataflow::{
401 /// operators::{ToStream, generic::Operator},
402 /// channels::pact::Pipeline,
403 /// };
404 /// use timely::dataflow::operators::CapabilitySet;
405 ///
406 /// timely::example(|scope| {
407 /// vec![()]
408 /// .into_iter()
409 /// .to_stream(scope)
410 /// .container::<Vec<_>>()
411 /// .unary_frontier(Pipeline, "example", |default_cap, _info| {
412 /// let mut cap = CapabilitySet::from_elem(default_cap);
413 /// move |(input, frontier), output| {
414 /// cap.downgrade(&frontier.frontier());
415 /// input.for_each_time(|time, data| {});
416 /// let a_cap = cap.first();
417 /// if let Some(a_cap) = a_cap.as_ref() {
418 /// output.session(a_cap).give(());
419 /// }
420 /// }
421 /// })
422 /// .container::<Vec<_>>();
423 /// });
424 /// ```
425 pub fn from_elem(cap: Capability<T>) -> Self {
426 Self { elements: vec![cap] }
427 }
428
429 /// Inserts `capability` into the set, discarding redundant capabilities.
430 pub fn insert(&mut self, capability: Capability<T>) {
431 if !self.elements.iter().any(|c| c.less_equal(&capability)) {
432 self.elements.retain(|c| !capability.less_equal(c));
433 self.elements.push(capability);
434 }
435 }
436
437 /// Creates a new capability to send data at `time`.
438 ///
439 /// This method panics if there does not exist a capability in `self.elements` less or equal to `time`.
440 pub fn delayed(&self, time: &T) -> Capability<T> {
441 /// Makes the panic branch cold & outlined to decrease code bloat & give
442 /// the inner function the best chance possible of being inlined with
443 /// minimal code bloat
444 #[cold]
445 #[inline(never)]
446 fn delayed_panic(invalid_time: &dyn Debug) -> ! {
447 // Formatting & panic machinery is relatively expensive in terms of code bloat, so
448 // we outline it
449 panic!(
450 "failed to create a delayed capability, the current set does not \
451 have an element less than or equal to {:?}",
452 invalid_time,
453 )
454 }
455
456 self.try_delayed(time)
457 .unwrap_or_else(|| delayed_panic(time))
458 }
459
460 /// Attempts to create a new capability to send data at `time`.
461 ///
462 /// Returns [`None`] if there does not exist a capability in `self.elements` less or equal to `time`.
463 pub fn try_delayed(&self, time: &T) -> Option<Capability<T>> {
464 self.elements
465 .iter()
466 .find(|capability| capability.time().less_equal(time))
467 .and_then(|capability| capability.try_delayed(time))
468 }
469
470 /// Downgrades the set of capabilities to correspond with the times in `frontier`.
471 ///
472 /// This method panics if any element of `frontier` is not greater or equal to some element of `self.elements`.
473 pub fn downgrade<B, F>(&mut self, frontier: F)
474 where
475 B: borrow::Borrow<T>,
476 F: IntoIterator<Item = B>,
477 {
478 /// Makes the panic branch cold & outlined to decrease code bloat & give
479 /// the inner function the best chance possible of being inlined with
480 /// minimal code bloat
481 #[cold]
482 #[inline(never)]
483 fn downgrade_panic() -> ! {
484 // Formatting & panic machinery is relatively expensive in terms of code bloat, so
485 // we outline it
486 panic!(
487 "Attempted to downgrade a CapabilitySet with a frontier containing an element \
488 that was not beyond an element within the set"
489 )
490 }
491
492 self.try_downgrade(frontier)
493 .unwrap_or_else(|_| downgrade_panic())
494 }
495
496 /// Attempts to downgrade the set of capabilities to correspond with the times in `frontier`.
497 ///
498 /// Returns [`None`] if any element of `frontier` is not greater or equal to some element of `self.elements`.
499 ///
500 /// **Warning**: If an error is returned the capability set may be in an inconsistent state and can easily
501 /// cause logic errors within the program if not properly handled.
502 ///
503 pub fn try_downgrade<B, F>(&mut self, frontier: F) -> Result<(), DowngradeError>
504 where
505 B: borrow::Borrow<T>,
506 F: IntoIterator<Item = B>,
507 {
508 let count = self.elements.len();
509 for time in frontier.into_iter() {
510 let capability = self.try_delayed(time.borrow()).ok_or(DowngradeError(()))?;
511 self.elements.push(capability);
512 }
513 self.elements.drain(..count);
514
515 Ok(())
516 }
517}
518
519impl<T> From<Vec<Capability<T>>> for CapabilitySet<T>
520where
521 T: Timestamp,
522{
523 fn from(capabilities: Vec<Capability<T>>) -> Self {
524 let mut this = Self::with_capacity(capabilities.len());
525 for capability in capabilities {
526 this.insert(capability);
527 }
528
529 this
530 }
531}
532
533impl<T: Timestamp> Default for CapabilitySet<T> {
534 fn default() -> Self {
535 Self::new()
536 }
537}
538
539impl<T: Timestamp> Deref for CapabilitySet<T> {
540 type Target=[Capability<T>];
541
542 fn deref(&self) -> &[Capability<T>] {
543 &self.elements
544 }
545}