Morning,
Just a small post regarding returning a consumer group coordinator for a specific consumer group.
We had the issue that consumer groups are re-balancing and we didn’t knew if it’s related to application logic or the Consumer Group Coordinator was changing and the Kafka cluster was reassign a different one each time. So, a small piece of code was needed. I was using the libraries that are sent with Kafka 1.0.0 for this test so be aware of the classpath update if you want to modify this.
In order to do the test, i started a standalone Confluent Kafka image which normally listens on port 29092. For more details please consult their documentation here
I also created a test topic with one partition and same replication factor. Produced some messages in the topic and after that started a console consumer:
sorin@debian-test:~/kafka_2.11-1.0.0/bin$ ./kafka-console-consumer.sh --bootstrap-server localhost:29092 --topic test --from-beginning test message test message
Once this is started you can also see it using consumer-groups command like this:
sorin@debian-test:~/kafka_2.11-1.0.0/bin$ ./kafka-consumer-groups.sh --bootstrap-server localhost:29092 --list Note: This will not show information about old Zookeeper-based consumers. console-consumer-49198 console-consumer-66063 console-consumer-77631
Now my console consumer is identified by console-consumer-77631 and in order to see the group coordinator you will have to run something like:
./getconsumercoordinator.sh localhost 29092 console-consumer-77631 warning: there were three deprecation warnings; re-run with -deprecation for details one warning found Creating connection to: localhost 29092 log4j:WARN No appenders could be found for logger (kafka.network.BlockingChannel). log4j:WARN Please initialize the log4j system properly. log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info. Channel connected SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". SLF4J: Defaulting to no-operation (NOP) logger implementation SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details. GroupCoordinatorResponse(Some(BrokerEndPoint(1001,localhost,29092)),NONE,0)
It’s clear that since we have only one broker, that is also the coordinator.
Regarding the details for the code i used this link and also, in order to search for all of the dependencies, since i don’t have a scala project, just a script the following command was of great use
for i in *.jar; do jar -tvf "$i" | grep -Hsi ClassName && echo "$i"; done
Here is also the code:
#!/bin/sh
exec scala -classpath "/home/sorin/kafka_2.11-1.0.0/libs/kafka-clients-1.0.0.jar:/home/sorin/kafka_2.11-1.0.0/libs/kafka_2.11-1.0.0.jar:/home/sorin/kafka_2.11-1.0.0/libs/slf4j-api-1.7.25.jar:/home/sorin/kafka_2.11-1.0.0/libs/jackson-core-2.9.1.jar:/home/sorin/kafka_2.11-1.0.0/libs/jackson-databind-2.9.1.jar:/home/sorin/kafka_2.11-1.0.0/libs/jackson-annotations-2.9.1.jar:/home/sorin/kafka_2.11-1.0.0/libs/log4j-1.2.17.jar" "$0" "$1" "$2" "$@"
!#
import com.fasterxml.jackson.core._
import com.fasterxml.jackson.databind.ObjectMapper
import kafka.network.BlockingChannel
import kafka.api.GroupCoordinatorRequest
import kafka.api.GroupCoordinatorResponse
import org.slf4j.LoggerFactory
val hostname = args(0)
val port = args(1).toInt
val group = args(2)
println("Creating connection to: " + hostname + " " + port + " ")
var channel = new BlockingChannel(hostname, port, 1048576, 1048576, readTimeoutMs = 50000)
channel.connect()
if (channel.isConnected) {
println("Channel connected")
channel.send(GroupCoordinatorRequest(group))
val metadataResponse = GroupCoordinatorResponse.readFrom(channel.receive.payload())
println(metadataResponse) }
channel.disconnect()
Regarding the code, the first part is to run scala from shell script, you need to update the lasspath with all libraries and also specify how many parameters to be used. In our case this is three. Also, if you won’t add all of the jackson, log4j and slf4j dependencies, it won’t work.
P.S: It will work also by running exec scala -classpath "/home/sorin/kafka_2.11-1.0.0/libs/*
Cheers!