mz_compute_client/protocol/
history.rs1use std::borrow::Borrow;
13use std::collections::{BTreeMap, BTreeSet};
14
15use mz_ore::cast::CastFrom;
16use mz_ore::metrics::UIntGauge;
17use mz_ore::{assert_none, soft_assert_or_log};
18use timely::PartialOrder;
19use timely::progress::Antichain;
20
21use crate::controller::StorageCollections;
22use crate::metrics::HistoryMetrics;
23use crate::protocol::command::{ComputeCommand, ComputeParameters};
24
25#[derive(Debug)]
27pub struct ComputeCommandHistory<M, T = mz_repr::Timestamp> {
28    reduced_count: usize,
30    commands: Vec<ComputeCommand<T>>,
36    metrics: HistoryMetrics<M>,
38}
39
40impl<M, T> ComputeCommandHistory<M, T>
41where
42    M: Borrow<UIntGauge>,
43    T: timely::progress::Timestamp,
44{
45    pub fn new(metrics: HistoryMetrics<M>) -> Self {
47        metrics.reset();
48
49        Self {
50            reduced_count: 0,
51            commands: Vec::new(),
52            metrics,
53        }
54    }
55
56    pub fn push(&mut self, command: ComputeCommand<T>) {
60        self.commands.push(command);
61
62        if self.commands.len() > 2 * self.reduced_count {
63            self.reduce();
64        } else {
65            let command = self.commands.last().expect("pushed above");
68            self.metrics
69                .command_counts
70                .for_command(command)
71                .borrow()
72                .inc();
73            if matches!(command, ComputeCommand::CreateDataflow(_)) {
74                self.metrics.dataflow_count.borrow().inc();
75            }
76        }
77    }
78
79    pub fn reduce(&mut self) {
86        let mut final_frontiers = BTreeMap::new();
90        let mut created_dataflows = Vec::new();
91        let mut scheduled_collections = Vec::new();
92        let mut live_peeks = BTreeMap::new();
93
94        let mut hello_command = None;
95        let mut create_inst_command = None;
96
97        let mut final_configuration = ComputeParameters::default();
102
103        let mut initialization_complete = false;
104        let mut allow_writes = false;
105
106        for command in self.commands.drain(..) {
107            match command {
108                hello @ ComputeCommand::Hello { .. } => {
109                    assert_none!(hello_command);
110                    hello_command = Some(hello);
111                }
112                create_inst @ ComputeCommand::CreateInstance(_) => {
114                    assert_none!(create_inst_command);
115                    create_inst_command = Some(create_inst);
116                }
117                ComputeCommand::InitializationComplete => {
118                    initialization_complete = true;
119                }
120                ComputeCommand::UpdateConfiguration(params) => {
121                    final_configuration.update(*params);
122                }
123                ComputeCommand::CreateDataflow(dataflow) => {
124                    created_dataflows.push(dataflow);
125                }
126                ComputeCommand::Schedule(id) => {
127                    scheduled_collections.push(id);
128                }
129                ComputeCommand::AllowCompaction { id, frontier } => {
130                    final_frontiers.insert(id, frontier.clone());
131                }
132                ComputeCommand::Peek(peek) => {
133                    live_peeks.insert(peek.uuid, peek);
134                }
135                ComputeCommand::CancelPeek { uuid } => {
136                    live_peeks.remove(&uuid);
137                }
138                ComputeCommand::AllowWrites => {
139                    allow_writes = true;
140                }
141            }
142        }
143
144        for dataflow in created_dataflows.iter_mut() {
147            let mut as_of = Antichain::new();
148            let initial_as_of = dataflow.as_of.as_ref().unwrap();
149            for id in dataflow.export_ids() {
150                if let Some(frontier) = final_frontiers.get(&id) {
152                    as_of.extend(frontier.clone());
153                } else {
154                    as_of.extend(initial_as_of.clone());
155                }
156            }
157
158            soft_assert_or_log!(
159                PartialOrder::less_equal(initial_as_of, &as_of),
160                "dataflow as-of regression: {:?} -> {:?} (exports={})",
161                initial_as_of.elements(),
162                as_of.elements(),
163                dataflow.display_export_ids(),
164            );
165
166            for id in dataflow.export_ids() {
168                if let Some(frontier) = final_frontiers.get(&id) {
169                    if frontier == &as_of {
170                        final_frontiers.remove(&id);
171                    }
172                }
173            }
174
175            dataflow.as_of = Some(as_of);
176        }
177
178        created_dataflows.retain(|dataflow| dataflow.as_of != Some(Antichain::new()));
180        let retained_collections: BTreeSet<_> = created_dataflows
181            .iter()
182            .flat_map(|d| d.export_ids())
183            .collect();
184        scheduled_collections.retain(|id| retained_collections.contains(id));
185
186        let command_counts = &self.metrics.command_counts;
191        let dataflow_count = &self.metrics.dataflow_count;
192
193        let count = u64::from(hello_command.is_some());
194        command_counts.hello.borrow().set(count);
195        if let Some(hello) = hello_command {
196            self.commands.push(hello);
197        }
198
199        let count = u64::from(create_inst_command.is_some());
200        command_counts.create_instance.borrow().set(count);
201        if let Some(create_inst_command) = create_inst_command {
202            self.commands.push(create_inst_command);
203        }
204
205        let count = u64::from(!final_configuration.all_unset());
206        command_counts.update_configuration.borrow().set(count);
207        if !final_configuration.all_unset() {
208            let config = Box::new(final_configuration);
209            self.commands
210                .push(ComputeCommand::UpdateConfiguration(config));
211        }
212
213        let count = u64::cast_from(created_dataflows.len());
214        command_counts.create_dataflow.borrow().set(count);
215        dataflow_count.borrow().set(count);
216        for dataflow in created_dataflows {
217            self.commands.push(ComputeCommand::CreateDataflow(dataflow));
218        }
219
220        let count = u64::cast_from(scheduled_collections.len());
221        command_counts.schedule.borrow().set(count);
222        for id in scheduled_collections {
223            self.commands.push(ComputeCommand::Schedule(id));
224        }
225
226        let count = u64::cast_from(live_peeks.len());
227        command_counts.peek.borrow().set(count);
228        for peek in live_peeks.into_values() {
229            self.commands.push(ComputeCommand::Peek(peek));
230        }
231
232        command_counts.cancel_peek.borrow().set(0);
233
234        let count = u64::cast_from(final_frontiers.len());
236        command_counts.allow_compaction.borrow().set(count);
237        for (id, frontier) in final_frontiers {
238            self.commands
239                .push(ComputeCommand::AllowCompaction { id, frontier });
240        }
241
242        let count = u64::from(initialization_complete);
243        command_counts.initialization_complete.borrow().set(count);
244        if initialization_complete {
245            self.commands.push(ComputeCommand::InitializationComplete);
246        }
247
248        if allow_writes {
249            self.commands.push(ComputeCommand::AllowWrites);
250        }
251
252        self.reduced_count = self.commands.len();
253    }
254
255    pub fn discard_peeks(&mut self) {
257        self.commands.retain(|command| {
258            use ComputeCommand::*;
259            let is_peek = matches!(command, Peek(_) | CancelPeek { .. });
260            if is_peek {
261                self.metrics
262                    .command_counts
263                    .for_command(command)
264                    .borrow()
265                    .dec();
266            }
267            !is_peek
268        });
269    }
270
271    pub fn update_source_uppers(&mut self, storage_collections: &StorageCollections<T>) {
277        for command in &mut self.commands {
278            if let ComputeCommand::CreateDataflow(dataflow) = command {
279                for (id, (_, _, upper)) in dataflow.source_imports.iter_mut() {
280                    let frontiers = storage_collections
281                        .collection_frontiers(*id)
282                        .expect("collection exists");
283
284                    *upper = frontiers.write_frontier;
285                }
286            }
287        }
288    }
289
290    pub fn iter(&self) -> impl Iterator<Item = &ComputeCommand<T>> {
292        self.commands.iter()
293    }
294}