1use std::cell::RefCell;
13use std::collections::BTreeMap;
14use std::rc::Rc;
15use std::time::Duration;
16
17use differential_dataflow::consolidation::ConsolidatingContainerBuilder;
18use differential_dataflow::logging::{
19 BatchEvent, BatcherEvent, DifferentialEvent, DropEvent, MergeEvent, TraceShare,
20};
21use mz_ore::cast::CastFrom;
22use mz_repr::{Datum, Diff, Timestamp};
23use mz_timely_util::columnar::batcher;
24use mz_timely_util::columnar::builder::ColumnBuilder;
25use mz_timely_util::columnar::{Col2ValBatcher, columnar_exchange};
26use mz_timely_util::columnation::ColumnationChunker;
27use mz_timely_util::replay::MzReplay;
28use timely::dataflow::channels::pact::{ExchangeCore, Pipeline};
29use timely::dataflow::operators::InputCapability;
30use timely::dataflow::operators::generic::builder_rc::OperatorBuilder;
31use timely::dataflow::operators::generic::operator::empty;
32use timely::dataflow::operators::generic::{OutputBuilder, Session};
33use timely::dataflow::{Scope, StreamVec};
34
35use crate::extensions::arrange::MzArrangeCore;
36use crate::logging::compute::{ArrangementHeapSizeOperatorDrop, ComputeEvent};
37use crate::logging::{
38 DifferentialLog, EventQueue, LogCollection, LogVariant, SharedLoggingState,
39 consolidate_and_pack,
40};
41use crate::typedefs::{KeyBatcher, RowRowSpine};
42use mz_row_spine::RowRowBuilder;
43
44pub(super) struct Return {
46 pub collections: BTreeMap<LogVariant, LogCollection>,
48}
49
50pub(super) fn construct(
58 scope: Scope<'_, Timestamp>,
59 config: &mz_compute_client::logging::LoggingConfig,
60 event_queue: EventQueue<Vec<(Duration, DifferentialEvent)>>,
61 shared_state: Rc<RefCell<SharedLoggingState>>,
62) -> Return {
63 let logging_interval_ms = std::cmp::max(1, config.interval.as_millis());
64
65 scope.scoped("differential logging", move |scope| {
66 let enable_logging = config.enable_logging;
67 let (logs, token) = if enable_logging {
68 event_queue.links.mz_replay(
69 scope,
70 "differential logs",
71 config.interval,
72 event_queue.activator,
73 )
74 } else {
75 let token: Rc<dyn std::any::Any> = Rc::new(Box::new(()));
76 (empty(scope), token)
77 };
78
79 let mut demux =
82 OperatorBuilder::new("Differential Logging Demux".to_string(), scope.clone());
83 let mut input = demux.new_input(logs, Pipeline);
84 let (batches_out, batches) = demux.new_output();
85 let (records_out, records) = demux.new_output();
86 let (sharing_out, sharing) = demux.new_output();
87 let (batcher_records_out, batcher_records) = demux.new_output();
88 let (batcher_size_out, batcher_size) = demux.new_output();
89 let (batcher_capacity_out, batcher_capacity) = demux.new_output();
90 let (batcher_allocations_out, batcher_allocations) = demux.new_output();
91
92 let mut batches_out = OutputBuilder::from(batches_out);
93 let mut records_out = OutputBuilder::from(records_out);
94 let mut sharing_out = OutputBuilder::from(sharing_out);
95 let mut batcher_records_out = OutputBuilder::from(batcher_records_out);
96 let mut batcher_size_out = OutputBuilder::from(batcher_size_out);
97 let mut batcher_capacity_out = OutputBuilder::from(batcher_capacity_out);
98 let mut batcher_allocations_out = OutputBuilder::from(batcher_allocations_out);
99
100 let mut demux_state = Default::default();
101 demux.build(move |_capability| {
102 move |_frontiers| {
103 let mut batches = batches_out.activate();
104 let mut records = records_out.activate();
105 let mut sharing = sharing_out.activate();
106 let mut batcher_records = batcher_records_out.activate();
107 let mut batcher_size = batcher_size_out.activate();
108 let mut batcher_capacity = batcher_capacity_out.activate();
109 let mut batcher_allocations = batcher_allocations_out.activate();
110
111 input.for_each_time(|cap, data| {
112 let mut output_buffers = DemuxOutput {
113 batches: batches.session_with_builder(&cap),
114 records: records.session_with_builder(&cap),
115 sharing: sharing.session_with_builder(&cap),
116 batcher_records: batcher_records.session_with_builder(&cap),
117 batcher_size: batcher_size.session_with_builder(&cap),
118 batcher_capacity: batcher_capacity.session_with_builder(&cap),
119 batcher_allocations: batcher_allocations.session_with_builder(&cap),
120 };
121
122 for (time, event) in data.flat_map(|data: &mut Vec<_>| data.drain(..)) {
123 DemuxHandler {
124 state: &mut demux_state,
125 output: &mut output_buffers,
126 logging_interval_ms,
127 time,
128 shared_state: &mut shared_state.borrow_mut(),
129 }
130 .handle(event);
131 }
132 });
133 }
134 });
135
136 fn stream_to_collection<'scope>(
139 input: StreamVec<'scope, Timestamp, ((usize, ()), Timestamp, Diff)>,
140 log: DifferentialLog,
141 worker_id: usize,
142 ) -> timely::dataflow::Stream<
143 'scope,
144 Timestamp,
145 mz_timely_util::columnar::Column<((mz_repr::Row, mz_repr::Row), Timestamp, Diff)>,
146 > {
147 consolidate_and_pack::<
148 ColumnationChunker<_>,
149 KeyBatcher<_, _, _>,
150 ColumnBuilder<_>,
151 _,
152 _,
153 _,
154 >(input, log, move |data, packer, session| {
155 for ((op, ()), time, diff) in data.iter() {
156 let data = packer.pack_slice(&[
157 Datum::UInt64(u64::cast_from(*op)),
158 Datum::UInt64(u64::cast_from(worker_id)),
159 ]);
160 session.give((data, *time, *diff))
161 }
162 })
163 }
164 let worker_id = scope.index();
165
166 let arrangement_batches = stream_to_collection(batches, ArrangementBatches, worker_id);
168 let arrangement_records = stream_to_collection(records, ArrangementRecords, worker_id);
169 let sharing = stream_to_collection(sharing, Sharing, worker_id);
170 let batcher_records = stream_to_collection(batcher_records, BatcherRecords, worker_id);
171 let batcher_size = stream_to_collection(batcher_size, BatcherSize, worker_id);
172 let batcher_capacity = stream_to_collection(batcher_capacity, BatcherCapacity, worker_id);
173 let batcher_allocations =
174 stream_to_collection(batcher_allocations, BatcherAllocations, worker_id);
175
176 use DifferentialLog::*;
177 let logs = [
178 (ArrangementBatches, arrangement_batches),
179 (ArrangementRecords, arrangement_records),
180 (Sharing, sharing),
181 (BatcherRecords, batcher_records),
182 (BatcherSize, batcher_size),
183 (BatcherCapacity, batcher_capacity),
184 (BatcherAllocations, batcher_allocations),
185 ];
186
187 let mut collections = BTreeMap::new();
189 for (variant, collection) in logs {
190 let variant = LogVariant::Differential(variant);
191 if config.index_logs.contains_key(&variant) {
192 let exchange = ExchangeCore::<ColumnBuilder<_>, _>::new_core(
193 columnar_exchange::<mz_repr::Row, mz_repr::Row, Timestamp, mz_repr::Diff>,
194 );
195 let trace = collection
196 .mz_arrange_core::<
197 _,
198 batcher::Chunker<_>,
199 Col2ValBatcher<_, _, _, _>,
200 RowRowBuilder<_, _>,
201 RowRowSpine<_, _>,
202 >(exchange, &format!("Arrange {variant:?}"))
203 .trace;
204 let collection = LogCollection {
205 trace,
206 token: Rc::clone(&token),
207 };
208 collections.insert(variant, collection);
209 }
210 }
211
212 Return { collections }
213 })
214}
215
216type OutputSession<'a, 'b, D> = Session<
217 'a,
218 'b,
219 Timestamp,
220 ConsolidatingContainerBuilder<Vec<(D, Timestamp, Diff)>>,
221 InputCapability<Timestamp>,
222>;
223
224struct DemuxOutput<'a, 'b> {
226 batches: OutputSession<'a, 'b, (usize, ())>,
227 records: OutputSession<'a, 'b, (usize, ())>,
228 sharing: OutputSession<'a, 'b, (usize, ())>,
229 batcher_records: OutputSession<'a, 'b, (usize, ())>,
230 batcher_size: OutputSession<'a, 'b, (usize, ())>,
231 batcher_capacity: OutputSession<'a, 'b, (usize, ())>,
232 batcher_allocations: OutputSession<'a, 'b, (usize, ())>,
233}
234
235#[derive(Default)]
237struct DemuxState {
238 sharing: BTreeMap<usize, usize>,
240}
241
242struct DemuxHandler<'a, 'b, 'c> {
244 state: &'a mut DemuxState,
246 output: &'a mut DemuxOutput<'b, 'c>,
248 logging_interval_ms: u128,
250 time: Duration,
252 shared_state: &'a mut SharedLoggingState,
254}
255
256impl DemuxHandler<'_, '_, '_> {
257 fn ts(&self) -> Timestamp {
260 let time_ms = self.time.as_millis();
261 let interval = self.logging_interval_ms;
262 let rounded = (time_ms / interval + 1) * interval;
263 rounded.try_into().expect("must fit")
264 }
265
266 fn handle(&mut self, event: DifferentialEvent) {
268 use DifferentialEvent::*;
269
270 match event {
271 Batch(e) => self.handle_batch(e),
272 Merge(e) => self.handle_merge(e),
273 Drop(e) => self.handle_drop(e),
274 TraceShare(e) => self.handle_trace_share(e),
275 Batcher(e) => self.handle_batcher_event(e),
276 MergeShortfall(_) => (),
277 }
278 }
279
280 fn handle_batch(&mut self, event: BatchEvent) {
281 let ts = self.ts();
282 let operator_id = event.operator;
283 self.output.batches.give(((operator_id, ()), ts, Diff::ONE));
284
285 let diff = Diff::try_from(event.length).expect("must fit");
286 self.output.records.give(((operator_id, ()), ts, diff));
287 self.notify_arrangement_size(operator_id);
288 }
289
290 fn handle_merge(&mut self, event: MergeEvent) {
291 let Some(done) = event.complete else { return };
292
293 let ts = self.ts();
294 let operator_id = event.operator;
295 self.output
296 .batches
297 .give(((operator_id, ()), ts, Diff::MINUS_ONE));
298
299 let diff = Diff::try_from(done).expect("must fit")
300 - Diff::try_from(event.length1 + event.length2).expect("must fit");
301 if diff != Diff::ZERO {
302 self.output.records.give(((operator_id, ()), ts, diff));
303 }
304 self.notify_arrangement_size(operator_id);
305 }
306
307 fn handle_drop(&mut self, event: DropEvent) {
308 let ts = self.ts();
309 let operator_id = event.operator;
310 self.output
311 .batches
312 .give(((operator_id, ()), ts, Diff::MINUS_ONE));
313
314 let diff = -Diff::try_from(event.length).expect("must fit");
315 if diff != Diff::ZERO {
316 self.output.records.give(((operator_id, ()), ts, diff));
317 }
318 self.notify_arrangement_size(operator_id);
319 }
320
321 fn handle_trace_share(&mut self, event: TraceShare) {
322 let ts = self.ts();
323 let operator_id = event.operator;
324 let diff = Diff::cast_from(event.diff);
325 debug_assert_ne!(diff, Diff::ZERO);
326 self.output.sharing.give(((operator_id, ()), ts, diff));
327
328 let sharing = self.state.sharing.entry(operator_id).or_default();
329 *sharing = (Diff::try_from(*sharing).expect("must fit") + diff)
330 .into_inner()
331 .try_into()
332 .expect("under/overflow");
333 if *sharing == 0 {
334 self.state.sharing.remove(&operator_id);
335 self.shared_state.compute_logger.as_ref().map(|logger| {
336 logger.log(&ComputeEvent::ArrangementHeapSizeOperatorDrop(
337 ArrangementHeapSizeOperatorDrop { operator_id },
338 ))
339 });
340 }
341 }
342
343 fn handle_batcher_event(&mut self, event: BatcherEvent) {
344 let ts = self.ts();
345 let operator_id = event.operator;
346 let records_diff = Diff::cast_from(event.records_diff);
347 let size_diff = Diff::cast_from(event.size_diff);
348 let capacity_diff = Diff::cast_from(event.capacity_diff);
349 let allocations_diff = Diff::cast_from(event.allocations_diff);
350 self.output
351 .batcher_records
352 .give(((operator_id, ()), ts, records_diff));
353 self.output
354 .batcher_size
355 .give(((operator_id, ()), ts, size_diff));
356 self.output
357 .batcher_capacity
358 .give(((operator_id, ()), ts, capacity_diff));
359 self.output
360 .batcher_allocations
361 .give(((operator_id, ()), ts, allocations_diff));
362 }
363
364 fn notify_arrangement_size(&self, operator: usize) {
365 if let Some(activator) = self.shared_state.arrangement_size_activators.get(&operator) {
370 activator.activate();
371 }
372 }
373}