use std::rc::Rc;
use std::cell::RefCell;
use std::default::Default;
use crate::progress::{ChangeBatch, Timestamp};
use crate::progress::operate::SharedProgress;
use crate::progress::frontier::{Antichain, MutableAntichain};
use crate::Container;
use crate::container::ContainerBuilder;
use crate::dataflow::{Scope, StreamCore};
use crate::dataflow::channels::pushers::Tee;
use crate::dataflow::channels::pushers::Counter as PushCounter;
use crate::dataflow::channels::pushers::buffer::Buffer as PushBuffer;
use crate::dataflow::channels::pact::ParallelizationContract;
use crate::dataflow::channels::pullers::Counter as PullCounter;
use crate::dataflow::operators::capability::Capability;
use crate::dataflow::operators::generic::handles::{InputHandleCore, new_input_handle, OutputWrapper};
use crate::dataflow::operators::generic::operator_info::OperatorInfo;
use crate::dataflow::operators::generic::builder_raw::OperatorShape;
use crate::logging::TimelyLogger as Logger;
use super::builder_raw::OperatorBuilder as OperatorBuilderRaw;
#[derive(Debug)]
pub struct OperatorBuilder<G: Scope> {
builder: OperatorBuilderRaw<G>,
frontier: Vec<MutableAntichain<G::Timestamp>>,
consumed: Vec<Rc<RefCell<ChangeBatch<G::Timestamp>>>>,
internal: Rc<RefCell<Vec<Rc<RefCell<ChangeBatch<G::Timestamp>>>>>>,
summaries: Vec<Rc<RefCell<Vec<Antichain<<G::Timestamp as Timestamp>::Summary>>>>>,
produced: Vec<Rc<RefCell<ChangeBatch<G::Timestamp>>>>,
logging: Option<Logger>,
}
impl<G: Scope> OperatorBuilder<G> {
pub fn new(name: String, scope: G) -> Self {
let logging = scope.logging();
OperatorBuilder {
builder: OperatorBuilderRaw::new(name, scope),
frontier: Vec::new(),
consumed: Vec::new(),
internal: Rc::new(RefCell::new(Vec::new())),
summaries: Vec::new(),
produced: Vec::new(),
logging,
}
}
pub fn set_notify(&mut self, notify: bool) {
self.builder.set_notify(notify);
}
pub fn new_input<C: Container, P>(&mut self, stream: &StreamCore<G, C>, pact: P) -> InputHandleCore<G::Timestamp, C, P::Puller>
where
P: ParallelizationContract<G::Timestamp, C> {
let connection = (0..self.builder.shape().outputs()).map(|_| Antichain::from_elem(Default::default())).collect();
self.new_input_connection(stream, pact, connection)
}
pub fn new_input_connection<C: Container, P>(&mut self, stream: &StreamCore<G, C>, pact: P, connection: Vec<Antichain<<G::Timestamp as Timestamp>::Summary>>) -> InputHandleCore<G::Timestamp, C, P::Puller>
where
P: ParallelizationContract<G::Timestamp, C> {
let puller = self.builder.new_input_connection(stream, pact, connection.clone());
let input = PullCounter::new(puller);
self.frontier.push(MutableAntichain::new());
self.consumed.push(input.consumed().clone());
let shared_summary = Rc::new(RefCell::new(connection));
self.summaries.push(shared_summary.clone());
new_input_handle(input, self.internal.clone(), shared_summary, self.logging.clone())
}
pub fn new_output<CB: ContainerBuilder>(&mut self) -> (OutputWrapper<G::Timestamp, CB, Tee<G::Timestamp, CB::Container>>, StreamCore<G, CB::Container>) {
let connection = (0..self.builder.shape().inputs()).map(|_| Antichain::from_elem(Default::default())).collect();
self.new_output_connection(connection)
}
pub fn new_output_connection<CB: ContainerBuilder>(
&mut self,
connection: Vec<Antichain<<G::Timestamp as Timestamp>::Summary>>
) -> (
OutputWrapper<G::Timestamp, CB, Tee<G::Timestamp, CB::Container>>,
StreamCore<G, CB::Container>
) {
let (tee, stream) = self.builder.new_output_connection(connection.clone());
let internal = Rc::new(RefCell::new(ChangeBatch::new()));
self.internal.borrow_mut().push(internal.clone());
let mut buffer = PushBuffer::new(PushCounter::new(tee));
self.produced.push(buffer.inner().produced().clone());
for (summary, connection) in self.summaries.iter().zip(connection.into_iter()) {
summary.borrow_mut().push(connection.clone());
}
(OutputWrapper::new(buffer, internal), stream)
}
pub fn build<B, L>(self, constructor: B)
where
B: FnOnce(Vec<Capability<G::Timestamp>>) -> L,
L: FnMut(&[MutableAntichain<G::Timestamp>])+'static
{
self.build_reschedule(|caps| {
let mut logic = constructor(caps);
move |frontier| { logic(frontier); false }
})
}
pub fn build_reschedule<B, L>(self, constructor: B)
where
B: FnOnce(Vec<Capability<G::Timestamp>>) -> L,
L: FnMut(&[MutableAntichain<G::Timestamp>])->bool+'static
{
let mut capabilities = Vec::with_capacity(self.internal.borrow().len());
for batch in self.internal.borrow().iter() {
capabilities.push(Capability::new(G::Timestamp::minimum(), batch.clone()));
batch.borrow_mut().clear();
}
let mut logic = constructor(capabilities);
let mut self_frontier = self.frontier;
let self_consumed = self.consumed;
let self_internal = self.internal;
let self_produced = self.produced;
let raw_logic =
move |progress: &mut SharedProgress<G::Timestamp>| {
for (progress, frontier) in progress.frontiers.iter_mut().zip(self_frontier.iter_mut()) {
frontier.update_iter(progress.drain());
}
let result = logic(&self_frontier[..]);
for (progress, consumed) in progress.consumeds.iter_mut().zip(self_consumed.iter()) {
consumed.borrow_mut().drain_into(progress);
}
let self_internal_borrow = self_internal.borrow_mut();
for index in 0 .. self_internal_borrow.len() {
let mut borrow = self_internal_borrow[index].borrow_mut();
progress.internals[index].extend(borrow.drain());
}
for (progress, produced) in progress.produceds.iter_mut().zip(self_produced.iter()) {
produced.borrow_mut().drain_into(progress);
}
result
};
self.builder.build(raw_logic);
}
pub fn index(&self) -> usize {
self.builder.index()
}
pub fn global(&self) -> usize {
self.builder.global()
}
pub fn shape(&self) -> &OperatorShape {
self.builder.shape()
}
pub fn operator_info(&self) -> OperatorInfo {
self.builder.operator_info()
}
}
#[cfg(test)]
mod tests {
use crate::container::CapacityContainerBuilder;
#[test]
#[should_panic]
fn incorrect_capabilities() {
use crate::dataflow::operators::generic::builder_rc::OperatorBuilder;
crate::example(|scope| {
let mut builder = OperatorBuilder::new("Failure".to_owned(), scope.clone());
let (mut output1, _stream1) = builder.new_output::<CapacityContainerBuilder<Vec<()>>>();
let (mut output2, _stream2) = builder.new_output::<CapacityContainerBuilder<Vec<()>>>();
builder.build(move |capabilities| {
move |_frontiers| {
let mut output_handle1 = output1.activate();
let mut output_handle2 = output2.activate();
output_handle2.session(&capabilities[0]);
output_handle1.session(&capabilities[1]);
}
});
})
}
#[test]
fn correct_capabilities() {
use crate::dataflow::operators::generic::builder_rc::OperatorBuilder;
crate::example(|scope| {
let mut builder = OperatorBuilder::new("Failure".to_owned(), scope.clone());
let (mut output1, _stream1) = builder.new_output::<CapacityContainerBuilder<Vec<()>>>();
let (mut output2, _stream2) = builder.new_output::<CapacityContainerBuilder<Vec<()>>>();
builder.build(move |mut capabilities| {
move |_frontiers| {
let mut output_handle1 = output1.activate();
let mut output_handle2 = output2.activate();
if !capabilities.is_empty() {
output_handle1.session(&capabilities[0]);
output_handle2.session(&capabilities[1]);
capabilities.clear();
}
}
});
"Hello".to_owned()
});
}
}