1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120
// Copyright Materialize, Inc. and contributors. All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.
use anyhow::anyhow;
use rdkafka::admin::{NewTopic, TopicReplication};
use crate::action::{ControlFlow, State};
use crate::parser::BuiltinCommand;
pub async fn run_create_topic(
mut cmd: BuiltinCommand,
state: &mut State,
) -> Result<ControlFlow, anyhow::Error> {
let topic_prefix = format!("testdrive-{}", cmd.args.string("topic")?);
let partitions = cmd.args.opt_parse("partitions")?;
let replication_factor = cmd.args.opt_parse("replication-factor")?.unwrap_or(1);
let compression = cmd
.args
.opt_string("compression")
.unwrap_or_else(|| "producer".into());
let compaction = cmd.args.opt_parse("compaction")?.unwrap_or(false);
cmd.args.done()?;
// NOTE(benesch): it is critical that we invent a new topic name on
// every testdrive run. We previously tried to delete and recreate the
// topic with a fixed name, but ran into serious race conditions in
// Kafka that would regularly cause CI to hang. Details follow.
//
// Kafka topic creation and deletion is documented to be asynchronous.
// That seems fine at first, as the Kafka admin API exposes an
// `operation_timeout` option that would appear to allow you to opt into
// a synchronous request by setting a massive timeout. As it turns out,
// this parameter doesn't actually do anything [0].
//
// So, fine, we can implement our own polling for topic creation and
// deletion, since the Kafka API exposes the list of topics currently
// known to Kafka. This polling works well enough for topic creation.
// After issuing a CreateTopics request, we poll the metadata list until
// the topic appears with the requested number of partitions. (Yes,
// sometimes the topic will appear with the wrong number of partitions
// at first, and later sort itself out.)
//
// For deletion, though, there's another problem. Not only is deletion
// of the topic metadata asynchronous, but deletion of the
// topic data is *also* asynchronous, and independently so. As best as
// I can tell, the following sequence of events is not only plausible,
// but likely:
//
// 1. Client issues DeleteTopics(FOO).
// 2. Kafka launches garbage collection of topic FOO.
// 3. Kafka deletes metadata for topic FOO.
// 4. Client polls and discovers topic FOO's metadata is gone.
// 5. Client issues CreateTopics(FOO).
// 6. Client writes some data to topic FOO.
// 7. Kafka deletes data for topic FOO, including the data that was
// written to the second incarnation of topic FOO.
// 8. Client attempts to read data written to topic FOO and waits
// forever, since there is no longer any data in the topic.
// Client becomes very confused and sad.
//
// There doesn't appear to be any sane way to poll to determine whether
// the data has been deleted, since Kafka doesn't expose how many
// messages are in a topic, and it's therefore impossible to distinguish
// an empty topic from a deleted topic. And that's not even accounting
// for the behavior when auto.create.topics.enable is true, which it
// is by default, where asking about a topic that doesn't exist will
// automatically create it.
//
// All this to say: please think twice before changing the topic naming
// strategy.
//
// [0]: https://github.com/confluentinc/confluent-kafka-python/issues/524#issuecomment-456783176
let topic_name = format!("{}-{}", topic_prefix, state.seed);
let partitions = partitions.unwrap_or(state.kafka_default_partitions);
println!(
"Creating Kafka topic {} with partition count of {}",
topic_name, partitions
);
let new_topic = NewTopic::new(
&topic_name,
i32::try_from(partitions)
.map_err(|_| anyhow!("partition count must fit in an i32: {}", partitions))?,
TopicReplication::Fixed(replication_factor),
)
// Disabling retention is very important! Our testdrive tests
// use hardcoded timestamps that are immediately eligible for
// deletion by Kafka's garbage collector. E.g., the timestamp
// "1" is interpreted as January 1, 1970 00:00:01, which is
// breaches the default 7-day retention policy.
.set("retention.ms", "-1")
.set("compression.type", &compression);
// aggressive compaction, when it is enabled
let new_topic = if compaction {
new_topic
.set("cleanup.policy", "compact")
// eagerly roll over segments
.set("segment.ms", "100")
// make sure we get compaction even with low throughput
.set("min.cleanable.dirty.ratio", "0.01")
.set("min.compaction.lag.ms", "100")
.set("delete.retention.ms", "100")
} else {
new_topic
};
mz_kafka_util::admin::ensure_topic(&state.kafka_admin, &state.kafka_admin_opts, &new_topic)
.await?;
state.kafka_topics.insert(topic_name, partitions);
Ok(ControlFlow::Continue)
}