timely/dataflow/operators/core/capture/mod.rs
1//! Operators to capture and replay timely dataflow streams.
2//!
3//! The `capture_into` and `replay_into` operators respectively capture what a unary operator
4//! sees as input (both data and progress information), and play this information back as a new
5//! input.
6//!
7//! The `capture_into` method requires a `P: EventPusher<T, D>`, which is some type accepting
8//! `Event<T, D>` inputs. This module provides several examples, including the linked list
9//! `EventLink<T, D>`, and the binary `EventWriter<T, D, W>` wrapping any `W: Write`.
10//!
11//! Streams are captured at the worker granularity, and one can replay an arbitrary subset of
12//! the captured streams on any number of workers (fewer, more, or as many as were captured).
13//! There is a protocol the captured stream uses, and implementors of new event streams should
14//! make sure to understand this (and complain if it is not clear).
15//!
16//! # Examples
17//!
18//! The type `Rc<EventLink<T,D>>` implements a typed linked list,
19//! and can be captured into and replayed from.
20//!
21//! ```rust
22//! use std::rc::Rc;
23//! use timely::dataflow::Scope;
24//! use timely::dataflow::operators::{Capture, ToStream, Inspect};
25//! use timely::dataflow::operators::capture::{EventLink, Replay};
26//!
27//! # #[cfg(miri)] fn main() {}
28//! # #[cfg(not(miri))]
29//! # fn main() {
30//! timely::execute(timely::Config::thread(), |worker| {
31//! let handle1 = Rc::new(EventLink::new());
32//! let handle2 = Some(handle1.clone());
33//!
34//! worker.dataflow::<u64,_,_>(|scope1|
35//! (0..10).to_stream(scope1)
36//! .container::<Vec<_>>()
37//! .capture_into(handle1)
38//! );
39//!
40//! worker.dataflow(|scope2| {
41//! handle2.replay_into(scope2)
42//! .inspect(|x| println!("replayed: {:?}", x));
43//! })
44//! }).unwrap();
45//! # }
46//! ```
47//!
48//! The types `EventWriter<T, D, W>` and `EventReader<T, D, R>` can be
49//! captured into and replayed from, respectively. The use binary writers
50//! and readers respectively, and can be backed by files, network sockets,
51//! etc.
52//!
53//! ```
54//! use std::rc::Rc;
55//! use std::net::{TcpListener, TcpStream};
56//! use timely::dataflow::Scope;
57//! use timely::dataflow::operators::{Capture, ToStream, Inspect};
58//! use timely::dataflow::operators::capture::{EventReader, EventWriter, Replay};
59//!
60//! # #[cfg(miri)] fn main() {}
61//! # #[cfg(not(miri))]
62//! # fn main() {
63//! timely::execute(timely::Config::thread(), |worker| {
64//! let list = TcpListener::bind("127.0.0.1:8000").unwrap();
65//! let send = TcpStream::connect("127.0.0.1:8000").unwrap();
66//! let recv = list.incoming().next().unwrap().unwrap();
67//!
68//! recv.set_nonblocking(true).unwrap();
69//!
70//! worker.dataflow::<u64,_,_>(|scope1|
71//! (0..10u64)
72//! .to_stream(scope1)
73//! .container::<Vec<_>>()
74//! .capture_into(EventWriter::new(send))
75//! );
76//!
77//! worker.dataflow::<u64,_,_>(|scope2| {
78//! Some(EventReader::<_,Vec<u64>,_>::new(recv))
79//! .replay_into(scope2)
80//! .inspect(|x| println!("replayed: {:?}", x));
81//! })
82//! }).unwrap();
83//! # }
84//! ```
85
86pub use self::capture::Capture;
87pub use self::replay::Replay;
88pub use self::extract::Extract;
89pub use self::event::{Event, EventPusher};
90pub use self::event::link::EventLink;
91pub use self::event::binary::EventReader;
92pub use self::event::binary::EventWriter;
93
94pub mod capture;
95pub mod replay;
96pub mod extract;
97pub mod event;