mz_compute_client/protocol/
history.rs1use std::borrow::Borrow;
13use std::collections::{BTreeMap, BTreeSet};
14
15use mz_ore::cast::CastFrom;
16use mz_ore::metrics::UIntGauge;
17use mz_ore::{assert_none, soft_assert_or_log};
18use timely::PartialOrder;
19use timely::progress::Antichain;
20
21use crate::controller::StorageCollections;
22use crate::metrics::HistoryMetrics;
23use crate::protocol::command::{ComputeCommand, ComputeParameters};
24
25#[derive(Debug)]
27pub struct ComputeCommandHistory<M, T = mz_repr::Timestamp> {
28 reduced_count: usize,
30 commands: Vec<ComputeCommand<T>>,
36 metrics: HistoryMetrics<M>,
38}
39
40impl<M, T> ComputeCommandHistory<M, T>
41where
42 M: Borrow<UIntGauge>,
43 T: timely::progress::Timestamp,
44{
45 pub fn new(metrics: HistoryMetrics<M>) -> Self {
47 metrics.reset();
48
49 Self {
50 reduced_count: 0,
51 commands: Vec::new(),
52 metrics,
53 }
54 }
55
56 pub fn push(&mut self, command: ComputeCommand<T>) {
60 self.commands.push(command);
61
62 if self.commands.len() > 2 * self.reduced_count {
63 self.reduce();
64 } else {
65 let command = self.commands.last().expect("pushed above");
68 self.metrics
69 .command_counts
70 .for_command(command)
71 .borrow()
72 .inc();
73 if matches!(command, ComputeCommand::CreateDataflow(_)) {
74 self.metrics.dataflow_count.borrow().inc();
75 }
76 }
77 }
78
79 pub fn reduce(&mut self) {
86 let mut final_frontiers = BTreeMap::new();
90 let mut created_dataflows = Vec::new();
91 let mut scheduled_collections = Vec::new();
92 let mut live_peeks = BTreeMap::new();
93
94 let mut create_inst_command = None;
95 let mut create_timely_command = None;
96
97 let mut final_configuration = ComputeParameters::default();
102
103 let mut initialization_complete = false;
104 let mut allow_writes = false;
105
106 for command in self.commands.drain(..) {
107 match command {
108 create_timely @ ComputeCommand::CreateTimely { .. } => {
109 assert_none!(create_timely_command);
110 create_timely_command = Some(create_timely);
111 }
112 create_inst @ ComputeCommand::CreateInstance(_) => {
114 assert_none!(create_inst_command);
115 create_inst_command = Some(create_inst);
116 }
117 ComputeCommand::InitializationComplete => {
118 initialization_complete = true;
119 }
120 ComputeCommand::UpdateConfiguration(params) => {
121 final_configuration.update(*params);
122 }
123 ComputeCommand::CreateDataflow(dataflow) => {
124 created_dataflows.push(dataflow);
125 }
126 ComputeCommand::Schedule(id) => {
127 scheduled_collections.push(id);
128 }
129 ComputeCommand::AllowCompaction { id, frontier } => {
130 final_frontiers.insert(id, frontier.clone());
131 }
132 ComputeCommand::Peek(peek) => {
133 live_peeks.insert(peek.uuid, peek);
134 }
135 ComputeCommand::CancelPeek { uuid } => {
136 live_peeks.remove(&uuid);
137 }
138 ComputeCommand::AllowWrites => {
139 allow_writes = true;
140 }
141 }
142 }
143
144 for dataflow in created_dataflows.iter_mut() {
147 let mut as_of = Antichain::new();
148 let initial_as_of = dataflow.as_of.as_ref().unwrap();
149 for id in dataflow.export_ids() {
150 if let Some(frontier) = final_frontiers.get(&id) {
152 as_of.extend(frontier.clone());
153 } else {
154 as_of.extend(initial_as_of.clone());
155 }
156 }
157
158 soft_assert_or_log!(
159 PartialOrder::less_equal(initial_as_of, &as_of),
160 "dataflow as-of regression: {:?} -> {:?} (exports={})",
161 initial_as_of.elements(),
162 as_of.elements(),
163 dataflow.display_export_ids(),
164 );
165
166 for id in dataflow.export_ids() {
168 if let Some(frontier) = final_frontiers.get(&id) {
169 if frontier == &as_of {
170 final_frontiers.remove(&id);
171 }
172 }
173 }
174
175 dataflow.as_of = Some(as_of);
176 }
177
178 created_dataflows.retain(|dataflow| dataflow.as_of != Some(Antichain::new()));
180 let retained_collections: BTreeSet<_> = created_dataflows
181 .iter()
182 .flat_map(|d| d.export_ids())
183 .collect();
184 scheduled_collections.retain(|id| retained_collections.contains(id));
185
186 let command_counts = &self.metrics.command_counts;
191 let dataflow_count = &self.metrics.dataflow_count;
192
193 let count = u64::from(create_timely_command.is_some());
194 command_counts.create_timely.borrow().set(count);
195 if let Some(create_timely_command) = create_timely_command {
196 self.commands.push(create_timely_command);
197 }
198
199 let count = u64::from(create_inst_command.is_some());
200 command_counts.create_instance.borrow().set(count);
201 if let Some(create_inst_command) = create_inst_command {
202 self.commands.push(create_inst_command);
203 }
204
205 let count = u64::from(!final_configuration.all_unset());
206 command_counts.update_configuration.borrow().set(count);
207 if !final_configuration.all_unset() {
208 let config = Box::new(final_configuration);
209 self.commands
210 .push(ComputeCommand::UpdateConfiguration(config));
211 }
212
213 let count = u64::cast_from(created_dataflows.len());
214 command_counts.create_dataflow.borrow().set(count);
215 dataflow_count.borrow().set(count);
216 for dataflow in created_dataflows {
217 self.commands.push(ComputeCommand::CreateDataflow(dataflow));
218 }
219
220 let count = u64::cast_from(scheduled_collections.len());
221 command_counts.schedule.borrow().set(count);
222 for id in scheduled_collections {
223 self.commands.push(ComputeCommand::Schedule(id));
224 }
225
226 let count = u64::cast_from(live_peeks.len());
227 command_counts.peek.borrow().set(count);
228 for peek in live_peeks.into_values() {
229 self.commands.push(ComputeCommand::Peek(peek));
230 }
231
232 command_counts.cancel_peek.borrow().set(0);
233
234 let count = u64::cast_from(final_frontiers.len());
236 command_counts.allow_compaction.borrow().set(count);
237 for (id, frontier) in final_frontiers {
238 self.commands
239 .push(ComputeCommand::AllowCompaction { id, frontier });
240 }
241
242 let count = u64::from(initialization_complete);
243 command_counts.initialization_complete.borrow().set(count);
244 if initialization_complete {
245 self.commands.push(ComputeCommand::InitializationComplete);
246 }
247
248 if allow_writes {
249 self.commands.push(ComputeCommand::AllowWrites);
250 }
251
252 self.reduced_count = self.commands.len();
253 }
254
255 pub fn discard_peeks(&mut self) {
257 self.commands.retain(|command| {
258 use ComputeCommand::*;
259 let is_peek = matches!(command, Peek(_) | CancelPeek { .. });
260 if is_peek {
261 self.metrics
262 .command_counts
263 .for_command(command)
264 .borrow()
265 .dec();
266 }
267 !is_peek
268 });
269 }
270
271 pub fn update_source_uppers(&mut self, storage_collections: &StorageCollections<T>) {
277 for command in &mut self.commands {
278 if let ComputeCommand::CreateDataflow(dataflow) = command {
279 for (id, (_, _, upper)) in dataflow.source_imports.iter_mut() {
280 let frontiers = storage_collections
281 .collection_frontiers(*id)
282 .expect("collection exists");
283
284 *upper = frontiers.write_frontier;
285 }
286 }
287 }
288 }
289
290 pub fn iter(&self) -> impl Iterator<Item = &ComputeCommand<T>> {
292 self.commands.iter()
293 }
294}