pub mod compute;
mod differential;
mod initialize;
mod reachability;
mod timely;
use std::any::Any;
use std::collections::BTreeMap;
use std::marker::PhantomData;
use std::rc::Rc;
use std::time::Duration;
use ::timely::container::ContainerBuilder;
use ::timely::dataflow::channels::pact::Pipeline;
use ::timely::dataflow::channels::pushers::buffer::Session;
use ::timely::dataflow::channels::pushers::{Counter, Tee};
use ::timely::dataflow::operators::capture::{Event, EventLink, EventPusher};
use ::timely::dataflow::operators::Operator;
use ::timely::dataflow::StreamCore;
use ::timely::progress::Timestamp as TimelyTimestamp;
use ::timely::scheduling::Activator;
use ::timely::Container;
use differential_dataflow::trace::Batcher;
use mz_compute_client::logging::{ComputeLog, DifferentialLog, LogVariant, TimelyLog};
use mz_expr::{permutation_for_arrangement, MirScalarExpr};
use mz_repr::{Datum, Diff, Row, RowPacker, RowRef, Timestamp};
use mz_timely_util::activator::RcActivator;
use mz_timely_util::operator::consolidate_pact;
use crate::logging::compute::Logger as ComputeLogger;
use crate::typedefs::RowRowAgent;
pub use crate::logging::initialize::initialize;
struct BatchLogger<C, P>
where
P: EventPusher<Timestamp, C>,
{
time_ms: Timestamp,
event_pusher: P,
interval_ms: u128,
_marker: PhantomData<C>,
}
impl<C, P> BatchLogger<C, P>
where
P: EventPusher<Timestamp, C>,
{
fn new(event_pusher: P, interval_ms: u128) -> Self {
BatchLogger {
time_ms: Timestamp::minimum(),
event_pusher,
interval_ms,
_marker: PhantomData,
}
}
}
impl<C, P> BatchLogger<C, P>
where
P: EventPusher<Timestamp, C>,
C: Container,
{
fn publish_batch(&mut self, data: C) {
self.event_pusher.push(Event::Messages(self.time_ms, data));
}
fn report_progress(&mut self, time: Duration) -> bool {
let time_ms = ((time.as_millis() / self.interval_ms) + 1) * self.interval_ms;
let new_time_ms: Timestamp = time_ms.try_into().expect("must fit");
if self.time_ms < new_time_ms {
self.event_pusher
.push(Event::Progress(vec![(new_time_ms, 1), (self.time_ms, -1)]));
self.time_ms = new_time_ms;
true
} else {
false
}
}
}
impl<C, P> Drop for BatchLogger<C, P>
where
P: EventPusher<Timestamp, C>,
{
fn drop(&mut self) {
self.event_pusher
.push(Event::Progress(vec![(self.time_ms, -1)]));
}
}
#[derive(Clone)]
struct EventQueue<C, const N: usize = 1> {
links: [Rc<EventLink<Timestamp, C>>; N],
activator: RcActivator,
}
impl<C, const N: usize> EventQueue<C, N> {
fn new(name: &str) -> Self {
let activator_name = format!("{name}_activator");
let activate_after = 128;
Self {
links: [(); N].map(|_| Rc::new(EventLink::new())),
activator: RcActivator::new(activator_name, activate_after),
}
}
}
#[derive(Default)]
struct SharedLoggingState {
arrangement_size_activators: BTreeMap<usize, Activator>,
compute_logger: Option<ComputeLogger>,
}
pub(crate) struct PermutedRowPacker {
key: Vec<usize>,
value: Vec<usize>,
key_row: Row,
value_row: Row,
}
impl PermutedRowPacker {
pub(crate) fn new<V: Into<LogVariant>>(variant: V) -> Self {
let variant = variant.into();
let key = variant.index_by();
let (_, value) = permutation_for_arrangement(
&key.iter()
.cloned()
.map(MirScalarExpr::Column)
.collect::<Vec<_>>(),
variant.desc().arity(),
);
Self {
key,
value,
key_row: Row::default(),
value_row: Row::default(),
}
}
pub(crate) fn pack_slice(&mut self, datums: &[Datum]) -> (&RowRef, &RowRef) {
self.pack_by_index(|packer, index| packer.push(datums[index]))
}
pub(crate) fn pack_slice_owned(&mut self, datums: &[Datum]) -> (Row, Row) {
let (key, value) = self.pack_slice(datums);
(key.to_owned(), value.to_owned())
}
pub(crate) fn pack_by_index<F: Fn(&mut RowPacker, usize)>(
&mut self,
logic: F,
) -> (&RowRef, &RowRef) {
let mut packer = self.key_row.packer();
for index in &self.key {
logic(&mut packer, *index);
}
let mut packer = self.value_row.packer();
for index in &self.value {
logic(&mut packer, *index);
}
(&self.key_row, &self.value_row)
}
}
struct LogCollection {
trace: RowRowAgent<Timestamp, Diff>,
token: Rc<dyn Any>,
dataflow_index: usize,
}
pub(super) type Pusher<C> = Counter<Timestamp, C, Tee<Timestamp, C>>;
pub(super) type OutputSession<'a, CB> =
Session<'a, Timestamp, CB, Pusher<<CB as ContainerBuilder>::Container>>;
pub(super) fn consolidate_and_pack<G, B, CB, L, F>(
input: &StreamCore<G, B::Input>,
log: L,
mut logic: F,
) -> StreamCore<G, CB::Container>
where
G: ::timely::dataflow::Scope<Timestamp = Timestamp>,
B: Batcher<Time = G::Timestamp> + 'static,
B::Input: Container + Clone + 'static,
B::Output: Container + Clone + 'static,
CB: ContainerBuilder,
L: Into<LogVariant>,
F: for<'a> FnMut(
<B::Output as Container>::ItemRef<'a>,
&mut PermutedRowPacker,
&mut OutputSession<CB>,
) + 'static,
{
let log = log.into();
let c_name = &format!("Consolidate {log:?}");
let u_name = &format!("ToRow {log:?}");
let mut packer = PermutedRowPacker::new(log);
let consolidated = consolidate_pact::<B, _, _>(input, Pipeline, c_name);
consolidated.unary::<CB, _, _, _>(Pipeline, u_name, |_, _| {
move |input, output| {
while let Some((time, data)) = input.next() {
let mut session = output.session_with_builder(&time);
for item in data.iter().flatten().flat_map(|chunk| chunk.iter()) {
logic(item, &mut packer, &mut session);
}
}
}
})
}