pub mod compute;
mod differential;
mod initialize;
mod reachability;
mod timely;
use ::timely::container::{CapacityContainerBuilder, ContainerBuilder, PushInto, SizableContainer};
use ::timely::Container;
use std::any::Any;
use std::collections::BTreeMap;
use std::rc::Rc;
use std::time::Duration;
use ::timely::dataflow::operators::capture::{Event, EventLink, EventPusher};
use ::timely::progress::Timestamp as TimelyTimestamp;
use ::timely::scheduling::Activator;
use mz_compute_client::logging::{ComputeLog, DifferentialLog, LogVariant, TimelyLog};
use mz_expr::{permutation_for_arrangement, MirScalarExpr};
use mz_repr::{Datum, Diff, Row, RowPacker, SharedRow, Timestamp};
use mz_timely_util::activator::RcActivator;
use crate::logging::compute::Logger as ComputeLogger;
use crate::typedefs::RowRowAgent;
pub use crate::logging::initialize::initialize;
struct BatchLogger<CB, P>
where
CB: ContainerBuilder,
P: EventPusher<Timestamp, CB::Container>,
{
time_ms: Timestamp,
event_pusher: P,
interval_ms: u128,
builder: CB,
}
impl<CB, P> BatchLogger<CB, P>
where
CB: ContainerBuilder,
P: EventPusher<Timestamp, CB::Container>,
{
fn new(event_pusher: P, interval_ms: u128) -> Self {
BatchLogger {
time_ms: Timestamp::minimum(),
event_pusher,
interval_ms,
builder: CB::default(),
}
}
fn flush_through(&mut self, time: &Duration) {
let time_ms = ((time.as_millis() / self.interval_ms) + 1) * self.interval_ms;
let new_time_ms: Timestamp = time_ms.try_into().expect("must fit");
if self.time_ms < new_time_ms {
while let Some(finished) = self.builder.finish() {
let finished = std::mem::take(finished);
self.event_pusher
.push(Event::Messages(self.time_ms, finished));
}
self.event_pusher
.push(Event::Progress(vec![(new_time_ms, 1), (self.time_ms, -1)]));
self.time_ms = new_time_ms;
}
}
fn extract_and_send(&mut self) {
while let Some(extracted) = self.builder.extract() {
let extracted = std::mem::take(extracted);
self.event_pusher
.push(Event::Messages(self.time_ms, extracted));
}
}
}
impl<CB, P, D> PushInto<D> for BatchLogger<CB, P>
where
CB: ContainerBuilder + PushInto<D>,
P: EventPusher<Timestamp, CB::Container>,
{
fn push_into(&mut self, item: D) {
self.builder.push_into(item);
}
}
impl<C, P> BatchLogger<CapacityContainerBuilder<C>, P>
where
C: SizableContainer + Clone + 'static,
P: EventPusher<Timestamp, C>,
{
fn publish_batch(&mut self, time: &Duration, data: &mut C) {
self.builder.push_container(data);
self.extract_and_send();
self.flush_through(time);
}
}
impl<CB, P> Drop for BatchLogger<CB, P>
where
CB: ContainerBuilder,
P: EventPusher<Timestamp, CB::Container>,
{
fn drop(&mut self) {
self.event_pusher
.push(Event::Progress(vec![(self.time_ms, -1)]));
}
}
#[derive(Clone)]
struct EventQueue<C> {
link: Rc<EventLink<Timestamp, C>>,
activator: RcActivator,
}
impl<C> EventQueue<C> {
fn new(name: &str) -> Self {
let activator_name = format!("{name}_activator");
let activate_after = 128;
Self {
link: Rc::new(EventLink::new()),
activator: RcActivator::new(activator_name, activate_after),
}
}
}
#[derive(Default)]
struct SharedLoggingState {
arrangement_size_activators: BTreeMap<usize, Activator>,
compute_logger: Option<ComputeLogger>,
}
pub(crate) struct PermutedRowPacker {
key: Vec<usize>,
value: Vec<usize>,
}
impl PermutedRowPacker {
pub(crate) fn new<V: Into<LogVariant>>(variant: V) -> Self {
let variant = variant.into();
let key = variant.index_by();
let (_, value) = permutation_for_arrangement(
&key.iter()
.cloned()
.map(MirScalarExpr::Column)
.collect::<Vec<_>>(),
variant.desc().arity(),
);
Self { key, value }
}
pub(crate) fn pack_slice(&self, datums: &[Datum]) -> (Row, Row) {
self.pack_by_index(|packer, index| packer.push(datums[index]))
}
pub(crate) fn pack_by_index<F: Fn(&mut RowPacker, usize)>(&self, logic: F) -> (Row, Row) {
let binding = SharedRow::get();
let mut row_builder = binding.borrow_mut();
let mut packer = row_builder.packer();
for index in &self.key {
logic(&mut packer, *index);
}
let key_row = row_builder.clone();
let mut packer = row_builder.packer();
for index in &self.value {
logic(&mut packer, *index);
}
let value_row = row_builder.clone();
(key_row, value_row)
}
}
struct LogCollection {
trace: RowRowAgent<Timestamp, Diff>,
token: Rc<dyn Any>,
dataflow_index: usize,
}