use std::cell::RefCell;
use std::collections::{BTreeMap, BTreeSet};
use std::fmt::{Display, Write};
use std::rc::Rc;
use std::time::{Duration, Instant};
use columnar::{Columnar, Index};
use differential_dataflow::collection::AsCollection;
use differential_dataflow::trace::{BatchReader, Cursor};
use differential_dataflow::Collection;
use mz_compute_types::plan::LirId;
use mz_ore::cast::CastFrom;
use mz_repr::{Datum, Diff, GlobalId, Timestamp};
use mz_timely_util::containers::{Column, ColumnBuilder, ProvidedBuilder};
use mz_timely_util::replay::MzReplay;
use timely::communication::Allocate;
use timely::container::CapacityContainerBuilder;
use timely::dataflow::channels::pact::Pipeline;
use timely::dataflow::channels::pushers::buffer::Session;
use timely::dataflow::channels::pushers::{Counter, Tee};
use timely::dataflow::operators::core::Map;
use timely::dataflow::operators::generic::builder_rc::OperatorBuilder;
use timely::dataflow::operators::Operator;
use timely::dataflow::{Scope, Stream};
use timely::scheduling::Scheduler;
use timely::worker::Worker;
use timely::{Container, Data};
use tracing::error;
use uuid::Uuid;
use crate::extensions::arrange::MzArrange;
use crate::logging::{
ComputeLog, EventQueue, LogCollection, LogVariant, PermutedRowPacker, SharedLoggingState,
};
use crate::row_spine::{RowRowBatcher, RowRowBuilder};
use crate::typedefs::RowRowSpine;
pub type Logger = timely::logging_core::Logger<ComputeEventBuilder>;
pub type ComputeEventBuilder = ColumnBuilder<(Duration, ComputeEvent)>;
#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
pub struct Export {
pub export_id: GlobalId,
pub dataflow_index: usize,
}
#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
pub struct ExportDropped {
pub export_id: GlobalId,
}
#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
pub struct PeekEvent {
pub peek: Peek,
pub peek_type: PeekType,
pub installed: bool,
}
#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
pub struct Frontier {
pub export_id: GlobalId,
pub time: Timestamp,
pub diff: i8,
}
#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
pub struct ImportFrontier {
pub import_id: GlobalId,
pub export_id: GlobalId,
pub time: Timestamp,
pub diff: i8,
}
#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
pub struct ArrangementHeapSize {
pub operator_id: usize,
pub delta_size: isize,
}
#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
pub struct ArrangementHeapCapacity {
pub operator_id: usize,
pub delta_capacity: isize,
}
#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
pub struct ArrangementHeapAllocations {
pub operator_id: usize,
pub delta_allocations: isize,
}
#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
pub struct ArrangementHeapSizeOperator {
pub operator_id: usize,
pub address: Vec<usize>,
}
#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
pub struct ArrangementHeapSizeOperatorDrop {
pub operator_id: usize,
}
#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
pub struct DataflowShutdown {
pub dataflow_index: usize,
}
#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
pub struct ErrorCount {
pub export_id: GlobalId,
pub diff: i64,
}
#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
pub struct Hydration {
pub export_id: GlobalId,
}
#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
pub struct LirMapping {
pub global_id: GlobalId,
pub mapping: Vec<(LirId, LirMetadata)>,
}
#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
pub struct DataflowGlobal {
pub dataflow_index: usize,
pub global_id: GlobalId,
}
#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
pub enum ComputeEvent {
Export(Export),
ExportDropped(ExportDropped),
Peek(PeekEvent),
Frontier(Frontier),
ImportFrontier(ImportFrontier),
ArrangementHeapSize(ArrangementHeapSize),
ArrangementHeapCapacity(ArrangementHeapCapacity),
ArrangementHeapAllocations(ArrangementHeapAllocations),
ArrangementHeapSizeOperator(ArrangementHeapSizeOperator),
ArrangementHeapSizeOperatorDrop(ArrangementHeapSizeOperatorDrop),
DataflowShutdown(DataflowShutdown),
ErrorCount(ErrorCount),
Hydration(Hydration),
LirMapping(LirMapping),
DataflowGlobal(DataflowGlobal),
}
#[derive(Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Debug, Columnar)]
pub enum PeekType {
Index,
Persist,
}
impl PeekType {
fn name(self) -> &'static str {
match self {
PeekType::Index => "index",
PeekType::Persist => "persist",
}
}
}
#[derive(Debug, Clone, PartialOrd, PartialEq, Columnar)]
pub struct Peek {
id: GlobalId,
time: Timestamp,
uuid: uuid::Bytes,
}
impl Peek {
pub fn new(id: GlobalId, time: Timestamp, uuid: Uuid) -> Self {
let uuid = uuid.into_bytes();
Self { id, time, uuid }
}
}
#[derive(Clone, Debug, PartialEq, PartialOrd, Columnar)]
pub struct LirMetadata {
operator: String,
parent_lir_id: Option<LirId>,
nesting: u8,
operator_span: (usize, usize),
}
impl LirMetadata {
pub fn new(
operator: String,
parent_lir_id: Option<LirId>,
nesting: u8,
operator_span: (usize, usize),
) -> Self {
Self {
operator,
parent_lir_id,
nesting,
operator_span,
}
}
}
pub(super) fn construct<A: Allocate + 'static>(
worker: &mut timely::worker::Worker<A>,
config: &mz_compute_client::logging::LoggingConfig,
event_queue: EventQueue<Column<(Duration, ComputeEvent)>>,
shared_state: Rc<RefCell<SharedLoggingState>>,
) -> BTreeMap<LogVariant, LogCollection> {
let logging_interval_ms = std::cmp::max(1, config.interval.as_millis());
let worker_id = worker.index();
let worker2 = worker.clone();
let dataflow_index = worker.next_dataflow_index();
worker.dataflow_named("Dataflow: compute logging", move |scope| {
let enable_logging = config.enable_logging;
let (logs, token) = event_queue.links.mz_replay::<_, ProvidedBuilder<_>, _>(
scope,
"compute logs",
config.interval,
event_queue.activator,
move |mut session, mut data| {
if enable_logging {
session.give_container(data.to_mut())
}
},
);
let mut demux = OperatorBuilder::new("Compute Logging Demux".to_string(), scope.clone());
let mut input = demux.new_input(&logs, Pipeline);
let (mut export_out, export) = demux.new_output();
let (mut frontier_out, frontier) = demux.new_output();
let (mut import_frontier_out, import_frontier) = demux.new_output();
let (mut peek_out, peek) = demux.new_output();
let (mut peek_duration_out, peek_duration) = demux.new_output();
let (mut shutdown_duration_out, shutdown_duration) = demux.new_output();
let (mut arrangement_heap_size_out, arrangement_heap_size) = demux.new_output();
let (mut arrangement_heap_capacity_out, arrangement_heap_capacity) = demux.new_output();
let (mut arrangement_heap_allocations_out, arrangement_heap_allocations) =
demux.new_output();
let (mut error_count_out, error_count) = demux.new_output();
let (mut hydration_time_out, hydration_time) = demux.new_output();
let (mut lir_mapping_out, lir_mapping) = demux.new_output();
let (mut dataflow_global_ids_out, dataflow_global_ids) = demux.new_output();
let mut demux_state = DemuxState::new(worker2);
demux.build(move |_capability| {
move |_frontiers| {
let mut export = export_out.activate();
let mut frontier = frontier_out.activate();
let mut import_frontier = import_frontier_out.activate();
let mut peek = peek_out.activate();
let mut peek_duration = peek_duration_out.activate();
let mut shutdown_duration = shutdown_duration_out.activate();
let mut arrangement_heap_size = arrangement_heap_size_out.activate();
let mut arrangement_heap_capacity = arrangement_heap_capacity_out.activate();
let mut arrangement_heap_allocations = arrangement_heap_allocations_out.activate();
let mut error_count = error_count_out.activate();
let mut hydration_time = hydration_time_out.activate();
let mut lir_mapping = lir_mapping_out.activate();
let mut dataflow_global_ids = dataflow_global_ids_out.activate();
input.for_each(|cap, data| {
let mut output_sessions = DemuxOutput {
export: export.session(&cap),
frontier: frontier.session(&cap),
import_frontier: import_frontier.session(&cap),
peek: peek.session(&cap),
peek_duration: peek_duration.session(&cap),
shutdown_duration: shutdown_duration.session(&cap),
arrangement_heap_size: arrangement_heap_size.session(&cap),
arrangement_heap_capacity: arrangement_heap_capacity.session(&cap),
arrangement_heap_allocations: arrangement_heap_allocations.session(&cap),
error_count: error_count.session(&cap),
hydration_time: hydration_time.session(&cap),
lir_mapping: lir_mapping.session_with_builder(&cap),
dataflow_global_ids: dataflow_global_ids.session(&cap),
};
let shared_state = &mut shared_state.borrow_mut();
for (time, event) in data.drain() {
DemuxHandler {
state: &mut demux_state,
shared_state,
output: &mut output_sessions,
logging_interval_ms,
time,
}
.handle(event);
}
});
}
});
let mut packer = PermutedRowPacker::new(ComputeLog::DataflowCurrent);
let dataflow_current = export.as_collection().map({
let mut scratch = String::new();
move |datum| {
packer.pack_slice_owned(&[
make_string_datum(datum.export_id, &mut scratch),
Datum::UInt64(u64::cast_from(worker_id)),
Datum::UInt64(u64::cast_from(datum.dataflow_index)),
])
}
});
let mut packer = PermutedRowPacker::new(ComputeLog::FrontierCurrent);
let frontier_current = frontier.as_collection().map({
let mut scratch = String::new();
move |datum| {
packer.pack_slice_owned(&[
make_string_datum(datum.export_id, &mut scratch),
Datum::UInt64(u64::cast_from(worker_id)),
Datum::MzTimestamp(datum.time),
])
}
});
let mut packer = PermutedRowPacker::new(ComputeLog::ImportFrontierCurrent);
let import_frontier_current = import_frontier.as_collection().map({
let mut scratch1 = String::new();
let mut scratch2 = String::new();
move |datum| {
packer.pack_slice_owned(&[
make_string_datum(datum.export_id, &mut scratch1),
make_string_datum(datum.import_id, &mut scratch2),
Datum::UInt64(u64::cast_from(worker_id)),
Datum::MzTimestamp(datum.time),
])
}
});
let mut packer = PermutedRowPacker::new(ComputeLog::PeekCurrent);
let peek_current = peek.as_collection().map({
let mut scratch = String::new();
move |PeekDatum { peek, peek_type }| {
packer.pack_slice_owned(&[
Datum::Uuid(Uuid::from_bytes(peek.uuid)),
Datum::UInt64(u64::cast_from(worker_id)),
make_string_datum(peek.id, &mut scratch),
Datum::String(peek_type.name()),
Datum::MzTimestamp(peek.time),
])
}
});
let mut packer = PermutedRowPacker::new(ComputeLog::PeekDuration);
let peek_duration =
peek_duration
.as_collection()
.map(move |PeekDurationDatum { peek_type, bucket }| {
packer.pack_slice_owned(&[
Datum::UInt64(u64::cast_from(worker_id)),
Datum::String(peek_type.name()),
Datum::UInt64(bucket.try_into().expect("bucket too big")),
])
});
let mut packer = PermutedRowPacker::new(ComputeLog::ShutdownDuration);
let shutdown_duration = shutdown_duration.as_collection().map(move |bucket| {
packer.pack_slice_owned(&[
Datum::UInt64(u64::cast_from(worker_id)),
Datum::UInt64(bucket.try_into().expect("bucket too big")),
])
});
let arrangement_heap_datum_to_row =
move |packer: &mut PermutedRowPacker, ArrangementHeapDatum { operator_id }| {
packer.pack_slice_owned(&[
Datum::UInt64(operator_id.try_into().expect("operator_id too big")),
Datum::UInt64(u64::cast_from(worker_id)),
])
};
let mut packer = PermutedRowPacker::new(ComputeLog::ArrangementHeapSize);
let arrangement_heap_size = arrangement_heap_size
.as_collection()
.map(move |d| arrangement_heap_datum_to_row(&mut packer, d));
let mut packer = PermutedRowPacker::new(ComputeLog::ArrangementHeapCapacity);
let arrangement_heap_capacity = arrangement_heap_capacity
.as_collection()
.map(move |d| arrangement_heap_datum_to_row(&mut packer, d));
let mut packer = PermutedRowPacker::new(ComputeLog::ArrangementHeapSize);
let arrangement_heap_allocations = arrangement_heap_allocations
.as_collection()
.map(move |d| arrangement_heap_datum_to_row(&mut packer, d));
let mut packer = PermutedRowPacker::new(ComputeLog::ErrorCount);
let error_count = error_count.as_collection().map({
let mut scratch = String::new();
move |datum| {
packer.pack_slice_owned(&[
make_string_datum(datum.export_id, &mut scratch),
Datum::UInt64(u64::cast_from(worker_id)),
Datum::Int64(datum.count),
])
}
});
let mut packer = PermutedRowPacker::new(ComputeLog::HydrationTime);
let hydration_time = hydration_time.as_collection().map({
let mut scratch = String::new();
move |datum| {
packer.pack_slice_owned(&[
make_string_datum(datum.export_id, &mut scratch),
Datum::UInt64(u64::cast_from(worker_id)),
Datum::from(datum.time_ns),
])
}
});
let mut scratch1 = String::new();
let mut scratch2 = String::new();
let mut packer = PermutedRowPacker::new(ComputeLog::LirMapping);
let lir_mapping = lir_mapping
.map(move |(datum, time, diff)| {
let row = packer.pack_slice_owned(&[
make_string_datum(GlobalId::into_owned(datum.global_id), &mut scratch1),
Datum::UInt64(<LirId as Columnar>::into_owned(datum.lir_id).into()),
Datum::UInt64(u64::cast_from(worker_id)),
make_string_datum(datum.operator, &mut scratch2),
datum
.parent_lir_id
.map(|lir_id| Datum::UInt64(LirId::into_owned(lir_id).into()))
.unwrap_or_else(|| Datum::Null),
Datum::UInt16(u16::cast_from(*datum.nesting)),
Datum::UInt64(u64::cast_from(datum.operator_span.0)),
Datum::UInt64(u64::cast_from(datum.operator_span.1)),
]);
(row, Timestamp::into_owned(time), *diff)
})
.as_collection();
let mut packer = PermutedRowPacker::new(ComputeLog::DataflowGlobal);
let dataflow_global_ids = dataflow_global_ids.as_collection().map({
let mut scratch = String::new();
move |datum| {
packer.pack_slice_owned(&[
Datum::UInt64(u64::cast_from(datum.dataflow_index)),
Datum::UInt64(u64::cast_from(worker_id)),
make_string_datum(datum.global_id, &mut scratch),
])
}
});
use ComputeLog::*;
let logs = [
(DataflowCurrent, dataflow_current),
(FrontierCurrent, frontier_current),
(ImportFrontierCurrent, import_frontier_current),
(PeekCurrent, peek_current),
(PeekDuration, peek_duration),
(ShutdownDuration, shutdown_duration),
(ArrangementHeapSize, arrangement_heap_size),
(ArrangementHeapCapacity, arrangement_heap_capacity),
(ArrangementHeapAllocations, arrangement_heap_allocations),
(ErrorCount, error_count),
(HydrationTime, hydration_time),
(LirMapping, lir_mapping),
(DataflowGlobal, dataflow_global_ids),
];
let mut result = BTreeMap::new();
for (variant, collection) in logs {
let variant = LogVariant::Compute(variant);
if config.index_logs.contains_key(&variant) {
let trace = collection
.mz_arrange::<RowRowBatcher<_, _>, RowRowBuilder<_, _>, RowRowSpine<_, _>>(
&format!("Arrange {variant:?}"),
)
.trace;
let collection = LogCollection {
trace,
token: Rc::clone(&token),
dataflow_index,
};
result.insert(variant, collection);
}
}
result
})
}
fn make_string_datum<V>(value: V, scratch: &mut String) -> Datum<'_>
where
V: Display,
{
scratch.clear();
write!(scratch, "{}", value).expect("writing to a `String` can't fail");
Datum::String(scratch)
}
struct DemuxState<A: Allocate> {
worker: Worker<A>,
exports: BTreeMap<GlobalId, ExportState>,
dataflow_export_counts: BTreeMap<usize, u32>,
dataflow_drop_times: BTreeMap<usize, Duration>,
shutdown_dataflows: BTreeSet<usize>,
peek_stash: BTreeMap<Uuid, Duration>,
arrangement_size: BTreeMap<usize, ArrangementSizeState>,
lir_mapping: BTreeMap<GlobalId, BTreeMap<LirId, LirMetadata>>,
dataflow_global_ids: BTreeMap<usize, BTreeSet<GlobalId>>,
}
impl<A: Allocate> DemuxState<A> {
fn new(worker: Worker<A>) -> Self {
Self {
worker,
exports: Default::default(),
dataflow_export_counts: Default::default(),
dataflow_drop_times: Default::default(),
shutdown_dataflows: Default::default(),
peek_stash: Default::default(),
arrangement_size: Default::default(),
lir_mapping: Default::default(),
dataflow_global_ids: Default::default(),
}
}
}
struct ExportState {
dataflow_index: usize,
error_count: i64,
created_at: Instant,
hydration_time_ns: Option<u64>,
}
impl ExportState {
fn new(dataflow_index: usize) -> Self {
Self {
dataflow_index,
error_count: 0,
created_at: Instant::now(),
hydration_time_ns: None,
}
}
}
#[derive(Default)]
struct ArrangementSizeState {
size: isize,
capacity: isize,
count: isize,
}
type Update<D> = (D, Timestamp, Diff);
type Pusher<D> = Counter<Timestamp, Vec<Update<D>>, Tee<Timestamp, Vec<Update<D>>>>;
type PusherColumnar<D> = Counter<Timestamp, Column<Update<D>>, Tee<Timestamp, Column<Update<D>>>>;
type OutputSession<'a, D> =
Session<'a, Timestamp, CapacityContainerBuilder<Vec<Update<D>>>, Pusher<D>>;
type OutputSessionColumnar<'a, D> =
Session<'a, Timestamp, ColumnBuilder<Update<D>>, PusherColumnar<D>>;
struct DemuxOutput<'a> {
export: OutputSession<'a, ExportDatum>,
frontier: OutputSession<'a, FrontierDatum>,
import_frontier: OutputSession<'a, ImportFrontierDatum>,
peek: OutputSession<'a, PeekDatum>,
peek_duration: OutputSession<'a, PeekDurationDatum>,
shutdown_duration: OutputSession<'a, u128>,
arrangement_heap_size: OutputSession<'a, ArrangementHeapDatum>,
arrangement_heap_capacity: OutputSession<'a, ArrangementHeapDatum>,
arrangement_heap_allocations: OutputSession<'a, ArrangementHeapDatum>,
hydration_time: OutputSession<'a, HydrationTimeDatum>,
error_count: OutputSession<'a, ErrorCountDatum>,
lir_mapping: OutputSessionColumnar<'a, LirMappingDatum>,
dataflow_global_ids: OutputSession<'a, DataflowGlobalDatum>,
}
#[derive(Clone)]
struct ExportDatum {
export_id: GlobalId,
dataflow_index: usize,
}
#[derive(Clone)]
struct FrontierDatum {
export_id: GlobalId,
time: Timestamp,
}
#[derive(Clone)]
struct ImportFrontierDatum {
export_id: GlobalId,
import_id: GlobalId,
time: Timestamp,
}
#[derive(Clone)]
struct PeekDatum {
peek: Peek,
peek_type: PeekType,
}
#[derive(Clone)]
struct PeekDurationDatum {
peek_type: PeekType,
bucket: u128,
}
#[derive(Clone, Copy)]
struct ArrangementHeapDatum {
operator_id: usize,
}
#[derive(Clone)]
struct HydrationTimeDatum {
export_id: GlobalId,
time_ns: Option<u64>,
}
#[derive(Clone)]
struct ErrorCountDatum {
export_id: GlobalId,
count: i64,
}
#[derive(Clone, Columnar)]
struct LirMappingDatum {
global_id: GlobalId,
lir_id: LirId,
operator: String,
parent_lir_id: Option<LirId>,
nesting: u8,
operator_span: (usize, usize),
}
#[derive(Clone)]
struct DataflowGlobalDatum {
dataflow_index: usize,
global_id: GlobalId,
}
struct DemuxHandler<'a, 'b, A: Allocate + 'static> {
state: &'a mut DemuxState<A>,
shared_state: &'a mut SharedLoggingState,
output: &'a mut DemuxOutput<'b>,
logging_interval_ms: u128,
time: Duration,
}
impl<A: Allocate> DemuxHandler<'_, '_, A> {
fn ts(&self) -> Timestamp {
let time_ms = self.time.as_millis();
let interval = self.logging_interval_ms;
let rounded = (time_ms / interval + 1) * interval;
rounded.try_into().expect("must fit")
}
fn handle(&mut self, event: <ComputeEvent as Columnar>::Ref<'_>) {
use ComputeEventReference::*;
match event {
Export(export) => self.handle_export(export),
ExportDropped(export_dropped) => self.handle_export_dropped(export_dropped),
Peek(peek) if peek.installed => self.handle_peek_install(peek),
Peek(peek) => self.handle_peek_retire(peek),
Frontier(frontier) => self.handle_frontier(frontier),
ImportFrontier(import_frontier) => self.handle_import_frontier(import_frontier),
ArrangementHeapSize(inner) => self.handle_arrangement_heap_size(inner),
ArrangementHeapCapacity(inner) => self.handle_arrangement_heap_capacity(inner),
ArrangementHeapAllocations(inner) => self.handle_arrangement_heap_allocations(inner),
ArrangementHeapSizeOperator(inner) => self.handle_arrangement_heap_size_operator(inner),
ArrangementHeapSizeOperatorDrop(inner) => {
self.handle_arrangement_heap_size_operator_dropped(inner)
}
DataflowShutdown(shutdown) => self.handle_dataflow_shutdown(shutdown),
ErrorCount(error_count) => self.handle_error_count(error_count),
Hydration(hydration) => self.handle_hydration(hydration),
LirMapping(mapping) => self.handle_lir_mapping(mapping),
DataflowGlobal(global) => self.handle_dataflow_global(global),
}
}
fn handle_export(
&mut self,
ExportReference {
export_id,
dataflow_index,
}: <Export as Columnar>::Ref<'_>,
) {
let export_id = Columnar::into_owned(export_id);
let ts = self.ts();
let datum = ExportDatum {
export_id,
dataflow_index,
};
self.output.export.give((datum, ts, 1));
let existing = self
.state
.exports
.insert(export_id, ExportState::new(dataflow_index));
if existing.is_some() {
error!(%export_id, "export already registered");
}
*self
.state
.dataflow_export_counts
.entry(dataflow_index)
.or_default() += 1;
let datum = HydrationTimeDatum {
export_id,
time_ns: None,
};
self.output.hydration_time.give((datum, ts, 1));
}
fn handle_export_dropped(
&mut self,
ExportDroppedReference { export_id }: <ExportDropped as Columnar>::Ref<'_>,
) {
let export_id = Columnar::into_owned(export_id);
let Some(export) = self.state.exports.remove(&export_id) else {
error!(%export_id, "missing exports entry at time of export drop");
return;
};
let ts = self.ts();
let dataflow_index = export.dataflow_index;
let datum = ExportDatum {
export_id,
dataflow_index,
};
self.output.export.give((datum, ts, -1));
match self.state.dataflow_export_counts.get_mut(&dataflow_index) {
entry @ Some(0) | entry @ None => {
error!(
%export_id,
%dataflow_index,
"invalid dataflow_export_counts entry at time of export drop: {entry:?}",
);
}
Some(1) => self.handle_dataflow_dropped(dataflow_index),
Some(count) => *count -= 1,
}
if export.error_count != 0 {
let datum = ErrorCountDatum {
export_id,
count: export.error_count,
};
self.output.error_count.give((datum, ts, -1));
}
let datum = HydrationTimeDatum {
export_id,
time_ns: export.hydration_time_ns,
};
self.output.hydration_time.give((datum, ts, -1));
}
fn handle_dataflow_dropped(&mut self, dataflow_index: usize) {
self.state.dataflow_export_counts.remove(&dataflow_index);
if self.state.shutdown_dataflows.remove(&dataflow_index) {
self.output.shutdown_duration.give((0, self.ts(), 1));
} else {
let existing = self
.state
.dataflow_drop_times
.insert(dataflow_index, self.time);
if existing.is_some() {
error!(%dataflow_index, "dataflow already dropped");
}
}
}
fn handle_dataflow_shutdown(
&mut self,
DataflowShutdownReference { dataflow_index }: <DataflowShutdown as Columnar>::Ref<'_>,
) {
let ts = self.ts();
if let Some(start) = self.state.dataflow_drop_times.remove(&dataflow_index) {
let elapsed_ns = self.time.saturating_sub(start).as_nanos();
let elapsed_pow = elapsed_ns.next_power_of_two();
self.output.shutdown_duration.give((elapsed_pow, ts, 1));
} else {
let was_new = self.state.shutdown_dataflows.insert(dataflow_index);
if !was_new {
error!(%dataflow_index, "dataflow already shutdown");
}
}
if let Some(global_ids) = self.state.dataflow_global_ids.remove(&dataflow_index) {
for global_id in global_ids {
let datum = DataflowGlobalDatum {
dataflow_index,
global_id,
};
self.output.dataflow_global_ids.give((datum, ts, -1));
if let Some(mappings) = self.state.lir_mapping.remove(&global_id) {
for (
lir_id,
LirMetadata {
operator,
parent_lir_id,
nesting,
operator_span,
},
) in mappings
{
let datum = LirMappingDatum {
global_id,
lir_id,
operator,
parent_lir_id,
nesting,
operator_span,
};
self.output.lir_mapping.give(&(datum, ts, -1));
}
}
}
}
}
fn handle_error_count(
&mut self,
ErrorCountReference { export_id, diff }: <ErrorCount as Columnar>::Ref<'_>,
) {
let ts = self.ts();
let export_id = Columnar::into_owned(export_id);
let Some(export) = self.state.exports.get_mut(&export_id) else {
return;
};
let old_count = export.error_count;
let new_count = old_count + diff;
if old_count != 0 {
let datum = ErrorCountDatum {
export_id,
count: old_count,
};
self.output.error_count.give((datum, ts, -1));
}
if new_count != 0 {
let datum = ErrorCountDatum {
export_id,
count: new_count,
};
self.output.error_count.give((datum, ts, 1));
}
export.error_count = new_count;
}
fn handle_hydration(
&mut self,
HydrationReference { export_id }: <Hydration as Columnar>::Ref<'_>,
) {
let ts = self.ts();
let export_id = Columnar::into_owned(export_id);
let Some(export) = self.state.exports.get_mut(&export_id) else {
error!(%export_id, "hydration event for unknown export");
return;
};
if export.hydration_time_ns.is_some() {
return;
}
let duration = export.created_at.elapsed();
let nanos = u64::try_from(duration.as_nanos()).expect("must fit");
let retraction = HydrationTimeDatum {
export_id,
time_ns: None,
};
let insertion = HydrationTimeDatum {
export_id,
time_ns: Some(nanos),
};
self.output.hydration_time.give((retraction, ts, -1));
self.output.hydration_time.give((insertion, ts, 1));
export.hydration_time_ns = Some(nanos);
}
fn handle_peek_install(
&mut self,
PeekEventReference {
peek,
peek_type,
installed: _,
}: <PeekEvent as Columnar>::Ref<'_>,
) {
let peek = Peek::into_owned(peek);
let uuid = Uuid::from_bytes(peek.uuid);
let ts = self.ts();
self.output
.peek
.give((PeekDatum { peek, peek_type }, ts, 1));
let existing = self.state.peek_stash.insert(uuid, self.time);
if existing.is_some() {
error!(%uuid, "peek already registered");
}
}
fn handle_peek_retire(
&mut self,
PeekEventReference {
peek,
peek_type,
installed: _,
}: <PeekEvent as Columnar>::Ref<'_>,
) {
let peek = Peek::into_owned(peek);
let uuid = Uuid::from_bytes(peek.uuid);
let ts = self.ts();
self.output
.peek
.give((PeekDatum { peek, peek_type }, ts, -1));
if let Some(start) = self.state.peek_stash.remove(&uuid) {
let elapsed_ns = self.time.saturating_sub(start).as_nanos();
let bucket = elapsed_ns.next_power_of_two();
self.output
.peek_duration
.give((PeekDurationDatum { peek_type, bucket }, ts, 1));
} else {
error!(%uuid, "peek not yet registered");
}
}
fn handle_frontier(
&mut self,
FrontierReference {
export_id,
time,
diff,
}: <Frontier as Columnar>::Ref<'_>,
) {
let export_id = Columnar::into_owned(export_id);
let diff = i64::from(*diff);
let ts = self.ts();
let time = Columnar::into_owned(time);
let datum = FrontierDatum { export_id, time };
self.output.frontier.give((datum, ts, diff));
}
fn handle_import_frontier(
&mut self,
ImportFrontierReference {
import_id,
export_id,
time,
diff,
}: <ImportFrontier as Columnar>::Ref<'_>,
) {
let import_id = Columnar::into_owned(import_id);
let export_id = Columnar::into_owned(export_id);
let ts = self.ts();
let time = Columnar::into_owned(time);
let datum = ImportFrontierDatum {
export_id,
import_id,
time,
};
self.output
.import_frontier
.give((datum, ts, (*diff).into()));
}
fn handle_arrangement_heap_size(
&mut self,
ArrangementHeapSizeReference {
operator_id,
delta_size,
}: <ArrangementHeapSize as Columnar>::Ref<'_>,
) {
let ts = self.ts();
let Some(state) = self.state.arrangement_size.get_mut(&operator_id) else {
return;
};
let datum = ArrangementHeapDatum { operator_id };
self.output
.arrangement_heap_size
.give((datum, ts, Diff::cast_from(delta_size)));
state.size += delta_size;
}
fn handle_arrangement_heap_capacity(
&mut self,
ArrangementHeapCapacityReference {
operator_id,
delta_capacity,
}: <ArrangementHeapCapacity as Columnar>::Ref<'_>,
) {
let ts = self.ts();
let Some(state) = self.state.arrangement_size.get_mut(&operator_id) else {
return;
};
let datum = ArrangementHeapDatum { operator_id };
self.output
.arrangement_heap_capacity
.give((datum, ts, Diff::cast_from(delta_capacity)));
state.capacity += delta_capacity;
}
fn handle_arrangement_heap_allocations(
&mut self,
ArrangementHeapAllocationsReference {
operator_id,
delta_allocations,
}: <ArrangementHeapAllocations as Columnar>::Ref<'_>,
) {
let ts = self.ts();
let Some(state) = self.state.arrangement_size.get_mut(&operator_id) else {
return;
};
let datum = ArrangementHeapDatum { operator_id };
let diff = Diff::cast_from(delta_allocations);
self.output
.arrangement_heap_allocations
.give((datum, ts, diff));
state.count += delta_allocations;
}
fn handle_arrangement_heap_size_operator(
&mut self,
ArrangementHeapSizeOperatorReference {
operator_id,
address,
}: <ArrangementHeapSizeOperator as Columnar>::Ref<'_>,
) {
let activator = self
.state
.worker
.activator_for(address.into_iter().collect());
let existing = self
.state
.arrangement_size
.insert(operator_id, Default::default());
if existing.is_some() {
error!(%operator_id, "arrangement size operator already registered");
}
let existing = self
.shared_state
.arrangement_size_activators
.insert(operator_id, activator);
if existing.is_some() {
error!(%operator_id, "arrangement size activator already registered");
}
}
fn handle_arrangement_heap_size_operator_dropped(
&mut self,
ArrangementHeapSizeOperatorDropReference { operator_id }: <ArrangementHeapSizeOperatorDrop as Columnar>::Ref<'_>,
) {
if let Some(state) = self.state.arrangement_size.remove(&operator_id) {
let ts = self.ts();
let datum = ArrangementHeapDatum { operator_id };
let diff = -Diff::cast_from(state.size);
self.output.arrangement_heap_size.give((datum, ts, diff));
let diff = -Diff::cast_from(state.capacity);
self.output
.arrangement_heap_capacity
.give((datum, ts, diff));
let diff = -Diff::cast_from(state.count);
self.output
.arrangement_heap_allocations
.give((datum, ts, diff));
}
self.shared_state
.arrangement_size_activators
.remove(&operator_id);
}
fn handle_lir_mapping(
&mut self,
LirMappingReference { global_id, mapping }: <LirMapping as Columnar>::Ref<'_>,
) {
let global_id = Columnar::into_owned(global_id);
let mappings = || mapping.into_iter().map(Columnar::into_owned);
self.state
.lir_mapping
.entry(global_id)
.and_modify(|existing_mapping| existing_mapping.extend(mappings()))
.or_insert_with(|| mappings().collect());
let ts = self.ts();
for (lir_id, meta) in mapping.into_iter() {
let datum = LirMappingDatumReference {
global_id,
lir_id,
operator: meta.operator,
parent_lir_id: meta.parent_lir_id,
nesting: meta.nesting,
operator_span: meta.operator_span,
};
self.output.lir_mapping.give((datum, ts, 1));
}
}
fn handle_dataflow_global(
&mut self,
DataflowGlobalReference {
dataflow_index,
global_id,
}: <DataflowGlobal as Columnar>::Ref<'_>,
) {
let global_id = Columnar::into_owned(global_id);
self.state
.dataflow_global_ids
.entry(dataflow_index)
.and_modify(|globals| {
if !globals.insert(global_id) {
error!(%dataflow_index, %global_id, "dataflow mapping already knew about this GlobalId");
}
})
.or_insert_with(|| BTreeSet::from([global_id]));
let ts = self.ts();
let datum = DataflowGlobalDatum {
dataflow_index,
global_id,
};
self.output.dataflow_global_ids.give((datum, ts, 1));
}
}
pub struct CollectionLogging {
export_id: GlobalId,
logger: Logger,
logged_frontier: Option<Timestamp>,
logged_import_frontiers: BTreeMap<GlobalId, Timestamp>,
}
impl CollectionLogging {
pub fn new(
export_id: GlobalId,
logger: Logger,
dataflow_index: usize,
import_ids: impl Iterator<Item = GlobalId>,
) -> Self {
logger.log(&ComputeEvent::Export(Export {
export_id,
dataflow_index,
}));
let mut self_ = Self {
export_id,
logger,
logged_frontier: None,
logged_import_frontiers: Default::default(),
};
let initial_frontier = Some(Timestamp::MIN);
self_.set_frontier(initial_frontier);
import_ids.for_each(|id| self_.set_import_frontier(id, initial_frontier));
self_
}
pub fn set_frontier(&mut self, new_time: Option<Timestamp>) {
let old_time = self.logged_frontier;
self.logged_frontier = new_time;
if old_time != new_time {
let export_id = self.export_id;
let retraction = old_time.map(|time| {
ComputeEvent::Frontier(Frontier {
export_id,
time,
diff: -1,
})
});
let insertion = new_time.map(|time| {
ComputeEvent::Frontier(Frontier {
export_id,
time,
diff: 1,
})
});
let events = retraction.as_ref().into_iter().chain(insertion.as_ref());
self.logger.log_many(events);
}
}
pub fn set_import_frontier(&mut self, import_id: GlobalId, new_time: Option<Timestamp>) {
let old_time = self.logged_import_frontiers.remove(&import_id);
if let Some(time) = new_time {
self.logged_import_frontiers.insert(import_id, time);
}
if old_time != new_time {
let export_id = self.export_id;
let retraction = old_time.map(|time| {
ComputeEvent::ImportFrontier(ImportFrontier {
import_id,
export_id,
time,
diff: -1,
})
});
let insertion = new_time.map(|time| {
ComputeEvent::ImportFrontier(ImportFrontier {
import_id,
export_id,
time,
diff: 1,
})
});
let events = retraction.as_ref().into_iter().chain(insertion.as_ref());
self.logger.log_many(events);
}
}
pub fn set_hydrated(&self) {
self.logger.log(&ComputeEvent::Hydration(Hydration {
export_id: self.export_id,
}));
}
}
impl Drop for CollectionLogging {
fn drop(&mut self) {
self.set_frontier(None);
let import_ids: Vec<_> = self.logged_import_frontiers.keys().copied().collect();
for import_id in import_ids {
self.set_import_frontier(import_id, None);
}
self.logger.log(&ComputeEvent::ExportDropped(ExportDropped {
export_id: self.export_id,
}));
}
}
pub(crate) trait LogDataflowErrors {
fn log_dataflow_errors(self, logger: Logger, export_id: GlobalId) -> Self;
}
impl<G, D> LogDataflowErrors for Collection<G, D, Diff>
where
G: Scope,
D: Data,
{
fn log_dataflow_errors(self, logger: Logger, export_id: GlobalId) -> Self {
self.inner
.unary(Pipeline, "LogDataflowErrorsCollection", |_cap, _info| {
move |input, output| {
input.for_each(|cap, data| {
let diff = data.iter().map(|(_d, _t, r)| r).sum();
logger.log(&ComputeEvent::ErrorCount(ErrorCount { export_id, diff }));
output.session(&cap).give_container(data);
});
}
})
.as_collection()
}
}
impl<G, B> LogDataflowErrors for Stream<G, B>
where
G: Scope,
for<'a> B: BatchReader<DiffGat<'a> = &'a Diff> + Clone + 'static,
{
fn log_dataflow_errors(self, logger: Logger, export_id: GlobalId) -> Self {
self.unary(Pipeline, "LogDataflowErrorsStream", |_cap, _info| {
move |input, output| {
input.for_each(|cap, data| {
let diff = data.iter().map(sum_batch_diffs).sum();
logger.log(&ComputeEvent::ErrorCount(ErrorCount { export_id, diff }));
output.session(&cap).give_container(data);
});
}
})
}
}
fn sum_batch_diffs<B>(batch: &B) -> Diff
where
for<'a> B: BatchReader<DiffGat<'a> = &'a Diff>,
{
let mut sum = 0;
let mut cursor = batch.cursor();
while cursor.key_valid(batch) {
while cursor.val_valid(batch) {
cursor.map_times(batch, |_t, r| sum += r);
cursor.step_val(batch);
}
cursor.step_key(batch);
}
sum
}
#[cfg(test)]
mod tests {
use super::*;
#[mz_ore::test]
fn test_compute_event_size() {
assert_eq!(56, std::mem::size_of::<ComputeEvent>())
}
}