1use std::collections::BTreeMap;
13
14use mz_ore::cast::CastFrom;
15use mz_storage_client::client::StorageCommand;
16use mz_storage_client::metrics::HistoryMetrics;
17use mz_storage_types::parameters::StorageParameters;
18use timely::order::TotalOrder;
19
20#[derive(Debug)]
22pub(crate) struct CommandHistory<T> {
23 reduced_count: usize,
25 commands: Vec<StorageCommand<T>>,
31 metrics: HistoryMetrics,
33}
34
35impl<T: timely::progress::Timestamp + TotalOrder> CommandHistory<T> {
36 pub fn new(metrics: HistoryMetrics) -> Self {
38 metrics.reset();
39
40 Self {
41 reduced_count: 0,
42 commands: Vec::new(),
43 metrics,
44 }
45 }
46
47 pub fn iter(&self) -> impl DoubleEndedIterator<Item = &StorageCommand<T>> {
49 self.commands.iter()
50 }
51
52 pub fn push(&mut self, command: StorageCommand<T>) {
56 self.commands.push(command);
57
58 if self.commands.len() > 2 * self.reduced_count {
59 self.reduce();
60 } else {
61 let command = self.commands.last().expect("pushed above");
64 self.metrics.command_counts.for_command(command).inc();
65 }
66 }
67
68 pub fn reduce(&mut self) {
70 use StorageCommand::*;
71
72 let mut hello_command = None;
73 let mut initialization_complete = false;
74 let mut allow_writes = false;
75 let mut final_compactions = BTreeMap::new();
76
77 let mut final_ingestions = BTreeMap::new();
81 let mut final_sinks = BTreeMap::new();
82 let mut final_oneshot_ingestions = BTreeMap::new();
83
84 let mut final_configuration = StorageParameters::default();
91
92 for command in self.commands.drain(..) {
93 match command {
94 cmd @ Hello { .. } => hello_command = Some(cmd),
95 InitializationComplete => initialization_complete = true,
96 AllowWrites => allow_writes = true,
97 UpdateConfiguration(params) => final_configuration.update(*params),
98 RunIngestion(ingestion) => {
99 final_ingestions.insert(ingestion.id, ingestion);
100 }
101 RunSink(sink) => {
102 final_sinks.insert(sink.id, sink);
103 }
104 AllowCompaction(id, since) => {
105 final_compactions.insert(id, since);
106 }
107 RunOneshotIngestion(oneshot) => {
108 final_oneshot_ingestions.insert(oneshot.ingestion_id, oneshot);
109 }
110 CancelOneshotIngestion(uuid) => {
111 final_oneshot_ingestions.remove(&uuid);
112 }
113 }
114 }
115
116 let mut run_ingestions = Vec::new();
117 let mut run_sinks = Vec::new();
118 let mut allow_compaction = Vec::new();
119
120 for ingestion in final_ingestions.into_values() {
122 if let Some(frontier) = final_compactions.get(&ingestion.id) {
123 if frontier.is_empty() {
124 continue;
125 }
126 }
127
128 let compactions = ingestion
129 .description
130 .collection_ids()
131 .filter_map(|id| final_compactions.remove(&id).map(|f| (id, f)));
132 allow_compaction.extend(compactions);
133
134 run_ingestions.push(ingestion);
135 }
136
137 for sink in final_sinks.into_values() {
139 if let Some(frontier) = final_compactions.remove(&sink.id) {
140 if frontier.is_empty() {
141 continue;
142 }
143 }
144
145 run_sinks.push(sink);
146 }
147
148 let command_counts = &self.metrics.command_counts;
155
156 let count = u64::from(hello_command.is_some());
157 command_counts.hello.set(count);
158 if let Some(hello) = hello_command {
159 self.commands.push(hello);
160 }
161
162 let count = u64::from(!final_configuration.all_unset());
163 command_counts.update_configuration.set(count);
164 if !final_configuration.all_unset() {
165 let config = Box::new(final_configuration);
166 self.commands
167 .push(StorageCommand::UpdateConfiguration(config));
168 }
169
170 let count = u64::cast_from(run_ingestions.len());
171 command_counts.run_ingestion.set(count);
172 for ingestion in run_ingestions {
173 self.commands.push(StorageCommand::RunIngestion(ingestion));
174 }
175
176 let count = u64::cast_from(run_sinks.len());
177 command_counts.run_sink.set(count);
178 for sink in run_sinks {
179 self.commands.push(StorageCommand::RunSink(sink));
180 }
181
182 let count = u64::cast_from(final_oneshot_ingestions.len());
185 command_counts.run_oneshot_ingestion.set(count);
186 for ingestion in final_oneshot_ingestions.into_values() {
187 self.commands
188 .push(StorageCommand::RunOneshotIngestion(ingestion));
189 }
190
191 command_counts.cancel_oneshot_ingestion.set(0);
192
193 let count = u64::cast_from(allow_compaction.len());
194 command_counts.allow_compaction.set(count);
195 for (id, since) in allow_compaction {
196 self.commands
197 .push(StorageCommand::AllowCompaction(id, since));
198 }
199
200 let count = u64::from(initialization_complete);
201 command_counts.initialization_complete.set(count);
202 if initialization_complete {
203 self.commands.push(StorageCommand::InitializationComplete);
204 }
205
206 let count = u64::from(allow_writes);
207 command_counts.allow_writes.set(count);
208 if allow_writes {
209 self.commands.push(StorageCommand::AllowWrites);
210 }
211
212 self.reduced_count = self.commands.len();
213 }
214}
215
216#[cfg(test)]
217mod tests {
218 use std::str::FromStr;
219
220 use mz_cluster_client::metrics::ControllerMetrics;
221 use mz_ore::metrics::MetricsRegistry;
222 use mz_ore::url::SensitiveUrl;
223 use mz_persist_types::PersistLocation;
224 use mz_repr::{CatalogItemId, GlobalId, RelationDesc, SqlRelationType};
225 use mz_storage_client::client::{RunIngestionCommand, RunSinkCommand};
226 use mz_storage_client::metrics::StorageControllerMetrics;
227 use mz_storage_types::connections::inline::InlinedConnection;
228 use mz_storage_types::connections::{KafkaConnection, Tunnel};
229 use mz_storage_types::controller::CollectionMetadata;
230 use mz_storage_types::instances::StorageInstanceId;
231 use mz_storage_types::sinks::{
232 KafkaIdStyle, KafkaSinkCompressionType, KafkaSinkConnection, KafkaSinkFormat,
233 KafkaSinkFormatType, SinkEnvelope, StorageSinkConnection, StorageSinkDesc,
234 };
235 use mz_storage_types::sources::load_generator::{
236 LoadGenerator, LoadGeneratorOutput, LoadGeneratorSourceExportDetails,
237 };
238 use mz_storage_types::sources::{
239 GenericSourceConnection, IngestionDescription, LoadGeneratorSourceConnection, SourceDesc,
240 SourceEnvelope, SourceExport, SourceExportDataConfig, SourceExportDetails,
241 };
242 use timely::progress::Antichain;
243
244 use super::*;
245
246 fn history() -> CommandHistory<u64> {
247 let registry = MetricsRegistry::new();
248 let controller_metrics = ControllerMetrics::new(®istry);
249 let metrics = StorageControllerMetrics::new(®istry, controller_metrics)
250 .for_instance(StorageInstanceId::system(0).expect("0 is a valid ID"))
251 .for_history();
252
253 CommandHistory::new(metrics)
254 }
255
256 fn ingestion_description<S: Into<Vec<u64>>>(
257 ingestion_id: u64,
258 subsource_ids: S,
259 remap_collection_id: u64,
260 ) -> IngestionDescription<CollectionMetadata, InlinedConnection> {
261 let export_ids = [ingestion_id, remap_collection_id]
262 .into_iter()
263 .chain(subsource_ids.into());
264 let source_exports = export_ids
265 .map(|id| {
266 let export = SourceExport {
267 storage_metadata: CollectionMetadata {
268 persist_location: PersistLocation {
269 blob_uri: SensitiveUrl::from_str("mem://").expect("invalid URL"),
270 consensus_uri: SensitiveUrl::from_str("mem://").expect("invalid URL"),
271 },
272 data_shard: Default::default(),
273 relation_desc: RelationDesc::new(
274 SqlRelationType {
275 column_types: Default::default(),
276 keys: Default::default(),
277 },
278 Vec::<String>::new(),
279 ),
280 txns_shard: Default::default(),
281 },
282 details: SourceExportDetails::LoadGenerator(LoadGeneratorSourceExportDetails {
283 output: LoadGeneratorOutput::Default,
284 }),
285 data_config: SourceExportDataConfig {
286 encoding: Default::default(),
287 envelope: SourceEnvelope::CdcV2,
288 },
289 };
290 (GlobalId::User(id), export)
291 })
292 .collect();
293
294 let connection = GenericSourceConnection::LoadGenerator(LoadGeneratorSourceConnection {
295 load_generator: LoadGenerator::Auction,
296 tick_micros: Default::default(),
297 as_of: Default::default(),
298 up_to: Default::default(),
299 });
300
301 IngestionDescription {
302 desc: SourceDesc {
303 connection,
304 timestamp_interval: Default::default(),
305 },
306 remap_metadata: CollectionMetadata {
307 persist_location: PersistLocation {
308 blob_uri: SensitiveUrl::from_str("mem://").expect("invalid URL"),
309 consensus_uri: SensitiveUrl::from_str("mem://").expect("invalid URL"),
310 },
311 data_shard: Default::default(),
312 relation_desc: RelationDesc::new(
313 SqlRelationType {
314 column_types: Default::default(),
315 keys: Default::default(),
316 },
317 Vec::<String>::new(),
318 ),
319 txns_shard: Default::default(),
320 },
321 source_exports,
322 instance_id: StorageInstanceId::system(0).expect("0 is a valid ID"),
323 remap_collection_id: GlobalId::User(remap_collection_id),
324 }
325 }
326
327 fn sink_description() -> StorageSinkDesc<CollectionMetadata, u64> {
328 StorageSinkDesc {
329 from: GlobalId::System(1),
330 from_desc: RelationDesc::new(
331 SqlRelationType {
332 column_types: Default::default(),
333 keys: Default::default(),
334 },
335 Vec::<String>::new(),
336 ),
337 connection: StorageSinkConnection::Kafka(KafkaSinkConnection {
338 connection_id: CatalogItemId::System(2),
339 connection: KafkaConnection {
340 brokers: Default::default(),
341 default_tunnel: Tunnel::Direct,
342 progress_topic: Default::default(),
343 progress_topic_options: Default::default(),
344 options: Default::default(),
345 tls: Default::default(),
346 sasl: Default::default(),
347 },
348 format: KafkaSinkFormat {
349 key_format: Default::default(),
350 value_format: KafkaSinkFormatType::Text,
351 },
352 relation_key_indices: Default::default(),
353 key_desc_and_indices: Default::default(),
354 headers_index: Default::default(),
355 value_desc: RelationDesc::new(
356 SqlRelationType {
357 column_types: Default::default(),
358 keys: Default::default(),
359 },
360 Vec::<String>::new(),
361 ),
362 partition_by: Default::default(),
363 topic: Default::default(),
364 topic_options: Default::default(),
365 compression_type: KafkaSinkCompressionType::None,
366 progress_group_id: KafkaIdStyle::Legacy,
367 transactional_id: KafkaIdStyle::Legacy,
368 topic_metadata_refresh_interval: Default::default(),
369 }),
370 with_snapshot: Default::default(),
371 version: Default::default(),
372 envelope: SinkEnvelope::Upsert,
373 as_of: Antichain::from_elem(0),
374 from_storage_metadata: CollectionMetadata {
375 persist_location: PersistLocation {
376 blob_uri: SensitiveUrl::from_str("mem://").expect("invalid URL"),
377 consensus_uri: SensitiveUrl::from_str("mem://").expect("invalid URL"),
378 },
379 data_shard: Default::default(),
380 relation_desc: RelationDesc::new(
381 SqlRelationType {
382 column_types: Default::default(),
383 keys: Default::default(),
384 },
385 Vec::<String>::new(),
386 ),
387 txns_shard: Default::default(),
388 },
389 to_storage_metadata: CollectionMetadata {
390 persist_location: PersistLocation {
391 blob_uri: SensitiveUrl::from_str("mem://").expect("invalid URL"),
392 consensus_uri: SensitiveUrl::from_str("mem://").expect("invalid URL"),
393 },
394 data_shard: Default::default(),
395 relation_desc: RelationDesc::new(
396 SqlRelationType {
397 column_types: Default::default(),
398 keys: Default::default(),
399 },
400 Vec::<String>::new(),
401 ),
402 txns_shard: Default::default(),
403 },
404 }
405 }
406
407 #[mz_ore::test]
408 fn reduce_drops_dropped_ingestion() {
409 let mut history = history();
410
411 let commands = [
412 StorageCommand::RunIngestion(Box::new(RunIngestionCommand {
413 id: GlobalId::User(1),
414 description: ingestion_description(1, [2], 3),
415 })),
416 StorageCommand::AllowCompaction(GlobalId::User(1), Antichain::new()),
417 StorageCommand::AllowCompaction(GlobalId::User(2), Antichain::new()),
418 StorageCommand::AllowCompaction(GlobalId::User(3), Antichain::new()),
419 ];
420
421 for cmd in commands {
422 history.push(cmd);
423 }
424
425 history.reduce();
426
427 let commands_after: Vec<_> = history.iter().collect();
428 assert!(commands_after.is_empty(), "{:?}", commands_after);
429 }
430
431 #[mz_ore::test]
432 fn reduce_keeps_compacted_ingestion() {
433 let mut history = history();
434
435 let commands = [
436 StorageCommand::RunIngestion(Box::new(RunIngestionCommand {
437 id: GlobalId::User(1),
438 description: ingestion_description(1, [2], 3),
439 })),
440 StorageCommand::AllowCompaction(GlobalId::User(1), Antichain::from_elem(1)),
441 StorageCommand::AllowCompaction(GlobalId::User(2), Antichain::from_elem(2)),
442 StorageCommand::AllowCompaction(GlobalId::User(3), Antichain::from_elem(3)),
443 ];
444
445 for cmd in commands.clone() {
446 history.push(cmd);
447 }
448
449 history.reduce();
450
451 let commands_after: Vec<_> = history.iter().cloned().collect();
452 assert_eq!(commands_after, commands);
453 }
454
455 #[mz_ore::test]
456 fn reduce_keeps_partially_dropped_ingestion() {
457 let mut history = history();
458
459 let commands = [
460 StorageCommand::RunIngestion(Box::new(RunIngestionCommand {
461 id: GlobalId::User(1),
462 description: ingestion_description(1, [2], 3),
463 })),
464 StorageCommand::AllowCompaction(GlobalId::User(2), Antichain::new()),
465 ];
466
467 for cmd in commands.clone() {
468 history.push(cmd);
469 }
470
471 history.reduce();
472
473 let commands_after: Vec<_> = history.iter().cloned().collect();
474 assert_eq!(commands_after, commands);
475 }
476
477 #[mz_ore::test]
478 fn reduce_drops_dropped_sink() {
479 let mut history = history();
480
481 let commands = [
482 StorageCommand::RunSink(Box::new(RunSinkCommand {
483 id: GlobalId::User(1),
484 description: sink_description(),
485 })),
486 StorageCommand::AllowCompaction(GlobalId::User(1), Antichain::new()),
487 ];
488
489 for cmd in commands {
490 history.push(cmd);
491 }
492
493 history.reduce();
494
495 let commands_after: Vec<_> = history.iter().collect();
496 assert!(commands_after.is_empty(), "{:?}", commands_after);
497 }
498
499 #[mz_ore::test]
500 fn reduce_keeps_compacted_sink() {
501 let mut history = history();
502
503 let sink_desc = sink_description();
504 let commands = [
505 StorageCommand::RunSink(Box::new(RunSinkCommand {
506 id: GlobalId::User(1),
507 description: sink_desc.clone(),
508 })),
509 StorageCommand::AllowCompaction(GlobalId::User(1), Antichain::from_elem(42)),
510 ];
511
512 for cmd in commands {
513 history.push(cmd);
514 }
515
516 history.reduce();
517
518 let commands_after: Vec<_> = history.iter().cloned().collect();
519
520 let expected_commands = [StorageCommand::RunSink(Box::new(RunSinkCommand {
521 id: GlobalId::User(1),
522 description: sink_desc,
523 }))];
524
525 assert_eq!(commands_after, expected_commands);
526 }
527
528 #[mz_ore::test]
529 fn reduce_drops_stray_compactions() {
530 let mut history = history();
531
532 let commands = [
533 StorageCommand::AllowCompaction(GlobalId::User(1), Antichain::new()),
534 StorageCommand::AllowCompaction(GlobalId::User(2), Antichain::from_elem(1)),
535 StorageCommand::AllowCompaction(GlobalId::User(2), Antichain::from_elem(2)),
536 ];
537
538 for cmd in commands {
539 history.push(cmd);
540 }
541
542 history.reduce();
543
544 let commands_after: Vec<_> = history.iter().collect();
545 assert!(commands_after.is_empty(), "{:?}", commands_after);
546 }
547}