Apache Kafka Tips

Apache Kafka

Apache Kafka is an open-source distributed event streaming platform used by thousands of companies for high-performance data pipelines, streaming analytics, data integration, and mission-critical applications.

I decided to centralize the Apache Kafka commands and tool tips to help me access them quickly. Therefore, I will cover the main Kafka commands here.

Concepts

LEADER

A Leader is the broker/replica accepting produce messages

FOLLOWER

A Follower is a broker/replica that can join an ISR list and participate of the high watermark (used by the leader when acknowledging messages back to the producer).

OBSERVER

An Observer is a broker/replica that also has a copy of data for a given topic-partition, and consumers are allowed to read from them even though it is not the leader (know as “Follower Fetching”).

The data is copied asynchronous from the leader such that a producer does not wait on observers to get back an acknowledgment.

By the default Observers do not participate in the ISR list and cannot automatically become the leader if the current leader fails, but if a user manually changes leader assignment then they can participate in the ISR list.

ISR List

An ISR list (in-sync replicas) includes brokers that have a given topic-partition. The data is copied from the leader to every member of the ISR before the producer gets an acknowledgment. The followers in an ISR can become the leader if the current leader fails.

Download

Kafka Auth Settings

When you are using authentication, which is often required for Confluent Cloud clusters, you must create a cluster-with-auth.properties file with content similar to:

~/kafka_2.12-3.3.2/config/cluster-with-auth.properties

security.protocol=SASL_SSL
sasl.mechanism=SSL
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="<USERNAME-HERE>" password="<PASSWORD-HERE>";
./<command-here> <paramenters-here> \
    --command-config ~/kafka_2.13-3.4.1/config/cluster-with-auth.properties
...<kafka-command-line> --command-config ../config/cluster-with-auth.properties

Apache Kafka - Command lines

Performance tests

time kafka-producer-perf-test --producer-props bootstrap.servers=$kf_brokers \
    --topic $topic \
    --num-records $messages \
    --record-size $size \
    --throughput $throughput \
    --print-metrics \
    enable.idempotence=true \
    compression.type=snappy \
    linger.ms=20 

Kafka-configs

./kafka-configs.sh --bootstrap-server $kf_brokers \
    --alter \
    --entity-name $topic \
    --entity-type topics \
    --add-config \
    --partitions 6

topic-reg.cfg

{
    "version": 1,
    "replicas": [
        "count": 3,
        "constraints": {"rack": "us-east-1"}
    ],
    "observers": [
        "count": 3,
        "constraints": {"rack": "central-us-1"}
    ]
}
./kafka-configs.sh --bootstrap-server $kf_brokers \
    --alter \
    --entity-name $topic \
    --entity-type topics \
    --replica-placement topic-reg.cfg
./kafka-configs.sh --bootstrap-server $kf_brokers \
    --alter \
    --entity-name $topic \
    --entity-type topics \
    --add-config \
    x=y
./kafka-configs.sh --bootstrap-server $kf_brokers \
    --alter \
    --entity-name $topic \
    --entity-type topics \
    --delete-config \
    x=y

Topics

./kafka-console-consumer.sh --bootstrap-server $kf_brokers --topic $topic
./kafka-console-consumer --bootstrap-server $kf_brokers --topic $topic --formatter kafka.tools.DefaultMessageFormatter --property print.timestamp=true --property print.key=true --property print.value=true
./kafka-console-consumer.sh --bootstrap-server $kf_brokers --topic $topic --from-beginning
kafka-topics --bootstrap-server $kf_brokers --create --topic $topic --partitions $parition_number --replication-factor $replica_factor --config retention.ms=$retention_time --config segment.ms=10000

# tips
# --if-not-exists use this parameter if not exists
kafka-topics --bootstrap-server $kf_brokers --create --topic $topic --partitions $parition_number --replication-factor $replica_factor --replica-placement /etc/kafka/topic-reg.cfg
./kafka-topics.sh --bootstrap-server $kf_brokers --list
./kafka-topics --bootstrap-server $kf_brokers --topic $topic --describe 
./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list $kf_brokers --topic $topic
./kafka-topics.sh --bootstrap-server $kf_brokers --describe --entity-type topics --entity-name $topic
./kafka-topics.sh --bootstrap-server $kf_brokers --delete --topic $topic
./kafka-topics.sh --bootstrap-server $kf_brokers --alter --topic $topic --partitions $partition_number
./kafka-topics.sh --bootstrap-server $kf_brokers --describe --under-replicated-partitions
# Check topic configurations
./kafka-topics.sh --bootstrap-server $kf_brokers --topic $topic --describe

