Trait Capture

Source
pub trait Capture<T: Timestamp, C: Container + Data> {
    // Required method
    fn capture_into<P: EventPusher<T, C> + 'static>(&self, pusher: P);

    // Provided method
    fn capture(&self) -> Receiver<Event<T, C>> { ... }
}
Expand description

Capture a stream of timestamped data for later replay.

Required Methods§

Source

fn capture_into<P: EventPusher<T, C> + 'static>(&self, pusher: P)

Captures a stream of timestamped data for later replay.

§Examples

The type Rc<EventLink<T,D>> implements a typed linked list, and can be captured into and replayed from.

use std::rc::Rc;
use std::sync::{Arc, Mutex};
use timely::dataflow::Scope;
use timely::dataflow::operators::{Capture, ToStream, Inspect};
use timely::dataflow::operators::capture::{EventLink, Replay, Extract};

// get send and recv endpoints, wrap send to share
let (send, recv) = ::std::sync::mpsc::channel();
let send = Arc::new(Mutex::new(send));

timely::execute(timely::Config::thread(), move |worker| {

    // this is only to validate the output.
    let send = send.lock().unwrap().clone();

    // these are to capture/replay the stream.
    let handle1 = Rc::new(EventLink::new());
    let handle2 = Some(handle1.clone());

    worker.dataflow::<u64,_,_>(|scope1|
        (0..10).to_stream(scope1)
               .capture_into(handle1)
    );

    worker.dataflow(|scope2| {
        handle2.replay_into(scope2)
               .capture_into(send)
    });
}).unwrap();

assert_eq!(recv.extract()[0].1, (0..10).collect::<Vec<_>>());

The types EventWriter<T, D, W> and EventReader<T, D, R> can be captured into and replayed from, respectively. They use binary writers and readers respectively, and can be backed by files, network sockets, etc.

use std::rc::Rc;
use std::net::{TcpListener, TcpStream};
use std::sync::{Arc, Mutex};
use timely::dataflow::Scope;
use timely::dataflow::operators::{Capture, ToStream, Inspect};
use timely::dataflow::operators::capture::{EventReader, EventWriter, Replay, Extract};

// get send and recv endpoints, wrap send to share
let (send0, recv0) = ::std::sync::mpsc::channel();
let send0 = Arc::new(Mutex::new(send0));

timely::execute(timely::Config::thread(), move |worker| {

    // this is only to validate the output.
    let send0 = send0.lock().unwrap().clone();

    // these allow us to capture / replay a timely stream.
    let list = TcpListener::bind("127.0.0.1:8000").unwrap();
    let send = TcpStream::connect("127.0.0.1:8000").unwrap();
    let recv = list.incoming().next().unwrap().unwrap();

    recv.set_nonblocking(true).unwrap();

    worker.dataflow::<u64,_,_>(|scope1|
        (0..10u64)
            .to_stream(scope1)
            .capture_into(EventWriter::new(send))
    );

    worker.dataflow::<u64,_,_>(|scope2| {
        Some(EventReader::<_,Vec<u64>,_>::new(recv))
            .replay_into(scope2)
            .capture_into(send0)
    });
}).unwrap();

assert_eq!(recv0.extract()[0].1, (0..10).collect::<Vec<_>>());

Provided Methods§

Source

fn capture(&self) -> Receiver<Event<T, C>>

Captures a stream using Rust’s MPSC channels.

Dyn Compatibility§

This trait is not dyn compatible.

In older versions of Rust, dyn compatibility was called "object safety", so this trait is not object safe.

Implementors§

Source§

impl<S: Scope, C: Container + Data> Capture<<S as ScopeParent>::Timestamp, C> for StreamCore<S, C>