use std::cell::RefCell;
use std::collections::{BTreeMap, BTreeSet};
use std::fmt::{Display, Write};
use std::rc::Rc;
use std::time::{Duration, Instant};
use differential_dataflow::collection::AsCollection;
use differential_dataflow::trace::{BatchReader, Cursor};
use differential_dataflow::Collection;
use mz_compute_types::plan::LirId;
use mz_ore::cast::CastFrom;
use mz_repr::{Datum, Diff, GlobalId, Timestamp};
use mz_timely_util::replay::MzReplay;
use timely::communication::Allocate;
use timely::container::CapacityContainerBuilder;
use timely::dataflow::channels::pact::Pipeline;
use timely::dataflow::channels::pushers::buffer::Session;
use timely::dataflow::channels::pushers::{Counter, Tee};
use timely::dataflow::operators::generic::builder_rc::OperatorBuilder;
use timely::dataflow::operators::{Filter, Operator};
use timely::dataflow::{Scope, Stream};
use timely::scheduling::Scheduler;
use timely::worker::Worker;
use timely::{Container, Data};
use tracing::error;
use uuid::Uuid;
use crate::extensions::arrange::MzArrange;
use crate::logging::{
ComputeLog, EventQueue, LogCollection, LogVariant, PermutedRowPacker, SharedLoggingState,
};
use crate::row_spine::{RowRowBatcher, RowRowBuilder};
use crate::typedefs::RowRowSpine;
pub type Logger = timely::logging_core::Logger<ComputeEvent>;
#[derive(Debug, Clone, PartialOrd, PartialEq)]
pub enum ComputeEvent {
Export {
id: GlobalId,
dataflow_index: usize,
},
ExportDropped {
id: GlobalId,
},
Peek {
peek: Peek,
peek_type: PeekType,
installed: bool,
},
Frontier {
id: GlobalId,
time: Timestamp,
diff: i8,
},
ImportFrontier {
import_id: GlobalId,
export_id: GlobalId,
time: Timestamp,
diff: i8,
},
ArrangementHeapSize {
operator: usize,
delta_size: isize,
},
ArrangementHeapCapacity {
operator: usize,
delta_capacity: isize,
},
ArrangementHeapAllocations {
operator: usize,
delta_allocations: isize,
},
ArrangementHeapSizeOperator {
operator: usize,
address: Rc<[usize]>,
},
ArrangementHeapSizeOperatorDrop {
operator: usize,
},
DataflowShutdown {
dataflow_index: usize,
},
ErrorCount {
export_id: GlobalId,
diff: i64,
},
Hydration { export_id: GlobalId },
LirMapping {
global_id: GlobalId,
mapping: Box<[(LirId, LirMetadata)]>,
},
DataflowGlobal {
id: usize,
global_id: GlobalId,
},
}
#[derive(Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Debug)]
pub enum PeekType {
Index,
Persist,
}
impl PeekType {
fn name(self) -> &'static str {
match self {
PeekType::Index => "index",
PeekType::Persist => "persist",
}
}
}
#[derive(Debug, Clone, PartialOrd, PartialEq)]
pub struct Peek {
id: GlobalId,
time: Timestamp,
uuid: Uuid,
}
impl Peek {
pub fn new(id: GlobalId, time: Timestamp, uuid: Uuid) -> Self {
Self { id, time, uuid }
}
}
#[derive(Clone, Debug, PartialEq, PartialOrd)]
pub struct LirMetadata {
operator: Box<str>,
parent_lir_id: Option<LirId>,
nesting: u8,
operator_span: (usize, usize),
}
impl LirMetadata {
pub fn new(
operator: Box<str>,
parent_lir_id: Option<LirId>,
nesting: u8,
operator_span: (usize, usize),
) -> Self {
Self {
operator,
parent_lir_id,
nesting,
operator_span,
}
}
}
pub(super) fn construct<A: Allocate + 'static>(
worker: &mut timely::worker::Worker<A>,
config: &mz_compute_client::logging::LoggingConfig,
event_queue: EventQueue<Vec<(Duration, ComputeEvent)>>,
shared_state: Rc<RefCell<SharedLoggingState>>,
) -> BTreeMap<LogVariant, LogCollection> {
let logging_interval_ms = std::cmp::max(1, config.interval.as_millis());
let worker_id = worker.index();
let worker2 = worker.clone();
let dataflow_index = worker.next_dataflow_index();
worker.dataflow_named("Dataflow: compute logging", move |scope| {
let (mut logs, token) = Some(event_queue.link)
.mz_replay::<_, CapacityContainerBuilder<_>, _>(
scope,
"compute logs",
config.interval,
event_queue.activator,
|mut session, data| session.give_iterator(data.iter()),
);
if !config.enable_logging {
logs = logs.filter(|_| false);
}
let mut demux = OperatorBuilder::new("Compute Logging Demux".to_string(), scope.clone());
let mut input = demux.new_input(&logs, Pipeline);
let (mut export_out, export) = demux.new_output();
let (mut frontier_out, frontier) = demux.new_output();
let (mut import_frontier_out, import_frontier) = demux.new_output();
let (mut peek_out, peek) = demux.new_output();
let (mut peek_duration_out, peek_duration) = demux.new_output();
let (mut shutdown_duration_out, shutdown_duration) = demux.new_output();
let (mut arrangement_heap_size_out, arrangement_heap_size) = demux.new_output();
let (mut arrangement_heap_capacity_out, arrangement_heap_capacity) = demux.new_output();
let (mut arrangement_heap_allocations_out, arrangement_heap_allocations) =
demux.new_output();
let (mut error_count_out, error_count) = demux.new_output();
let (mut hydration_time_out, hydration_time) = demux.new_output();
let (mut lir_mapping_out, lir_mapping) = demux.new_output();
let (mut dataflow_global_ids_out, dataflow_global_ids) = demux.new_output();
let mut demux_state = DemuxState::new(worker2);
demux.build(move |_capability| {
move |_frontiers| {
let mut export = export_out.activate();
let mut frontier = frontier_out.activate();
let mut import_frontier = import_frontier_out.activate();
let mut peek = peek_out.activate();
let mut peek_duration = peek_duration_out.activate();
let mut shutdown_duration = shutdown_duration_out.activate();
let mut arrangement_heap_size = arrangement_heap_size_out.activate();
let mut arrangement_heap_capacity = arrangement_heap_capacity_out.activate();
let mut arrangement_heap_allocations = arrangement_heap_allocations_out.activate();
let mut error_count = error_count_out.activate();
let mut hydration_time = hydration_time_out.activate();
let mut lir_mapping = lir_mapping_out.activate();
let mut dataflow_global_ids = dataflow_global_ids_out.activate();
input.for_each(|cap, data| {
let mut output_sessions = DemuxOutput {
export: export.session(&cap),
frontier: frontier.session(&cap),
import_frontier: import_frontier.session(&cap),
peek: peek.session(&cap),
peek_duration: peek_duration.session(&cap),
shutdown_duration: shutdown_duration.session(&cap),
arrangement_heap_size: arrangement_heap_size.session(&cap),
arrangement_heap_capacity: arrangement_heap_capacity.session(&cap),
arrangement_heap_allocations: arrangement_heap_allocations.session(&cap),
error_count: error_count.session(&cap),
hydration_time: hydration_time.session(&cap),
lir_mapping: lir_mapping.session(&cap),
dataflow_global_ids: dataflow_global_ids.session(&cap),
};
for (time, event) in data.drain(..) {
DemuxHandler {
state: &mut demux_state,
shared_state: &mut shared_state.borrow_mut(),
output: &mut output_sessions,
logging_interval_ms,
time,
}
.handle(event);
}
});
}
});
let packer = PermutedRowPacker::new(ComputeLog::DataflowCurrent);
let dataflow_current = export.as_collection().map({
let mut scratch = String::new();
move |datum| {
packer.pack_slice(&[
make_string_datum(datum.id, &mut scratch),
Datum::UInt64(u64::cast_from(worker_id)),
Datum::UInt64(u64::cast_from(datum.dataflow_id)),
])
}
});
let packer = PermutedRowPacker::new(ComputeLog::FrontierCurrent);
let frontier_current = frontier.as_collection().map({
let mut scratch = String::new();
move |datum| {
packer.pack_slice(&[
make_string_datum(datum.export_id, &mut scratch),
Datum::UInt64(u64::cast_from(worker_id)),
Datum::MzTimestamp(datum.frontier),
])
}
});
let packer = PermutedRowPacker::new(ComputeLog::ImportFrontierCurrent);
let import_frontier_current = import_frontier.as_collection().map({
let mut scratch1 = String::new();
let mut scratch2 = String::new();
move |datum| {
packer.pack_slice(&[
make_string_datum(datum.export_id, &mut scratch1),
make_string_datum(datum.import_id, &mut scratch2),
Datum::UInt64(u64::cast_from(worker_id)),
Datum::MzTimestamp(datum.frontier),
])
}
});
let packer = PermutedRowPacker::new(ComputeLog::PeekCurrent);
let peek_current = peek.as_collection().map({
let mut scratch = String::new();
move |PeekDatum { peek, peek_type }| {
packer.pack_slice(&[
Datum::Uuid(peek.uuid),
Datum::UInt64(u64::cast_from(worker_id)),
make_string_datum(peek.id, &mut scratch),
Datum::String(peek_type.name()),
Datum::MzTimestamp(peek.time),
])
}
});
let packer = PermutedRowPacker::new(ComputeLog::PeekDuration);
let peek_duration =
peek_duration
.as_collection()
.map(move |PeekDurationDatum { peek_type, bucket }| {
packer.pack_slice(&[
Datum::UInt64(u64::cast_from(worker_id)),
Datum::String(peek_type.name()),
Datum::UInt64(bucket.try_into().expect("bucket too big")),
])
});
let packer = PermutedRowPacker::new(ComputeLog::ShutdownDuration);
let shutdown_duration = shutdown_duration.as_collection().map(move |bucket| {
packer.pack_slice(&[
Datum::UInt64(u64::cast_from(worker_id)),
Datum::UInt64(bucket.try_into().expect("bucket too big")),
])
});
let arrangement_heap_datum_to_row =
move |packer: &mut PermutedRowPacker, ArrangementHeapDatum { operator_id }| {
packer.pack_slice(&[
Datum::UInt64(operator_id.try_into().expect("operator_id too big")),
Datum::UInt64(u64::cast_from(worker_id)),
])
};
let mut packer = PermutedRowPacker::new(ComputeLog::ArrangementHeapSize);
let arrangement_heap_size = arrangement_heap_size
.as_collection()
.map(move |d| arrangement_heap_datum_to_row(&mut packer, d));
let mut packer = PermutedRowPacker::new(ComputeLog::ArrangementHeapCapacity);
let arrangement_heap_capacity = arrangement_heap_capacity
.as_collection()
.map(move |d| arrangement_heap_datum_to_row(&mut packer, d));
let mut packer = PermutedRowPacker::new(ComputeLog::ArrangementHeapSize);
let arrangement_heap_allocations = arrangement_heap_allocations
.as_collection()
.map(move |d| arrangement_heap_datum_to_row(&mut packer, d));
let packer = PermutedRowPacker::new(ComputeLog::ErrorCount);
let error_count = error_count.as_collection().map({
let mut scratch = String::new();
move |datum| {
packer.pack_slice(&[
make_string_datum(datum.export_id, &mut scratch),
Datum::UInt64(u64::cast_from(worker_id)),
Datum::Int64(datum.count),
])
}
});
let packer = PermutedRowPacker::new(ComputeLog::HydrationTime);
let hydration_time = hydration_time.as_collection().map({
let mut scratch = String::new();
move |datum| {
packer.pack_slice(&[
make_string_datum(datum.export_id, &mut scratch),
Datum::UInt64(u64::cast_from(worker_id)),
Datum::from(datum.time_ns),
])
}
});
let packer = PermutedRowPacker::new(ComputeLog::LirMapping);
let lir_mapping = lir_mapping.as_collection().map({
let mut scratch1 = String::new();
let mut scratch2 = String::new();
move |datum| {
packer.pack_slice(&[
make_string_datum(datum.global_id, &mut scratch1),
Datum::UInt64(datum.lir_id.into()),
Datum::UInt64(u64::cast_from(worker_id)),
make_string_datum(&datum.operator, &mut scratch2),
datum
.parent_lir_id
.map(|lir_id| Datum::UInt64(lir_id.into()))
.unwrap_or_else(|| Datum::Null),
Datum::UInt16(u16::cast_from(datum.nesting)),
Datum::UInt64(u64::cast_from(datum.operator_span.0)),
Datum::UInt64(u64::cast_from(datum.operator_span.1)),
])
}
});
let packer = PermutedRowPacker::new(ComputeLog::DataflowGlobal);
let dataflow_global_ids = dataflow_global_ids.as_collection().map({
let mut scratch = String::new();
move |datum| {
packer.pack_slice(&[
Datum::UInt64(u64::cast_from(datum.id)),
Datum::UInt64(u64::cast_from(worker_id)),
make_string_datum(datum.global_id, &mut scratch),
])
}
});
use ComputeLog::*;
let logs = [
(DataflowCurrent, dataflow_current),
(FrontierCurrent, frontier_current),
(ImportFrontierCurrent, import_frontier_current),
(PeekCurrent, peek_current),
(PeekDuration, peek_duration),
(ShutdownDuration, shutdown_duration),
(ArrangementHeapSize, arrangement_heap_size),
(ArrangementHeapCapacity, arrangement_heap_capacity),
(ArrangementHeapAllocations, arrangement_heap_allocations),
(ErrorCount, error_count),
(HydrationTime, hydration_time),
(LirMapping, lir_mapping),
(DataflowGlobal, dataflow_global_ids),
];
let mut result = BTreeMap::new();
for (variant, collection) in logs {
let variant = LogVariant::Compute(variant);
if config.index_logs.contains_key(&variant) {
let trace = collection
.mz_arrange::<RowRowBatcher<_, _>, RowRowBuilder<_, _>, RowRowSpine<_, _>>(
&format!("Arrange {variant:?}"),
)
.trace;
let collection = LogCollection {
trace,
token: Rc::clone(&token),
dataflow_index,
};
result.insert(variant, collection);
}
}
result
})
}
fn make_string_datum<V>(value: V, scratch: &mut String) -> Datum<'_>
where
V: Display,
{
scratch.clear();
write!(scratch, "{}", value).expect("writing to a `String` can't fail");
Datum::String(scratch)
}
struct DemuxState<A: Allocate> {
worker: Worker<A>,
exports: BTreeMap<GlobalId, ExportState>,
dataflow_export_counts: BTreeMap<usize, u32>,
dataflow_drop_times: BTreeMap<usize, Duration>,
shutdown_dataflows: BTreeSet<usize>,
peek_stash: BTreeMap<Uuid, Duration>,
arrangement_size: BTreeMap<usize, ArrangementSizeState>,
lir_mapping: BTreeMap<GlobalId, BTreeMap<LirId, LirMetadata>>,
dataflow_global_ids: BTreeMap<usize, BTreeSet<GlobalId>>,
}
impl<A: Allocate> DemuxState<A> {
fn new(worker: Worker<A>) -> Self {
Self {
worker,
exports: Default::default(),
dataflow_export_counts: Default::default(),
dataflow_drop_times: Default::default(),
shutdown_dataflows: Default::default(),
peek_stash: Default::default(),
arrangement_size: Default::default(),
lir_mapping: Default::default(),
dataflow_global_ids: Default::default(),
}
}
}
struct ExportState {
dataflow_id: usize,
error_count: i64,
created_at: Instant,
hydration_time_ns: Option<u64>,
}
impl ExportState {
fn new(dataflow_id: usize) -> Self {
Self {
dataflow_id,
error_count: 0,
created_at: Instant::now(),
hydration_time_ns: None,
}
}
}
#[derive(Default)]
struct ArrangementSizeState {
size: isize,
capacity: isize,
count: isize,
}
type Update<D> = (D, Timestamp, Diff);
type Pusher<D> = Counter<Timestamp, Vec<Update<D>>, Tee<Timestamp, Vec<Update<D>>>>;
type OutputSession<'a, D> =
Session<'a, Timestamp, CapacityContainerBuilder<Vec<Update<D>>>, Pusher<D>>;
struct DemuxOutput<'a> {
export: OutputSession<'a, ExportDatum>,
frontier: OutputSession<'a, FrontierDatum>,
import_frontier: OutputSession<'a, ImportFrontierDatum>,
peek: OutputSession<'a, PeekDatum>,
peek_duration: OutputSession<'a, PeekDurationDatum>,
shutdown_duration: OutputSession<'a, u128>,
arrangement_heap_size: OutputSession<'a, ArrangementHeapDatum>,
arrangement_heap_capacity: OutputSession<'a, ArrangementHeapDatum>,
arrangement_heap_allocations: OutputSession<'a, ArrangementHeapDatum>,
hydration_time: OutputSession<'a, HydrationTimeDatum>,
error_count: OutputSession<'a, ErrorCountDatum>,
lir_mapping: OutputSession<'a, LirMappingDatum>,
dataflow_global_ids: OutputSession<'a, DataflowGlobalDatum>,
}
#[derive(Clone)]
struct ExportDatum {
id: GlobalId,
dataflow_id: usize,
}
#[derive(Clone)]
struct FrontierDatum {
export_id: GlobalId,
frontier: Timestamp,
}
#[derive(Clone)]
struct ImportFrontierDatum {
export_id: GlobalId,
import_id: GlobalId,
frontier: Timestamp,
}
#[derive(Clone)]
struct PeekDatum {
peek: Peek,
peek_type: PeekType,
}
#[derive(Clone)]
struct PeekDurationDatum {
peek_type: PeekType,
bucket: u128,
}
#[derive(Clone)]
struct ArrangementHeapDatum {
operator_id: usize,
}
#[derive(Clone)]
struct HydrationTimeDatum {
export_id: GlobalId,
time_ns: Option<u64>,
}
#[derive(Clone)]
struct ErrorCountDatum {
export_id: GlobalId,
count: i64,
}
#[derive(Clone)]
struct LirMappingDatum {
global_id: GlobalId,
lir_id: LirId,
operator: Box<str>,
parent_lir_id: Option<LirId>,
nesting: u8,
operator_span: (usize, usize),
}
#[derive(Clone)]
struct DataflowGlobalDatum {
id: usize,
global_id: GlobalId,
}
struct DemuxHandler<'a, 'b, A: Allocate + 'static> {
state: &'a mut DemuxState<A>,
shared_state: &'a mut SharedLoggingState,
output: &'a mut DemuxOutput<'b>,
logging_interval_ms: u128,
time: Duration,
}
impl<A: Allocate> DemuxHandler<'_, '_, A> {
fn ts(&self) -> Timestamp {
let time_ms = self.time.as_millis();
let interval = self.logging_interval_ms;
let rounded = (time_ms / interval + 1) * interval;
rounded.try_into().expect("must fit")
}
fn handle(&mut self, event: ComputeEvent) {
use ComputeEvent::*;
match event {
Export { id, dataflow_index } => self.handle_export(id, dataflow_index),
ExportDropped { id } => self.handle_export_dropped(id),
Peek {
peek,
peek_type,
installed: true,
} => self.handle_peek_install(peek, peek_type),
Peek {
peek,
peek_type,
installed: false,
} => self.handle_peek_retire(peek, peek_type),
Frontier { id, time, diff } => self.handle_frontier(id, time, diff),
ImportFrontier {
import_id,
export_id,
time,
diff,
} => self.handle_import_frontier(import_id, export_id, time, diff),
ArrangementHeapSize {
operator,
delta_size: size,
} => self.handle_arrangement_heap_size(operator, size),
ArrangementHeapCapacity {
operator,
delta_capacity: capacity,
} => self.handle_arrangement_heap_capacity(operator, capacity),
ArrangementHeapAllocations {
operator,
delta_allocations: allocations,
} => self.handle_arrangement_heap_allocations(operator, allocations),
ArrangementHeapSizeOperator { operator, address } => {
self.handle_arrangement_heap_size_operator(operator, address)
}
ArrangementHeapSizeOperatorDrop { operator } => {
self.handle_arrangement_heap_size_operator_dropped(operator)
}
DataflowShutdown { dataflow_index } => self.handle_dataflow_shutdown(dataflow_index),
ErrorCount { export_id, diff } => self.handle_error_count(export_id, diff),
Hydration { export_id } => self.handle_hydration(export_id),
LirMapping { global_id, mapping } => self.handle_lir_mapping(global_id, mapping),
DataflowGlobal { id, global_id } => self.handle_dataflow_global(id, global_id),
}
}
fn handle_export(&mut self, id: GlobalId, dataflow_id: usize) {
let ts = self.ts();
let datum = ExportDatum { id, dataflow_id };
self.output.export.give((datum, ts, 1));
let existing = self.state.exports.insert(id, ExportState::new(dataflow_id));
if existing.is_some() {
error!(export = %id, "export already registered");
}
*self
.state
.dataflow_export_counts
.entry(dataflow_id)
.or_default() += 1;
let datum = HydrationTimeDatum {
export_id: id,
time_ns: None,
};
self.output.hydration_time.give((datum, ts, 1));
}
fn handle_export_dropped(&mut self, id: GlobalId) {
let Some(export) = self.state.exports.remove(&id) else {
error!(export = %id, "missing exports entry at time of export drop");
return;
};
let ts = self.ts();
let dataflow_id = export.dataflow_id;
let datum = ExportDatum { id, dataflow_id };
self.output.export.give((datum, ts, -1));
match self.state.dataflow_export_counts.get_mut(&dataflow_id) {
entry @ Some(0) | entry @ None => {
error!(
export = %id,
dataflow = %dataflow_id,
"invalid dataflow_export_counts entry at time of export drop: {entry:?}",
);
}
Some(1) => self.handle_dataflow_dropped(dataflow_id),
Some(count) => *count -= 1,
}
if export.error_count != 0 {
let datum = ErrorCountDatum {
export_id: id,
count: export.error_count,
};
self.output.error_count.give((datum, ts, -1));
}
let datum = HydrationTimeDatum {
export_id: id,
time_ns: export.hydration_time_ns,
};
self.output.hydration_time.give((datum, ts, -1));
}
fn handle_dataflow_dropped(&mut self, id: usize) {
self.state.dataflow_export_counts.remove(&id);
if self.state.shutdown_dataflows.remove(&id) {
self.output.shutdown_duration.give((0, self.ts(), 1));
} else {
let existing = self.state.dataflow_drop_times.insert(id, self.time);
if existing.is_some() {
error!(dataflow = %id, "dataflow already dropped");
}
}
}
fn handle_dataflow_shutdown(&mut self, id: usize) {
let ts = self.ts();
if let Some(start) = self.state.dataflow_drop_times.remove(&id) {
let elapsed_ns = self.time.saturating_sub(start).as_nanos();
let elapsed_pow = elapsed_ns.next_power_of_two();
self.output.shutdown_duration.give((elapsed_pow, ts, 1));
} else {
let was_new = self.state.shutdown_dataflows.insert(id);
if !was_new {
error!(dataflow = %id, "dataflow already shutdown");
}
}
if let Some(global_ids) = self.state.dataflow_global_ids.remove(&id) {
for global_id in global_ids {
let datum = DataflowGlobalDatum { id, global_id };
self.output.dataflow_global_ids.give((datum, ts, -1));
if let Some(mappings) = self.state.lir_mapping.remove(&global_id) {
for (
lir_id,
LirMetadata {
operator,
parent_lir_id,
nesting,
operator_span,
},
) in mappings
{
let datum = LirMappingDatum {
global_id,
lir_id,
operator,
parent_lir_id,
nesting,
operator_span,
};
self.output.lir_mapping.give((datum, ts, -1));
}
}
}
}
}
fn handle_error_count(&mut self, export_id: GlobalId, diff: i64) {
let ts = self.ts();
let Some(export) = self.state.exports.get_mut(&export_id) else {
return;
};
let old_count = export.error_count;
let new_count = old_count + diff;
if old_count != 0 {
let datum = ErrorCountDatum {
export_id,
count: old_count,
};
self.output.error_count.give((datum, ts, -1));
}
if new_count != 0 {
let datum = ErrorCountDatum {
export_id,
count: new_count,
};
self.output.error_count.give((datum, ts, 1));
}
export.error_count = new_count;
}
fn handle_hydration(&mut self, export_id: GlobalId) {
let ts = self.ts();
let Some(export) = self.state.exports.get_mut(&export_id) else {
error!(export = %export_id, "hydration event for unknown export");
return;
};
if export.hydration_time_ns.is_some() {
return;
}
let duration = export.created_at.elapsed();
let nanos = u64::try_from(duration.as_nanos()).expect("must fit");
let retraction = HydrationTimeDatum {
export_id,
time_ns: None,
};
let insertion = HydrationTimeDatum {
export_id,
time_ns: Some(nanos),
};
self.output.hydration_time.give((retraction, ts, -1));
self.output.hydration_time.give((insertion, ts, 1));
export.hydration_time_ns = Some(nanos);
}
fn handle_peek_install(&mut self, peek: Peek, peek_type: PeekType) {
let uuid = peek.uuid;
let ts = self.ts();
self.output
.peek
.give((PeekDatum { peek, peek_type }, ts, 1));
let existing = self.state.peek_stash.insert(uuid, self.time);
if existing.is_some() {
error!(
uuid = %uuid,
"peek already registered",
);
}
}
fn handle_peek_retire(&mut self, peek: Peek, peek_type: PeekType) {
let uuid = peek.uuid;
let ts = self.ts();
self.output
.peek
.give((PeekDatum { peek, peek_type }, ts, -1));
if let Some(start) = self.state.peek_stash.remove(&uuid) {
let elapsed_ns = self.time.saturating_sub(start).as_nanos();
let bucket = elapsed_ns.next_power_of_two();
self.output
.peek_duration
.give((PeekDurationDatum { peek_type, bucket }, ts, 1));
} else {
error!(
uuid = %uuid,
"peek not yet registered",
);
}
}
fn handle_frontier(&mut self, export_id: GlobalId, frontier: Timestamp, diff: i8) {
let diff = i64::from(diff);
let ts = self.ts();
let datum = FrontierDatum {
export_id,
frontier,
};
self.output.frontier.give((datum, ts, diff));
}
fn handle_import_frontier(
&mut self,
import_id: GlobalId,
export_id: GlobalId,
frontier: Timestamp,
diff: i8,
) {
let ts = self.ts();
let datum = ImportFrontierDatum {
export_id,
import_id,
frontier,
};
self.output.import_frontier.give((datum, ts, diff.into()));
}
fn handle_arrangement_heap_size(&mut self, operator_id: usize, size: isize) {
let ts = self.ts();
let Some(state) = self.state.arrangement_size.get_mut(&operator_id) else {
return;
};
let datum = ArrangementHeapDatum { operator_id };
self.output
.arrangement_heap_size
.give((datum, ts, Diff::cast_from(size)));
state.size += size;
}
fn handle_arrangement_heap_capacity(&mut self, operator_id: usize, capacity: isize) {
let ts = self.ts();
let Some(state) = self.state.arrangement_size.get_mut(&operator_id) else {
return;
};
let datum = ArrangementHeapDatum { operator_id };
self.output
.arrangement_heap_capacity
.give((datum, ts, Diff::cast_from(capacity)));
state.capacity += capacity;
}
fn handle_arrangement_heap_allocations(&mut self, operator_id: usize, count: isize) {
let ts = self.ts();
let Some(state) = self.state.arrangement_size.get_mut(&operator_id) else {
return;
};
let datum = ArrangementHeapDatum { operator_id };
self.output
.arrangement_heap_allocations
.give((datum, ts, Diff::cast_from(count)));
state.count += count;
}
fn handle_arrangement_heap_size_operator(&mut self, operator_id: usize, address: Rc<[usize]>) {
let activator = self.state.worker.activator_for(address);
let existing = self
.state
.arrangement_size
.insert(operator_id, Default::default());
if existing.is_some() {
error!(%operator_id, "arrangement size operator already registered");
}
let existing = self
.shared_state
.arrangement_size_activators
.insert(operator_id, activator);
if existing.is_some() {
error!(%operator_id, "arrangement size activator already registered");
}
}
fn handle_arrangement_heap_size_operator_dropped(&mut self, operator_id: usize) {
if let Some(state) = self.state.arrangement_size.remove(&operator_id) {
let ts = self.ts();
let datum = ArrangementHeapDatum { operator_id };
self.output.arrangement_heap_size.give((
datum.clone(),
ts,
-Diff::cast_from(state.size),
));
self.output.arrangement_heap_capacity.give((
datum.clone(),
ts,
-Diff::cast_from(state.capacity),
));
self.output.arrangement_heap_allocations.give((
datum,
ts,
-Diff::cast_from(state.count),
));
}
self.shared_state
.arrangement_size_activators
.remove(&operator_id);
}
fn handle_lir_mapping(&mut self, global_id: GlobalId, mapping: Box<[(LirId, LirMetadata)]>) {
self.state
.lir_mapping
.entry(global_id)
.and_modify(|existing_mapping| existing_mapping.extend(mapping.iter().cloned()))
.or_insert_with(|| mapping.iter().cloned().collect::<BTreeMap<_, _>>());
let ts = self.ts();
for (
lir_id,
LirMetadata {
operator,
parent_lir_id,
nesting,
operator_span,
},
) in mapping
{
let datum = LirMappingDatum {
global_id,
lir_id,
operator,
parent_lir_id,
nesting,
operator_span,
};
self.output.lir_mapping.give((datum, ts, 1));
}
}
fn handle_dataflow_global(&mut self, id: usize, global_id: GlobalId) {
self.state
.dataflow_global_ids
.entry(id)
.and_modify(|globals| {
if !globals.insert(global_id.clone()) {
error!(%id, %global_id, "dataflow mapping already knew about this GlobalId");
}
})
.or_insert_with(|| BTreeSet::from([global_id.clone()]));
let ts = self.ts();
let datum = DataflowGlobalDatum { id, global_id };
self.output.dataflow_global_ids.give((datum, ts, 1));
}
}
pub struct CollectionLogging {
id: GlobalId,
logger: Logger,
logged_frontier: Option<Timestamp>,
logged_import_frontiers: BTreeMap<GlobalId, Timestamp>,
}
impl CollectionLogging {
pub fn new(
id: GlobalId,
logger: Logger,
dataflow_index: usize,
import_ids: impl Iterator<Item = GlobalId>,
) -> Self {
logger.log(ComputeEvent::Export { id, dataflow_index });
let mut self_ = Self {
id,
logger,
logged_frontier: None,
logged_import_frontiers: Default::default(),
};
let initial_frontier = Some(Timestamp::MIN);
self_.set_frontier(initial_frontier);
import_ids.for_each(|id| self_.set_import_frontier(id, initial_frontier));
self_
}
pub fn set_frontier(&mut self, new_time: Option<Timestamp>) {
let old_time = self.logged_frontier;
self.logged_frontier = new_time;
if old_time != new_time {
let id = self.id;
let retraction = old_time.map(|time| ComputeEvent::Frontier { id, time, diff: -1 });
let insertion = new_time.map(|time| ComputeEvent::Frontier { id, time, diff: 1 });
let events = retraction.into_iter().chain(insertion);
self.logger.log_many(events);
}
}
pub fn set_import_frontier(&mut self, import_id: GlobalId, new_time: Option<Timestamp>) {
let old_time = self.logged_import_frontiers.remove(&import_id);
if let Some(time) = new_time {
self.logged_import_frontiers.insert(import_id, time);
}
if old_time != new_time {
let export_id = self.id;
let retraction = old_time.map(|time| ComputeEvent::ImportFrontier {
import_id,
export_id,
time,
diff: -1,
});
let insertion = new_time.map(|time| ComputeEvent::ImportFrontier {
import_id,
export_id,
time,
diff: 1,
});
let events = retraction.into_iter().chain(insertion);
self.logger.log_many(events);
}
}
pub fn set_hydrated(&self) {
self.logger
.log(ComputeEvent::Hydration { export_id: self.id });
}
}
impl Drop for CollectionLogging {
fn drop(&mut self) {
self.set_frontier(None);
let import_ids: Vec<_> = self.logged_import_frontiers.keys().copied().collect();
for id in import_ids {
self.set_import_frontier(id, None);
}
self.logger.log(ComputeEvent::ExportDropped { id: self.id });
}
}
pub(crate) trait LogDataflowErrors {
fn log_dataflow_errors(self, logger: Logger, export_id: GlobalId) -> Self;
}
impl<G, D> LogDataflowErrors for Collection<G, D, Diff>
where
G: Scope,
D: Data,
{
fn log_dataflow_errors(self, logger: Logger, export_id: GlobalId) -> Self {
self.inner
.unary(Pipeline, "LogDataflowErrorsCollection", |_cap, _info| {
move |input, output| {
input.for_each(|cap, data| {
let diff = data.iter().map(|(_d, _t, r)| r).sum();
logger.log(ComputeEvent::ErrorCount { export_id, diff });
output.session(&cap).give_container(data);
});
}
})
.as_collection()
}
}
impl<G, B> LogDataflowErrors for Stream<G, B>
where
G: Scope,
for<'a> B: BatchReader<DiffGat<'a> = &'a Diff> + Clone + 'static,
{
fn log_dataflow_errors(self, logger: Logger, export_id: GlobalId) -> Self {
self.unary(Pipeline, "LogDataflowErrorsStream", |_cap, _info| {
move |input, output| {
input.for_each(|cap, data| {
let diff = data.iter().map(sum_batch_diffs).sum();
logger.log(ComputeEvent::ErrorCount { export_id, diff });
output.session(&cap).give_container(data);
});
}
})
}
}
fn sum_batch_diffs<B>(batch: &B) -> Diff
where
for<'a> B: BatchReader<DiffGat<'a> = &'a Diff>,
{
let mut sum = 0;
let mut cursor = batch.cursor();
while cursor.key_valid(batch) {
while cursor.val_valid(batch) {
cursor.map_times(batch, |_t, r| sum += r);
cursor.step_val(batch);
}
cursor.step_key(batch);
}
sum
}
#[cfg(test)]
mod tests {
use super::*;
#[mz_ore::test]
fn test_compute_event_size() {
assert_eq!(48, std::mem::size_of::<ComputeEvent>())
}
}