1use std::cell::RefCell;
13use std::collections::BTreeMap;
14use std::rc::Rc;
15use std::time::Duration;
16
17use differential_dataflow::consolidation::ConsolidatingContainerBuilder;
18use differential_dataflow::logging::{
19 BatchEvent, BatcherEvent, DifferentialEvent, DropEvent, MergeEvent, TraceShare,
20};
21use mz_ore::cast::CastFrom;
22use mz_repr::{Datum, Diff, Timestamp};
23use mz_timely_util::columnar::builder::ColumnBuilder;
24use mz_timely_util::columnar::{Col2ValBatcher, columnar_exchange};
25use mz_timely_util::replay::MzReplay;
26use timely::dataflow::channels::pact::{ExchangeCore, Pipeline};
27use timely::dataflow::operators::InputCapability;
28use timely::dataflow::operators::generic::builder_rc::OperatorBuilder;
29use timely::dataflow::operators::generic::operator::empty;
30use timely::dataflow::operators::generic::{OutputBuilder, Session};
31use timely::dataflow::{Scope, StreamVec};
32
33use crate::extensions::arrange::MzArrangeCore;
34use crate::logging::compute::{ArrangementHeapSizeOperatorDrop, ComputeEvent};
35use crate::logging::{
36 DifferentialLog, EventQueue, LogCollection, LogVariant, SharedLoggingState,
37 consolidate_and_pack,
38};
39use crate::row_spine::RowRowBuilder;
40use crate::typedefs::{KeyBatcher, RowRowSpine};
41
42pub(super) struct Return {
44 pub collections: BTreeMap<LogVariant, LogCollection>,
46}
47
48pub(super) fn construct(
56 scope: Scope<'_, Timestamp>,
57 config: &mz_compute_client::logging::LoggingConfig,
58 event_queue: EventQueue<Vec<(Duration, DifferentialEvent)>>,
59 shared_state: Rc<RefCell<SharedLoggingState>>,
60) -> Return {
61 let logging_interval_ms = std::cmp::max(1, config.interval.as_millis());
62
63 scope.scoped("differential logging", move |scope| {
64 let enable_logging = config.enable_logging;
65 let (logs, token) = if enable_logging {
66 event_queue.links.mz_replay(
67 scope,
68 "differential logs",
69 config.interval,
70 event_queue.activator,
71 )
72 } else {
73 let token: Rc<dyn std::any::Any> = Rc::new(Box::new(()));
74 (empty(scope), token)
75 };
76
77 let mut demux =
80 OperatorBuilder::new("Differential Logging Demux".to_string(), scope.clone());
81 let mut input = demux.new_input(logs, Pipeline);
82 let (batches_out, batches) = demux.new_output();
83 let (records_out, records) = demux.new_output();
84 let (sharing_out, sharing) = demux.new_output();
85 let (batcher_records_out, batcher_records) = demux.new_output();
86 let (batcher_size_out, batcher_size) = demux.new_output();
87 let (batcher_capacity_out, batcher_capacity) = demux.new_output();
88 let (batcher_allocations_out, batcher_allocations) = demux.new_output();
89
90 let mut batches_out = OutputBuilder::from(batches_out);
91 let mut records_out = OutputBuilder::from(records_out);
92 let mut sharing_out = OutputBuilder::from(sharing_out);
93 let mut batcher_records_out = OutputBuilder::from(batcher_records_out);
94 let mut batcher_size_out = OutputBuilder::from(batcher_size_out);
95 let mut batcher_capacity_out = OutputBuilder::from(batcher_capacity_out);
96 let mut batcher_allocations_out = OutputBuilder::from(batcher_allocations_out);
97
98 let mut demux_state = Default::default();
99 demux.build(move |_capability| {
100 move |_frontiers| {
101 let mut batches = batches_out.activate();
102 let mut records = records_out.activate();
103 let mut sharing = sharing_out.activate();
104 let mut batcher_records = batcher_records_out.activate();
105 let mut batcher_size = batcher_size_out.activate();
106 let mut batcher_capacity = batcher_capacity_out.activate();
107 let mut batcher_allocations = batcher_allocations_out.activate();
108
109 input.for_each_time(|cap, data| {
110 let mut output_buffers = DemuxOutput {
111 batches: batches.session_with_builder(&cap),
112 records: records.session_with_builder(&cap),
113 sharing: sharing.session_with_builder(&cap),
114 batcher_records: batcher_records.session_with_builder(&cap),
115 batcher_size: batcher_size.session_with_builder(&cap),
116 batcher_capacity: batcher_capacity.session_with_builder(&cap),
117 batcher_allocations: batcher_allocations.session_with_builder(&cap),
118 };
119
120 for (time, event) in data.flat_map(|data: &mut Vec<_>| data.drain(..)) {
121 DemuxHandler {
122 state: &mut demux_state,
123 output: &mut output_buffers,
124 logging_interval_ms,
125 time,
126 shared_state: &mut shared_state.borrow_mut(),
127 }
128 .handle(event);
129 }
130 });
131 }
132 });
133
134 fn stream_to_collection<'scope>(
137 input: StreamVec<'scope, Timestamp, ((usize, ()), Timestamp, Diff)>,
138 log: DifferentialLog,
139 worker_id: usize,
140 ) -> timely::dataflow::Stream<
141 'scope,
142 Timestamp,
143 mz_timely_util::columnar::Column<((mz_repr::Row, mz_repr::Row), Timestamp, Diff)>,
144 > {
145 consolidate_and_pack::<KeyBatcher<_, _, _>, ColumnBuilder<_>, _, _>(
146 input,
147 log,
148 move |data, packer, session| {
149 for ((op, ()), time, diff) in data.iter() {
150 let data = packer.pack_slice(&[
151 Datum::UInt64(u64::cast_from(*op)),
152 Datum::UInt64(u64::cast_from(worker_id)),
153 ]);
154 session.give((data, *time, *diff))
155 }
156 },
157 )
158 }
159 let worker_id = scope.index();
160
161 let arrangement_batches = stream_to_collection(batches, ArrangementBatches, worker_id);
163 let arrangement_records = stream_to_collection(records, ArrangementRecords, worker_id);
164 let sharing = stream_to_collection(sharing, Sharing, worker_id);
165 let batcher_records = stream_to_collection(batcher_records, BatcherRecords, worker_id);
166 let batcher_size = stream_to_collection(batcher_size, BatcherSize, worker_id);
167 let batcher_capacity = stream_to_collection(batcher_capacity, BatcherCapacity, worker_id);
168 let batcher_allocations =
169 stream_to_collection(batcher_allocations, BatcherAllocations, worker_id);
170
171 use DifferentialLog::*;
172 let logs = [
173 (ArrangementBatches, arrangement_batches),
174 (ArrangementRecords, arrangement_records),
175 (Sharing, sharing),
176 (BatcherRecords, batcher_records),
177 (BatcherSize, batcher_size),
178 (BatcherCapacity, batcher_capacity),
179 (BatcherAllocations, batcher_allocations),
180 ];
181
182 let mut collections = BTreeMap::new();
184 for (variant, collection) in logs {
185 let variant = LogVariant::Differential(variant);
186 if config.index_logs.contains_key(&variant) {
187 let exchange = ExchangeCore::<ColumnBuilder<_>, _>::new_core(
188 columnar_exchange::<mz_repr::Row, mz_repr::Row, Timestamp, mz_repr::Diff>,
189 );
190 let trace = collection
191 .mz_arrange_core::<
192 _,
193 Col2ValBatcher<_, _, _, _>,
194 RowRowBuilder<_, _>,
195 RowRowSpine<_, _>,
196 >(exchange, &format!("Arrange {variant:?}"))
197 .trace;
198 let collection = LogCollection {
199 trace,
200 token: Rc::clone(&token),
201 };
202 collections.insert(variant, collection);
203 }
204 }
205
206 Return { collections }
207 })
208}
209
210type OutputSession<'a, 'b, D> = Session<
211 'a,
212 'b,
213 Timestamp,
214 ConsolidatingContainerBuilder<Vec<(D, Timestamp, Diff)>>,
215 InputCapability<Timestamp>,
216>;
217
218struct DemuxOutput<'a, 'b> {
220 batches: OutputSession<'a, 'b, (usize, ())>,
221 records: OutputSession<'a, 'b, (usize, ())>,
222 sharing: OutputSession<'a, 'b, (usize, ())>,
223 batcher_records: OutputSession<'a, 'b, (usize, ())>,
224 batcher_size: OutputSession<'a, 'b, (usize, ())>,
225 batcher_capacity: OutputSession<'a, 'b, (usize, ())>,
226 batcher_allocations: OutputSession<'a, 'b, (usize, ())>,
227}
228
229#[derive(Default)]
231struct DemuxState {
232 sharing: BTreeMap<usize, usize>,
234}
235
236struct DemuxHandler<'a, 'b, 'c> {
238 state: &'a mut DemuxState,
240 output: &'a mut DemuxOutput<'b, 'c>,
242 logging_interval_ms: u128,
244 time: Duration,
246 shared_state: &'a mut SharedLoggingState,
248}
249
250impl DemuxHandler<'_, '_, '_> {
251 fn ts(&self) -> Timestamp {
254 let time_ms = self.time.as_millis();
255 let interval = self.logging_interval_ms;
256 let rounded = (time_ms / interval + 1) * interval;
257 rounded.try_into().expect("must fit")
258 }
259
260 fn handle(&mut self, event: DifferentialEvent) {
262 use DifferentialEvent::*;
263
264 match event {
265 Batch(e) => self.handle_batch(e),
266 Merge(e) => self.handle_merge(e),
267 Drop(e) => self.handle_drop(e),
268 TraceShare(e) => self.handle_trace_share(e),
269 Batcher(e) => self.handle_batcher_event(e),
270 MergeShortfall(_) => (),
271 }
272 }
273
274 fn handle_batch(&mut self, event: BatchEvent) {
275 let ts = self.ts();
276 let operator_id = event.operator;
277 self.output.batches.give(((operator_id, ()), ts, Diff::ONE));
278
279 let diff = Diff::try_from(event.length).expect("must fit");
280 self.output.records.give(((operator_id, ()), ts, diff));
281 self.notify_arrangement_size(operator_id);
282 }
283
284 fn handle_merge(&mut self, event: MergeEvent) {
285 let Some(done) = event.complete else { return };
286
287 let ts = self.ts();
288 let operator_id = event.operator;
289 self.output
290 .batches
291 .give(((operator_id, ()), ts, Diff::MINUS_ONE));
292
293 let diff = Diff::try_from(done).expect("must fit")
294 - Diff::try_from(event.length1 + event.length2).expect("must fit");
295 if diff != Diff::ZERO {
296 self.output.records.give(((operator_id, ()), ts, diff));
297 }
298 self.notify_arrangement_size(operator_id);
299 }
300
301 fn handle_drop(&mut self, event: DropEvent) {
302 let ts = self.ts();
303 let operator_id = event.operator;
304 self.output
305 .batches
306 .give(((operator_id, ()), ts, Diff::MINUS_ONE));
307
308 let diff = -Diff::try_from(event.length).expect("must fit");
309 if diff != Diff::ZERO {
310 self.output.records.give(((operator_id, ()), ts, diff));
311 }
312 self.notify_arrangement_size(operator_id);
313 }
314
315 fn handle_trace_share(&mut self, event: TraceShare) {
316 let ts = self.ts();
317 let operator_id = event.operator;
318 let diff = Diff::cast_from(event.diff);
319 debug_assert_ne!(diff, Diff::ZERO);
320 self.output.sharing.give(((operator_id, ()), ts, diff));
321
322 let sharing = self.state.sharing.entry(operator_id).or_default();
323 *sharing = (Diff::try_from(*sharing).expect("must fit") + diff)
324 .into_inner()
325 .try_into()
326 .expect("under/overflow");
327 if *sharing == 0 {
328 self.state.sharing.remove(&operator_id);
329 self.shared_state.compute_logger.as_ref().map(|logger| {
330 logger.log(&ComputeEvent::ArrangementHeapSizeOperatorDrop(
331 ArrangementHeapSizeOperatorDrop { operator_id },
332 ))
333 });
334 }
335 }
336
337 fn handle_batcher_event(&mut self, event: BatcherEvent) {
338 let ts = self.ts();
339 let operator_id = event.operator;
340 let records_diff = Diff::cast_from(event.records_diff);
341 let size_diff = Diff::cast_from(event.size_diff);
342 let capacity_diff = Diff::cast_from(event.capacity_diff);
343 let allocations_diff = Diff::cast_from(event.allocations_diff);
344 self.output
345 .batcher_records
346 .give(((operator_id, ()), ts, records_diff));
347 self.output
348 .batcher_size
349 .give(((operator_id, ()), ts, size_diff));
350 self.output
351 .batcher_capacity
352 .give(((operator_id, ()), ts, capacity_diff));
353 self.output
354 .batcher_allocations
355 .give(((operator_id, ()), ts, allocations_diff));
356 }
357
358 fn notify_arrangement_size(&self, operator: usize) {
359 if let Some(activator) = self.shared_state.arrangement_size_activators.get(&operator) {
364 activator.activate();
365 }
366 }
367}