1use std::cell::RefCell;
13use std::collections::BTreeMap;
14use std::rc::Rc;
15use std::time::Duration;
16
17use differential_dataflow::consolidation::ConsolidatingContainerBuilder;
18use differential_dataflow::logging::{
19 BatchEvent, BatcherEvent, DifferentialEvent, DropEvent, MergeEvent, TraceShare,
20};
21use mz_ore::cast::CastFrom;
22use mz_repr::{Datum, Diff, Timestamp};
23use mz_timely_util::columnar::builder::ColumnBuilder;
24use mz_timely_util::columnar::{Col2ValBatcher, columnar_exchange};
25use mz_timely_util::replay::MzReplay;
26use timely::dataflow::channels::pact::{ExchangeCore, Pipeline};
27use timely::dataflow::operators::InputCapability;
28use timely::dataflow::operators::generic::builder_rc::OperatorBuilder;
29use timely::dataflow::operators::generic::operator::empty;
30use timely::dataflow::operators::generic::{OutputBuilder, Session};
31use timely::dataflow::{Scope, Stream};
32
33use crate::extensions::arrange::MzArrangeCore;
34use crate::logging::compute::{ArrangementHeapSizeOperatorDrop, ComputeEvent};
35use crate::logging::{
36 DifferentialLog, EventQueue, LogCollection, LogVariant, SharedLoggingState,
37 consolidate_and_pack,
38};
39use crate::row_spine::RowRowBuilder;
40use crate::typedefs::{KeyBatcher, RowRowSpine};
41
42pub(super) struct Return {
44 pub collections: BTreeMap<LogVariant, LogCollection>,
46}
47
48pub(super) fn construct<G: Scope<Timestamp = Timestamp>>(
56 mut scope: G,
57 config: &mz_compute_client::logging::LoggingConfig,
58 event_queue: EventQueue<Vec<(Duration, DifferentialEvent)>>,
59 shared_state: Rc<RefCell<SharedLoggingState>>,
60) -> Return {
61 let logging_interval_ms = std::cmp::max(1, config.interval.as_millis());
62
63 scope.scoped("differential logging", move |scope| {
64 let enable_logging = config.enable_logging;
65 let (logs, token) = if enable_logging {
66 event_queue.links
67 .mz_replay(
68 scope,
69 "differential logs",
70 config.interval,
71 event_queue.activator,
72 )
73 } else {
74 let token: Rc<dyn std::any::Any> = Rc::new(Box::new(()));
75 (empty(scope), token)
76 };
77
78 let mut demux =
81 OperatorBuilder::new("Differential Logging Demux".to_string(), scope.clone());
82 let mut input = demux.new_input(&logs, Pipeline);
83 let (batches_out, batches) = demux.new_output();
84 let (records_out, records) = demux.new_output();
85 let (sharing_out, sharing) = demux.new_output();
86 let (batcher_records_out, batcher_records) = demux.new_output();
87 let (batcher_size_out, batcher_size) = demux.new_output();
88 let (batcher_capacity_out, batcher_capacity) = demux.new_output();
89 let (batcher_allocations_out, batcher_allocations) = demux.new_output();
90
91 let mut batches_out = OutputBuilder::from(batches_out);
92 let mut records_out = OutputBuilder::from(records_out);
93 let mut sharing_out = OutputBuilder::from(sharing_out);
94 let mut batcher_records_out = OutputBuilder::from(batcher_records_out);
95 let mut batcher_size_out = OutputBuilder::from(batcher_size_out);
96 let mut batcher_capacity_out = OutputBuilder::from(batcher_capacity_out);
97 let mut batcher_allocations_out = OutputBuilder::from(batcher_allocations_out);
98
99 let mut demux_state = Default::default();
100 demux.build(move |_capability| {
101 move |_frontiers| {
102 let mut batches = batches_out.activate();
103 let mut records = records_out.activate();
104 let mut sharing = sharing_out.activate();
105 let mut batcher_records = batcher_records_out.activate();
106 let mut batcher_size = batcher_size_out.activate();
107 let mut batcher_capacity = batcher_capacity_out.activate();
108 let mut batcher_allocations = batcher_allocations_out.activate();
109
110 input.for_each_time(|cap, data| {
111 let mut output_buffers = DemuxOutput {
112 batches: batches.session_with_builder(&cap),
113 records: records.session_with_builder(&cap),
114 sharing: sharing.session_with_builder(&cap),
115 batcher_records: batcher_records.session_with_builder(&cap),
116 batcher_size: batcher_size.session_with_builder(&cap),
117 batcher_capacity: batcher_capacity.session_with_builder(&cap),
118 batcher_allocations: batcher_allocations.session_with_builder(&cap),
119 };
120
121 for (time, event) in data.flat_map(|data: &mut Vec<_>| data.drain(..)) {
122 DemuxHandler {
123 state: &mut demux_state,
124 output: &mut output_buffers,
125 logging_interval_ms,
126 time,
127 shared_state: &mut shared_state.borrow_mut(),
128 }
129 .handle(event);
130 }
131 });
132 }
133 });
134
135 let stream_to_collection = |input: &Stream<_, ((usize, ()), Timestamp, Diff)>, log| {
138 let worker_id = scope.index();
139 consolidate_and_pack::<_, KeyBatcher<_, _, _>, ColumnBuilder<_>, _, _>(
140 input,
141 log,
142 move |data, packer, session| {
143 for ((op, ()), time, diff) in data.iter() {
144 let data = packer.pack_slice(&[
145 Datum::UInt64(u64::cast_from(*op)),
146 Datum::UInt64(u64::cast_from(worker_id)),
147 ]);
148 session.give((data, *time, *diff))
149 }
150 },
151 )
152 };
153
154 let arrangement_batches = stream_to_collection(&batches, ArrangementBatches);
156 let arrangement_records = stream_to_collection(&records, ArrangementRecords);
157 let sharing = stream_to_collection(&sharing, Sharing);
158 let batcher_records = stream_to_collection(&batcher_records, BatcherRecords);
159 let batcher_size = stream_to_collection(&batcher_size, BatcherSize);
160 let batcher_capacity = stream_to_collection(&batcher_capacity, BatcherCapacity);
161 let batcher_allocations = stream_to_collection(&batcher_allocations, BatcherAllocations);
162
163 use DifferentialLog::*;
164 let logs = [
165 (ArrangementBatches, arrangement_batches),
166 (ArrangementRecords, arrangement_records),
167 (Sharing, sharing),
168 (BatcherRecords, batcher_records),
169 (BatcherSize, batcher_size),
170 (BatcherCapacity, batcher_capacity),
171 (BatcherAllocations, batcher_allocations),
172 ];
173
174 let mut collections = BTreeMap::new();
176 for (variant, collection) in logs {
177 let variant = LogVariant::Differential(variant);
178 if config.index_logs.contains_key(&variant) {
179 let trace = collection
180 .mz_arrange_core::<_, Col2ValBatcher<_, _, _, _>, RowRowBuilder<_, _>, RowRowSpine<_, _>>(
181 ExchangeCore::<ColumnBuilder<_>, _>::new_core(columnar_exchange::<mz_repr::Row, mz_repr::Row, Timestamp, mz_repr::Diff>),
182 &format!("Arrange {variant:?}"),
183 )
184 .trace;
185 let collection = LogCollection {
186 trace,
187 token: Rc::clone(&token),
188 };
189 collections.insert(variant, collection);
190 }
191 }
192
193 Return { collections, }
194 })
195}
196
197type OutputSession<'a, 'b, D> = Session<
198 'a,
199 'b,
200 Timestamp,
201 ConsolidatingContainerBuilder<Vec<(D, Timestamp, Diff)>>,
202 InputCapability<Timestamp>,
203>;
204
205struct DemuxOutput<'a, 'b> {
207 batches: OutputSession<'a, 'b, (usize, ())>,
208 records: OutputSession<'a, 'b, (usize, ())>,
209 sharing: OutputSession<'a, 'b, (usize, ())>,
210 batcher_records: OutputSession<'a, 'b, (usize, ())>,
211 batcher_size: OutputSession<'a, 'b, (usize, ())>,
212 batcher_capacity: OutputSession<'a, 'b, (usize, ())>,
213 batcher_allocations: OutputSession<'a, 'b, (usize, ())>,
214}
215
216#[derive(Default)]
218struct DemuxState {
219 sharing: BTreeMap<usize, usize>,
221}
222
223struct DemuxHandler<'a, 'b, 'c> {
225 state: &'a mut DemuxState,
227 output: &'a mut DemuxOutput<'b, 'c>,
229 logging_interval_ms: u128,
231 time: Duration,
233 shared_state: &'a mut SharedLoggingState,
235}
236
237impl DemuxHandler<'_, '_, '_> {
238 fn ts(&self) -> Timestamp {
241 let time_ms = self.time.as_millis();
242 let interval = self.logging_interval_ms;
243 let rounded = (time_ms / interval + 1) * interval;
244 rounded.try_into().expect("must fit")
245 }
246
247 fn handle(&mut self, event: DifferentialEvent) {
249 use DifferentialEvent::*;
250
251 match event {
252 Batch(e) => self.handle_batch(e),
253 Merge(e) => self.handle_merge(e),
254 Drop(e) => self.handle_drop(e),
255 TraceShare(e) => self.handle_trace_share(e),
256 Batcher(e) => self.handle_batcher_event(e),
257 _ => (),
258 }
259 }
260
261 fn handle_batch(&mut self, event: BatchEvent) {
262 let ts = self.ts();
263 let operator_id = event.operator;
264 self.output.batches.give(((operator_id, ()), ts, Diff::ONE));
265
266 let diff = Diff::try_from(event.length).expect("must fit");
267 self.output.records.give(((operator_id, ()), ts, diff));
268 self.notify_arrangement_size(operator_id);
269 }
270
271 fn handle_merge(&mut self, event: MergeEvent) {
272 let Some(done) = event.complete else { return };
273
274 let ts = self.ts();
275 let operator_id = event.operator;
276 self.output
277 .batches
278 .give(((operator_id, ()), ts, Diff::MINUS_ONE));
279
280 let diff = Diff::try_from(done).expect("must fit")
281 - Diff::try_from(event.length1 + event.length2).expect("must fit");
282 if diff != Diff::ZERO {
283 self.output.records.give(((operator_id, ()), ts, diff));
284 }
285 self.notify_arrangement_size(operator_id);
286 }
287
288 fn handle_drop(&mut self, event: DropEvent) {
289 let ts = self.ts();
290 let operator_id = event.operator;
291 self.output
292 .batches
293 .give(((operator_id, ()), ts, Diff::MINUS_ONE));
294
295 let diff = -Diff::try_from(event.length).expect("must fit");
296 if diff != Diff::ZERO {
297 self.output.records.give(((operator_id, ()), ts, diff));
298 }
299 self.notify_arrangement_size(operator_id);
300 }
301
302 fn handle_trace_share(&mut self, event: TraceShare) {
303 let ts = self.ts();
304 let operator_id = event.operator;
305 let diff = Diff::cast_from(event.diff);
306 debug_assert_ne!(diff, Diff::ZERO);
307 self.output.sharing.give(((operator_id, ()), ts, diff));
308
309 let sharing = self.state.sharing.entry(operator_id).or_default();
310 *sharing = (Diff::try_from(*sharing).expect("must fit") + diff)
311 .into_inner()
312 .try_into()
313 .expect("under/overflow");
314 if *sharing == 0 {
315 self.state.sharing.remove(&operator_id);
316 self.shared_state.compute_logger.as_ref().map(|logger| {
317 logger.log(&ComputeEvent::ArrangementHeapSizeOperatorDrop(
318 ArrangementHeapSizeOperatorDrop { operator_id },
319 ))
320 });
321 }
322 }
323
324 fn handle_batcher_event(&mut self, event: BatcherEvent) {
325 let ts = self.ts();
326 let operator_id = event.operator;
327 let records_diff = Diff::cast_from(event.records_diff);
328 let size_diff = Diff::cast_from(event.size_diff);
329 let capacity_diff = Diff::cast_from(event.capacity_diff);
330 let allocations_diff = Diff::cast_from(event.allocations_diff);
331 self.output
332 .batcher_records
333 .give(((operator_id, ()), ts, records_diff));
334 self.output
335 .batcher_size
336 .give(((operator_id, ()), ts, size_diff));
337 self.output
338 .batcher_capacity
339 .give(((operator_id, ()), ts, capacity_diff));
340 self.output
341 .batcher_allocations
342 .give(((operator_id, ()), ts, allocations_diff));
343 }
344
345 fn notify_arrangement_size(&self, operator: usize) {
346 if let Some(activator) = self.shared_state.arrangement_size_activators.get(&operator) {
351 activator.activate();
352 }
353 }
354}