timely/dataflow/operators/core/capture/mod.rs
1//! Operators to capture and replay timely dataflow streams.
2//!
3//! The `capture_into` and `replay_into` operators respectively capture what a unary operator
4//! sees as input (both data and progress information), and play this information back as a new
5//! input.
6//!
7//! The `capture_into` method requires a `P: EventPusher<T, D>`, which is some type accepting
8//! `Event<T, D>` inputs. This module provides several examples, including the linked list
9//! `EventLink<T, D>`, and the binary `EventWriter<T, D, W>` wrapping any `W: Write`.
10//!
11//! Streams are captured at the worker granularity, and one can replay an arbitrary subset of
12//! the captured streams on any number of workers (fewer, more, or as many as were captured).
13//! There is a protocol the captured stream uses, and implementors of new event streams should
14//! make sure to understand this (and complain if it is not clear).
15//!
16//! # Examples
17//!
18//! The type `Rc<EventLink<T,D>>` implements a typed linked list,
19//! and can be captured into and replayed from.
20//!
21//! ```rust
22//! use std::rc::Rc;
23//! use timely::dataflow::Scope;
24//! use timely::dataflow::operators::{Capture, ToStream, Inspect};
25//! use timely::dataflow::operators::capture::{EventLink, Replay};
26//!
27//! # #[cfg(miri)] fn main() {}
28//! # #[cfg(not(miri))]
29//! # fn main() {
30//! timely::execute(timely::Config::thread(), |worker| {
31//! let handle1 = Rc::new(EventLink::new());
32//! let handle2 = Some(handle1.clone());
33//!
34//! worker.dataflow::<u64,_,_>(|scope1|
35//! (0..10).to_stream(scope1)
36//! .capture_into(handle1)
37//! );
38//!
39//! worker.dataflow(|scope2| {
40//! handle2.replay_into(scope2)
41//! .inspect(|x| println!("replayed: {:?}", x));
42//! })
43//! }).unwrap();
44//! # }
45//! ```
46//!
47//! The types `EventWriter<T, D, W>` and `EventReader<T, D, R>` can be
48//! captured into and replayed from, respectively. The use binary writers
49//! and readers respectively, and can be backed by files, network sockets,
50//! etc.
51//!
52//! ```
53//! use std::rc::Rc;
54//! use std::net::{TcpListener, TcpStream};
55//! use timely::dataflow::Scope;
56//! use timely::dataflow::operators::{Capture, ToStream, Inspect};
57//! use timely::dataflow::operators::capture::{EventReader, EventWriter, Replay};
58//!
59//! # #[cfg(miri)] fn main() {}
60//! # #[cfg(not(miri))]
61//! # fn main() {
62//! timely::execute(timely::Config::thread(), |worker| {
63//! let list = TcpListener::bind("127.0.0.1:8000").unwrap();
64//! let send = TcpStream::connect("127.0.0.1:8000").unwrap();
65//! let recv = list.incoming().next().unwrap().unwrap();
66//!
67//! recv.set_nonblocking(true).unwrap();
68//!
69//! worker.dataflow::<u64,_,_>(|scope1|
70//! (0..10u64)
71//! .to_stream(scope1)
72//! .capture_into(EventWriter::new(send))
73//! );
74//!
75//! worker.dataflow::<u64,_,_>(|scope2| {
76//! Some(EventReader::<_,Vec<u64>,_>::new(recv))
77//! .replay_into(scope2)
78//! .inspect(|x| println!("replayed: {:?}", x));
79//! })
80//! }).unwrap();
81//! # }
82//! ```
83
84pub use self::capture::Capture;
85pub use self::replay::Replay;
86pub use self::extract::Extract;
87pub use self::event::{Event, EventPusher};
88pub use self::event::link::EventLink;
89pub use self::event::binary::EventReader;
90pub use self::event::binary::EventWriter;
91
92pub mod capture;
93pub mod replay;
94pub mod extract;
95pub mod event;