1use std::cell::RefCell;
13use std::collections::BTreeMap;
14use std::rc::Rc;
15use std::time::Duration;
16
17use differential_dataflow::consolidation::ConsolidatingContainerBuilder;
18use differential_dataflow::logging::{
19 BatchEvent, BatcherEvent, DifferentialEvent, DropEvent, MergeEvent, TraceShare,
20};
21use mz_ore::cast::CastFrom;
22use mz_repr::{Datum, Diff, Timestamp};
23use mz_timely_util::containers::{
24 Col2ValBatcher, ColumnBuilder, ProvidedBuilder, columnar_exchange,
25};
26use mz_timely_util::replay::MzReplay;
27use timely::dataflow::channels::pact::{ExchangeCore, Pipeline};
28use timely::dataflow::channels::pushers::buffer::Session;
29use timely::dataflow::channels::pushers::{Counter, Tee};
30use timely::dataflow::operators::generic::builder_rc::OperatorBuilder;
31use timely::dataflow::{Scope, Stream};
32
33use crate::extensions::arrange::MzArrangeCore;
34use crate::logging::compute::{ArrangementHeapSizeOperatorDrop, ComputeEvent};
35use crate::logging::{
36 DifferentialLog, EventQueue, LogCollection, LogVariant, SharedLoggingState,
37 consolidate_and_pack,
38};
39use crate::row_spine::RowRowBuilder;
40use crate::typedefs::{KeyBatcher, RowRowSpine};
41
42pub(super) struct Return {
44 pub collections: BTreeMap<LogVariant, LogCollection>,
46}
47
48pub(super) fn construct<G: Scope<Timestamp = Timestamp>>(
56 mut scope: G,
57 config: &mz_compute_client::logging::LoggingConfig,
58 event_queue: EventQueue<Vec<(Duration, DifferentialEvent)>>,
59 shared_state: Rc<RefCell<SharedLoggingState>>,
60) -> Return {
61 let logging_interval_ms = std::cmp::max(1, config.interval.as_millis());
62
63 scope.scoped("differential logging", move |scope| {
64 let enable_logging = config.enable_logging;
65 let (logs, token) = event_queue.links
66 .mz_replay::<_, ProvidedBuilder<_>, _>(
67 scope,
68 "differential logs",
69 config.interval,
70 event_queue.activator,
71 move |mut session, mut data|{
72 if enable_logging {
75 session.give_container(data.to_mut())
76 }
77 }
78 );
79
80 let mut demux =
83 OperatorBuilder::new("Differential Logging Demux".to_string(), scope.clone());
84 let mut input = demux.new_input(&logs, Pipeline);
85 let (mut batches_out, batches) = demux.new_output();
86 let (mut records_out, records) = demux.new_output();
87 let (mut sharing_out, sharing) = demux.new_output();
88 let (mut batcher_records_out, batcher_records) = demux.new_output();
89 let (mut batcher_size_out, batcher_size) = demux.new_output();
90 let (mut batcher_capacity_out, batcher_capacity) = demux.new_output();
91 let (mut batcher_allocations_out, batcher_allocations) = demux.new_output();
92
93 let mut demux_state = Default::default();
94 demux.build(move |_capability| {
95 move |_frontiers| {
96 let mut batches = batches_out.activate();
97 let mut records = records_out.activate();
98 let mut sharing = sharing_out.activate();
99 let mut batcher_records = batcher_records_out.activate();
100 let mut batcher_size = batcher_size_out.activate();
101 let mut batcher_capacity = batcher_capacity_out.activate();
102 let mut batcher_allocations = batcher_allocations_out.activate();
103
104 input.for_each(|cap, data| {
105 let mut output_buffers = DemuxOutput {
106 batches: batches.session_with_builder(&cap),
107 records: records.session_with_builder(&cap),
108 sharing: sharing.session_with_builder(&cap),
109 batcher_records: batcher_records.session_with_builder(&cap),
110 batcher_size: batcher_size.session_with_builder(&cap),
111 batcher_capacity: batcher_capacity.session_with_builder(&cap),
112 batcher_allocations: batcher_allocations.session_with_builder(&cap),
113 };
114
115 for (time, event) in data.drain(..) {
116 DemuxHandler {
117 state: &mut demux_state,
118 output: &mut output_buffers,
119 logging_interval_ms,
120 time,
121 shared_state: &mut shared_state.borrow_mut(),
122 }
123 .handle(event);
124 }
125 });
126 }
127 });
128
129 let stream_to_collection = |input: &Stream<_, ((usize, ()), Timestamp, Diff)>, log| {
132 let worker_id = scope.index();
133 consolidate_and_pack::<_, KeyBatcher<_, _, _>, ColumnBuilder<_>, _, _>(
134 input,
135 log,
136 move |((op, ()), time, diff), packer, session| {
137 let data = packer.pack_slice(&[
138 Datum::UInt64(u64::cast_from(*op)),
139 Datum::UInt64(u64::cast_from(worker_id)),
140 ]);
141 session.give((data, *time, *diff))
142 },
143 )
144 };
145
146 let arrangement_batches = stream_to_collection(&batches, ArrangementBatches);
148 let arrangement_records = stream_to_collection(&records, ArrangementRecords);
149 let sharing = stream_to_collection(&sharing, Sharing);
150 let batcher_records = stream_to_collection(&batcher_records, BatcherRecords);
151 let batcher_size = stream_to_collection(&batcher_size, BatcherSize);
152 let batcher_capacity = stream_to_collection(&batcher_capacity, BatcherCapacity);
153 let batcher_allocations = stream_to_collection(&batcher_allocations, BatcherAllocations);
154
155 use DifferentialLog::*;
156 let logs = [
157 (ArrangementBatches, arrangement_batches),
158 (ArrangementRecords, arrangement_records),
159 (Sharing, sharing),
160 (BatcherRecords, batcher_records),
161 (BatcherSize, batcher_size),
162 (BatcherCapacity, batcher_capacity),
163 (BatcherAllocations, batcher_allocations),
164 ];
165
166 let mut collections = BTreeMap::new();
168 for (variant, collection) in logs {
169 let variant = LogVariant::Differential(variant);
170 if config.index_logs.contains_key(&variant) {
171 let trace = collection
172 .mz_arrange_core::<_, Col2ValBatcher<_, _, _, _>, RowRowBuilder<_, _>, RowRowSpine<_, _>>(
173 ExchangeCore::<ColumnBuilder<_>, _>::new_core(columnar_exchange::<mz_repr::Row, mz_repr::Row, Timestamp, mz_repr::Diff>),
174 &format!("Arrange {variant:?}"),
175 )
176 .trace;
177 let collection = LogCollection {
178 trace,
179 token: Rc::clone(&token),
180 };
181 collections.insert(variant, collection);
182 }
183 }
184
185 Return { collections, }
186 })
187}
188
189type Pusher<D> =
190 Counter<Timestamp, Vec<(D, Timestamp, Diff)>, Tee<Timestamp, Vec<(D, Timestamp, Diff)>>>;
191type OutputSession<'a, D> =
192 Session<'a, Timestamp, ConsolidatingContainerBuilder<Vec<(D, Timestamp, Diff)>>, Pusher<D>>;
193
194struct DemuxOutput<'a> {
196 batches: OutputSession<'a, (usize, ())>,
197 records: OutputSession<'a, (usize, ())>,
198 sharing: OutputSession<'a, (usize, ())>,
199 batcher_records: OutputSession<'a, (usize, ())>,
200 batcher_size: OutputSession<'a, (usize, ())>,
201 batcher_capacity: OutputSession<'a, (usize, ())>,
202 batcher_allocations: OutputSession<'a, (usize, ())>,
203}
204
205#[derive(Default)]
207struct DemuxState {
208 sharing: BTreeMap<usize, usize>,
210}
211
212struct DemuxHandler<'a, 'b> {
214 state: &'a mut DemuxState,
216 output: &'a mut DemuxOutput<'b>,
218 logging_interval_ms: u128,
220 time: Duration,
222 shared_state: &'a mut SharedLoggingState,
224}
225
226impl DemuxHandler<'_, '_> {
227 fn ts(&self) -> Timestamp {
230 let time_ms = self.time.as_millis();
231 let interval = self.logging_interval_ms;
232 let rounded = (time_ms / interval + 1) * interval;
233 rounded.try_into().expect("must fit")
234 }
235
236 fn handle(&mut self, event: DifferentialEvent) {
238 use DifferentialEvent::*;
239
240 match event {
241 Batch(e) => self.handle_batch(e),
242 Merge(e) => self.handle_merge(e),
243 Drop(e) => self.handle_drop(e),
244 TraceShare(e) => self.handle_trace_share(e),
245 Batcher(e) => self.handle_batcher_event(e),
246 _ => (),
247 }
248 }
249
250 fn handle_batch(&mut self, event: BatchEvent) {
251 let ts = self.ts();
252 let operator_id = event.operator;
253 self.output.batches.give(((operator_id, ()), ts, Diff::ONE));
254
255 let diff = Diff::try_from(event.length).expect("must fit");
256 self.output.records.give(((operator_id, ()), ts, diff));
257 self.notify_arrangement_size(operator_id);
258 }
259
260 fn handle_merge(&mut self, event: MergeEvent) {
261 let Some(done) = event.complete else { return };
262
263 let ts = self.ts();
264 let operator_id = event.operator;
265 self.output
266 .batches
267 .give(((operator_id, ()), ts, Diff::MINUS_ONE));
268
269 let diff = Diff::try_from(done).expect("must fit")
270 - Diff::try_from(event.length1 + event.length2).expect("must fit");
271 if diff != Diff::ZERO {
272 self.output.records.give(((operator_id, ()), ts, diff));
273 }
274 self.notify_arrangement_size(operator_id);
275 }
276
277 fn handle_drop(&mut self, event: DropEvent) {
278 let ts = self.ts();
279 let operator_id = event.operator;
280 self.output
281 .batches
282 .give(((operator_id, ()), ts, Diff::MINUS_ONE));
283
284 let diff = -Diff::try_from(event.length).expect("must fit");
285 if diff != Diff::ZERO {
286 self.output.records.give(((operator_id, ()), ts, diff));
287 }
288 self.notify_arrangement_size(operator_id);
289 }
290
291 fn handle_trace_share(&mut self, event: TraceShare) {
292 let ts = self.ts();
293 let operator_id = event.operator;
294 let diff = Diff::cast_from(event.diff);
295 debug_assert_ne!(diff, Diff::ZERO);
296 self.output.sharing.give(((operator_id, ()), ts, diff));
297
298 let sharing = self.state.sharing.entry(operator_id).or_default();
299 *sharing = (Diff::try_from(*sharing).expect("must fit") + diff)
300 .into_inner()
301 .try_into()
302 .expect("under/overflow");
303 if *sharing == 0 {
304 self.state.sharing.remove(&operator_id);
305 self.shared_state.compute_logger.as_ref().map(|logger| {
306 logger.log(&ComputeEvent::ArrangementHeapSizeOperatorDrop(
307 ArrangementHeapSizeOperatorDrop { operator_id },
308 ))
309 });
310 }
311 }
312
313 fn handle_batcher_event(&mut self, event: BatcherEvent) {
314 let ts = self.ts();
315 let operator_id = event.operator;
316 let records_diff = Diff::cast_from(event.records_diff);
317 let size_diff = Diff::cast_from(event.size_diff);
318 let capacity_diff = Diff::cast_from(event.capacity_diff);
319 let allocations_diff = Diff::cast_from(event.allocations_diff);
320 self.output
321 .batcher_records
322 .give(((operator_id, ()), ts, records_diff));
323 self.output
324 .batcher_size
325 .give(((operator_id, ()), ts, size_diff));
326 self.output
327 .batcher_capacity
328 .give(((operator_id, ()), ts, capacity_diff));
329 self.output
330 .batcher_allocations
331 .give(((operator_id, ()), ts, allocations_diff));
332 }
333
334 fn notify_arrangement_size(&self, operator: usize) {
335 if let Some(activator) = self.shared_state.arrangement_size_activators.get(&operator) {
340 activator.activate();
341 }
342 }
343}