Skip to main content
Version: 5.1

Configuring Kafka Nodes

Sample configuration files defining a node's role in the cluster are located in the /app/kafka/config/kraft directory and include controller.properties and broker.properties.

The table below lists the key configuration parameters that require modification during the basic Kafka setup:

Key Kafka Configuration Parameters

ParameterDescriptionПримечание
process.rolesDefines the node role: broker or controller.-
node.idUnique node ID within the cluster.-
controller.quorum.votersList of controllers with their addresses and ports.-
listenersList of URIs and listener names through which Kafka accepts connections. Each listener must have a unique name and port. Use 0.0.0.0 to bind to all interfaces or leave the host empty for the default interface.Examples: PLAINTEXT://myhost:9092, SSL://:9091, CLIENT://0.0.0.0:9092, REPLICATION://localhost:9093.
controller.listener.namesSets the addresses and ports for client (producer/consumer) connections to brokers. Particularly relevant when working behind NAT or with multiple network interfaces.-
listener.security.protocol.mapMaps listener names to security protocols. Allows different security settings (SSL, SASL) for different ports or IPs. Each listener is specified only once.Example for INTERNAL: listener.name.internal.ssl.keystore.location. If not set, the global value (ssl.keystore.location) is used. In KRaft, if no explicit mapping is defined, listeners from controller.listener.names default to PLAINTEXT.
log.dirsDirectories where Kafka stores log data.A comma-separated list of paths.
num.partitionsSets the default number of partitions for each topic.Overridden if specified during topic creation.
offsets.topic.replication.factorDefines the replication factor for the offsets topic.Increase for higher availability. The internal topic won’t be created until the cluster size meets this requirement.
transaction.state.log.replication.factorSets the replication factor for the transactions topic.Increase for higher availability. The internal topic won’t be created until the cluster size meets this requirement.
transaction.state.log.min.isrOverrides min.insync.replicas for the transactions topic.-
min.insync.replicasMinimum number of replicas that must acknowledge a write when using acks=all (or -1).If the required number of replicas is unavailable, the producer will receive an error producer (NotEnoughReplicas or NotEnoughReplicasAfterAppend). Tuning acks and min.insync.replicas together improves message delivery reliability..
log.retention.hoursDefines log retention time (in hours) before deletion.Alternative parameters: log.retention.ms, log.retention.minutes.
log.segment.bytesMaximum size of a single partition segment file.-
ssl.keystore.locationLocation of the keystore file.-
ssl.keystore.passwordPassword for the keystore file, required when using SSL with a specified keystore path.Not needed if ssl.keystore.location is unset. Not supported for PEM format.
ssl.truststore.locationLocation of the truststore file.-
ssl.truststore.passwordPassword for the truststore file.If no password is set, the file will be used without integrity checks. Not supported for PEM format.
ssl.client.authDetermines whether client authentication is required for SSL connections.Supported values: required — client must authenticate. requested — authentication is preferred but optional. none — client authentication is disabled.
super.usersList of superusers with full access rights.Bypasses ACL restrictions.
ssl.principal.mapping.rulesRules for transforming the full certificate CN into a short username.Rules are applied in order; the first matching rule is used, and the rest are ignored.

Configuring Kafka Controller Nodes

Example configuration for the first controller node /app/kafka/config/kraft/controller.properties:

Example
process.roles=controller
node.id=1
controller.quorum.voters=1@192.168.0.51:9093,2@192.168.0.52:9093,3@192.168.0.53:9093
listeners=CONTROLLER://192.168.0.51:9093
controller.listener.names=CONTROLLER
listener.security.protocol.map=CONTROLLER:SSL
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/app/logs/kraft-logs
num.partitions=5
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=5
transaction.state.log.replication.factor=5
transaction.state.log.min.isr=3
min.insync.replicas=3
log.retention.hours=3
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
ssl.keystore.location=/app/certs/kafka.keystore.jks
ssl.keystore.password=PASSWORDKeystore
ssl.truststore.location=/app/certs/ca-truststore.jks
ssl.truststore.password=PASSWORDTruststore
ssl.client.auth=required
authorizer.class.name=org.apache.kafka.metadata.authorizer.StandardAuthorizer
super.users=User:controller1;User:conroller2;User:controller3;User:broker1;User:broker2;User:broker3;User:broker4;User:broker5;User:adminkfk
allow.everyone.if.no.acl.found=False
ssl.principal.mapping.rules=RULE:^CN=(adminkfk),O=Work,L=Dubai,ST=Dubai,C=AE$/$1/L,RULE:^CN=(controller[1-3]),O=Work,L=Dubai,ST=Dubai,C=AE$/$1/L, RULE:^ CN=(broker[1-5]),O=Work,L=Dubai,ST=Dubai,C=AE$/$1/L
Note

