differential_dataflow::operators::arrange::agent

Struct TraceAgent

Source
pub struct TraceAgent<Tr>
where Tr: TraceReader,
{ /* private fields */ }
Expand description

A TraceReader wrapper which can be imported into other dataflows.

The TraceAgent is the default trace type produced by arranged, and it can be extracted from the dataflow in which it was defined, and imported into other dataflows.

Implementations§

Source§

impl<Tr: TraceReader> TraceAgent<Tr>

Source

pub fn new( trace: Tr, operator: OperatorInfo, logging: Option<Logger>, ) -> (Self, TraceWriter<Tr>)
where Tr: Trace, Tr::Batch: Batch,

Creates a new agent from a trace reader.

Source

pub fn new_listener( &mut self, activator: Activator, ) -> Rc<(Activator, RefCell<VecDeque<TraceReplayInstruction<Tr>>>)>

Attaches a new shared queue to the trace.

The queue is first populated with existing batches from the trace, The queue will be immediately populated with existing historical batches from the trace, and until the reference is dropped the queue will receive new batches as produced by the source arrange operator.

Source

pub fn operator(&self) -> &OperatorInfo

The OperatorInfo of the underlying Timely operator

Source

pub fn trace_box_unstable(&self) -> Rc<RefCell<TraceBox<Tr>>>

Obtain a reference to the inner TraceBox. It is the caller’s obligation to maintain the trace box and this trace agent’s invariants. Specifically, it is undefined behavior to mutate the trace box. Keeping strong references can prevent resource reclamation.

This method is subject to changes and removal and should not be considered part of a stable interface.

Source§

impl<Tr> TraceAgent<Tr>
where Tr: TraceReader + 'static,

Source

pub fn import<G>(&mut self, scope: &G) -> Arranged<G, TraceAgent<Tr>>
where G: Scope<Timestamp = Tr::Time>,

Copies an existing collection into the supplied scope.

This method creates an Arranged collection that should appear indistinguishable from applying arrange directly to the source collection brought into the local scope. The only caveat is that the initial state of the collection is its current state, and updates occur from this point forward. The historical changes the collection experienced in the past are accumulated, and the distinctions from the initial collection are no longer evident.

The current behavior is that the introduced collection accumulates updates to some times less or equal to self.get_logical_compaction(). There is not currently a guarantee that the updates are accumulated to the frontier, and the resulting collection history may be weirdly partial until this point. In particular, the historical collection may move through configurations that did not actually occur, even if eventually arriving at the correct collection. This is probably a bug; although we get to the right place in the end, the intermediate computation could do something that the original computation did not, like diverge.

I would expect the semantics to improve to “updates are advanced to self.get_logical_compaction()”, which means the computation will run as if starting from exactly this frontier. It is not currently clear whose responsibility this should be (the trace/batch should only reveal these times, or an operator should know to advance times before using them).

§Examples
use timely::Config;
use differential_dataflow::input::Input;
use differential_dataflow::operators::arrange::ArrangeBySelf;
use differential_dataflow::operators::reduce::Reduce;
use differential_dataflow::trace::Trace;

::timely::execute(Config::thread(), |worker| {

    // create a first dataflow
    let mut trace = worker.dataflow::<u32,_,_>(|scope| {
        // create input handle and collection.
        scope.new_collection_from(0 .. 10).1
             .arrange_by_self()
             .trace
    });

    // do some work.
    worker.step();
    worker.step();

    // create a second dataflow
    worker.dataflow(move |scope| {
        trace.import(scope)
             .reduce(move |_key, src, dst| dst.push((*src[0].0, 1)));
    });

}).unwrap();
Source

pub fn import_named<G>( &mut self, scope: &G, name: &str, ) -> Arranged<G, TraceAgent<Tr>>
where G: Scope<Timestamp = Tr::Time>,

Same as import, but allows to name the source.

Source

pub fn import_core<G>( &mut self, scope: &G, name: &str, ) -> (Arranged<G, TraceAgent<Tr>>, ShutdownButton<CapabilitySet<Tr::Time>>)
where G: Scope<Timestamp = Tr::Time>,

Imports an arrangement into the supplied scope.

§Examples
use timely::Config;
use timely::dataflow::ProbeHandle;
use timely::dataflow::operators::Probe;
use differential_dataflow::input::InputSession;
use differential_dataflow::operators::arrange::ArrangeBySelf;
use differential_dataflow::operators::reduce::Reduce;
use differential_dataflow::trace::Trace;

