timely/
lib.rs

1//! Timely dataflow is a framework for managing and executing data-parallel dataflow computations.
2//!
3//! The code is organized in crates and modules that are meant to depend as little as possible on each other.
4//!
5//! **Serialization**: Timely uses the `bincode` crate for serialization. Performance could be improved.
6//!
7//! **Communication**: The [`timely_communication`] crate defines several primitives for
8//! communicating between dataflow workers, and across machine boundaries.
9//!
10//! **Progress tracking**: The [`timely::progress`](progress) module defines core dataflow structures for
11//! tracking and reporting progress in a timely dataflow system, namely the number of outstanding
12//! dataflow messages and un-exercised message capabilities throughout the timely dataflow graph.
13//! It depends on `timely_communication` to exchange progress messages.
14//!
15//! **Dataflow construction**: The [`timely::dataflow`](dataflow) module defines an example dataflow system
16//! using `communication` and `progress` to both exchange data and progress information, in support
17//! of an actual data-parallel timely dataflow computation. It depends on `timely_communication` to
18//! move data, and `timely::progress` to provide correct operator notifications.
19//!
20//! # Examples
21//!
22//! The following is a hello-world dataflow program.
23//!
24//! ```
25//! use timely::*;
26//! use timely::dataflow::operators::{Input, Inspect};
27//!
28//! // construct and execute a timely dataflow
29//! timely::execute_from_args(std::env::args(), |worker| {
30//!
31//!     // add an input and base computation off of it
32//!     let mut input = worker.dataflow(|scope| {
33//!         let (input, stream) = scope.new_input();
34//!         stream.inspect(|x| println!("hello {:?}", x));
35//!         input
36//!     });
37//!
38//!     // introduce input, advance computation
39//!     for round in 0..10 {
40//!         input.send(round);
41//!         input.advance_to(round + 1);
42//!         worker.step();
43//!     }
44//! });
45//! ```
46//!
47//! The program uses `timely::execute_from_args` to spin up a computation based on command line arguments
48//! and a closure specifying what each worker should do, in terms of a handle to a timely dataflow
49//! `Scope` (in this case, `root`). A `Scope` allows you to define inputs, feedback
50//! cycles, and dataflow subgraphs, as part of building the dataflow graph of your dreams.
51//!
52//! In this example, we define a new scope of root using `scoped`, add an exogenous
53//! input using `new_input`, and add a dataflow `inspect` operator to print each observed record.
54//! We then introduce input at increasing rounds, indicate the advance to the system (promising
55//! that we will introduce no more input at prior rounds), and step the computation.
56
57#![forbid(missing_docs)]
58
59pub use execute::{execute, execute_directly, example};
60#[cfg(feature = "getopts")]
61pub use execute::execute_from_args;
62pub use order::PartialOrder;
63
64pub use timely_communication::Config as CommunicationConfig;
65pub use worker::Config as WorkerConfig;
66pub use execute::Config as Config;
67
68pub use timely_container::Accountable;
69/// Re-export of the `timely_container` crate.
70pub mod container {
71    pub use timely_container::*;
72}
73
74/// Re-export of the `timely_communication` crate.
75pub mod communication {
76    pub use timely_communication::*;
77}
78
79/// Re-export of the `timely_bytes` crate.
80pub mod bytes {
81    pub use timely_bytes::*;
82}
83
84/// Re-export of the `timely_logging` crate.
85pub mod logging_core {
86    pub use timely_logging::*;
87}
88
89pub mod worker;
90pub mod progress;
91pub mod dataflow;
92pub mod synchronization;
93pub mod execute;
94pub mod order;
95
96pub mod logging;
97// pub mod log_events;
98
99pub mod scheduling;
100
101/// A composite trait for types usable as data in timely dataflow.
102///
103/// The `Data` trait is necessary for all types that go along timely dataflow channels.
104pub trait Data: Clone+'static { }
105impl<T: Clone+'static> Data for T { }
106
107/// A composite trait for types usable as containers in timely dataflow.
108///
109/// The `Container` trait is necessary for all containers in timely dataflow channels.
110pub trait Container: Accountable + Default + Clone + 'static { }
111impl<C: Accountable + Default + Clone + 'static> Container for C { }
112
113/// A composite trait for types usable as container builders in timely dataflow.
114pub trait ContainerBuilder: timely_container::ContainerBuilder<Container: Container> + Default + 'static {}
115impl<CB: timely_container::ContainerBuilder<Container: Container> + Default + 'static> ContainerBuilder for CB {}
116
117/// A composite trait for types usable on exchange channels in timely dataflow.
118///
119/// The `ExchangeData` trait extends `Data` with any requirements imposed by the `timely_communication`
120/// `Data` trait, which describes requirements for communication along channels.
121pub trait ExchangeData: Data + encoding::Data { }
122impl<T: Data + encoding::Data> ExchangeData for T { }
123
124#[doc = include_str!("../../README.md")]
125#[cfg(doctest)]
126pub struct ReadmeDoctests;
127
128/// A wrapper that indicates a serialization/deserialization strategy.
129pub use encoding::Bincode;
130
131mod encoding {
132
133    use std::any::Any;
134    use serde::{Serialize, Deserialize};
135    use timely_bytes::arc::Bytes;
136    use timely_communication::Bytesable;
137
138    /// A composite trait for types that may be used with channels.
139    pub trait Data : Send+Any+Serialize+for<'a>Deserialize<'a> { }
140    impl<T: Send+Any+Serialize+for<'a>Deserialize<'a>> Data for T { }
141
142    /// A wrapper that indicates `bincode` as the serialization/deserialization strategy.
143    #[derive(Clone)]
144    pub struct Bincode<T> {
145        /// Bincode contents.
146        pub payload: T,
147    }
148
149    impl<T> From<T> for Bincode<T> {
150        fn from(payload: T) -> Self {
151            Self { payload }
152        }
153    }
154
155    // We will pad out anything we write to make the result `u64` aligned.
156    impl<T: Data> Bytesable for Bincode<T> {
157        fn from_bytes(bytes: Bytes) -> Self {
158            let typed = ::bincode::deserialize(&bytes[..]).expect("bincode::deserialize() failed");
159            let typed_size = ::bincode::serialized_size(&typed).expect("bincode::serialized_size() failed") as usize;
160            assert_eq!(bytes.len(), (typed_size + 7) & !7);
161            Bincode { payload: typed }
162        }
163
164        fn length_in_bytes(&self) -> usize {
165            let typed_size = ::bincode::serialized_size(&self.payload).expect("bincode::serialized_size() failed") as usize;
166            (typed_size + 7) & !7
167        }
168
169        fn into_bytes<W: ::std::io::Write>(&self, mut writer: &mut W) {
170            let typed_size = ::bincode::serialized_size(&self.payload).expect("bincode::serialized_size() failed") as usize;
171            let typed_slop = ((typed_size + 7) & !7) - typed_size;
172            ::bincode::serialize_into(&mut writer, &self.payload).expect("bincode::serialize_into() failed");
173            writer.write_all(&[0u8; 8][..typed_slop]).unwrap();
174        }
175    }
176
177    impl<T> ::std::ops::Deref for Bincode<T> {
178        type Target = T;
179        fn deref(&self) -> &Self::Target {
180            &self.payload
181        }
182    }
183    impl<T> ::std::ops::DerefMut for Bincode<T> {
184        fn deref_mut(&mut self) -> &mut Self::Target {
185            &mut self.payload
186        }
187    }
188}