1pub mod compute;
13mod differential;
14pub(super) mod initialize;
15mod reachability;
16mod timely;
17
18use std::any::Any;
19use std::collections::BTreeMap;
20use std::marker::PhantomData;
21use std::rc::Rc;
22use std::time::Duration;
23
24use ::timely::Container;
25use ::timely::container::{CapacityContainerBuilder, ContainerBuilder};
26use ::timely::dataflow::StreamCore;
27use ::timely::dataflow::channels::pact::Pipeline;
28use ::timely::dataflow::channels::pushers::buffer::Session;
29use ::timely::dataflow::channels::pushers::{Counter, Tee};
30use ::timely::dataflow::operators::Operator;
31use ::timely::dataflow::operators::capture::{Event, EventLink, EventPusher};
32use ::timely::progress::Timestamp as TimelyTimestamp;
33use ::timely::scheduling::Activator;
34use differential_dataflow::trace::Batcher;
35use mz_compute_client::logging::{ComputeLog, DifferentialLog, LogVariant, TimelyLog};
36use mz_expr::{MirScalarExpr, permutation_for_arrangement};
37use mz_repr::{Datum, Diff, Row, RowPacker, RowRef, Timestamp};
38use mz_timely_util::activator::RcActivator;
39use mz_timely_util::containers::ColumnBuilder;
40use mz_timely_util::operator::consolidate_pact;
41
42use crate::logging::compute::Logger as ComputeLogger;
43use crate::typedefs::RowRowAgent;
44
45pub use crate::logging::initialize::initialize;
46
47pub(super) type Update<D> = (D, Timestamp, Diff);
49pub(super) type Pusher<C> = Counter<Timestamp, C, Tee<Timestamp, C>>;
51pub(super) type OutputSession<'a, CB> =
53 Session<'a, Timestamp, CB, Pusher<<CB as ContainerBuilder>::Container>>;
54pub(super) type OutputSessionVec<'a, D> = OutputSession<'a, CapacityContainerBuilder<Vec<D>>>;
56pub(super) type OutputSessionColumnar<'a, D> = OutputSession<'a, ColumnBuilder<D>>;
58
59struct BatchLogger<C, P>
61where
62 P: EventPusher<Timestamp, C>,
63{
64 time_ms: Timestamp,
66 event_pusher: P,
68 interval_ms: u128,
72 _marker: PhantomData<C>,
73}
74
75impl<C, P> BatchLogger<C, P>
76where
77 P: EventPusher<Timestamp, C>,
78{
79 fn new(event_pusher: P, interval_ms: u128) -> Self {
81 BatchLogger {
82 time_ms: Timestamp::minimum(),
83 event_pusher,
84 interval_ms,
85 _marker: PhantomData,
86 }
87 }
88}
89
90impl<C, P> BatchLogger<C, P>
91where
92 P: EventPusher<Timestamp, C>,
93 C: Container,
94{
95 fn publish_batch(&mut self, data: C) {
97 self.event_pusher.push(Event::Messages(self.time_ms, data));
98 }
99
100 fn report_progress(&mut self, time: Duration) -> bool {
104 let time_ms = ((time.as_millis() / self.interval_ms) + 1) * self.interval_ms;
105 let new_time_ms: Timestamp = time_ms.try_into().expect("must fit");
106 if self.time_ms < new_time_ms {
107 self.event_pusher
108 .push(Event::Progress(vec![(new_time_ms, 1), (self.time_ms, -1)]));
109 self.time_ms = new_time_ms;
110 true
111 } else {
112 false
113 }
114 }
115}
116
117impl<C, P> Drop for BatchLogger<C, P>
118where
119 P: EventPusher<Timestamp, C>,
120{
121 fn drop(&mut self) {
122 self.event_pusher
123 .push(Event::Progress(vec![(self.time_ms, -1)]));
124 }
125}
126
127#[derive(Clone)]
137struct EventQueue<C, const N: usize = 1> {
138 links: [Rc<EventLink<Timestamp, C>>; N],
139 activator: RcActivator,
140}
141
142impl<C, const N: usize> EventQueue<C, N> {
143 fn new(name: &str) -> Self {
144 let activator_name = format!("{name}_activator");
145 let activate_after = 128;
146 Self {
147 links: [(); N].map(|_| Rc::new(EventLink::new())),
148 activator: RcActivator::new(activator_name, activate_after),
149 }
150 }
151}
152
153#[derive(Default)]
155struct SharedLoggingState {
156 arrangement_size_activators: BTreeMap<usize, Activator>,
158 compute_logger: Option<ComputeLogger>,
160}
161
162pub(crate) struct PermutedRowPacker {
164 key: Vec<usize>,
165 value: Vec<usize>,
166 key_row: Row,
167 value_row: Row,
168}
169
170impl PermutedRowPacker {
171 pub(crate) fn new<V: Into<LogVariant>>(variant: V) -> Self {
173 let variant = variant.into();
174 let key = variant.index_by();
175 let (_, value) = permutation_for_arrangement(
176 &key.iter()
177 .cloned()
178 .map(MirScalarExpr::Column)
179 .collect::<Vec<_>>(),
180 variant.desc().arity(),
181 );
182 Self {
183 key,
184 value,
185 key_row: Row::default(),
186 value_row: Row::default(),
187 }
188 }
189
190 pub(crate) fn pack_slice(&mut self, datums: &[Datum]) -> (&RowRef, &RowRef) {
192 self.pack_by_index(|packer, index| packer.push(datums[index]))
193 }
194
195 pub(crate) fn pack_slice_owned(&mut self, datums: &[Datum]) -> (Row, Row) {
201 let (key, value) = self.pack_slice(datums);
202 (key.to_owned(), value.to_owned())
203 }
204
205 pub(crate) fn pack_by_index<F: Fn(&mut RowPacker, usize)>(
207 &mut self,
208 logic: F,
209 ) -> (&RowRef, &RowRef) {
210 let mut packer = self.key_row.packer();
211 for index in &self.key {
212 logic(&mut packer, *index);
213 }
214
215 let mut packer = self.value_row.packer();
216 for index in &self.value {
217 logic(&mut packer, *index);
218 }
219
220 (&self.key_row, &self.value_row)
221 }
222}
223
224struct LogCollection {
226 trace: RowRowAgent<Timestamp, Diff>,
228 token: Rc<dyn Any>,
230}
231
232pub(super) fn consolidate_and_pack<G, B, CB, L, F>(
239 input: &StreamCore<G, B::Input>,
240 log: L,
241 mut logic: F,
242) -> StreamCore<G, CB::Container>
243where
244 G: ::timely::dataflow::Scope<Timestamp = Timestamp>,
245 B: Batcher<Time = G::Timestamp> + 'static,
246 B::Input: Container + Clone + 'static,
247 B::Output: Container + Clone + 'static,
248 CB: ContainerBuilder,
249 L: Into<LogVariant>,
250 F: for<'a> FnMut(
251 <B::Output as Container>::ItemRef<'a>,
252 &mut PermutedRowPacker,
253 &mut OutputSession<CB>,
254 ) + 'static,
255{
256 let log = log.into();
257 let c_name = &format!("Consolidate {log:?}");
259 let u_name = &format!("ToRow {log:?}");
260 let mut packer = PermutedRowPacker::new(log);
261 let consolidated = consolidate_pact::<B, _, _>(input, Pipeline, c_name);
262 consolidated.unary::<CB, _, _, _>(Pipeline, u_name, |_, _| {
263 move |input, output| {
264 while let Some((time, data)) = input.next() {
265 let mut session = output.session_with_builder(&time);
266 for item in data.iter().flatten().flat_map(|chunk| chunk.iter()) {
267 logic(item, &mut packer, &mut session);
268 }
269 }
270 }
271 })
272}