topic-reassign.json

{
  "version":1,
  "partitions":[
    {
      "topic":"topic-one",
      "partition":0,
      "replicas":[1,2,6],
      "observers":[2,6]
    },
    {
      "topic":"topic-one",
      "partition":1,
      "replicas":[1,2,6],
      "observers":[2,6],
    },
    {
      "topic":"topic-one",
      "partition":2,
      "replicas":[1,2,6],
      "observers":[2,6]
    }
  ]
}

execute kafka reassign partitions command line

./kafka-reassign-partitions.sh --bootstrap-server $kf_broker --reassignment-json-file topic-reassign.json --execute

increse topic partitions number to 3

./kafka-topics.sh --bootstrap-server $kf_brokers --alter --topic $topic --partitions 3

For OLD Kafka version, use --zookeeper instead of --bootstrap-server argument

./kafka-log-dirs.sh --bootstrap-server $kf_brokers --describe  | grep -E "^{"  | jq '[.brokers[].logDirs[].partitions[]] |  sort_by(.size) | map({ (.partition): (.size / (1024*1024*1024) | round | tostring + " GB") })'  | grep GB
ms days
259200000 3
604800000 7
1209600000 14
2592000000 30
# log.retention.ms=-1 
# -1 means it is infinte...

./kafka-configs.sh --bootstrap-server $kf_brokers --alter --entity-name $topic --entity-type topics --add-config retention.ms=$retention_time

# then validate the topic settings
./kafka-topics.sh --describe --ootstrap-server $kf_brokers --topic $topic

As a workaround, change the retention to one minute. This allows you to purge the offsets quickly, and then you can simply return the retention time to its original setting.

./kafka-configs.sh --bootstrap-server $kf_brokers --alter --entity-type topics --entity-name $topic --add-config retention.ms=1000

Generally, when the data retention of a topic is very long or infinite, it can fill the disk, causing major unforeseen events for those using the cluster.

Generate a JSON file:

{"partitions": [
    {"topic": "topic_name", "partition": 0, "offset":  200000000},
    {"topic": "topic_name", "partition": 1, "offset":  200000000},
    {"topic": "topic_name", "partition": 2, "offset":  200000000},
    {"topic": "topic_name", "partition": 3, "offset":  200000000},
    {"topic": "topic_name", "partition": 4, "offset":  200000000}
    ],
 "version":1 }

When executing the command below, messages will be removed up to data prior to the informed offset, 200000000;

/kafka-delete-records.sh --bootstrap-server $kf_brokers --offset-json-file ./file.json

Election types

./kafka-leader-election --bootstrap-server $kf_brokers --all-topic-partitions --election-type preferred

Or using JSON FILE

{"partitions":
   [
      {"topic": "topic-name", "partition": 0}
   ]
}
# ./kafka-leader-election --bootstrap-server $kf_brokers --election-type preferred --path-to-json-file topic.json
./kafka-run-class.sh kafka.tools.GetOffsetShell --bootstrap-server $kf_brokers --topic $topic --time -1
./kafka-run-class.sh kafka.tools.GetOffsetShell --bootstrap-server $kf_brokers --topic $topic --time -2
...
# Pass --command-config <FILE-WITH-SSL-CONFIGs> for kafka auth's

Consumer-groups

,/kafka-consumer-groups.sh --bootstrap-server $kf_brokers --list
,/kafka-consumer-groups.sh --bootstrap-server $kf_brokers --all-groups --describe
./kafka-consumer-groups.sh --bootstrap-server $kf_brokers --describe --group $consumer_group_name --verbose
# dry-run
./kafka-consumer-groups.sh --bootstrap-server $kf_brokers --topic $topic:$partition_number --group $group_name --reset-offsets --to-latest --dry-run

# execute
./kafka-consumer-groups.sh --bootstrap-server $kf_brokers --topic $topic:$partition_number --group $group_name --reset-offsets --to-latest --execute
# dry-run
./kafka-consumer-groups.sh --bootstrap-server $kf_brokers --topic $topic:$partition_number --group $group_name --reset-offsets --to-earliest --dry-run

