mz_testdrive/action/kafka/
create_topic.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use crate::action::{ControlFlow, State};
11use crate::parser::BuiltinCommand;
12use anyhow::anyhow;
13use mz_kafka_util::admin::EnsureTopicConfig;
14use rdkafka::admin::{NewTopic, TopicReplication};
15
16pub async fn run_create_topic(
17    mut cmd: BuiltinCommand,
18    state: &mut State,
19) -> Result<ControlFlow, anyhow::Error> {
20    let topic_prefix = format!("testdrive-{}", cmd.args.string("topic")?);
21    let partitions = cmd.args.opt_parse("partitions")?;
22
23    let replication_factor = cmd.args.opt_parse("replication-factor")?.unwrap_or(1);
24    let compression = cmd
25        .args
26        .opt_string("compression")
27        .unwrap_or_else(|| "producer".into());
28    let compaction = cmd.args.opt_parse("compaction")?.unwrap_or(false);
29    cmd.args.done()?;
30
31    // NOTE(benesch): it is critical that we invent a new topic name on
32    // every testdrive run. We previously tried to delete and recreate the
33    // topic with a fixed name, but ran into serious race conditions in
34    // Kafka that would regularly cause CI to hang. Details follow.
35    //
36    // Kafka topic creation and deletion is documented to be asynchronous.
37    // That seems fine at first, as the Kafka admin API exposes an
38    // `operation_timeout` option that would appear to allow you to opt into
39    // a synchronous request by setting a massive timeout. As it turns out,
40    // this parameter doesn't actually do anything [0].
41    //
42    // So, fine, we can implement our own polling for topic creation and
43    // deletion, since the Kafka API exposes the list of topics currently
44    // known to Kafka. This polling works well enough for topic creation.
45    // After issuing a CreateTopics request, we poll the metadata list until
46    // the topic appears with the requested number of partitions. (Yes,
47    // sometimes the topic will appear with the wrong number of partitions
48    // at first, and later sort itself out.)
49    //
50    // For deletion, though, there's another problem. Not only is deletion
51    // of the topic metadata asynchronous, but deletion of the
52    // topic data is *also* asynchronous, and independently so. As best as
53    // I can tell, the following sequence of events is not only plausible,
54    // but likely:
55    //
56    //     1. Client issues DeleteTopics(FOO).
57    //     2. Kafka launches garbage collection of topic FOO.
58    //     3. Kafka deletes metadata for topic FOO.
59    //     4. Client polls and discovers topic FOO's metadata is gone.
60    //     5. Client issues CreateTopics(FOO).
61    //     6. Client writes some data to topic FOO.
62    //     7. Kafka deletes data for topic FOO, including the data that was
63    //        written to the second incarnation of topic FOO.
64    //     8. Client attempts to read data written to topic FOO and waits
65    //        forever, since there is no longer any data in the topic.
66    //        Client becomes very confused and sad.
67    //
68    // There doesn't appear to be any sane way to poll to determine whether
69    // the data has been deleted, since Kafka doesn't expose how many
70    // messages are in a topic, and it's therefore impossible to distinguish
71    // an empty topic from a deleted topic. And that's not even accounting
72    // for the behavior when auto.create.topics.enable is true, which it
73    // is by default, where asking about a topic that doesn't exist will
74    // automatically create it.
75    //
76    // All this to say: please think twice before changing the topic naming
77    // strategy.
78    //
79    // [0]: https://github.com/confluentinc/confluent-kafka-python/issues/524#issuecomment-456783176
80    let topic_name = format!("{}-{}", topic_prefix, state.seed);
81    let partitions = partitions.unwrap_or(state.kafka_default_partitions);
82
83    println!(
84        "Creating Kafka topic {} with partition count of {}",
85        topic_name, partitions
86    );
87
88    let new_topic = NewTopic::new(
89        &topic_name,
90        i32::try_from(partitions)
91            .map_err(|_| anyhow!("partition count must fit in an i32: {}", partitions))?,
92        TopicReplication::Fixed(replication_factor),
93    )
94    // Disabling retention is very important! Our testdrive tests
95    // use hardcoded timestamps that are immediately eligible for
96    // deletion by Kafka's garbage collector. E.g., the timestamp
97    // "1" is interpreted as January 1, 1970 00:00:01, which is
98    // breaches the default 7-day retention policy.
99    .set("retention.ms", "-1")
100    .set("compression.type", &compression);
101
102    // aggressive compaction, when it is enabled
103    let new_topic = if compaction {
104        new_topic
105            .set("cleanup.policy", "compact")
106            // eagerly roll over segments
107            .set("segment.ms", "100")
108            // make sure we get compaction even with low throughput
109            .set("min.cleanable.dirty.ratio", "0.01")
110            .set("min.compaction.lag.ms", "100")
111            .set("delete.retention.ms", "100")
112    } else {
113        new_topic
114    };
115
116    mz_kafka_util::admin::ensure_topic(
117        &state.kafka_admin,
118        &state.kafka_admin_opts,
119        &new_topic,
120        EnsureTopicConfig::Check,
121    )
122    .await?;
123    state.kafka_topics.insert(topic_name, partitions);
124    Ok(ControlFlow::Continue)
125}