The remaining controller nodes must be configured similarly.

Configuring Kafka Broker Nodes

Example configuration for the first broker node:

Example
process.roles=broker
node.id=4
controller.quorum.voters=1@192.168.0.51:9093,2@192.168.0.52:9093,3@192.168.0.53:9093
listeners=SSL:// 192.168.0.54:9092
inter.broker.listener.name=SSL
advertised.listeners=SSL:// 192.168.0.54:9092
controller.listener.names=CONTROLLER
listener.security.protocol.map=CONTROLLER:SSL,SSL:SSL
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/app/logs/kraft-logs
num.partitions=5
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=5
transaction.state.log.replication.factor=5
transaction.state.log.min.isr=3
min.insync.replicas=3
log.retention.hours=3
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
ssl.keystore.location=/app/certs/kafka.keystore.jks
ssl.keystore.password=PASSWORDkeystore
ssl.truststore.location=/app/certs/ca-truststore.jks
ssl.truststore.password=PASSWORDtruststore
ssl.client.auth=requested
allow.everyone.if.no.acl.found=False
auto.create.topics.enable=false
super.users=User:contreoller1;User:controller2;User:controller3;User:broker1;User:broker2;User:broker3;User:broker4;User:broker5;User:adminkfk
ssl.principal.mapping.rules=RULE:^CN=producer_hosts[1-2],O=Work,L=Dubai,ST=Dubai,C=AE$/producer_user/L,RULE:^CN=consumer_hosts[1-3],O=Work,L=Dubai,ST=Dubai,C=AE$/consumer_user/L,RULE:^CN=(adminkfk),O=Innotech,L=Dubai,ST=Dubai,C=AE$/$1/L

Configuring Kafka Services

Kafka servers require service configuration files for managing the Kafka service. The filenames are identical for both controller and broker nodes: /etc/systemd/system/kafka.service.

  1. Example service configuration for a controller node:
[Unit]
Description=Apache Kafka server (controller)
Documentation=http://kafka.apache.org/documentation.html
Requires=network.target remote-fs.target
After=network.target remote-fs.target

[Service]
Type=simple
WorkingDirectory=/app/kafka
User=kafka
#User=root
Environment=KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
Environment=LOG_DIR="/app/logs/kafka"
XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent"
Environment=JAVA_HOME="/app/jdk/"
ExecStart=/app/kafka/bin/kafka-server-start.sh /app/kafka/config/kraft/controller.properties
ExecStop=/app/kafka/bin/kafka-server-stop.sh
TimeoutSec=30
LimitNOFILE=infinity

Restart=on-failure
RestartSec=10s

[Install]
WantedBy=multi-user.target
  1. Example service configuration /app/service/kafka.service for a broker node:
 [Unit]
Description=Apache Kafka server (broker)
Documentation=http://kafka.apache.org/documentation.html
Requires=network.target remote-fs.target
After=network.target remote-fs.target

[Service]
Type=simple
WorkingDirectory=/app/kafka
User=kafka
#User=root
Environment=KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
Environment=LOG_DIR="/app/logs/kafka"
XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent"
Environment=JAVA_HOME="/app/jdk/"
ExecStart=/app/kafka/bin/kafka-server-start.sh /app/kafka/config/kraft/broker.properties
ExecStop=/app/kafka/bin/kafka-server-stop.sh

TimeoutSec=30
LimitNOFILE=infinity

Restart=on-failure
RestartSec=10s

[Install]
WantedBy=multi-user.target
kafka.service_broker

After creating the service files, reload all unit files from standard paths (/etc/systemd/system, /usr/lib/systemd/system, etc.) by running:

systemctl daemon-reload