1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
//! Timely dataflow is a framework for managing and executing data-parallel dataflow computations.
//!
//! The code is organized in crates and modules that are meant to depend as little as possible on each other.
//!
//! **Serialization**: Timely uses the `bincode` crate for serialization. Performance could be improved.
//!
//! **Communication**: The [`timely_communication`] crate defines several primitives for
//! communicating between dataflow workers, and across machine boundaries.
//!
//! **Progress tracking**: The [`timely::progress`](progress) module defines core dataflow structures for
//! tracking and reporting progress in a timely dataflow system, namely the number of outstanding
//! dataflow messages and un-exercised message capabilities throughout the timely dataflow graph.
//! It depends on `timely_communication` to exchange progress messages.
//!
//! **Dataflow construction**: The [`timely::dataflow`](dataflow) module defines an example dataflow system
//! using `communication` and `progress` to both exchange data and progress information, in support
//! of an actual data-parallel timely dataflow computation. It depends on `timely_communication` to
//! move data, and `timely::progress` to provide correct operator notifications.
//!
//! # Examples
//!
//! The following is a hello-world dataflow program.
//!
//! ```
//! use timely::*;
//! use timely::dataflow::operators::{Input, Inspect};
//!
//! // construct and execute a timely dataflow
//! timely::execute_from_args(std::env::args(), |worker| {
//!
//!     // add an input and base computation off of it
//!     let mut input = worker.dataflow(|scope| {
//!         let (input, stream) = scope.new_input();
//!         stream.inspect(|x| println!("hello {:?}", x));
//!         input
//!     });
//!
//!     // introduce input, advance computation
//!     for round in 0..10 {
//!         input.send(round);
//!         input.advance_to(round + 1);
//!         worker.step();
//!     }
//! });
//! ```
//!
//! The program uses `timely::execute_from_args` to spin up a computation based on command line arguments
//! and a closure specifying what each worker should do, in terms of a handle to a timely dataflow
//! `Scope` (in this case, `root`). A `Scope` allows you to define inputs, feedback
//! cycles, and dataflow subgraphs, as part of building the dataflow graph of your dreams.
//!
//! In this example, we define a new scope of root using `scoped`, add an exogenous
//! input using `new_input`, and add a dataflow `inspect` operator to print each observed record.
//! We then introduce input at increasing rounds, indicate the advance to the system (promising
//! that we will introduce no more input at prior rounds), and step the computation.

#![forbid(missing_docs)]

pub use execute::{execute, execute_directly, example};
#[cfg(feature = "getopts")]
pub use execute::execute_from_args;
pub use order::PartialOrder;

pub use timely_communication::Config as CommunicationConfig;
pub use worker::Config as WorkerConfig;
pub use execute::Config as Config;

pub use timely_container::Container;
/// Re-export of the `timely_container` crate.
pub mod container {
    pub use timely_container::*;
}

/// Re-export of the `timely_communication` crate.
pub mod communication {
    pub use timely_communication::*;
}

/// Re-export of the `timely_bytes` crate.
pub mod bytes {
    pub use timely_bytes::*;
}

/// Re-export of the `timely_logging` crate.
pub mod logging_core {
    pub use timely_logging::*;
}

pub mod worker;
pub mod progress;
pub mod dataflow;
pub mod synchronization;
pub mod execute;
pub mod order;

pub mod logging;
// pub mod log_events;

pub mod scheduling;

/// A composite trait for types usable as data in timely dataflow.
///
/// The `Data` trait is necessary for all types that go along timely dataflow channels.
pub trait Data: Clone+'static { }
impl<T: Clone+'static> Data for T { }

/// A composite trait for types usable on exchange channels in timely dataflow.
///
/// The `ExchangeData` trait extends `Data` with any requirements imposed by the `timely_communication`
/// `Data` trait, which describes requirements for communication along channels.
pub trait ExchangeData: Data + encoding::Data { }
impl<T: Data + encoding::Data> ExchangeData for T { }

#[doc = include_str!("../../README.md")]
#[cfg(doctest)]
pub struct ReadmeDoctests;

/// A wrapper that indicates a serialization/deserialization strategy.
pub use encoding::Bincode;

mod encoding {

    use std::any::Any;
    use serde::{Serialize, Deserialize};
    use timely_bytes::arc::Bytes;
    use timely_communication::Bytesable;

    /// A composite trait for types that may be used with channels.
    pub trait Data : Send+Any+Serialize+for<'a>Deserialize<'a> { }
    impl<T: Send+Any+Serialize+for<'a>Deserialize<'a>> Data for T { }

    /// A wrapper that indicates `bincode` as the serialization/deserialization strategy.
    pub struct Bincode<T> {
        /// Bincode contents.
        pub payload: T,
    }

    impl<T> From<T> for Bincode<T> {
        fn from(payload: T) -> Self {
            Self { payload }
        }
    }

    impl<T: Data> Bytesable for Bincode<T> {
        fn from_bytes(bytes: Bytes) -> Self {
            let typed = ::bincode::deserialize(&bytes[..]).expect("bincode::deserialize() failed");
            Bincode { payload: typed }
        }

        fn length_in_bytes(&self) -> usize {
            ::bincode::serialized_size(&self.payload).expect("bincode::serialized_size() failed") as usize
        }

        fn into_bytes<W: ::std::io::Write>(&self, writer: &mut W) {
            ::bincode::serialize_into(writer, &self.payload).expect("bincode::serialize_into() failed");
        }
    }

    impl<T> ::std::ops::Deref for Bincode<T> {
        type Target = T;
        fn deref(&self) -> &Self::Target {
            &self.payload
        }
    }
    impl<T> ::std::ops::DerefMut for Bincode<T> {
        fn deref_mut(&mut self) -> &mut Self::Target {
            &mut self.payload
        }
    }
}