1use std::collections::BTreeMap;
13
14use mz_ore::cast::CastFrom;
15use mz_storage_client::client::StorageCommand;
16use mz_storage_client::metrics::HistoryMetrics;
17use mz_storage_types::parameters::StorageParameters;
18
19#[derive(Debug)]
21pub(crate) struct CommandHistory {
22 reduced_count: usize,
24 commands: Vec<StorageCommand>,
30 metrics: HistoryMetrics,
32}
33
34impl CommandHistory {
35 pub fn new(metrics: HistoryMetrics) -> Self {
37 metrics.reset();
38
39 Self {
40 reduced_count: 0,
41 commands: Vec::new(),
42 metrics,
43 }
44 }
45
46 pub fn iter(&self) -> impl DoubleEndedIterator<Item = &StorageCommand> {
48 self.commands.iter()
49 }
50
51 pub fn push(&mut self, command: StorageCommand) {
55 self.commands.push(command);
56
57 if self.commands.len() > 2 * self.reduced_count {
58 self.reduce();
59 } else {
60 let command = self.commands.last().expect("pushed above");
63 self.metrics.command_counts.for_command(command).inc();
64 }
65 }
66
67 pub fn reduce(&mut self) {
69 use StorageCommand::*;
70
71 let mut hello_command = None;
72 let mut initialization_complete = false;
73 let mut allow_writes = false;
74 let mut final_compactions = BTreeMap::new();
75
76 let mut final_ingestions = BTreeMap::new();
80 let mut final_sinks = BTreeMap::new();
81 let mut final_oneshot_ingestions = BTreeMap::new();
82
83 let mut final_configuration = StorageParameters::default();
90
91 for command in self.commands.drain(..) {
92 match command {
93 cmd @ Hello { .. } => hello_command = Some(cmd),
94 InitializationComplete => initialization_complete = true,
95 AllowWrites => allow_writes = true,
96 UpdateConfiguration(params) => final_configuration.update(*params),
97 RunIngestion(ingestion) => {
98 final_ingestions.insert(ingestion.id, ingestion);
99 }
100 RunSink(sink) => {
101 final_sinks.insert(sink.id, sink);
102 }
103 AllowCompaction(id, since) => {
104 final_compactions.insert(id, since);
105 }
106 RunOneshotIngestion(oneshot) => {
107 final_oneshot_ingestions.insert(oneshot.ingestion_id, oneshot);
108 }
109 CancelOneshotIngestion(uuid) => {
110 final_oneshot_ingestions.remove(&uuid);
111 }
112 }
113 }
114
115 let mut run_ingestions = Vec::new();
116 let mut run_sinks = Vec::new();
117 let mut allow_compaction = Vec::new();
118
119 for ingestion in final_ingestions.into_values() {
121 if let Some(frontier) = final_compactions.get(&ingestion.id) {
122 if frontier.is_empty() {
123 continue;
124 }
125 }
126
127 let compactions = ingestion
128 .description
129 .collection_ids()
130 .filter_map(|id| final_compactions.remove(&id).map(|f| (id, f)));
131 allow_compaction.extend(compactions);
132
133 run_ingestions.push(ingestion);
134 }
135
136 for sink in final_sinks.into_values() {
138 if let Some(frontier) = final_compactions.remove(&sink.id) {
139 if frontier.is_empty() {
140 continue;
141 }
142 }
143
144 run_sinks.push(sink);
145 }
146
147 let command_counts = &self.metrics.command_counts;
154
155 let count = u64::from(hello_command.is_some());
156 command_counts.hello.set(count);
157 if let Some(hello) = hello_command {
158 self.commands.push(hello);
159 }
160
161 let count = u64::from(!final_configuration.all_unset());
162 command_counts.update_configuration.set(count);
163 if !final_configuration.all_unset() {
164 let config = Box::new(final_configuration);
165 self.commands
166 .push(StorageCommand::UpdateConfiguration(config));
167 }
168
169 let count = u64::cast_from(run_ingestions.len());
170 command_counts.run_ingestion.set(count);
171 for ingestion in run_ingestions {
172 self.commands.push(StorageCommand::RunIngestion(ingestion));
173 }
174
175 let count = u64::cast_from(run_sinks.len());
176 command_counts.run_sink.set(count);
177 for sink in run_sinks {
178 self.commands.push(StorageCommand::RunSink(sink));
179 }
180
181 let count = u64::cast_from(final_oneshot_ingestions.len());
184 command_counts.run_oneshot_ingestion.set(count);
185 for ingestion in final_oneshot_ingestions.into_values() {
186 self.commands
187 .push(StorageCommand::RunOneshotIngestion(ingestion));
188 }
189
190 command_counts.cancel_oneshot_ingestion.set(0);
191
192 let count = u64::cast_from(allow_compaction.len());
193 command_counts.allow_compaction.set(count);
194 for (id, since) in allow_compaction {
195 self.commands
196 .push(StorageCommand::AllowCompaction(id, since));
197 }
198
199 let count = u64::from(initialization_complete);
200 command_counts.initialization_complete.set(count);
201 if initialization_complete {
202 self.commands.push(StorageCommand::InitializationComplete);
203 }
204
205 let count = u64::from(allow_writes);
206 command_counts.allow_writes.set(count);
207 if allow_writes {
208 self.commands.push(StorageCommand::AllowWrites);
209 }
210
211 self.reduced_count = self.commands.len();
212 }
213}
214
215#[cfg(test)]
216mod tests {
217 use std::str::FromStr;
218
219 use mz_cluster_client::metrics::ControllerMetrics;
220 use mz_ore::metrics::MetricsRegistry;
221 use mz_ore::url::SensitiveUrl;
222 use mz_persist_types::PersistLocation;
223 use mz_repr::{CatalogItemId, GlobalId, RelationDesc, SqlRelationType};
224 use mz_storage_client::client::{RunIngestionCommand, RunSinkCommand};
225 use mz_storage_client::metrics::StorageControllerMetrics;
226 use mz_storage_types::connections::inline::InlinedConnection;
227 use mz_storage_types::connections::{KafkaConnection, Tunnel};
228 use mz_storage_types::controller::CollectionMetadata;
229 use mz_storage_types::instances::StorageInstanceId;
230 use mz_storage_types::sinks::{
231 KafkaIdStyle, KafkaSinkCompressionType, KafkaSinkConnection, KafkaSinkFormat,
232 KafkaSinkFormatType, SinkEnvelope, StorageSinkConnection, StorageSinkDesc,
233 };
234 use mz_storage_types::sources::load_generator::{
235 LoadGenerator, LoadGeneratorOutput, LoadGeneratorSourceExportDetails,
236 };
237 use mz_storage_types::sources::{
238 GenericSourceConnection, IngestionDescription, LoadGeneratorSourceConnection, SourceDesc,
239 SourceEnvelope, SourceExport, SourceExportDataConfig, SourceExportDetails,
240 };
241 use timely::progress::Antichain;
242
243 use super::*;
244
245 fn history() -> CommandHistory {
246 let registry = MetricsRegistry::new();
247 let controller_metrics = ControllerMetrics::new(®istry);
248 let metrics = StorageControllerMetrics::new(®istry, controller_metrics)
249 .for_instance(StorageInstanceId::system(0).expect("0 is a valid ID"))
250 .for_history();
251
252 CommandHistory::new(metrics)
253 }
254
255 fn ingestion_description<S: Into<Vec<u64>>>(
256 ingestion_id: u64,
257 subsource_ids: S,
258 remap_collection_id: u64,
259 ) -> IngestionDescription<CollectionMetadata, InlinedConnection> {
260 let export_ids = [ingestion_id, remap_collection_id]
261 .into_iter()
262 .chain(subsource_ids.into());
263 let source_exports = export_ids
264 .map(|id| {
265 let export = SourceExport {
266 storage_metadata: CollectionMetadata {
267 persist_location: PersistLocation {
268 blob_uri: SensitiveUrl::from_str("mem://").expect("invalid URL"),
269 consensus_uri: SensitiveUrl::from_str("mem://").expect("invalid URL"),
270 },
271 data_shard: Default::default(),
272 relation_desc: RelationDesc::new(
273 SqlRelationType {
274 column_types: Default::default(),
275 keys: Default::default(),
276 },
277 Vec::<String>::new(),
278 ),
279 txns_shard: Default::default(),
280 },
281 details: SourceExportDetails::LoadGenerator(LoadGeneratorSourceExportDetails {
282 output: LoadGeneratorOutput::Default,
283 }),
284 data_config: SourceExportDataConfig {
285 encoding: Default::default(),
286 envelope: SourceEnvelope::CdcV2,
287 },
288 };
289 (GlobalId::User(id), export)
290 })
291 .collect();
292
293 let connection = GenericSourceConnection::LoadGenerator(LoadGeneratorSourceConnection {
294 load_generator: LoadGenerator::Auction,
295 tick_micros: Default::default(),
296 as_of: Default::default(),
297 up_to: Default::default(),
298 });
299
300 IngestionDescription {
301 desc: SourceDesc {
302 connection,
303 timestamp_interval: Default::default(),
304 },
305 remap_metadata: CollectionMetadata {
306 persist_location: PersistLocation {
307 blob_uri: SensitiveUrl::from_str("mem://").expect("invalid URL"),
308 consensus_uri: SensitiveUrl::from_str("mem://").expect("invalid URL"),
309 },
310 data_shard: Default::default(),
311 relation_desc: RelationDesc::new(
312 SqlRelationType {
313 column_types: Default::default(),
314 keys: Default::default(),
315 },
316 Vec::<String>::new(),
317 ),
318 txns_shard: Default::default(),
319 },
320 source_exports,
321 instance_id: StorageInstanceId::system(0).expect("0 is a valid ID"),
322 remap_collection_id: GlobalId::User(remap_collection_id),
323 }
324 }
325
326 fn sink_description() -> StorageSinkDesc<CollectionMetadata> {
327 StorageSinkDesc {
328 from: GlobalId::System(1),
329 from_desc: RelationDesc::new(
330 SqlRelationType {
331 column_types: Default::default(),
332 keys: Default::default(),
333 },
334 Vec::<String>::new(),
335 ),
336 connection: StorageSinkConnection::Kafka(KafkaSinkConnection {
337 connection_id: CatalogItemId::System(2),
338 connection: KafkaConnection {
339 brokers: Default::default(),
340 default_tunnel: Tunnel::Direct,
341 progress_topic: Default::default(),
342 progress_topic_options: Default::default(),
343 options: Default::default(),
344 tls: Default::default(),
345 sasl: Default::default(),
346 },
347 format: KafkaSinkFormat {
348 key_format: Default::default(),
349 value_format: KafkaSinkFormatType::Text,
350 },
351 relation_key_indices: Default::default(),
352 key_desc_and_indices: Default::default(),
353 headers_index: Default::default(),
354 value_desc: RelationDesc::new(
355 SqlRelationType {
356 column_types: Default::default(),
357 keys: Default::default(),
358 },
359 Vec::<String>::new(),
360 ),
361 partition_by: Default::default(),
362 topic: Default::default(),
363 topic_options: Default::default(),
364 compression_type: KafkaSinkCompressionType::None,
365 progress_group_id: KafkaIdStyle::Legacy,
366 transactional_id: KafkaIdStyle::Legacy,
367 topic_metadata_refresh_interval: Default::default(),
368 }),
369 with_snapshot: Default::default(),
370 version: Default::default(),
371 envelope: SinkEnvelope::Upsert,
372 as_of: Antichain::from_elem(0.into()),
373 from_storage_metadata: CollectionMetadata {
374 persist_location: PersistLocation {
375 blob_uri: SensitiveUrl::from_str("mem://").expect("invalid URL"),
376 consensus_uri: SensitiveUrl::from_str("mem://").expect("invalid URL"),
377 },
378 data_shard: Default::default(),
379 relation_desc: RelationDesc::new(
380 SqlRelationType {
381 column_types: Default::default(),
382 keys: Default::default(),
383 },
384 Vec::<String>::new(),
385 ),
386 txns_shard: Default::default(),
387 },
388 to_storage_metadata: CollectionMetadata {
389 persist_location: PersistLocation {
390 blob_uri: SensitiveUrl::from_str("mem://").expect("invalid URL"),
391 consensus_uri: SensitiveUrl::from_str("mem://").expect("invalid URL"),
392 },
393 data_shard: Default::default(),
394 relation_desc: RelationDesc::new(
395 SqlRelationType {
396 column_types: Default::default(),
397 keys: Default::default(),
398 },
399 Vec::<String>::new(),
400 ),
401 txns_shard: Default::default(),
402 },
403 commit_interval: Default::default(),
404 }
405 }
406
407 #[mz_ore::test]
408 fn reduce_drops_dropped_ingestion() {
409 let mut history = history();
410
411 let commands = [
412 StorageCommand::RunIngestion(Box::new(RunIngestionCommand {
413 id: GlobalId::User(1),
414 description: ingestion_description(1, [2], 3),
415 })),
416 StorageCommand::AllowCompaction(GlobalId::User(1), Antichain::new()),
417 StorageCommand::AllowCompaction(GlobalId::User(2), Antichain::new()),
418 StorageCommand::AllowCompaction(GlobalId::User(3), Antichain::new()),
419 ];
420
421 for cmd in commands {
422 history.push(cmd);
423 }
424
425 history.reduce();
426
427 let commands_after: Vec<_> = history.iter().collect();
428 assert!(commands_after.is_empty(), "{:?}", commands_after);
429 }
430
431 #[mz_ore::test]
432 fn reduce_keeps_compacted_ingestion() {
433 let mut history = history();
434
435 let commands = [
436 StorageCommand::RunIngestion(Box::new(RunIngestionCommand {
437 id: GlobalId::User(1),
438 description: ingestion_description(1, [2], 3),
439 })),
440 StorageCommand::AllowCompaction(GlobalId::User(1), Antichain::from_elem(1.into())),
441 StorageCommand::AllowCompaction(GlobalId::User(2), Antichain::from_elem(2.into())),
442 StorageCommand::AllowCompaction(GlobalId::User(3), Antichain::from_elem(3.into())),
443 ];
444
445 for cmd in commands.clone() {
446 history.push(cmd);
447 }
448
449 history.reduce();
450
451 let commands_after: Vec<_> = history.iter().cloned().collect();
452 assert_eq!(commands_after, commands);
453 }
454
455 #[mz_ore::test]
456 fn reduce_keeps_partially_dropped_ingestion() {
457 let mut history = history();
458
459 let commands = [
460 StorageCommand::RunIngestion(Box::new(RunIngestionCommand {
461 id: GlobalId::User(1),
462 description: ingestion_description(1, [2], 3),
463 })),
464 StorageCommand::AllowCompaction(GlobalId::User(2), Antichain::new()),
465 ];
466
467 for cmd in commands.clone() {
468 history.push(cmd);
469 }
470
471 history.reduce();
472
473 let commands_after: Vec<_> = history.iter().cloned().collect();
474 assert_eq!(commands_after, commands);
475 }
476
477 #[mz_ore::test]
478 fn reduce_drops_dropped_sink() {
479 let mut history = history();
480
481 let commands = [
482 StorageCommand::RunSink(Box::new(RunSinkCommand {
483 id: GlobalId::User(1),
484 description: sink_description(),
485 })),
486 StorageCommand::AllowCompaction(GlobalId::User(1), Antichain::new()),
487 ];
488
489 for cmd in commands {
490 history.push(cmd);
491 }
492
493 history.reduce();
494
495 let commands_after: Vec<_> = history.iter().collect();
496 assert!(commands_after.is_empty(), "{:?}", commands_after);
497 }
498
499 #[mz_ore::test]
500 fn reduce_keeps_compacted_sink() {
501 let mut history = history();
502
503 let sink_desc = sink_description();
504 let commands = [
505 StorageCommand::RunSink(Box::new(RunSinkCommand {
506 id: GlobalId::User(1),
507 description: sink_desc.clone(),
508 })),
509 StorageCommand::AllowCompaction(GlobalId::User(1), Antichain::from_elem(42.into())),
510 ];
511
512 for cmd in commands {
513 history.push(cmd);
514 }
515
516 history.reduce();
517
518 let commands_after: Vec<_> = history.iter().cloned().collect();
519
520 let expected_commands = [StorageCommand::RunSink(Box::new(RunSinkCommand {
521 id: GlobalId::User(1),
522 description: sink_desc,
523 }))];
524
525 assert_eq!(commands_after, expected_commands);
526 }
527
528 #[mz_ore::test]
529 fn reduce_drops_stray_compactions() {
530 let mut history = history();
531
532 let commands = [
533 StorageCommand::AllowCompaction(GlobalId::User(1), Antichain::new()),
534 StorageCommand::AllowCompaction(GlobalId::User(2), Antichain::from_elem(1.into())),
535 StorageCommand::AllowCompaction(GlobalId::User(2), Antichain::from_elem(2.into())),
536 ];
537
538 for cmd in commands {
539 history.push(cmd);
540 }
541
542 history.reduce();
543
544 let commands_after: Vec<_> = history.iter().collect();
545 assert!(commands_after.is_empty(), "{:?}", commands_after);
546 }
547}