# execute
./kafka-consumer-groups.sh --bootstrap-server $kf_brokers --topic $topic:$partition_number --group $group_name --reset-offsets --to-earliest --execute

Producer

./kafka-console-producer.sh --broker-list $kf_brokers --topic $topic

# from a file
./kafka-console-producer.sh --broker-list $kf_brokers --topic $topic < input-messages.txt
kafka-console-producer.sh --broker-list $kf_brokers --topic $topic --property parse.key=true 
--property key.separator=:

# e.g.
# >key:value
# >foo:bar
# >anotherKey:another value

Zookeeper Commands

./zookeeper-shell.sh $zookeeper_broker:2181
./zookeeper-shell $zookeeper_broker:2181 ls /brokers/ids
./zookeeper-shell $zookeeper_broker:2181 get /broker/ids/$broker_id
./zookeeper-shell $zookeeper_broker:2181 get /brokers/topics/$topic
echo stat | nc $zk_broker 2181
echo ruok | nc $zk_broker 2181
echo mntr | nc $zk_broker 2181
echo isro | nc $zk_broker 2181

Tools - KR

Install

brew tap fgeller/tap && brew install kt

How to use

$ kt -help
kt is a tool for Kafka.

Usage:

        kt command [arguments]

The commands are:

        consume        consume messages.
        produce        produce messages.
        topic          topic information.
        group          consumer group information and modification.
        admin          basic cluster administration.

Use "kt [command] -help" for for information about the command.

Authentication:

Authentication with Kafka can be configured via a JSON file.
You can set the file name via an "-auth" flag to each command or
set it via the environment variable KT_AUTH.
KT_BROKERS=b-1.domain.com,b-2.domain.com,b-3.domain.com kt consume -topic topic.name_001

Tools - KAF

Install

curl https://raw.githubusercontent.com/infinimesh/kaf/master/godownloader.sh | BINDIR=$HOME/bin bash
kaf config add-cluster local -b localhost:9092
kaf config select-cluster
go install github.com/birdayz/kaf/cmd/kaf@latest
brew tap birdayz/kaf
brew install kaf

Start Zookeeper and Kafka (if running locally)

bin/zookeeper-server-start.sh config/zookeeper.properties
bin/kafka-server-start.sh config/server.properties

Usage

$ kaf --help                  
Kafka Command Line utility for cluster management

Usage:
  kaf [command]

Available Commands:
  completion  Generate completion script for bash, zsh, fish or powershell
  config      Handle kaf configuration
  consume     Consume messages
  group       Display information about consumer groups.
  groups      List groups
  help        Help about any command
  node        Describe and List nodes
  nodes       List nodes in a cluster
  produce     Produce record. Reads data from stdin.
  query       Query topic by key
  topic       Create and describe topics.
  topics      List topics

Flags:
  -b, --brokers strings          Comma separated list of broker ip:port pairs
  -c, --cluster string           set a temporary current cluster
      --config string            config file (default is $HOME/.kaf/config)
  -h, --help                     help for kaf
      --schema-registry string   URL to a Confluent schema registry. Used for attempting to decode Avro-encoded messages
  -v, --verbose                  Whether to turn on sarama logging
      --version                  version for kaf

Use "kaf [command] --help" for more information about a command.
kaf node ls
kaf topics
kaf topics describe $topic
kaf groups
kaf group describe $consumer_group_name
 kaf consume $topic --raw --offset oldest | grep "ID-TO-FILTER"
 ./kaf consume $topic --raw --offset oldest | grep "ID-TO-FILTER"
kaf group commit $consumer_group_name -t $topic --offset latest --all-partitions
kaf group commit $consumer_group_name -t $topic --offset oldest --all-partitions
kaf group commit $consumer_group_name -t $topic --offset 1001 --partition 0

Kafkacat

docker run -it --rm confluentinc/cp-kafkacat:7.1.1 /bin/bash
kafkacat -C -b $kf_brokers -t $topic -o- s@$(date -d "today 00:00" +%s%3N) -o e@1729779300000
kafkacat -Q -b $kf_brokers -t $topic:$partition:$time -c 1

NOTE: The 1729779300000 is time from epoch…