1pub mod compute;
13mod differential;
14pub(super) mod initialize;
15mod reachability;
16mod timely;
17
18use std::any::Any;
19use std::collections::BTreeMap;
20use std::marker::PhantomData;
21use std::rc::Rc;
22use std::time::Duration;
23
24use ::timely::container::CapacityContainerBuilder;
25use ::timely::dataflow::StreamCore;
26use ::timely::dataflow::channels::pact::Pipeline;
27use ::timely::dataflow::operators::capture::{Event, EventLink, EventPusher};
28use ::timely::dataflow::operators::generic::Session;
29use ::timely::dataflow::operators::{InputCapability, Operator};
30use ::timely::progress::Timestamp as TimelyTimestamp;
31use ::timely::scheduling::Activator;
32use ::timely::{Container, ContainerBuilder};
33use differential_dataflow::trace::Batcher;
34use mz_compute_client::logging::{ComputeLog, DifferentialLog, LogVariant, TimelyLog};
35use mz_expr::{MirScalarExpr, permutation_for_arrangement};
36use mz_repr::{Datum, Diff, Row, RowPacker, RowRef, Timestamp};
37use mz_timely_util::activator::RcActivator;
38use mz_timely_util::columnar::builder::ColumnBuilder;
39use mz_timely_util::operator::consolidate_pact;
40
41use crate::logging::compute::Logger as ComputeLogger;
42use crate::typedefs::RowRowAgent;
43
44pub use crate::logging::initialize::initialize;
45
46pub(super) type Update<D> = (D, Timestamp, Diff);
48pub(super) type OutputSession<'a, 'b, CB> =
51 Session<'a, 'b, Timestamp, CB, InputCapability<Timestamp>>;
52pub(super) type OutputSessionVec<'a, 'b, D> =
54 OutputSession<'a, 'b, CapacityContainerBuilder<Vec<D>>>;
55pub(super) type OutputSessionColumnar<'a, 'b, D> = OutputSession<'a, 'b, ColumnBuilder<D>>;
57
58struct BatchLogger<C, P>
60where
61 P: EventPusher<Timestamp, C>,
62{
63 time_ms: Timestamp,
65 event_pusher: P,
67 interval_ms: u128,
71 _marker: PhantomData<C>,
72}
73
74impl<C, P> BatchLogger<C, P>
75where
76 P: EventPusher<Timestamp, C>,
77{
78 fn new(event_pusher: P, interval_ms: u128) -> Self {
80 BatchLogger {
81 time_ms: Timestamp::minimum(),
82 event_pusher,
83 interval_ms,
84 _marker: PhantomData,
85 }
86 }
87}
88
89impl<C, P> BatchLogger<C, P>
90where
91 P: EventPusher<Timestamp, C>,
92 C: Container,
93{
94 fn publish_batch(&mut self, data: C) {
96 self.event_pusher.push(Event::Messages(self.time_ms, data));
97 }
98
99 fn report_progress(&mut self, time: Duration) -> bool {
103 let time_ms = ((time.as_millis() / self.interval_ms) + 1) * self.interval_ms;
104 let new_time_ms: Timestamp = time_ms.try_into().expect("must fit");
105 if self.time_ms < new_time_ms {
106 self.event_pusher
107 .push(Event::Progress(vec![(new_time_ms, 1), (self.time_ms, -1)]));
108 self.time_ms = new_time_ms;
109 true
110 } else {
111 false
112 }
113 }
114}
115
116impl<C, P> Drop for BatchLogger<C, P>
117where
118 P: EventPusher<Timestamp, C>,
119{
120 fn drop(&mut self) {
121 self.event_pusher
122 .push(Event::Progress(vec![(self.time_ms, -1)]));
123 }
124}
125
126#[derive(Clone)]
136struct EventQueue<C, const N: usize = 1> {
137 links: [Rc<EventLink<Timestamp, C>>; N],
138 activator: RcActivator,
139}
140
141impl<C, const N: usize> EventQueue<C, N> {
142 fn new(name: &str) -> Self {
143 let activator_name = format!("{name}_activator");
144 let activate_after = 128;
145 Self {
146 links: [(); N].map(|_| Rc::new(EventLink::new())),
147 activator: RcActivator::new(activator_name, activate_after),
148 }
149 }
150}
151
152#[derive(Default)]
154struct SharedLoggingState {
155 arrangement_size_activators: BTreeMap<usize, Activator>,
157 compute_logger: Option<ComputeLogger>,
159}
160
161pub(crate) struct PermutedRowPacker {
163 key: Vec<usize>,
164 value: Vec<usize>,
165 key_row: Row,
166 value_row: Row,
167}
168
169impl PermutedRowPacker {
170 pub(crate) fn new<V: Into<LogVariant>>(variant: V) -> Self {
172 let variant = variant.into();
173 let key = variant.index_by();
174 let (_, value) = permutation_for_arrangement(
175 &key.iter()
176 .cloned()
177 .map(MirScalarExpr::column)
178 .collect::<Vec<_>>(),
179 variant.desc().arity(),
180 );
181 Self {
182 key,
183 value,
184 key_row: Row::default(),
185 value_row: Row::default(),
186 }
187 }
188
189 pub(crate) fn pack_slice(&mut self, datums: &[Datum]) -> (&RowRef, &RowRef) {
191 self.pack_by_index(|packer, index| packer.push(datums[index]))
192 }
193
194 pub(crate) fn pack_by_index<F: Fn(&mut RowPacker, usize)>(
196 &mut self,
197 logic: F,
198 ) -> (&RowRef, &RowRef) {
199 let mut packer = self.key_row.packer();
200 for index in &self.key {
201 logic(&mut packer, *index);
202 }
203
204 let mut packer = self.value_row.packer();
205 for index in &self.value {
206 logic(&mut packer, *index);
207 }
208
209 (&self.key_row, &self.value_row)
210 }
211}
212
213struct LogCollection {
215 trace: RowRowAgent<Timestamp, Diff>,
217 token: Rc<dyn Any>,
219}
220
221pub(super) fn consolidate_and_pack<G, B, CB, L, F>(
228 input: &StreamCore<G, B::Input>,
229 log: L,
230 mut logic: F,
231) -> StreamCore<G, CB::Container>
232where
233 G: ::timely::dataflow::Scope<Timestamp = Timestamp>,
234 B: Batcher<Time = G::Timestamp> + 'static,
235 B::Input: Container + Clone + 'static,
236 B::Output: Clone,
237 CB: ContainerBuilder,
238 L: Into<LogVariant>,
239 F: for<'a> FnMut(B::Output, &mut PermutedRowPacker, &mut OutputSession<CB>) + 'static,
240{
241 let log = log.into();
242 let c_name = &format!("Consolidate {log:?}");
244 let u_name = &format!("ToRow {log:?}");
245 let mut packer = PermutedRowPacker::new(log);
246 let consolidated = consolidate_pact::<B, _, _>(input, Pipeline, c_name);
247 consolidated.unary::<CB, _, _, _>(Pipeline, u_name, |_, _| {
248 move |input, output| {
249 input.for_each_time(|time, data| {
250 let mut session = output.session_with_builder(&time);
251 for item in data.flatten().flat_map(|data| data.drain(..)) {
252 logic(item, &mut packer, &mut session);
253 }
254 });
255 }
256 })
257}