::timely::execute(Config::thread(), |worker| {

    let mut input = InputSession::<_,(),isize>::new();
    let mut probe = ProbeHandle::new();

    // create a first dataflow
    let mut trace = worker.dataflow::<u32,_,_>(|scope| {
        // create input handle and collection.
        input.to_collection(scope)
             .arrange_by_self()
             .trace
    });

    // do some work.
    worker.step();
    worker.step();

    // create a second dataflow
    let mut shutdown = worker.dataflow(|scope| {
        let (arrange, button) = trace.import_core(scope, "Import");
        arrange.stream.probe_with(&mut probe);
        button
    });

    worker.step();
    worker.step();
    assert!(!probe.done());

    shutdown.press();

    worker.step();
    worker.step();
    assert!(probe.done());

}).unwrap();
Source

pub fn import_frontier<G>( &mut self, scope: &G, name: &str, ) -> (Arranged<G, TraceFrontier<TraceAgent<Tr>>>, ShutdownButton<CapabilitySet<Tr::Time>>)
where G: Scope<Timestamp = Tr::Time>, Tr: TraceReader,

Imports an arrangement into the supplied scope.

This variant of import uses the get_logical_compaction to forcibly advance timestamps in updates.

§Examples
use timely::Config;
use timely::progress::frontier::AntichainRef;
use timely::dataflow::ProbeHandle;
use timely::dataflow::operators::Probe;
use timely::dataflow::operators::Inspect;
use differential_dataflow::input::InputSession;
use differential_dataflow::operators::arrange::ArrangeBySelf;
use differential_dataflow::operators::reduce::Reduce;
use differential_dataflow::trace::Trace;
use differential_dataflow::trace::TraceReader;
use differential_dataflow::input::Input;

::timely::execute(Config::thread(), |worker| {

    let mut probe = ProbeHandle::new();

    // create a first dataflow
    let (mut handle, mut trace) = worker.dataflow::<u32,_,_>(|scope| {
        // create input handle and collection.
        let (handle, stream) = scope.new_collection();
        let trace = stream.arrange_by_self().trace;
        (handle, trace)
    });

    handle.insert(0); handle.advance_to(1); handle.flush(); worker.step();
    handle.remove(0); handle.advance_to(2); handle.flush(); worker.step();
    handle.insert(1); handle.advance_to(3); handle.flush(); worker.step();
    handle.remove(1); handle.advance_to(4); handle.flush(); worker.step();
    handle.insert(0); handle.advance_to(5); handle.flush(); worker.step();

    trace.set_logical_compaction(AntichainRef::new(&[5]));

    // create a second dataflow
    let mut shutdown = worker.dataflow(|scope| {
        let (arrange, button) = trace.import_frontier(scope, "Import");
        arrange
            .as_collection(|k,v| (*k,*v))
            .inner
            .inspect(|(d,t,r)| {
                assert!(t >= &5);
            })
            .probe_with(&mut probe);

        button
    });

    worker.step();
    worker.step();
    assert!(!probe.done());

    shutdown.press();

    worker.step();
    worker.step();
    assert!(probe.done());

}).unwrap();
Source

pub fn import_frontier_core<G>( &mut self, scope: &G, name: &str, since: Antichain<Tr::Time>, until: Antichain<Tr::Time>, ) -> (Arranged<G, TraceFrontier<TraceAgent<Tr>>>, ShutdownButton<CapabilitySet<Tr::Time>>)
where G: Scope<Timestamp = Tr::Time>, Tr: TraceReader,

Import a trace restricted to a specific time interval [since, until).

All updates present in the input trace will be first advanced to since, and then either emitted, or if greater or equal to until, suppressed. Once all times are certain to be greater or equal to until the operator capability will be dropped.

Invoking this method with an until of Antichain::new() will perform no filtering, as the empty frontier indicates the end of times.

Trait Implementations§

Source§

impl<Tr> Clone for TraceAgent<Tr>
where Tr: TraceReader,

Source§

fn clone(&self) -> Self

Returns a copy of the value. Read more
1.0.0 · Source§

fn clone_from(&mut self, source: &Self)

Performs copy-assignment from source. Read more
Source§

impl<Tr> Drop for TraceAgent<Tr>
where Tr: TraceReader,

Source§

fn drop(&mut self)

Executes the destructor for this type. Read more
Source§

impl<Tr> TraceReader for TraceAgent<Tr>
where Tr: TraceReader,

Source§

type Key<'a> = <Tr as TraceReader>::Key<'a>

Key by which updates are indexed.
Source§

