1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
// Copyright Materialize, Inc. and contributors. All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use anyhow::anyhow;
use rdkafka::admin::{NewTopic, TopicReplication};

use crate::action::{ControlFlow, State};
use crate::parser::BuiltinCommand;

pub async fn run_create_topic(
    mut cmd: BuiltinCommand,
    state: &mut State,
) -> Result<ControlFlow, anyhow::Error> {
    let topic_prefix = format!("testdrive-{}", cmd.args.string("topic")?);
    let partitions = cmd.args.opt_parse("partitions")?;

    let replication_factor = cmd.args.opt_parse("replication-factor")?.unwrap_or(1);
    let compression = cmd
        .args
        .opt_string("compression")
        .unwrap_or_else(|| "producer".into());
    let compaction = cmd.args.opt_parse("compaction")?.unwrap_or(false);
    cmd.args.done()?;

    // NOTE(benesch): it is critical that we invent a new topic name on
    // every testdrive run. We previously tried to delete and recreate the
    // topic with a fixed name, but ran into serious race conditions in
    // Kafka that would regularly cause CI to hang. Details follow.
    //
    // Kafka topic creation and deletion is documented to be asynchronous.
    // That seems fine at first, as the Kafka admin API exposes an
    // `operation_timeout` option that would appear to allow you to opt into
    // a synchronous request by setting a massive timeout. As it turns out,
    // this parameter doesn't actually do anything [0].
    //
    // So, fine, we can implement our own polling for topic creation and
    // deletion, since the Kafka API exposes the list of topics currently
    // known to Kafka. This polling works well enough for topic creation.
    // After issuing a CreateTopics request, we poll the metadata list until
    // the topic appears with the requested number of partitions. (Yes,
    // sometimes the topic will appear with the wrong number of partitions
    // at first, and later sort itself out.)
    //
    // For deletion, though, there's another problem. Not only is deletion
    // of the topic metadata asynchronous, but deletion of the
    // topic data is *also* asynchronous, and independently so. As best as
    // I can tell, the following sequence of events is not only plausible,
    // but likely:
    //
    //     1. Client issues DeleteTopics(FOO).
    //     2. Kafka launches garbage collection of topic FOO.
    //     3. Kafka deletes metadata for topic FOO.
    //     4. Client polls and discovers topic FOO's metadata is gone.
    //     5. Client issues CreateTopics(FOO).
    //     6. Client writes some data to topic FOO.
    //     7. Kafka deletes data for topic FOO, including the data that was
    //        written to the second incarnation of topic FOO.
    //     8. Client attempts to read data written to topic FOO and waits
    //        forever, since there is no longer any data in the topic.
    //        Client becomes very confused and sad.
    //
    // There doesn't appear to be any sane way to poll to determine whether
    // the data has been deleted, since Kafka doesn't expose how many
    // messages are in a topic, and it's therefore impossible to distinguish
    // an empty topic from a deleted topic. And that's not even accounting
    // for the behavior when auto.create.topics.enable is true, which it
    // is by default, where asking about a topic that doesn't exist will
    // automatically create it.
    //
    // All this to say: please think twice before changing the topic naming
    // strategy.
    //
    // [0]: https://github.com/confluentinc/confluent-kafka-python/issues/524#issuecomment-456783176
    let topic_name = format!("{}-{}", topic_prefix, state.seed);
    let partitions = partitions.unwrap_or(state.kafka_default_partitions);

    println!(
        "Creating Kafka topic {} with partition count of {}",
        topic_name, partitions
    );

    let new_topic = NewTopic::new(
        &topic_name,
        i32::try_from(partitions)
            .map_err(|_| anyhow!("partition count must fit in an i32: {}", partitions))?,
        TopicReplication::Fixed(replication_factor),
    )
    // Disabling retention is very important! Our testdrive tests
    // use hardcoded timestamps that are immediately eligible for
    // deletion by Kafka's garbage collector. E.g., the timestamp
    // "1" is interpreted as January 1, 1970 00:00:01, which is
    // breaches the default 7-day retention policy.
    .set("retention.ms", "-1")
    .set("compression.type", &compression);

    // aggressive compaction, when it is enabled
    let new_topic = if compaction {
        new_topic
            .set("cleanup.policy", "compact")
            // eagerly roll over segments
            .set("segment.ms", "100")
            // make sure we get compaction even with low throughput
            .set("min.cleanable.dirty.ratio", "0.01")
            .set("min.compaction.lag.ms", "100")
            .set("delete.retention.ms", "100")
    } else {
        new_topic
    };

    mz_kafka_util::admin::ensure_topic(&state.kafka_admin, &state.kafka_admin_opts, &new_topic)
        .await?;
    state.kafka_topics.insert(topic_name, partitions);
    Ok(ControlFlow::Continue)
}