1use std::collections::BTreeMap;
13
14use mz_dyncfg::ConfigValHandle;
15use mz_ore::cast::CastFrom;
16use mz_storage_client::client::StorageCommand;
17use mz_storage_client::metrics::HistoryMetrics;
18use mz_storage_types::parameters::StorageParameters;
19use timely::PartialOrder;
20use timely::order::TotalOrder;
21
22#[derive(Debug)]
24pub(crate) struct CommandHistory<T> {
25 reduced_count: usize,
27 commands: Vec<StorageCommand<T>>,
33 metrics: HistoryMetrics,
35 enable_snapshot_frontier: ConfigValHandle<bool>,
37}
38
39impl<T: timely::progress::Timestamp + TotalOrder> CommandHistory<T> {
40 pub fn new(metrics: HistoryMetrics, enable_snapshot_frontier: ConfigValHandle<bool>) -> Self {
42 metrics.reset();
43
44 Self {
45 reduced_count: 0,
46 commands: Vec::new(),
47 metrics,
48 enable_snapshot_frontier,
49 }
50 }
51
52 pub fn iter(&self) -> impl DoubleEndedIterator<Item = &StorageCommand<T>> {
54 self.commands.iter()
55 }
56
57 pub fn push(&mut self, command: StorageCommand<T>) {
61 use StorageCommand::*;
62
63 self.commands.push(command);
64
65 if self.commands.len() > 2 * self.reduced_count {
66 self.reduce();
67 } else {
68 let command = self.commands.last().expect("pushed above");
71 let metrics = &self.metrics;
72 match command {
73 CreateTimely { .. } => metrics.create_timely_count.inc(),
74 InitializationComplete => metrics.initialization_complete_count.inc(),
75 AllowWrites => metrics.allow_writes_count.inc(),
76 UpdateConfiguration(_) => metrics.update_configuration_count.inc(),
77 RunIngestion(_) => metrics.run_ingestions_count.inc(),
78 RunSink(_) => metrics.run_sinks_count.inc(),
79 AllowCompaction(_, _) => metrics.allow_compaction_count.inc(),
80 RunOneshotIngestion(_) | CancelOneshotIngestion { .. } => {
81 }
83 }
84 }
85 }
86
87 pub fn reduce(&mut self) {
89 use StorageCommand::*;
90
91 let mut create_timely_command = None;
92 let mut initialization_complete = false;
93 let mut allow_writes = false;
94 let mut final_compactions = BTreeMap::new();
95
96 let mut final_ingestions = BTreeMap::new();
100 let mut final_sinks = BTreeMap::new();
101 let mut final_oneshot_ingestions = BTreeMap::new();
102
103 let mut final_configuration = StorageParameters::default();
110
111 for command in self.commands.drain(..) {
112 match command {
113 cmd @ CreateTimely { .. } => create_timely_command = Some(cmd),
114 InitializationComplete => initialization_complete = true,
115 AllowWrites => allow_writes = true,
116 UpdateConfiguration(params) => final_configuration.update(*params),
117 RunIngestion(ingestion) => {
118 final_ingestions.insert(ingestion.id, ingestion);
119 }
120 RunSink(sink) => {
121 final_sinks.insert(sink.id, sink);
122 }
123 AllowCompaction(id, since) => {
124 final_compactions.insert(id, since);
125 }
126 RunOneshotIngestion(oneshot) => {
127 final_oneshot_ingestions.insert(oneshot.ingestion_id, oneshot);
128 }
129 CancelOneshotIngestion(uuid) => {
130 final_oneshot_ingestions.remove(&uuid);
131 }
132 }
133 }
134
135 let mut run_ingestions = Vec::new();
136 let mut run_sinks = Vec::new();
137 let mut allow_compaction = Vec::new();
138
139 for ingestion in final_ingestions.into_values() {
141 if let Some(frontier) = final_compactions.get(&ingestion.id) {
142 if frontier.is_empty() {
143 continue;
144 }
145 }
146
147 let compactions = ingestion
148 .description
149 .collection_ids()
150 .filter_map(|id| final_compactions.remove(&id).map(|f| (id, f)));
151 allow_compaction.extend(compactions);
152
153 run_ingestions.push(ingestion);
154 }
155
156 for mut sink in final_sinks.into_values() {
158 if let Some(frontier) = final_compactions.remove(&sink.id) {
159 if frontier.is_empty() {
160 continue;
161 }
162 if PartialOrder::less_than(&sink.description.as_of, &frontier) {
167 sink.description.as_of = frontier;
168 if self.enable_snapshot_frontier.get() {
169 sink.description.with_snapshot = false;
170 }
171 }
172 }
173
174 run_sinks.push(sink);
175 }
176
177 let count = u64::from(create_timely_command.is_some());
185 self.metrics.create_timely_count.set(count);
186 if let Some(create_timely_command) = create_timely_command {
187 self.commands.push(create_timely_command);
188 }
189
190 let count = u64::from(!final_configuration.all_unset());
191 self.metrics.update_configuration_count.set(count);
192 if !final_configuration.all_unset() {
193 let config = Box::new(final_configuration);
194 self.commands
195 .push(StorageCommand::UpdateConfiguration(config));
196 }
197
198 let count = u64::cast_from(run_ingestions.len());
199 self.metrics.run_ingestions_count.set(count);
200 for ingestion in run_ingestions {
201 self.commands.push(StorageCommand::RunIngestion(ingestion));
202 }
203
204 let count = u64::cast_from(run_sinks.len());
205 self.metrics.run_ingestions_count.set(count);
206 for sink in run_sinks {
207 self.commands.push(StorageCommand::RunSink(sink));
208 }
209
210 for ingestion in final_oneshot_ingestions.into_values() {
215 self.commands
216 .push(StorageCommand::RunOneshotIngestion(ingestion));
217 }
218
219 let count = u64::cast_from(allow_compaction.len());
220 self.metrics.allow_compaction_count.set(count);
221 for (id, since) in allow_compaction {
222 self.commands
223 .push(StorageCommand::AllowCompaction(id, since));
224 }
225
226 let count = u64::from(initialization_complete);
227 self.metrics.initialization_complete_count.set(count);
228 if initialization_complete {
229 self.commands.push(StorageCommand::InitializationComplete);
230 }
231
232 let count = u64::from(allow_writes);
233 self.metrics.allow_writes_count.set(count);
234 if allow_writes {
235 self.commands.push(StorageCommand::AllowWrites);
236 }
237
238 self.reduced_count = self.commands.len();
239 }
240}
241
242#[cfg(test)]
243mod tests {
244 use std::str::FromStr;
245
246 use mz_cluster_client::metrics::ControllerMetrics;
247 use mz_ore::metrics::MetricsRegistry;
248 use mz_ore::url::SensitiveUrl;
249 use mz_persist_types::PersistLocation;
250 use mz_repr::{CatalogItemId, GlobalId, RelationDesc, RelationType};
251 use mz_storage_client::client::{RunIngestionCommand, RunSinkCommand};
252 use mz_storage_client::metrics::StorageControllerMetrics;
253 use mz_storage_types::connections::inline::InlinedConnection;
254 use mz_storage_types::connections::{KafkaConnection, Tunnel};
255 use mz_storage_types::controller::CollectionMetadata;
256 use mz_storage_types::instances::StorageInstanceId;
257 use mz_storage_types::sinks::{
258 KafkaIdStyle, KafkaSinkCompressionType, KafkaSinkConnection, KafkaSinkFormat,
259 KafkaSinkFormatType, SinkEnvelope, StorageSinkConnection, StorageSinkDesc,
260 };
261 use mz_storage_types::sources::load_generator::{
262 LoadGenerator, LoadGeneratorOutput, LoadGeneratorSourceExportDetails,
263 };
264 use mz_storage_types::sources::{
265 GenericSourceConnection, IngestionDescription, LoadGeneratorSourceConnection,
266 SourceConnection, SourceDesc, SourceEnvelope, SourceExport, SourceExportDataConfig,
267 SourceExportDetails,
268 };
269 use timely::progress::Antichain;
270
271 use super::*;
272
273 fn history() -> CommandHistory<u64> {
274 let registry = MetricsRegistry::new();
275 let controller_metrics = ControllerMetrics::new(®istry);
276 let metrics = StorageControllerMetrics::new(®istry, controller_metrics)
277 .for_instance(StorageInstanceId::system(0).expect("0 is a valid ID"))
278 .for_history();
279
280 CommandHistory::new(metrics, ConfigValHandle::disconnected(true))
281 }
282
283 fn ingestion_description<S: Into<Vec<u64>>>(
284 ingestion_id: u64,
285 subsource_ids: S,
286 remap_collection_id: u64,
287 ) -> IngestionDescription<CollectionMetadata, InlinedConnection> {
288 let export_ids = [ingestion_id, remap_collection_id]
289 .into_iter()
290 .chain(subsource_ids.into());
291 let source_exports = export_ids
292 .map(|id| {
293 let export = SourceExport {
294 storage_metadata: CollectionMetadata {
295 persist_location: PersistLocation {
296 blob_uri: SensitiveUrl::from_str("mem://").expect("invalid URL"),
297 consensus_uri: SensitiveUrl::from_str("mem://").expect("invalid URL"),
298 },
299 remap_shard: Default::default(),
300 data_shard: Default::default(),
301 relation_desc: RelationDesc::new(
302 RelationType {
303 column_types: Default::default(),
304 keys: Default::default(),
305 },
306 Vec::<String>::new(),
307 ),
308 txns_shard: Default::default(),
309 },
310 details: SourceExportDetails::LoadGenerator(LoadGeneratorSourceExportDetails {
311 output: LoadGeneratorOutput::Default,
312 }),
313 data_config: SourceExportDataConfig {
314 encoding: Default::default(),
315 envelope: SourceEnvelope::CdcV2,
316 },
317 };
318 (GlobalId::User(id), export)
319 })
320 .collect();
321
322 let connection = GenericSourceConnection::LoadGenerator(LoadGeneratorSourceConnection {
323 load_generator: LoadGenerator::Auction,
324 tick_micros: Default::default(),
325 as_of: Default::default(),
326 up_to: Default::default(),
327 });
328 let primary_export_details = connection.primary_export_details();
329
330 IngestionDescription {
331 desc: SourceDesc {
332 connection,
333 primary_export: SourceExportDataConfig {
334 encoding: Default::default(),
335 envelope: SourceEnvelope::CdcV2,
336 },
337 primary_export_details,
338 timestamp_interval: Default::default(),
339 },
340 ingestion_metadata: CollectionMetadata {
341 persist_location: PersistLocation {
342 blob_uri: SensitiveUrl::from_str("mem://").expect("invalid URL"),
343 consensus_uri: SensitiveUrl::from_str("mem://").expect("invalid URL"),
344 },
345 remap_shard: Default::default(),
346 data_shard: Default::default(),
347 relation_desc: RelationDesc::new(
348 RelationType {
349 column_types: Default::default(),
350 keys: Default::default(),
351 },
352 Vec::<String>::new(),
353 ),
354 txns_shard: Default::default(),
355 },
356 source_exports,
357 instance_id: StorageInstanceId::system(0).expect("0 is a valid ID"),
358 remap_collection_id: GlobalId::User(remap_collection_id),
359 }
360 }
361
362 fn sink_description() -> StorageSinkDesc<CollectionMetadata, u64> {
363 StorageSinkDesc {
364 from: GlobalId::System(1),
365 from_desc: RelationDesc::new(
366 RelationType {
367 column_types: Default::default(),
368 keys: Default::default(),
369 },
370 Vec::<String>::new(),
371 ),
372 connection: StorageSinkConnection::Kafka(KafkaSinkConnection {
373 connection_id: CatalogItemId::System(2),
374 connection: KafkaConnection {
375 brokers: Default::default(),
376 default_tunnel: Tunnel::Direct,
377 progress_topic: Default::default(),
378 progress_topic_options: Default::default(),
379 options: Default::default(),
380 tls: Default::default(),
381 sasl: Default::default(),
382 },
383 format: KafkaSinkFormat {
384 key_format: Default::default(),
385 value_format: KafkaSinkFormatType::Text,
386 },
387 relation_key_indices: Default::default(),
388 key_desc_and_indices: Default::default(),
389 headers_index: Default::default(),
390 value_desc: RelationDesc::new(
391 RelationType {
392 column_types: Default::default(),
393 keys: Default::default(),
394 },
395 Vec::<String>::new(),
396 ),
397 partition_by: Default::default(),
398 topic: Default::default(),
399 topic_options: Default::default(),
400 compression_type: KafkaSinkCompressionType::None,
401 progress_group_id: KafkaIdStyle::Legacy,
402 transactional_id: KafkaIdStyle::Legacy,
403 topic_metadata_refresh_interval: Default::default(),
404 }),
405 with_snapshot: Default::default(),
406 version: Default::default(),
407 envelope: SinkEnvelope::Upsert,
408 as_of: Antichain::from_elem(0),
409 from_storage_metadata: CollectionMetadata {
410 persist_location: PersistLocation {
411 blob_uri: SensitiveUrl::from_str("mem://").expect("invalid URL"),
412 consensus_uri: SensitiveUrl::from_str("mem://").expect("invalid URL"),
413 },
414 remap_shard: Default::default(),
415 data_shard: Default::default(),
416 relation_desc: RelationDesc::new(
417 RelationType {
418 column_types: Default::default(),
419 keys: Default::default(),
420 },
421 Vec::<String>::new(),
422 ),
423 txns_shard: Default::default(),
424 },
425 to_storage_metadata: CollectionMetadata {
426 persist_location: PersistLocation {
427 blob_uri: SensitiveUrl::from_str("mem://").expect("invalid URL"),
428 consensus_uri: SensitiveUrl::from_str("mem://").expect("invalid URL"),
429 },
430 remap_shard: Default::default(),
431 data_shard: Default::default(),
432 relation_desc: RelationDesc::new(
433 RelationType {
434 column_types: Default::default(),
435 keys: Default::default(),
436 },
437 Vec::<String>::new(),
438 ),
439 txns_shard: Default::default(),
440 },
441 }
442 }
443
444 #[mz_ore::test]
445 fn reduce_drops_dropped_ingestion() {
446 let mut history = history();
447
448 let commands = [
449 StorageCommand::RunIngestion(Box::new(RunIngestionCommand {
450 id: GlobalId::User(1),
451 description: ingestion_description(1, [2], 3),
452 })),
453 StorageCommand::AllowCompaction(GlobalId::User(1), Antichain::new()),
454 StorageCommand::AllowCompaction(GlobalId::User(2), Antichain::new()),
455 StorageCommand::AllowCompaction(GlobalId::User(3), Antichain::new()),
456 ];
457
458 for cmd in commands {
459 history.push(cmd);
460 }
461
462 history.reduce();
463
464 let commands_after: Vec<_> = history.iter().collect();
465 assert!(commands_after.is_empty(), "{:?}", commands_after);
466 }
467
468 #[mz_ore::test]
469 fn reduce_keeps_compacted_ingestion() {
470 let mut history = history();
471
472 let commands = [
473 StorageCommand::RunIngestion(Box::new(RunIngestionCommand {
474 id: GlobalId::User(1),
475 description: ingestion_description(1, [2], 3),
476 })),
477 StorageCommand::AllowCompaction(GlobalId::User(1), Antichain::from_elem(1)),
478 StorageCommand::AllowCompaction(GlobalId::User(2), Antichain::from_elem(2)),
479 StorageCommand::AllowCompaction(GlobalId::User(3), Antichain::from_elem(3)),
480 ];
481
482 for cmd in commands.clone() {
483 history.push(cmd);
484 }
485
486 history.reduce();
487
488 let commands_after: Vec<_> = history.iter().cloned().collect();
489 assert_eq!(commands_after, commands);
490 }
491
492 #[mz_ore::test]
493 fn reduce_keeps_partially_dropped_ingestion() {
494 let mut history = history();
495
496 let commands = [
497 StorageCommand::RunIngestion(Box::new(RunIngestionCommand {
498 id: GlobalId::User(1),
499 description: ingestion_description(1, [2], 3),
500 })),
501 StorageCommand::AllowCompaction(GlobalId::User(2), Antichain::new()),
502 ];
503
504 for cmd in commands.clone() {
505 history.push(cmd);
506 }
507
508 history.reduce();
509
510 let commands_after: Vec<_> = history.iter().cloned().collect();
511 assert_eq!(commands_after, commands);
512 }
513
514 #[mz_ore::test]
515 fn reduce_drops_dropped_sink() {
516 let mut history = history();
517
518 let commands = [
519 StorageCommand::RunSink(Box::new(RunSinkCommand {
520 id: GlobalId::User(1),
521 description: sink_description(),
522 })),
523 StorageCommand::AllowCompaction(GlobalId::User(1), Antichain::new()),
524 ];
525
526 for cmd in commands {
527 history.push(cmd);
528 }
529
530 history.reduce();
531
532 let commands_after: Vec<_> = history.iter().collect();
533 assert!(commands_after.is_empty(), "{:?}", commands_after);
534 }
535
536 #[mz_ore::test]
537 fn reduce_keeps_compacted_sink() {
538 let mut history = history();
539
540 let sink_desc = sink_description();
541 let commands = [
542 StorageCommand::RunSink(Box::new(RunSinkCommand {
543 id: GlobalId::User(1),
544 description: sink_desc.clone(),
545 })),
546 StorageCommand::AllowCompaction(GlobalId::User(1), Antichain::from_elem(42)),
547 ];
548
549 for cmd in commands {
550 history.push(cmd);
551 }
552
553 history.reduce();
554
555 let commands_after: Vec<_> = history.iter().cloned().collect();
556
557 let expected_sink_desc = StorageSinkDesc {
558 as_of: Antichain::from_elem(42),
559 ..sink_desc
560 };
561 let expected_commands = [StorageCommand::RunSink(Box::new(RunSinkCommand {
562 id: GlobalId::User(1),
563 description: expected_sink_desc,
564 }))];
565
566 assert_eq!(commands_after, expected_commands);
567 }
568
569 #[mz_ore::test]
570 fn reduce_drops_stray_compactions() {
571 let mut history = history();
572
573 let commands = [
574 StorageCommand::AllowCompaction(GlobalId::User(1), Antichain::new()),
575 StorageCommand::AllowCompaction(GlobalId::User(2), Antichain::from_elem(1)),
576 StorageCommand::AllowCompaction(GlobalId::User(2), Antichain::from_elem(2)),
577 ];
578
579 for cmd in commands {
580 history.push(cmd);
581 }
582
583 history.reduce();
584
585 let commands_after: Vec<_> = history.iter().collect();
586 assert!(commands_after.is_empty(), "{:?}", commands_after);
587 }
588}