type Val<'a> = <Tr as TraceReader>::Val<'a>

Values associated with keys.
Source§

type Time = <Tr as TraceReader>::Time

Timestamps associated with updates
Source§

type TimeGat<'a> = <Tr as TraceReader>::TimeGat<'a>

Borrowed form of timestamp.
Source§

type Diff = <Tr as TraceReader>::Diff

Owned form of update difference.
Source§

type DiffGat<'a> = <Tr as TraceReader>::DiffGat<'a>

Borrowed form of update difference.
Source§

type Batch = <Tr as TraceReader>::Batch

The type of an immutable collection of updates.
Source§

type Storage = <Tr as TraceReader>::Storage

Storage type for Self::Cursor. Likely related to Self::Batch.
Source§

type Cursor = <Tr as TraceReader>::Cursor

The type used to enumerate the collections contents.
Source§

fn set_logical_compaction(&mut self, frontier: AntichainRef<'_, Tr::Time>)

Advances the frontier that constrains logical compaction. Read more
Source§

fn get_logical_compaction(&mut self) -> AntichainRef<'_, Tr::Time>

Reports the logical compaction frontier. Read more
Source§

fn set_physical_compaction(&mut self, frontier: AntichainRef<'_, Tr::Time>)

Advances the frontier that constrains physical compaction. Read more
Source§

fn get_physical_compaction(&mut self) -> AntichainRef<'_, Tr::Time>

Reports the physical compaction frontier. Read more
Source§

fn cursor_through( &mut self, frontier: AntichainRef<'_, Tr::Time>, ) -> Option<(Self::Cursor, Self::Storage)>

Acquires a cursor to the restriction of the collection’s contents to updates at times not greater or equal to an element of upper. Read more
Source§

fn map_batches<F: FnMut(&Self::Batch)>(&self, f: F)

Maps logic across the non-empty sequence of batches in the trace. Read more
Source§

fn cursor(&mut self) -> (Self::Cursor, Self::Storage)

Provides a cursor over updates contained in the trace.
Source§

fn advance_by(&mut self, frontier: AntichainRef<'_, Self::Time>)

👎Deprecated since 0.11: please use set_logical_compaction
Deprecated form of set_logical_compaction.
Source§

fn advance_frontier(&mut self) -> AntichainRef<'_, Self::Time>

👎Deprecated since 0.11: please use get_logical_compaction
Deprecated form of get_logical_compaction.
Source§

fn distinguish_since(&mut self, frontier: AntichainRef<'_, Self::Time>)

👎Deprecated since 0.11: please use set_physical_compaction
Deprecated form of set_physical_compaction.
Source§

fn distinguish_frontier(&mut self) -> AntichainRef<'_, Self::Time>

👎Deprecated since 0.11: please use get_physical_compaction
Deprecated form of get_physical_compaction.
Source§

fn read_upper(&mut self, target: &mut Antichain<Self::Time>)

Reads the upper frontier of committed times.
Source§

fn advance_upper(&mut self, upper: &mut Antichain<Self::Time>)

Advances upper by any empty batches. Read more

Auto Trait Implementations§

§

impl<Tr> Freeze for TraceAgent<Tr>
where <Tr as TraceReader>::Time: Freeze,

§

impl<Tr> !RefUnwindSafe for TraceAgent<Tr>

§

impl<Tr> !Send for TraceAgent<Tr>

§

impl<Tr> !Sync for TraceAgent<Tr>

§

impl<Tr> Unpin for TraceAgent<Tr>
where <Tr as TraceReader>::Time: Unpin,

§

impl<Tr> !UnwindSafe for TraceAgent<Tr>

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> CloneToUninit for T
where T: Clone,

Source§

unsafe fn clone_to_uninit(&self, dst: *mut T)

🔬This is a nightly-only experimental API. (clone_to_uninit)
Performs copy-assignment from self to dst. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<'a, S, T> Semigroup<&'a S> for T
where T: Semigroup<S>,

Source§

fn plus_equals(&mut self, rhs: &&'a S)

The method of std::ops::AddAssign, for types that do not implement AddAssign.
Source§

impl<T> ToOwned for T
where T: Clone,

Source§

type Owned = T

The resulting type after obtaining ownership.
Source§

fn to_owned(&self) -> T

Creates owned data from borrowed data, usually by cloning. Read more
Source§

fn clone_into(&self, target: &mut T)

Uses borrowed data to replace owned data, usually by cloning. Read more
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<T> Data for T
where T: Clone + 'static,