mz_compute_client/protocol/
history.rs1use std::borrow::Borrow;
13use std::collections::{BTreeMap, BTreeSet};
14
15use mz_ore::cast::CastFrom;
16use mz_ore::metrics::UIntGauge;
17use mz_ore::{assert_none, soft_assert_or_log};
18use timely::PartialOrder;
19use timely::progress::Antichain;
20
21use crate::controller::StorageCollections;
22use crate::metrics::HistoryMetrics;
23use crate::protocol::command::{ComputeCommand, ComputeParameters};
24
25#[derive(Debug)]
27pub struct ComputeCommandHistory<M, T = mz_repr::Timestamp> {
28 reduced_count: usize,
30 commands: Vec<ComputeCommand<T>>,
36 metrics: HistoryMetrics<M>,
38}
39
40impl<M, T> ComputeCommandHistory<M, T>
41where
42 M: Borrow<UIntGauge>,
43 T: timely::progress::Timestamp,
44{
45 pub fn new(metrics: HistoryMetrics<M>) -> Self {
47 metrics.reset();
48
49 Self {
50 reduced_count: 0,
51 commands: Vec::new(),
52 metrics,
53 }
54 }
55
56 pub fn push(&mut self, command: ComputeCommand<T>) {
60 self.commands.push(command);
61
62 if self.commands.len() > 2 * self.reduced_count {
63 self.reduce();
64 } else {
65 let command = self.commands.last().expect("pushed above");
68 self.metrics
69 .command_counts
70 .for_command(command)
71 .borrow()
72 .inc();
73 if matches!(command, ComputeCommand::CreateDataflow(_)) {
74 self.metrics.dataflow_count.borrow().inc();
75 }
76 }
77 }
78
79 pub fn reduce(&mut self) {
86 let mut final_frontiers = BTreeMap::new();
90 let mut created_dataflows = Vec::new();
91 let mut scheduled_collections = Vec::new();
92 let mut live_peeks = BTreeMap::new();
93
94 let mut hello_command = None;
95 let mut create_inst_command = None;
96
97 let mut final_configuration = ComputeParameters::default();
102
103 let mut initialization_complete = false;
104 let mut allow_writes = BTreeSet::new();
105
106 for command in self.commands.drain(..) {
107 match command {
108 hello @ ComputeCommand::Hello { .. } => {
109 assert_none!(hello_command);
110 hello_command = Some(hello);
111 }
112 create_inst @ ComputeCommand::CreateInstance(_) => {
114 assert_none!(create_inst_command);
115 create_inst_command = Some(create_inst);
116 }
117 ComputeCommand::InitializationComplete => {
118 initialization_complete = true;
119 }
120 ComputeCommand::UpdateConfiguration(params) => {
121 final_configuration.update(*params);
122 }
123 ComputeCommand::CreateDataflow(dataflow) => {
124 created_dataflows.push(dataflow);
125 }
126 ComputeCommand::Schedule(id) => {
127 scheduled_collections.push(id);
128 }
129 ComputeCommand::AllowCompaction { id, frontier } => {
130 final_frontiers.insert(id, frontier.clone());
131 }
132 ComputeCommand::Peek(peek) => {
133 live_peeks.insert(peek.uuid, peek);
134 }
135 ComputeCommand::CancelPeek { uuid } => {
136 live_peeks.remove(&uuid);
137 }
138 ComputeCommand::AllowWrites(id) => {
139 allow_writes.insert(id);
140 }
141 }
142 }
143
144 for dataflow in created_dataflows.iter_mut() {
147 let mut as_of = Antichain::new();
148 let initial_as_of = dataflow.as_of.as_ref().unwrap();
149 for id in dataflow.export_ids() {
150 if let Some(frontier) = final_frontiers.get(&id) {
152 as_of.extend(frontier.clone());
153 } else {
154 as_of.extend(initial_as_of.clone());
155 }
156 }
157
158 soft_assert_or_log!(
159 PartialOrder::less_equal(initial_as_of, &as_of),
160 "dataflow as-of regression: {:?} -> {:?} (exports={})",
161 initial_as_of.elements(),
162 as_of.elements(),
163 dataflow.display_export_ids(),
164 );
165
166 for id in dataflow.export_ids() {
168 if let Some(frontier) = final_frontiers.get(&id) {
169 if frontier == &as_of {
170 final_frontiers.remove(&id);
171 }
172 }
173 }
174
175 dataflow.as_of = Some(as_of);
176 }
177
178 created_dataflows.retain(|dataflow| dataflow.as_of != Some(Antichain::new()));
180 let retained_collections: BTreeSet<_> = created_dataflows
181 .iter()
182 .flat_map(|d| d.export_ids())
183 .collect();
184 scheduled_collections.retain(|id| retained_collections.contains(id));
185 allow_writes.retain(|id| retained_collections.contains(id));
187
188 let command_counts = &self.metrics.command_counts;
193 let dataflow_count = &self.metrics.dataflow_count;
194
195 let count = u64::from(hello_command.is_some());
196 command_counts.hello.borrow().set(count);
197 if let Some(hello) = hello_command {
198 self.commands.push(hello);
199 }
200
201 let count = u64::from(create_inst_command.is_some());
202 command_counts.create_instance.borrow().set(count);
203 if let Some(create_inst_command) = create_inst_command {
204 self.commands.push(create_inst_command);
205 }
206
207 let count = u64::from(!final_configuration.all_unset());
208 command_counts.update_configuration.borrow().set(count);
209 if !final_configuration.all_unset() {
210 let config = Box::new(final_configuration);
211 self.commands
212 .push(ComputeCommand::UpdateConfiguration(config));
213 }
214
215 let count = u64::cast_from(created_dataflows.len());
216 command_counts.create_dataflow.borrow().set(count);
217 dataflow_count.borrow().set(count);
218 for dataflow in created_dataflows {
219 self.commands.push(ComputeCommand::CreateDataflow(dataflow));
220 }
221
222 let count = u64::cast_from(scheduled_collections.len());
223 command_counts.schedule.borrow().set(count);
224 for id in scheduled_collections {
225 self.commands.push(ComputeCommand::Schedule(id));
226 }
227
228 let count = u64::cast_from(live_peeks.len());
229 command_counts.peek.borrow().set(count);
230 for peek in live_peeks.into_values() {
231 self.commands.push(ComputeCommand::Peek(peek));
232 }
233
234 command_counts.cancel_peek.borrow().set(0);
235
236 let count = u64::cast_from(final_frontiers.len());
238 command_counts.allow_compaction.borrow().set(count);
239 for (id, frontier) in final_frontiers {
240 self.commands
241 .push(ComputeCommand::AllowCompaction { id, frontier });
242 }
243
244 let count = u64::cast_from(allow_writes.len());
245 command_counts.allow_writes.borrow().set(count);
246 for id in allow_writes {
247 self.commands.push(ComputeCommand::AllowWrites(id));
248 }
249
250 let count = u64::from(initialization_complete);
251 command_counts.initialization_complete.borrow().set(count);
252 if initialization_complete {
253 self.commands.push(ComputeCommand::InitializationComplete);
254 }
255
256 self.reduced_count = self.commands.len();
257 }
258
259 pub fn discard_peeks(&mut self) {
261 self.commands.retain(|command| {
262 use ComputeCommand::*;
263 let is_peek = matches!(command, Peek(_) | CancelPeek { .. });
264 if is_peek {
265 self.metrics
266 .command_counts
267 .for_command(command)
268 .borrow()
269 .dec();
270 }
271 !is_peek
272 });
273 }
274
275 pub fn update_source_uppers(&mut self, storage_collections: &StorageCollections<T>) {
281 for command in &mut self.commands {
282 if let ComputeCommand::CreateDataflow(dataflow) = command {
283 for (id, (_, _, upper)) in dataflow.source_imports.iter_mut() {
284 let frontiers = storage_collections
285 .collection_frontiers(*id)
286 .expect("collection exists");
287
288 *upper = frontiers.write_frontier;
289 }
290 }
291 }
292 }
293
294 pub fn iter(&self) -> impl Iterator<Item = &ComputeCommand<T>> {
296 self.commands.iter()
297 }
298}