Struct differential_dataflow::operators::arrange::agent::TraceAgent
source · pub struct TraceAgent<Tr>where
Tr: TraceReader,{ /* private fields */ }
Expand description
A TraceReader
wrapper which can be imported into other dataflows.
The TraceAgent
is the default trace type produced by arranged
, and it can be extracted
from the dataflow in which it was defined, and imported into other dataflows.
Implementations§
source§impl<Tr: TraceReader> TraceAgent<Tr>
impl<Tr: TraceReader> TraceAgent<Tr>
sourcepub fn new(
trace: Tr,
operator: OperatorInfo,
logging: Option<Logger>,
) -> (Self, TraceWriter<Tr>)
pub fn new( trace: Tr, operator: OperatorInfo, logging: Option<Logger>, ) -> (Self, TraceWriter<Tr>)
Creates a new agent from a trace reader.
sourcepub fn new_listener(
&mut self,
activator: Activator,
) -> Rc<(Activator, RefCell<VecDeque<TraceReplayInstruction<Tr>>>)>
pub fn new_listener( &mut self, activator: Activator, ) -> Rc<(Activator, RefCell<VecDeque<TraceReplayInstruction<Tr>>>)>
Attaches a new shared queue to the trace.
The queue is first populated with existing batches from the trace,
The queue will be immediately populated with existing historical batches from the trace, and until the reference
is dropped the queue will receive new batches as produced by the source arrange
operator.
sourcepub fn operator(&self) -> &OperatorInfo
pub fn operator(&self) -> &OperatorInfo
The OperatorInfo of the underlying Timely operator
sourcepub fn trace_box_unstable(&self) -> Rc<RefCell<TraceBox<Tr>>>
pub fn trace_box_unstable(&self) -> Rc<RefCell<TraceBox<Tr>>>
Obtain a reference to the inner TraceBox
. It is the caller’s obligation to maintain
the trace box and this trace agent’s invariants. Specifically, it is undefined behavior
to mutate the trace box. Keeping strong references can prevent resource reclamation.
This method is subject to changes and removal and should not be considered part of a stable interface.
source§impl<Tr> TraceAgent<Tr>where
Tr: TraceReader + 'static,
impl<Tr> TraceAgent<Tr>where
Tr: TraceReader + 'static,
sourcepub fn import<G>(&mut self, scope: &G) -> Arranged<G, TraceAgent<Tr>>
pub fn import<G>(&mut self, scope: &G) -> Arranged<G, TraceAgent<Tr>>
Copies an existing collection into the supplied scope.
This method creates an Arranged
collection that should appear indistinguishable from applying arrange
directly to the source collection brought into the local scope. The only caveat is that the initial state
of the collection is its current state, and updates occur from this point forward. The historical changes
the collection experienced in the past are accumulated, and the distinctions from the initial collection
are no longer evident.
The current behavior is that the introduced collection accumulates updates to some times less or equal
to self.get_logical_compaction()
. There is not currently a guarantee that the updates are accumulated to
the frontier, and the resulting collection history may be weirdly partial until this point. In particular,
the historical collection may move through configurations that did not actually occur, even if eventually
arriving at the correct collection. This is probably a bug; although we get to the right place in the end,
the intermediate computation could do something that the original computation did not, like diverge.
I would expect the semantics to improve to “updates are advanced to self.get_logical_compaction()
”, which
means the computation will run as if starting from exactly this frontier. It is not currently clear whose
responsibility this should be (the trace/batch should only reveal these times, or an operator should know
to advance times before using them).
§Examples
use timely::Config;
use differential_dataflow::input::Input;
use differential_dataflow::operators::arrange::ArrangeBySelf;
use differential_dataflow::operators::reduce::Reduce;
use differential_dataflow::trace::Trace;
::timely::execute(Config::thread(), |worker| {
// create a first dataflow
let mut trace = worker.dataflow::<u32,_,_>(|scope| {
// create input handle and collection.
scope.new_collection_from(0 .. 10).1
.arrange_by_self()
.trace
});
// do some work.
worker.step();
worker.step();
// create a second dataflow
worker.dataflow(move |scope| {
trace.import(scope)
.reduce(move |_key, src, dst| dst.push((*src[0].0, 1)));
});
}).unwrap();
sourcepub fn import_named<G>(
&mut self,
scope: &G,
name: &str,
) -> Arranged<G, TraceAgent<Tr>>
pub fn import_named<G>( &mut self, scope: &G, name: &str, ) -> Arranged<G, TraceAgent<Tr>>
Same as import
, but allows to name the source.
sourcepub fn import_core<G>(
&mut self,
scope: &G,
name: &str,
) -> (Arranged<G, TraceAgent<Tr>>, ShutdownButton<CapabilitySet<Tr::Time>>)
pub fn import_core<G>( &mut self, scope: &G, name: &str, ) -> (Arranged<G, TraceAgent<Tr>>, ShutdownButton<CapabilitySet<Tr::Time>>)
Imports an arrangement into the supplied scope.
§Examples
use timely::Config;
use timely::dataflow::ProbeHandle;
use timely::dataflow::operators::Probe;
use differential_dataflow::input::InputSession;
use differential_dataflow::operators::arrange::ArrangeBySelf;
use differential_dataflow::operators::reduce::Reduce;
use differential_dataflow::trace::Trace;
::timely::execute(Config::thread(), |worker| {
let mut input = InputSession::<_,(),isize>::new();
let mut probe = ProbeHandle::new();
// create a first dataflow
let mut trace = worker.dataflow::<u32,_,_>(|scope| {
// create input handle and collection.
input.to_collection(scope)
.arrange_by_self()
.trace
});
// do some work.
worker.step();
worker.step();
// create a second dataflow
let mut shutdown = worker.dataflow(|scope| {
let (arrange, button) = trace.import_core(scope, "Import");
arrange.stream.probe_with(&mut probe);
button
});
worker.step();
worker.step();
assert!(!probe.done());
shutdown.press();
worker.step();
worker.step();
assert!(probe.done());
}).unwrap();
sourcepub fn import_frontier<G>(
&mut self,
scope: &G,
name: &str,
) -> (Arranged<G, TraceFrontier<TraceAgent<Tr>>>, ShutdownButton<CapabilitySet<Tr::Time>>)
pub fn import_frontier<G>( &mut self, scope: &G, name: &str, ) -> (Arranged<G, TraceFrontier<TraceAgent<Tr>>>, ShutdownButton<CapabilitySet<Tr::Time>>)
Imports an arrangement into the supplied scope.
This variant of import uses the get_logical_compaction
to forcibly advance timestamps in updates.
§Examples
use timely::Config;
use timely::progress::frontier::AntichainRef;
use timely::dataflow::ProbeHandle;
use timely::dataflow::operators::Probe;
use timely::dataflow::operators::Inspect;
use differential_dataflow::input::InputSession;
use differential_dataflow::operators::arrange::ArrangeBySelf;
use differential_dataflow::operators::reduce::Reduce;
use differential_dataflow::trace::Trace;
use differential_dataflow::trace::TraceReader;
use differential_dataflow::input::Input;
::timely::execute(Config::thread(), |worker| {
let mut probe = ProbeHandle::new();
// create a first dataflow
let (mut handle, mut trace) = worker.dataflow::<u32,_,_>(|scope| {
// create input handle and collection.
let (handle, stream) = scope.new_collection();
let trace = stream.arrange_by_self().trace;
(handle, trace)
});
handle.insert(0); handle.advance_to(1); handle.flush(); worker.step();
handle.remove(0); handle.advance_to(2); handle.flush(); worker.step();
handle.insert(1); handle.advance_to(3); handle.flush(); worker.step();
handle.remove(1); handle.advance_to(4); handle.flush(); worker.step();
handle.insert(0); handle.advance_to(5); handle.flush(); worker.step();
trace.set_logical_compaction(AntichainRef::new(&[5]));
// create a second dataflow
let mut shutdown = worker.dataflow(|scope| {
let (arrange, button) = trace.import_frontier(scope, "Import");
arrange
.as_collection(|k,v| (*k,*v))
.inner
.inspect(|(d,t,r)| {
assert!(t >= &5);
})
.probe_with(&mut probe);
button
});
worker.step();
worker.step();
assert!(!probe.done());
shutdown.press();
worker.step();
worker.step();
assert!(probe.done());
}).unwrap();
sourcepub fn import_frontier_core<G>(
&mut self,
scope: &G,
name: &str,
since: Antichain<Tr::Time>,
until: Antichain<Tr::Time>,
) -> (Arranged<G, TraceFrontier<TraceAgent<Tr>>>, ShutdownButton<CapabilitySet<Tr::Time>>)
pub fn import_frontier_core<G>( &mut self, scope: &G, name: &str, since: Antichain<Tr::Time>, until: Antichain<Tr::Time>, ) -> (Arranged<G, TraceFrontier<TraceAgent<Tr>>>, ShutdownButton<CapabilitySet<Tr::Time>>)
Import a trace restricted to a specific time interval [since, until)
.
All updates present in the input trace will be first advanced to since
, and then either emitted,
or if greater or equal to until
, suppressed. Once all times are certain to be greater or equal
to until
the operator capability will be dropped.
Invoking this method with an until
of Antichain::new()
will perform no filtering, as the empty
frontier indicates the end of times.
Trait Implementations§
source§impl<Tr> Clone for TraceAgent<Tr>where
Tr: TraceReader,
impl<Tr> Clone for TraceAgent<Tr>where
Tr: TraceReader,
source§impl<Tr> Drop for TraceAgent<Tr>where
Tr: TraceReader,
impl<Tr> Drop for TraceAgent<Tr>where
Tr: TraceReader,
source§impl<Tr> TraceReader for TraceAgent<Tr>where
Tr: TraceReader,
impl<Tr> TraceReader for TraceAgent<Tr>where
Tr: TraceReader,
§type Key<'a> = <Tr as TraceReader>::Key<'a>
type Key<'a> = <Tr as TraceReader>::Key<'a>
§type Val<'a> = <Tr as TraceReader>::Val<'a>
type Val<'a> = <Tr as TraceReader>::Val<'a>
§type Time = <Tr as TraceReader>::Time
type Time = <Tr as TraceReader>::Time
§type TimeGat<'a> = <Tr as TraceReader>::TimeGat<'a>
type TimeGat<'a> = <Tr as TraceReader>::TimeGat<'a>
§type Diff = <Tr as TraceReader>::Diff
type Diff = <Tr as TraceReader>::Diff
§type DiffGat<'a> = <Tr as TraceReader>::DiffGat<'a>
type DiffGat<'a> = <Tr as TraceReader>::DiffGat<'a>
§type Batch = <Tr as TraceReader>::Batch
type Batch = <Tr as TraceReader>::Batch
§type Storage = <Tr as TraceReader>::Storage
type Storage = <Tr as TraceReader>::Storage
Self::Cursor
. Likely related to Self::Batch
.§type Cursor = <Tr as TraceReader>::Cursor
type Cursor = <Tr as TraceReader>::Cursor
source§fn set_logical_compaction(&mut self, frontier: AntichainRef<'_, Tr::Time>)
fn set_logical_compaction(&mut self, frontier: AntichainRef<'_, Tr::Time>)
source§fn get_logical_compaction(&mut self) -> AntichainRef<'_, Tr::Time>
fn get_logical_compaction(&mut self) -> AntichainRef<'_, Tr::Time>
source§fn set_physical_compaction(&mut self, frontier: AntichainRef<'_, Tr::Time>)
fn set_physical_compaction(&mut self, frontier: AntichainRef<'_, Tr::Time>)
source§fn get_physical_compaction(&mut self) -> AntichainRef<'_, Tr::Time>
fn get_physical_compaction(&mut self) -> AntichainRef<'_, Tr::Time>
source§fn cursor_through(
&mut self,
frontier: AntichainRef<'_, Tr::Time>,
) -> Option<(Self::Cursor, Self::Storage)>
fn cursor_through( &mut self, frontier: AntichainRef<'_, Tr::Time>, ) -> Option<(Self::Cursor, Self::Storage)>
upper
. Read moresource§fn map_batches<F: FnMut(&Self::Batch)>(&self, f: F)
fn map_batches<F: FnMut(&Self::Batch)>(&self, f: F)
source§fn cursor(&mut self) -> (Self::Cursor, Self::Storage)
fn cursor(&mut self) -> (Self::Cursor, Self::Storage)
source§fn advance_by(&mut self, frontier: AntichainRef<'_, Self::Time>)
fn advance_by(&mut self, frontier: AntichainRef<'_, Self::Time>)
set_logical_compaction
set_logical_compaction
.source§fn advance_frontier(&mut self) -> AntichainRef<'_, Self::Time>
fn advance_frontier(&mut self) -> AntichainRef<'_, Self::Time>
get_logical_compaction
get_logical_compaction
.source§fn distinguish_since(&mut self, frontier: AntichainRef<'_, Self::Time>)
fn distinguish_since(&mut self, frontier: AntichainRef<'_, Self::Time>)
set_physical_compaction
set_physical_compaction
.source§fn distinguish_frontier(&mut self) -> AntichainRef<'_, Self::Time>
fn distinguish_frontier(&mut self) -> AntichainRef<'_, Self::Time>
get_physical_compaction
get_physical_compaction
.source§fn read_upper(&mut self, target: &mut Antichain<Self::Time>)
fn read_upper(&mut self, target: &mut Antichain<Self::Time>)
Auto Trait Implementations§
impl<Tr> Freeze for TraceAgent<Tr>
impl<Tr> !RefUnwindSafe for TraceAgent<Tr>
impl<Tr> !Send for TraceAgent<Tr>
impl<Tr> !Sync for TraceAgent<Tr>
impl<Tr> Unpin for TraceAgent<Tr>
impl<Tr> !UnwindSafe for TraceAgent<Tr>
Blanket Implementations§
source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
source§impl<T> CloneToUninit for Twhere
T: Clone,
impl<T> CloneToUninit for Twhere
T: Clone,
source§default unsafe fn clone_to_uninit(&self, dst: *mut T)
default unsafe fn clone_to_uninit(&self, dst: *mut T)
clone_to_uninit
)source§impl<'a, S, T> Semigroup<&'a S> for Twhere
T: Semigroup<S>,
impl<'a, S, T> Semigroup<&'a S> for Twhere
T: Semigroup<S>,
source§fn plus_equals(&mut self, rhs: &&'a S)
fn plus_equals(&mut self, rhs: &&'a S)
std::ops::AddAssign
, for types that do not implement AddAssign
.