Skip to content

Kafka (KRaft)

Since v0.24.0

Introduction

The Testcontainers module for KRaft: Apache Kafka Without ZooKeeper.

Adding this module to your project dependencies

Please run the following command to add the Kafka module to your Go dependencies:

go get github.com/testcontainers/testcontainers-go/modules/kafka

Usage example

ctx := context.Background()

kafkaContainer, err := kafka.Run(ctx,
    "confluentinc/confluent-local:7.5.0",
    kafka.WithClusterID("test-cluster"),
)
defer func() {
    if err := testcontainers.TerminateContainer(kafkaContainer); err != nil {
        log.Printf("failed to terminate container: %s", err)
    }
}()
if err != nil {
    log.Printf("failed to start container: %s", err)
    return
}

Module Reference

Run function

Info

The RunContainer(ctx, opts...) function is deprecated and will be removed in the next major release of Testcontainers for Go.

The Kafka module exposes one entrypoint function to create the Kafka container, and this function receives three parameters:

func Run(ctx context.Context, img string, opts ...testcontainers.ContainerCustomizer) (*KafkaContainer, error)
  • context.Context, the Go context.
  • string, the Docker image to use.
  • testcontainers.ContainerCustomizer, a variadic argument for passing options.

Image

Use the second argument in the Run function to set a valid Docker image. In example: Run(context.Background(), "confluentinc/confluent-local:7.5.0").

Warning

The minimal required version of Kafka for KRaft mode is confluentinc/confluent-local:7.4.0. If you are using an image that is different from the official one, please make sure that it's compatible with KRaft mode, as the module won't check the version for you.

Environment variables

The environment variables that are already set by default are:

"KAFKA_LISTENERS":                                "PLAINTEXT://0.0.0.0:9093,BROKER://0.0.0.0:9092,CONTROLLER://0.0.0.0:9094",
"KAFKA_REST_BOOTSTRAP_SERVERS":                   "PLAINTEXT://0.0.0.0:9093,BROKER://0.0.0.0:9092,CONTROLLER://0.0.0.0:9094",
"KAFKA_LISTENER_SECURITY_PROTOCOL_MAP":           "BROKER:PLAINTEXT,PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT",
"KAFKA_INTER_BROKER_LISTENER_NAME":               "BROKER",
"KAFKA_BROKER_ID":                                "1",
"KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR":         "1",
"KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS":             "1",
"KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR": "1",
"KAFKA_TRANSACTION_STATE_LOG_MIN_ISR":            "1",
"KAFKA_LOG_FLUSH_INTERVAL_MESSAGES":              strconv.Itoa(math.MaxInt),
"KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS":         "0",
"KAFKA_NODE_ID":                                  "1",
"KAFKA_PROCESS_ROLES":                            "broker,controller",
"KAFKA_CONTROLLER_LISTENER_NAMES":                "CONTROLLER",

Init script

The Kafka container will be started using a custom shell script:

    starterScriptContent = `#!/bin/bash
source /etc/confluent/docker/bash-config
export KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://%s:%d,BROKER://%s:9092
echo Starting Kafka KRaft mode
sed -i '/KAFKA_ZOOKEEPER_CONNECT/d' /etc/confluent/docker/configure
echo 'kafka-storage format --ignore-formatted -t "$(kafka-storage random-uuid)" -c /etc/kafka/kafka.properties' >> /etc/confluent/docker/configure
echo '' > /etc/confluent/docker/ensure
/etc/confluent/docker/configure
/etc/confluent/docker/launch`



        return fmt.Errorf("copy to container: %w", err)

Container Options

When starting the Kafka container, you can pass options in a variadic way to configure it.

The following options are exposed by the testcontainers package.

Basic Options

Lifecycle Options

Files & Mounts Options

Build Options

Logging Options

Image Options

Networking Options

Advanced Options

Experimental Options

Container Methods

The Kafka container exposes the following methods:

Brokers

The Brokers(ctx) method returns the Kafka brokers as a string slice, containing the host and the random port defined by Kafka's public port (9093/tcp).

brokers, err := kafkaContainer.Brokers(ctx)