pub mod compute;
mod differential;
mod initialize;
mod reachability;
mod timely;
use std::any::Any;
use std::collections::BTreeMap;
use std::rc::Rc;
use std::time::Duration;
use ::timely::dataflow::operators::capture::{Event, EventLink, EventPusher};
use ::timely::logging::WorkerIdentifier;
use ::timely::progress::Timestamp as TimelyTimestamp;
use ::timely::scheduling::Activator;
use mz_compute_client::logging::{ComputeLog, DifferentialLog, LogVariant, TimelyLog};
use mz_expr::{permutation_for_arrangement, MirScalarExpr};
use mz_repr::{Datum, Diff, Row, RowPacker, SharedRow, Timestamp};
use mz_timely_util::activator::RcActivator;
use crate::logging::compute::Logger as ComputeLogger;
use crate::typedefs::RowRowAgent;
pub use crate::logging::initialize::initialize;
struct BatchLogger<T, E, P>
where
P: EventPusher<Timestamp, (Duration, E, T)>,
{
time_ms: Timestamp,
event_pusher: P,
_phantom: ::std::marker::PhantomData<(E, T)>,
interval_ms: u64,
buffer: Vec<(Duration, E, T)>,
}
impl<T, E, P> BatchLogger<T, E, P>
where
P: EventPusher<Timestamp, (Duration, E, T)>,
{
const BATCH_SIZE_BYTES: usize = 1 << 13;
fn buffer_capacity() -> usize {
let size = ::std::mem::size_of::<(Duration, E, T)>();
if size == 0 {
Self::BATCH_SIZE_BYTES
} else if size <= Self::BATCH_SIZE_BYTES {
Self::BATCH_SIZE_BYTES / size
} else {
1
}
}
fn new(event_pusher: P, interval_ms: u64) -> Self {
BatchLogger {
time_ms: Timestamp::minimum(),
event_pusher,
_phantom: ::std::marker::PhantomData,
interval_ms,
buffer: Vec::with_capacity(Self::buffer_capacity()),
}
}
fn publish_batch(&mut self, time: &Duration, data: &mut Vec<(Duration, E, T)>) {
#[allow(clippy::as_conversions)]
let new_time_ms = Timestamp::try_from(
(((time.as_millis() as u64) / self.interval_ms) + 1) * self.interval_ms,
)
.expect("must fit");
if !data.is_empty() {
if data.len() > self.buffer.capacity() - self.buffer.len() {
self.event_pusher.push(Event::Messages(
self.time_ms,
self.buffer.drain(..).collect(),
));
}
self.buffer.append(data);
}
if self.time_ms < new_time_ms {
self.event_pusher.push(Event::Messages(
self.time_ms,
self.buffer.drain(..).collect(),
));
if self.buffer.capacity() > Self::buffer_capacity() {
self.buffer = Vec::with_capacity(Self::buffer_capacity())
}
self.event_pusher
.push(Event::Progress(vec![(new_time_ms, 1), (self.time_ms, -1)]));
}
self.time_ms = new_time_ms;
}
}
impl<T, E, P> Drop for BatchLogger<T, E, P>
where
P: EventPusher<Timestamp, (Duration, E, T)>,
{
fn drop(&mut self) {
self.event_pusher
.push(Event::Progress(vec![(self.time_ms, -1)]));
}
}
#[derive(Clone)]
struct EventQueue<E> {
link: Rc<EventLink<Timestamp, (Duration, WorkerIdentifier, E)>>,
activator: RcActivator,
}
impl<E> EventQueue<E> {
fn new(name: &str) -> Self {
let activator_name = format!("{name}_activator");
let activate_after = 128;
Self {
link: Rc::new(EventLink::new()),
activator: RcActivator::new(activator_name, activate_after),
}
}
}
#[derive(Default)]
struct SharedLoggingState {
arrangement_size_activators: BTreeMap<usize, Activator>,
compute_logger: Option<ComputeLogger>,
}
pub(crate) struct PermutedRowPacker {
key: Vec<usize>,
value: Vec<usize>,
}
impl PermutedRowPacker {
pub(crate) fn new<V: Into<LogVariant>>(variant: V) -> Self {
let variant = variant.into();
let key = variant.index_by();
let (_, value) = permutation_for_arrangement(
&key.iter()
.cloned()
.map(MirScalarExpr::Column)
.collect::<Vec<_>>(),
variant.desc().arity(),
);
Self { key, value }
}
pub(crate) fn pack_slice(&mut self, datums: &[Datum]) -> (Row, Row) {
self.pack_by_index(|packer, index| packer.push(datums[index]))
}
pub(crate) fn pack_by_index<F: Fn(&mut RowPacker, usize)>(&mut self, logic: F) -> (Row, Row) {
let binding = SharedRow::get();
let mut row_builder = binding.borrow_mut();
let mut packer = row_builder.packer();
for index in &self.key {
logic(&mut packer, *index);
}
let key_row = row_builder.clone();
let mut packer = row_builder.packer();
for index in &self.value {
logic(&mut packer, *index);
}
let value_row = row_builder.clone();
(key_row, value_row)
}
}
struct LogCollection {
trace: RowRowAgent<Timestamp, Diff>,
token: Rc<dyn Any>,
dataflow_index: usize,
}