Kafka (KRaft)¶
Since testcontainers-go v0.24.0
Introduction¶
The Testcontainers module for KRaft: Apache Kafka Without ZooKeeper.
Adding this module to your project dependencies¶
Please run the following command to add the Kafka module to your Go dependencies:
go get github.com/testcontainers/testcontainers-go/modules/kafka
Usage example¶
ctx := context.Background()
kafkaContainer, err := kafka.RunContainer(ctx,
kafka.WithClusterID("test-cluster"),
testcontainers.WithImage("confluentinc/confluent-local:7.5.0"),
)
if err != nil {
panic(err)
}
// Clean up the container after
defer func() {
if err := kafkaContainer.Terminate(ctx); err != nil {
panic(err)
}
}()
Module reference¶
The Kafka module exposes one entrypoint function to create the Kafka container, and this function receives two parameters:
func RunContainer(ctx context.Context, opts ...testcontainers.ContainerCustomizer) (*KafkaContainer, error)
context.Context
, the Go context.testcontainers.ContainerCustomizer
, a variadic argument for passing options.
Container Options¶
When starting the Kafka container, you can pass options in a variadic way to configure it.
Image¶
If you need to set a different Kafka Docker image, you can use testcontainers.WithImage
with a valid Docker image
for Kafka. E.g. testcontainers.WithImage("confluentinc/confluent-local:7.5.0")
.
Warning
The minimal required version of Kafka for KRaft mode is confluentinc/confluent-local:7.4.0
. If you are using an image that
is different from the official one, please make sure that it's compatible with KRaft mode, as the module won't check
the version for you.
Init script¶
The Kafka container will be started using a custom shell script:
starterScriptContent = `#!/bin/bash
source /etc/confluent/docker/bash-config
export KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://%s:%d,BROKER://%s:9092
echo Starting Kafka KRaft mode
sed -i '/KAFKA_ZOOKEEPER_CONNECT/d' /etc/confluent/docker/configure
echo 'kafka-storage format --ignore-formatted -t "$(kafka-storage random-uuid)" -c /etc/kafka/kafka.properties' >> /etc/confluent/docker/configure
echo '' > /etc/confluent/docker/ensure
/etc/confluent/docker/configure
/etc/confluent/docker/launch`
Environment variables¶
The environment variables that are already set by default are:
"KAFKA_LISTENERS": "PLAINTEXT://0.0.0.0:9093,BROKER://0.0.0.0:9092,CONTROLLER://0.0.0.0:9094",
"KAFKA_REST_BOOTSTRAP_SERVERS": "PLAINTEXT://0.0.0.0:9093,BROKER://0.0.0.0:9092,CONTROLLER://0.0.0.0:9094",
"KAFKA_LISTENER_SECURITY_PROTOCOL_MAP": "BROKER:PLAINTEXT,PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT",
"KAFKA_INTER_BROKER_LISTENER_NAME": "BROKER",
"KAFKA_BROKER_ID": "1",
"KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR": "1",
"KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS": "1",
"KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR": "1",
"KAFKA_TRANSACTION_STATE_LOG_MIN_ISR": "1",
"KAFKA_LOG_FLUSH_INTERVAL_MESSAGES": fmt.Sprintf("%d", math.MaxInt64),
"KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS": "0",
"KAFKA_NODE_ID": "1",
"KAFKA_PROCESS_ROLES": "broker,controller",
"KAFKA_CONTROLLER_LISTENER_NAMES": "CONTROLLER",
Wait Strategies¶
If you need to set a different wait strategy for the container, you can use testcontainers.WithWaitStrategy
with a valid wait strategy.
Info
The default deadline for the wait strategy is 60 seconds.
At the same time, it's possible to set a wait strategy and a custom deadline with testcontainers.WithWaitStrategyAndDeadline
.
Startup Commands¶
- Not available until the next release of testcontainers-go main
Testcontainers exposes the WithStartupCommand(e ...Executable)
option to run arbitrary commands in the container right after it's started.
Info
To better understand how this feature works, please read the Create containers: Lifecycle Hooks documentation.
It also exports an Executable
interface, defining one single method: AsCommand()
, which returns a slice of strings to represent the command and positional arguments to be executed in the container.
You could use this feature to run a custom script, or to run a command that is not supported by the module right after the container is started.
Docker type modifiers¶
If you need an advanced configuration for the container, you can leverage the following Docker type modifiers:
testcontainers.WithConfigModifier
testcontainers.WithHostConfigModifier
testcontainers.WithEndpointSettingsModifier
Please read the Create containers: Advanced Settings documentation for more information.
Container Methods¶
The Kafka container exposes the following methods:
Brokers¶
The Brokers(ctx)
method returns the Kafka brokers as a string slice, containing the host and the random port defined by Kafka's public port (9093/tcp
).
brokers, err := kafkaContainer.Brokers(ctx)