Configuring Kafka Nodes
Sample configuration files defining a node's role in the cluster are located in the /app/kafka/config/kraft
directory and include controller.properties
and broker.properties
.
The table below lists the key configuration parameters that require modification during the basic Kafka
setup:
Key Kafka Configuration Parameters
Parameter | Description | Примечание |
---|---|---|
process.roles | Defines the node role: broker or controller . | - |
node.id | Unique node ID within the cluster. | - |
controller.quorum.voters | List of controllers with their addresses and ports. | - |
listeners | List of URIs and listener names through which Kafka accepts connections. Each listener must have a unique name and port. Use 0.0.0.0 to bind to all interfaces or leave the host empty for the default interface. | Examples: PLAINTEXT://myhost:9092 , SSL://:9091 , CLIENT://0.0.0.0:9092 , REPLICATION://localhost:9093 . |
controller.listener.names | Sets the addresses and ports for client (producer/consumer) connections to brokers. Particularly relevant when working behind NAT or with multiple network interfaces. | - |
listener.security.protocol.map | Maps listener names to security protocols. Allows different security settings (SSL, SASL) for different ports or IPs. Each listener is specified only once. | Example for INTERNAL: listener.name.internal.ssl.keystore.location . If not set, the global value (ssl.keystore.location ) is used. In KRaft, if no explicit mapping is defined, listeners from controller.listener.names default to PLAINTEXT. |
log.dirs | Directories where Kafka stores log data. | A comma-separated list of paths. |
num.partitions | Sets the default number of partitions for each topic. | Overridden if specified during topic creation. |
offsets.topic.replication.factor | Defines the replication factor for the offsets topic. | Increase for higher availability. The internal topic won’t be created until the cluster size meets this requirement. |
transaction.state.log.replication.factor | Sets the replication factor for the transactions topic. | Increase for higher availability. The internal topic won’t be created until the cluster size meets this requirement. |
transaction.state.log.min.isr | Overrides min.insync.replicas for the transactions topic. | - |
min.insync.replicas | Minimum number of replicas that must acknowledge a write when using acks=all (or -1 ). | If the required number of replicas is unavailable, the producer will receive an error producer (NotEnoughReplicas or NotEnoughReplicasAfterAppend ). Tuning acks and min.insync.replicas together improves message delivery reliability.. |
log.retention.hours | Defines log retention time (in hours) before deletion. | Alternative parameters: log.retention.ms , log.retention.minutes . |
log.segment.bytes | Maximum size of a single partition segment file. | - |
ssl.keystore.location | Location of the keystore file. | - |
ssl.keystore.password | Password for the keystore file, required when using SSL with a specified keystore path. | Not needed if ssl.keystore.location is unset. Not supported for PEM format. |
ssl.truststore.location | Location of the truststore file. | - |
ssl.truststore.password | Password for the truststore file. | If no password is set, the file will be used without integrity checks. Not supported for PEM format. |
ssl.client.auth | Determines whether client authentication is required for SSL connections. | Supported values: required — client must authenticate. requested — authentication is preferred but optional. none — client authentication is disabled. |
super.users | List of superusers with full access rights. | Bypasses ACL restrictions. |
ssl.principal.mapping.rules | Rules for transforming the full certificate CN into a short username. | Rules are applied in order; the first matching rule is used, and the rest are ignored. |
Configuring Kafka Controller Nodes
Example configuration for the first controller
node /app/kafka/config/kraft/controller.properties
:
Example
process.roles=controller
node.id=1
controller.quorum.voters=1@192.168.0.51:9093,2@192.168.0.52:9093,3@192.168.0.53:9093
listeners=CONTROLLER://192.168.0.51:9093
controller.listener.names=CONTROLLER
listener.security.protocol.map=CONTROLLER:SSL
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/app/logs/kraft-logs
num.partitions=5
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=5
transaction.state.log.replication.factor=5
transaction.state.log.min.isr=3
min.insync.replicas=3
log.retention.hours=3
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
ssl.keystore.location=/app/certs/kafka.keystore.jks
ssl.keystore.password=PASSWORDKeystore
ssl.truststore.location=/app/certs/ca-truststore.jks
ssl.truststore.password=PASSWORDTruststore
ssl.client.auth=required
authorizer.class.name=org.apache.kafka.metadata.authorizer.StandardAuthorizer
super.users=User:controller1;User:conroller2;User:controller3;User:broker1;User:broker2;User:broker3;User:broker4;User:broker5;User:adminkfk
allow.everyone.if.no.acl.found=False
ssl.principal.mapping.rules=RULE:^CN=(adminkfk),O=Work,L=Dubai,ST=Dubai,C=AE$/$1/L,RULE:^CN=(controller[1-3]),O=Work,L=Dubai,ST=Dubai,C=AE$/$1/L, RULE:^ CN=(broker[1-5]),O=Work,L=Dubai,ST=Dubai,C=AE$/$1/L
The remaining controller
nodes must be configured similarly.
Configuring Kafka Broker Nodes
Example configuration for the first broker
node:
Example
process.roles=broker
node.id=4
controller.quorum.voters=1@192.168.0.51:9093,2@192.168.0.52:9093,3@192.168.0.53:9093
listeners=SSL:// 192.168.0.54:9092
inter.broker.listener.name=SSL
advertised.listeners=SSL:// 192.168.0.54:9092
controller.listener.names=CONTROLLER
listener.security.protocol.map=CONTROLLER:SSL,SSL:SSL
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/app/logs/kraft-logs
num.partitions=5
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=5
transaction.state.log.replication.factor=5
transaction.state.log.min.isr=3
min.insync.replicas=3
log.retention.hours=3
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
ssl.keystore.location=/app/certs/kafka.keystore.jks
ssl.keystore.password=PASSWORDkeystore
ssl.truststore.location=/app/certs/ca-truststore.jks
ssl.truststore.password=PASSWORDtruststore
ssl.client.auth=requested
allow.everyone.if.no.acl.found=False
auto.create.topics.enable=false
super.users=User:contreoller1;User:controller2;User:controller3;User:broker1;User:broker2;User:broker3;User:broker4;User:broker5;User:adminkfk
ssl.principal.mapping.rules=RULE:^CN=producer_hosts[1-2],O=Work,L=Dubai,ST=Dubai,C=AE$/producer_user/L,RULE:^CN=consumer_hosts[1-3],O=Work,L=Dubai,ST=Dubai,C=AE$/consumer_user/L,RULE:^CN=(adminkfk),O=Innotech,L=Dubai,ST=Dubai,C=AE$/$1/L
Configuring Kafka Services
Kafka
servers require service configuration files for managing the Kafka
service. The filenames are identical for both controller
and broker
nodes: /etc/systemd/system/kafka.service
.
- Example service configuration for a
controller
node:
[Unit]
Description=Apache Kafka server (controller)
Documentation=http://kafka.apache.org/documentation.html
Requires=network.target remote-fs.target
After=network.target remote-fs.target
[Service]
Type=simple
WorkingDirectory=/app/kafka
User=kafka
#User=root
Environment=KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
Environment=LOG_DIR="/app/logs/kafka"
XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent"
Environment=JAVA_HOME="/app/jdk/"
ExecStart=/app/kafka/bin/kafka-server-start.sh /app/kafka/config/kraft/controller.properties
ExecStop=/app/kafka/bin/kafka-server-stop.sh
TimeoutSec=30
LimitNOFILE=infinity
Restart=on-failure
RestartSec=10s
[Install]
WantedBy=multi-user.target
- Example service configuration
/app/service/kafka.service
for abroker
node:
[Unit]
Description=Apache Kafka server (broker)
Documentation=http://kafka.apache.org/documentation.html
Requires=network.target remote-fs.target
After=network.target remote-fs.target
[Service]
Type=simple
WorkingDirectory=/app/kafka
User=kafka
#User=root
Environment=KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
Environment=LOG_DIR="/app/logs/kafka"
XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent"
Environment=JAVA_HOME="/app/jdk/"
ExecStart=/app/kafka/bin/kafka-server-start.sh /app/kafka/config/kraft/broker.properties
ExecStop=/app/kafka/bin/kafka-server-stop.sh
TimeoutSec=30
LimitNOFILE=infinity
Restart=on-failure
RestartSec=10s
[Install]
WantedBy=multi-user.target
kafka.service_broker
After creating the service files, reload all unit files from standard paths (/etc/systemd/system
, /usr/lib/systemd/system
, etc.) by running:
systemctl daemon-reload