mz_compute_client/protocol/
history.rs1use std::borrow::Borrow;
13use std::collections::{BTreeMap, BTreeSet};
14
15use mz_ore::cast::CastFrom;
16use mz_ore::metrics::UIntGauge;
17use mz_ore::{assert_none, soft_assert_or_log};
18use timely::PartialOrder;
19use timely::progress::Antichain;
20
21use crate::controller::StorageCollections;
22use crate::metrics::HistoryMetrics;
23use crate::protocol::command::{ComputeCommand, ComputeParameters};
24
25#[derive(Debug)]
27pub struct ComputeCommandHistory<M> {
28 reduced_count: usize,
30 commands: Vec<ComputeCommand>,
36 metrics: HistoryMetrics<M>,
38}
39
40impl<M> ComputeCommandHistory<M>
41where
42 M: Borrow<UIntGauge>,
43{
44 pub fn new(metrics: HistoryMetrics<M>) -> Self {
46 metrics.reset();
47
48 Self {
49 reduced_count: 0,
50 commands: Vec::new(),
51 metrics,
52 }
53 }
54
55 pub fn push(&mut self, command: ComputeCommand) {
59 self.commands.push(command);
60
61 if self.commands.len() > 2 * self.reduced_count {
62 self.reduce();
63 } else {
64 let command = self.commands.last().expect("pushed above");
67 self.metrics
68 .command_counts
69 .for_command(command)
70 .borrow()
71 .inc();
72 if matches!(command, ComputeCommand::CreateDataflow(_)) {
73 self.metrics.dataflow_count.borrow().inc();
74 }
75 }
76 }
77
78 pub fn reduce(&mut self) {
85 let mut final_frontiers = BTreeMap::new();
89 let mut created_dataflows = Vec::new();
90 let mut scheduled_collections = Vec::new();
91 let mut live_peeks = BTreeMap::new();
92
93 let mut hello_command = None;
94 let mut create_inst_command = None;
95
96 let mut final_configuration = ComputeParameters::default();
101
102 let mut initialization_complete = false;
103 let mut allow_writes = BTreeSet::new();
104
105 for command in self.commands.drain(..) {
106 match command {
107 hello @ ComputeCommand::Hello { .. } => {
108 assert_none!(hello_command);
109 hello_command = Some(hello);
110 }
111 create_inst @ ComputeCommand::CreateInstance(_) => {
113 assert_none!(create_inst_command);
114 create_inst_command = Some(create_inst);
115 }
116 ComputeCommand::InitializationComplete => {
117 initialization_complete = true;
118 }
119 ComputeCommand::UpdateConfiguration(params) => {
120 final_configuration.update(*params);
121 }
122 ComputeCommand::CreateDataflow(dataflow) => {
123 created_dataflows.push(dataflow);
124 }
125 ComputeCommand::Schedule(id) => {
126 scheduled_collections.push(id);
127 }
128 ComputeCommand::AllowCompaction { id, frontier } => {
129 final_frontiers.insert(id, frontier.clone());
130 }
131 ComputeCommand::Peek(peek) => {
132 live_peeks.insert(peek.uuid, peek);
133 }
134 ComputeCommand::CancelPeek { uuid } => {
135 live_peeks.remove(&uuid);
136 }
137 ComputeCommand::AllowWrites(id) => {
138 allow_writes.insert(id);
139 }
140 }
141 }
142
143 for dataflow in created_dataflows.iter_mut() {
146 let mut as_of = Antichain::new();
147 let initial_as_of = dataflow.as_of.as_ref().unwrap();
148 for id in dataflow.export_ids() {
149 if let Some(frontier) = final_frontiers.get(&id) {
151 as_of.extend(frontier.clone());
152 } else {
153 as_of.extend(initial_as_of.clone());
154 }
155 }
156
157 soft_assert_or_log!(
158 PartialOrder::less_equal(initial_as_of, &as_of),
159 "dataflow as-of regression: {:?} -> {:?} (exports={})",
160 initial_as_of.elements(),
161 as_of.elements(),
162 dataflow.display_export_ids(),
163 );
164
165 for id in dataflow.export_ids() {
167 if let Some(frontier) = final_frontiers.get(&id) {
168 if frontier == &as_of {
169 final_frontiers.remove(&id);
170 }
171 }
172 }
173
174 dataflow.as_of = Some(as_of);
175 }
176
177 created_dataflows.retain(|dataflow| dataflow.as_of != Some(Antichain::new()));
179 let retained_collections: BTreeSet<_> = created_dataflows
180 .iter()
181 .flat_map(|d| d.export_ids())
182 .collect();
183 scheduled_collections.retain(|id| retained_collections.contains(id));
184 allow_writes.retain(|id| retained_collections.contains(id));
186
187 let command_counts = &self.metrics.command_counts;
192 let dataflow_count = &self.metrics.dataflow_count;
193
194 let count = u64::from(hello_command.is_some());
195 command_counts.hello.borrow().set(count);
196 if let Some(hello) = hello_command {
197 self.commands.push(hello);
198 }
199
200 let count = u64::from(create_inst_command.is_some());
201 command_counts.create_instance.borrow().set(count);
202 if let Some(create_inst_command) = create_inst_command {
203 self.commands.push(create_inst_command);
204 }
205
206 let count = u64::from(!final_configuration.all_unset());
207 command_counts.update_configuration.borrow().set(count);
208 if !final_configuration.all_unset() {
209 let config = Box::new(final_configuration);
210 self.commands
211 .push(ComputeCommand::UpdateConfiguration(config));
212 }
213
214 let count = u64::cast_from(created_dataflows.len());
215 command_counts.create_dataflow.borrow().set(count);
216 dataflow_count.borrow().set(count);
217 for dataflow in created_dataflows {
218 self.commands.push(ComputeCommand::CreateDataflow(dataflow));
219 }
220
221 let count = u64::cast_from(scheduled_collections.len());
222 command_counts.schedule.borrow().set(count);
223 for id in scheduled_collections {
224 self.commands.push(ComputeCommand::Schedule(id));
225 }
226
227 let count = u64::cast_from(live_peeks.len());
228 command_counts.peek.borrow().set(count);
229 for peek in live_peeks.into_values() {
230 self.commands.push(ComputeCommand::Peek(peek));
231 }
232
233 command_counts.cancel_peek.borrow().set(0);
234
235 let count = u64::cast_from(final_frontiers.len());
237 command_counts.allow_compaction.borrow().set(count);
238 for (id, frontier) in final_frontiers {
239 self.commands
240 .push(ComputeCommand::AllowCompaction { id, frontier });
241 }
242
243 let count = u64::cast_from(allow_writes.len());
244 command_counts.allow_writes.borrow().set(count);
245 for id in allow_writes {
246 self.commands.push(ComputeCommand::AllowWrites(id));
247 }
248
249 let count = u64::from(initialization_complete);
250 command_counts.initialization_complete.borrow().set(count);
251 if initialization_complete {
252 self.commands.push(ComputeCommand::InitializationComplete);
253 }
254
255 self.reduced_count = self.commands.len();
256 }
257
258 pub fn discard_peeks(&mut self) {
260 self.commands.retain(|command| {
261 use ComputeCommand::*;
262 let is_peek = matches!(command, Peek(_) | CancelPeek { .. });
263 if is_peek {
264 self.metrics
265 .command_counts
266 .for_command(command)
267 .borrow()
268 .dec();
269 }
270 !is_peek
271 });
272 }
273
274 pub fn update_source_uppers(&mut self, storage_collections: &StorageCollections) {
280 for command in &mut self.commands {
281 if let ComputeCommand::CreateDataflow(dataflow) = command {
282 for (id, import) in dataflow.source_imports.iter_mut() {
283 let frontiers = storage_collections
284 .collection_frontiers(*id)
285 .expect("collection exists");
286
287 import.upper = frontiers.write_frontier;
288 }
289 }
290 }
291 }
292
293 pub fn iter(&self) -> impl Iterator<Item = &ComputeCommand> {
295 self.commands.iter()
296 }
297}