1use std::collections::BTreeMap;
13
14use mz_ore::cast::CastFrom;
15use mz_storage_client::client::StorageCommand;
16use mz_storage_client::metrics::HistoryMetrics;
17use mz_storage_types::parameters::StorageParameters;
18use timely::order::TotalOrder;
19
20#[derive(Debug)]
22pub(crate) struct CommandHistory<T> {
23 reduced_count: usize,
25 commands: Vec<StorageCommand<T>>,
31 metrics: HistoryMetrics,
33}
34
35impl<T: timely::progress::Timestamp + TotalOrder> CommandHistory<T> {
36 pub fn new(metrics: HistoryMetrics) -> Self {
38 metrics.reset();
39
40 Self {
41 reduced_count: 0,
42 commands: Vec::new(),
43 metrics,
44 }
45 }
46
47 pub fn iter(&self) -> impl DoubleEndedIterator<Item = &StorageCommand<T>> {
49 self.commands.iter()
50 }
51
52 pub fn push(&mut self, command: StorageCommand<T>) {
56 self.commands.push(command);
57
58 if self.commands.len() > 2 * self.reduced_count {
59 self.reduce();
60 } else {
61 let command = self.commands.last().expect("pushed above");
64 self.metrics.command_counts.for_command(command).inc();
65 }
66 }
67
68 pub fn reduce(&mut self) {
70 use StorageCommand::*;
71
72 let mut hello_command = None;
73 let mut initialization_complete = false;
74 let mut allow_writes = false;
75 let mut final_compactions = BTreeMap::new();
76
77 let mut final_ingestions = BTreeMap::new();
81 let mut final_sinks = BTreeMap::new();
82 let mut final_oneshot_ingestions = BTreeMap::new();
83
84 let mut final_configuration = StorageParameters::default();
91
92 for command in self.commands.drain(..) {
93 match command {
94 cmd @ Hello { .. } => hello_command = Some(cmd),
95 InitializationComplete => initialization_complete = true,
96 AllowWrites => allow_writes = true,
97 UpdateConfiguration(params) => final_configuration.update(*params),
98 RunIngestion(ingestion) => {
99 final_ingestions.insert(ingestion.id, ingestion);
100 }
101 RunSink(sink) => {
102 final_sinks.insert(sink.id, sink);
103 }
104 AllowCompaction(id, since) => {
105 final_compactions.insert(id, since);
106 }
107 RunOneshotIngestion(oneshot) => {
108 final_oneshot_ingestions.insert(oneshot.ingestion_id, oneshot);
109 }
110 CancelOneshotIngestion(uuid) => {
111 final_oneshot_ingestions.remove(&uuid);
112 }
113 }
114 }
115
116 let mut run_ingestions = Vec::new();
117 let mut run_sinks = Vec::new();
118 let mut allow_compaction = Vec::new();
119
120 for ingestion in final_ingestions.into_values() {
122 if let Some(frontier) = final_compactions.get(&ingestion.id) {
123 if frontier.is_empty() {
124 continue;
125 }
126 }
127
128 let compactions = ingestion
129 .description
130 .collection_ids()
131 .filter_map(|id| final_compactions.remove(&id).map(|f| (id, f)));
132 allow_compaction.extend(compactions);
133
134 run_ingestions.push(ingestion);
135 }
136
137 for sink in final_sinks.into_values() {
139 if let Some(frontier) = final_compactions.remove(&sink.id) {
140 if frontier.is_empty() {
141 continue;
142 }
143 }
144
145 run_sinks.push(sink);
146 }
147
148 let command_counts = &self.metrics.command_counts;
155
156 let count = u64::from(hello_command.is_some());
157 command_counts.hello.set(count);
158 if let Some(hello) = hello_command {
159 self.commands.push(hello);
160 }
161
162 let count = u64::from(!final_configuration.all_unset());
163 command_counts.update_configuration.set(count);
164 if !final_configuration.all_unset() {
165 let config = Box::new(final_configuration);
166 self.commands
167 .push(StorageCommand::UpdateConfiguration(config));
168 }
169
170 let count = u64::cast_from(run_ingestions.len());
171 command_counts.run_ingestion.set(count);
172 for ingestion in run_ingestions {
173 self.commands.push(StorageCommand::RunIngestion(ingestion));
174 }
175
176 let count = u64::cast_from(run_sinks.len());
177 command_counts.run_sink.set(count);
178 for sink in run_sinks {
179 self.commands.push(StorageCommand::RunSink(sink));
180 }
181
182 let count = u64::cast_from(final_oneshot_ingestions.len());
185 command_counts.run_oneshot_ingestion.set(count);
186 for ingestion in final_oneshot_ingestions.into_values() {
187 self.commands
188 .push(StorageCommand::RunOneshotIngestion(ingestion));
189 }
190
191 command_counts.cancel_oneshot_ingestion.set(0);
192
193 let count = u64::cast_from(allow_compaction.len());
194 command_counts.allow_compaction.set(count);
195 for (id, since) in allow_compaction {
196 self.commands
197 .push(StorageCommand::AllowCompaction(id, since));
198 }
199
200 let count = u64::from(initialization_complete);
201 command_counts.initialization_complete.set(count);
202 if initialization_complete {
203 self.commands.push(StorageCommand::InitializationComplete);
204 }
205
206 let count = u64::from(allow_writes);
207 command_counts.allow_writes.set(count);
208 if allow_writes {
209 self.commands.push(StorageCommand::AllowWrites);
210 }
211
212 self.reduced_count = self.commands.len();
213 }
214}
215
216#[cfg(test)]
217mod tests {
218 use std::str::FromStr;
219
220 use mz_cluster_client::metrics::ControllerMetrics;
221 use mz_ore::metrics::MetricsRegistry;
222 use mz_ore::url::SensitiveUrl;
223 use mz_persist_types::PersistLocation;
224 use mz_repr::{CatalogItemId, GlobalId, RelationDesc, RelationType};
225 use mz_storage_client::client::{RunIngestionCommand, RunSinkCommand};
226 use mz_storage_client::metrics::StorageControllerMetrics;
227 use mz_storage_types::connections::inline::InlinedConnection;
228 use mz_storage_types::connections::{KafkaConnection, Tunnel};
229 use mz_storage_types::controller::CollectionMetadata;
230 use mz_storage_types::instances::StorageInstanceId;
231 use mz_storage_types::sinks::{
232 KafkaIdStyle, KafkaSinkCompressionType, KafkaSinkConnection, KafkaSinkFormat,
233 KafkaSinkFormatType, SinkEnvelope, StorageSinkConnection, StorageSinkDesc,
234 };
235 use mz_storage_types::sources::load_generator::{
236 LoadGenerator, LoadGeneratorOutput, LoadGeneratorSourceExportDetails,
237 };
238 use mz_storage_types::sources::{
239 GenericSourceConnection, IngestionDescription, LoadGeneratorSourceConnection,
240 SourceConnection, SourceDesc, SourceEnvelope, SourceExport, SourceExportDataConfig,
241 SourceExportDetails,
242 };
243 use timely::progress::Antichain;
244
245 use super::*;
246
247 fn history() -> CommandHistory<u64> {
248 let registry = MetricsRegistry::new();
249 let controller_metrics = ControllerMetrics::new(®istry);
250 let metrics = StorageControllerMetrics::new(®istry, controller_metrics)
251 .for_instance(StorageInstanceId::system(0).expect("0 is a valid ID"))
252 .for_history();
253
254 CommandHistory::new(metrics)
255 }
256
257 fn ingestion_description<S: Into<Vec<u64>>>(
258 ingestion_id: u64,
259 subsource_ids: S,
260 remap_collection_id: u64,
261 ) -> IngestionDescription<CollectionMetadata, InlinedConnection> {
262 let export_ids = [ingestion_id, remap_collection_id]
263 .into_iter()
264 .chain(subsource_ids.into());
265 let source_exports = export_ids
266 .map(|id| {
267 let export = SourceExport {
268 storage_metadata: CollectionMetadata {
269 persist_location: PersistLocation {
270 blob_uri: SensitiveUrl::from_str("mem://").expect("invalid URL"),
271 consensus_uri: SensitiveUrl::from_str("mem://").expect("invalid URL"),
272 },
273 remap_shard: Default::default(),
274 data_shard: Default::default(),
275 relation_desc: RelationDesc::new(
276 RelationType {
277 column_types: Default::default(),
278 keys: Default::default(),
279 },
280 Vec::<String>::new(),
281 ),
282 txns_shard: Default::default(),
283 },
284 details: SourceExportDetails::LoadGenerator(LoadGeneratorSourceExportDetails {
285 output: LoadGeneratorOutput::Default,
286 }),
287 data_config: SourceExportDataConfig {
288 encoding: Default::default(),
289 envelope: SourceEnvelope::CdcV2,
290 },
291 };
292 (GlobalId::User(id), export)
293 })
294 .collect();
295
296 let connection = GenericSourceConnection::LoadGenerator(LoadGeneratorSourceConnection {
297 load_generator: LoadGenerator::Auction,
298 tick_micros: Default::default(),
299 as_of: Default::default(),
300 up_to: Default::default(),
301 });
302 let primary_export_details = connection.primary_export_details();
303
304 IngestionDescription {
305 desc: SourceDesc {
306 connection,
307 primary_export: SourceExportDataConfig {
308 encoding: Default::default(),
309 envelope: SourceEnvelope::CdcV2,
310 },
311 primary_export_details,
312 timestamp_interval: Default::default(),
313 },
314 ingestion_metadata: CollectionMetadata {
315 persist_location: PersistLocation {
316 blob_uri: SensitiveUrl::from_str("mem://").expect("invalid URL"),
317 consensus_uri: SensitiveUrl::from_str("mem://").expect("invalid URL"),
318 },
319 remap_shard: Default::default(),
320 data_shard: Default::default(),
321 relation_desc: RelationDesc::new(
322 RelationType {
323 column_types: Default::default(),
324 keys: Default::default(),
325 },
326 Vec::<String>::new(),
327 ),
328 txns_shard: Default::default(),
329 },
330 source_exports,
331 instance_id: StorageInstanceId::system(0).expect("0 is a valid ID"),
332 remap_collection_id: GlobalId::User(remap_collection_id),
333 }
334 }
335
336 fn sink_description() -> StorageSinkDesc<CollectionMetadata, u64> {
337 StorageSinkDesc {
338 from: GlobalId::System(1),
339 from_desc: RelationDesc::new(
340 RelationType {
341 column_types: Default::default(),
342 keys: Default::default(),
343 },
344 Vec::<String>::new(),
345 ),
346 connection: StorageSinkConnection::Kafka(KafkaSinkConnection {
347 connection_id: CatalogItemId::System(2),
348 connection: KafkaConnection {
349 brokers: Default::default(),
350 default_tunnel: Tunnel::Direct,
351 progress_topic: Default::default(),
352 progress_topic_options: Default::default(),
353 options: Default::default(),
354 tls: Default::default(),
355 sasl: Default::default(),
356 },
357 format: KafkaSinkFormat {
358 key_format: Default::default(),
359 value_format: KafkaSinkFormatType::Text,
360 },
361 relation_key_indices: Default::default(),
362 key_desc_and_indices: Default::default(),
363 headers_index: Default::default(),
364 value_desc: RelationDesc::new(
365 RelationType {
366 column_types: Default::default(),
367 keys: Default::default(),
368 },
369 Vec::<String>::new(),
370 ),
371 partition_by: Default::default(),
372 topic: Default::default(),
373 topic_options: Default::default(),
374 compression_type: KafkaSinkCompressionType::None,
375 progress_group_id: KafkaIdStyle::Legacy,
376 transactional_id: KafkaIdStyle::Legacy,
377 topic_metadata_refresh_interval: Default::default(),
378 }),
379 with_snapshot: Default::default(),
380 version: Default::default(),
381 envelope: SinkEnvelope::Upsert,
382 as_of: Antichain::from_elem(0),
383 from_storage_metadata: CollectionMetadata {
384 persist_location: PersistLocation {
385 blob_uri: SensitiveUrl::from_str("mem://").expect("invalid URL"),
386 consensus_uri: SensitiveUrl::from_str("mem://").expect("invalid URL"),
387 },
388 remap_shard: Default::default(),
389 data_shard: Default::default(),
390 relation_desc: RelationDesc::new(
391 RelationType {
392 column_types: Default::default(),
393 keys: Default::default(),
394 },
395 Vec::<String>::new(),
396 ),
397 txns_shard: Default::default(),
398 },
399 to_storage_metadata: CollectionMetadata {
400 persist_location: PersistLocation {
401 blob_uri: SensitiveUrl::from_str("mem://").expect("invalid URL"),
402 consensus_uri: SensitiveUrl::from_str("mem://").expect("invalid URL"),
403 },
404 remap_shard: Default::default(),
405 data_shard: Default::default(),
406 relation_desc: RelationDesc::new(
407 RelationType {
408 column_types: Default::default(),
409 keys: Default::default(),
410 },
411 Vec::<String>::new(),
412 ),
413 txns_shard: Default::default(),
414 },
415 }
416 }
417
418 #[mz_ore::test]
419 fn reduce_drops_dropped_ingestion() {
420 let mut history = history();
421
422 let commands = [
423 StorageCommand::RunIngestion(Box::new(RunIngestionCommand {
424 id: GlobalId::User(1),
425 description: ingestion_description(1, [2], 3),
426 })),
427 StorageCommand::AllowCompaction(GlobalId::User(1), Antichain::new()),
428 StorageCommand::AllowCompaction(GlobalId::User(2), Antichain::new()),
429 StorageCommand::AllowCompaction(GlobalId::User(3), Antichain::new()),
430 ];
431
432 for cmd in commands {
433 history.push(cmd);
434 }
435
436 history.reduce();
437
438 let commands_after: Vec<_> = history.iter().collect();
439 assert!(commands_after.is_empty(), "{:?}", commands_after);
440 }
441
442 #[mz_ore::test]
443 fn reduce_keeps_compacted_ingestion() {
444 let mut history = history();
445
446 let commands = [
447 StorageCommand::RunIngestion(Box::new(RunIngestionCommand {
448 id: GlobalId::User(1),
449 description: ingestion_description(1, [2], 3),
450 })),
451 StorageCommand::AllowCompaction(GlobalId::User(1), Antichain::from_elem(1)),
452 StorageCommand::AllowCompaction(GlobalId::User(2), Antichain::from_elem(2)),
453 StorageCommand::AllowCompaction(GlobalId::User(3), Antichain::from_elem(3)),
454 ];
455
456 for cmd in commands.clone() {
457 history.push(cmd);
458 }
459
460 history.reduce();
461
462 let commands_after: Vec<_> = history.iter().cloned().collect();
463 assert_eq!(commands_after, commands);
464 }
465
466 #[mz_ore::test]
467 fn reduce_keeps_partially_dropped_ingestion() {
468 let mut history = history();
469
470 let commands = [
471 StorageCommand::RunIngestion(Box::new(RunIngestionCommand {
472 id: GlobalId::User(1),
473 description: ingestion_description(1, [2], 3),
474 })),
475 StorageCommand::AllowCompaction(GlobalId::User(2), Antichain::new()),
476 ];
477
478 for cmd in commands.clone() {
479 history.push(cmd);
480 }
481
482 history.reduce();
483
484 let commands_after: Vec<_> = history.iter().cloned().collect();
485 assert_eq!(commands_after, commands);
486 }
487
488 #[mz_ore::test]
489 fn reduce_drops_dropped_sink() {
490 let mut history = history();
491
492 let commands = [
493 StorageCommand::RunSink(Box::new(RunSinkCommand {
494 id: GlobalId::User(1),
495 description: sink_description(),
496 })),
497 StorageCommand::AllowCompaction(GlobalId::User(1), Antichain::new()),
498 ];
499
500 for cmd in commands {
501 history.push(cmd);
502 }
503
504 history.reduce();
505
506 let commands_after: Vec<_> = history.iter().collect();
507 assert!(commands_after.is_empty(), "{:?}", commands_after);
508 }
509
510 #[mz_ore::test]
511 fn reduce_keeps_compacted_sink() {
512 let mut history = history();
513
514 let sink_desc = sink_description();
515 let commands = [
516 StorageCommand::RunSink(Box::new(RunSinkCommand {
517 id: GlobalId::User(1),
518 description: sink_desc.clone(),
519 })),
520 StorageCommand::AllowCompaction(GlobalId::User(1), Antichain::from_elem(42)),
521 ];
522
523 for cmd in commands {
524 history.push(cmd);
525 }
526
527 history.reduce();
528
529 let commands_after: Vec<_> = history.iter().cloned().collect();
530
531 let expected_commands = [StorageCommand::RunSink(Box::new(RunSinkCommand {
532 id: GlobalId::User(1),
533 description: sink_desc,
534 }))];
535
536 assert_eq!(commands_after, expected_commands);
537 }
538
539 #[mz_ore::test]
540 fn reduce_drops_stray_compactions() {
541 let mut history = history();
542
543 let commands = [
544 StorageCommand::AllowCompaction(GlobalId::User(1), Antichain::new()),
545 StorageCommand::AllowCompaction(GlobalId::User(2), Antichain::from_elem(1)),
546 StorageCommand::AllowCompaction(GlobalId::User(2), Antichain::from_elem(2)),
547 ];
548
549 for cmd in commands {
550 history.push(cmd);
551 }
552
553 history.reduce();
554
555 let commands_after: Vec<_> = history.iter().collect();
556 assert!(commands_after.is_empty(), "{:?}", commands_after);
557 }
558}