pub trait Capture<T: Timestamp, D: Container> {
// Required method
fn capture_into<P: EventPusherCore<T, D> + 'static>(&self, pusher: P);
// Provided method
fn capture(&self) -> Receiver<EventCore<T, D>> { ... }
}
Expand description
Capture a stream of timestamped data for later replay.
Required Methods§
sourcefn capture_into<P: EventPusherCore<T, D> + 'static>(&self, pusher: P)
fn capture_into<P: EventPusherCore<T, D> + 'static>(&self, pusher: P)
Captures a stream of timestamped data for later replay.
Examples
The type Rc<EventLink<T,D>>
implements a typed linked list,
and can be captured into and replayed from.
use std::rc::Rc;
use std::sync::{Arc, Mutex};
use timely::dataflow::Scope;
use timely::dataflow::operators::{Capture, ToStream, Inspect};
use timely::dataflow::operators::capture::{EventLinkCore, Replay, Extract};
// get send and recv endpoints, wrap send to share
let (send, recv) = ::std::sync::mpsc::channel();
let send = Arc::new(Mutex::new(send));
timely::execute(timely::Config::thread(), move |worker| {
// this is only to validate the output.
let send = send.lock().unwrap().clone();
// these are to capture/replay the stream.
let handle1 = Rc::new(EventLinkCore::new());
let handle2 = Some(handle1.clone());
worker.dataflow::<u64,_,_>(|scope1|
(0..10).to_stream(scope1)
.capture_into(handle1)
);
worker.dataflow(|scope2| {
handle2.replay_into(scope2)
.capture_into(send)
});
}).unwrap();
assert_eq!(recv.extract()[0].1, (0..10).collect::<Vec<_>>());
The types EventWriter<T, D, W>
and EventReader<T, D, R>
can be
captured into and replayed from, respectively. They use binary writers
and readers respectively, and can be backed by files, network sockets,
etc.
use std::rc::Rc;
use std::net::{TcpListener, TcpStream};
use std::sync::{Arc, Mutex};
use timely::dataflow::Scope;
use timely::dataflow::operators::{Capture, ToStream, Inspect};
use timely::dataflow::operators::capture::{EventReader, EventWriter, Replay, Extract};
// get send and recv endpoints, wrap send to share
let (send0, recv0) = ::std::sync::mpsc::channel();
let send0 = Arc::new(Mutex::new(send0));
timely::execute(timely::Config::thread(), move |worker| {
// this is only to validate the output.
let send0 = send0.lock().unwrap().clone();
// these allow us to capture / replay a timely stream.
let list = TcpListener::bind("127.0.0.1:8000").unwrap();
let send = TcpStream::connect("127.0.0.1:8000").unwrap();
let recv = list.incoming().next().unwrap().unwrap();
recv.set_nonblocking(true).unwrap();
worker.dataflow::<u64,_,_>(|scope1|
(0..10u64)
.to_stream(scope1)
.capture_into(EventWriter::new(send))
);
worker.dataflow::<u64,_,_>(|scope2| {
Some(EventReader::<_,u64,_>::new(recv))
.replay_into(scope2)
.capture_into(send0)
});
}).unwrap();
assert_eq!(recv0.extract()[0].1, (0..10).collect::<Vec<_>>());