mz_testdrive/action/kafka/create_topic.rs
1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use crate::action::{ControlFlow, State};
11use crate::parser::BuiltinCommand;
12use anyhow::anyhow;
13use mz_kafka_util::admin::EnsureTopicConfig;
14use rdkafka::admin::{NewTopic, TopicReplication};
15
16pub async fn run_create_topic(
17 mut cmd: BuiltinCommand,
18 state: &mut State,
19) -> Result<ControlFlow, anyhow::Error> {
20 let topic_prefix = format!("testdrive-{}", cmd.args.string("topic")?);
21 let partitions = cmd.args.opt_parse("partitions")?;
22
23 let replication_factor = cmd.args.opt_parse("replication-factor")?.unwrap_or(1);
24 let compression = cmd
25 .args
26 .opt_string("compression")
27 .unwrap_or_else(|| "producer".into());
28 let compaction = cmd.args.opt_parse("compaction")?.unwrap_or(false);
29 cmd.args.done()?;
30
31 // NOTE(benesch): it is critical that we invent a new topic name on
32 // every testdrive run. We previously tried to delete and recreate the
33 // topic with a fixed name, but ran into serious race conditions in
34 // Kafka that would regularly cause CI to hang. Details follow.
35 //
36 // Kafka topic creation and deletion is documented to be asynchronous.
37 // That seems fine at first, as the Kafka admin API exposes an
38 // `operation_timeout` option that would appear to allow you to opt into
39 // a synchronous request by setting a massive timeout. As it turns out,
40 // this parameter doesn't actually do anything [0].
41 //
42 // So, fine, we can implement our own polling for topic creation and
43 // deletion, since the Kafka API exposes the list of topics currently
44 // known to Kafka. This polling works well enough for topic creation.
45 // After issuing a CreateTopics request, we poll the metadata list until
46 // the topic appears with the requested number of partitions. (Yes,
47 // sometimes the topic will appear with the wrong number of partitions
48 // at first, and later sort itself out.)
49 //
50 // For deletion, though, there's another problem. Not only is deletion
51 // of the topic metadata asynchronous, but deletion of the
52 // topic data is *also* asynchronous, and independently so. As best as
53 // I can tell, the following sequence of events is not only plausible,
54 // but likely:
55 //
56 // 1. Client issues DeleteTopics(FOO).
57 // 2. Kafka launches garbage collection of topic FOO.
58 // 3. Kafka deletes metadata for topic FOO.
59 // 4. Client polls and discovers topic FOO's metadata is gone.
60 // 5. Client issues CreateTopics(FOO).
61 // 6. Client writes some data to topic FOO.
62 // 7. Kafka deletes data for topic FOO, including the data that was
63 // written to the second incarnation of topic FOO.
64 // 8. Client attempts to read data written to topic FOO and waits
65 // forever, since there is no longer any data in the topic.
66 // Client becomes very confused and sad.
67 //
68 // There doesn't appear to be any sane way to poll to determine whether
69 // the data has been deleted, since Kafka doesn't expose how many
70 // messages are in a topic, and it's therefore impossible to distinguish
71 // an empty topic from a deleted topic. And that's not even accounting
72 // for the behavior when auto.create.topics.enable is true, which it
73 // is by default, where asking about a topic that doesn't exist will
74 // automatically create it.
75 //
76 // All this to say: please think twice before changing the topic naming
77 // strategy.
78 //
79 // [0]: https://github.com/confluentinc/confluent-kafka-python/issues/524#issuecomment-456783176
80 let topic_name = format!("{}-{}", topic_prefix, state.seed);
81 let partitions = partitions.unwrap_or(state.kafka_default_partitions);
82
83 println!(
84 "Creating Kafka topic {} with partition count of {}",
85 topic_name, partitions
86 );
87
88 let new_topic = NewTopic::new(
89 &topic_name,
90 i32::try_from(partitions)
91 .map_err(|_| anyhow!("partition count must fit in an i32: {}", partitions))?,
92 TopicReplication::Fixed(replication_factor),
93 )
94 // Disabling retention is very important! Our testdrive tests
95 // use hardcoded timestamps that are immediately eligible for
96 // deletion by Kafka's garbage collector. E.g., the timestamp
97 // "1" is interpreted as January 1, 1970 00:00:01, which is
98 // breaches the default 7-day retention policy.
99 .set("retention.ms", "-1")
100 .set("compression.type", &compression);
101
102 // aggressive compaction, when it is enabled
103 let new_topic = if compaction {
104 new_topic
105 .set("cleanup.policy", "compact")
106 // eagerly roll over segments
107 .set("segment.ms", "100")
108 // make sure we get compaction even with low throughput
109 .set("min.cleanable.dirty.ratio", "0.01")
110 .set("min.compaction.lag.ms", "100")
111 .set("delete.retention.ms", "100")
112 } else {
113 new_topic
114 };
115
116 mz_kafka_util::admin::ensure_topic(
117 &state.kafka_admin,
118 &state.kafka_admin_opts,
119 &new_topic,
120 EnsureTopicConfig::Check,
121 )
122 .await?;
123 state.kafka_topics.insert(topic_name, partitions);
124 Ok(ControlFlow::Continue)
125}