Hi,
There is a late requirement that we activate the kafka_consumer functionality of Datadog.
Unfortunately this is a challenge if you don’t have a fixed number of consumer groups and topics (on one client we had a couple of hundreds consumer groups)
Here is how it should look in the example file
# consumer_groups:
# <CONSUMER_NAME_1>:
# <TOPIC_NAME_1>: [0, 1, 4, 12]
# <CONSUMER_NAME_2>:
# <TOPIC_NAME_2>:
# <CONSUMER_NAME_3>
So, if you want to grab the data using the old kafka-consumer-groups.sh, you will have to do this in one iteration. I tried it this way, but even if i am done, should not be a option for larger clusters
require 'facter'
Facter.add('kafka_consumers_config') do
setcode do
kafka_consumers_cmd = '/opt/kafka/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list'
kafka_consumers_result = Facter::Core::Execution.exec(kafka_consumers_cmd)
kafka_consumers_result.to_s.split(/\n/)
group_hash = {}
kafka_consumers_result.each_line do |group|
groupid = group.strip
kafka_consumer_topic_list_cmd="/opt/kafka/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group #{groupid} --describe | sort | grep -v TOPIC | awk {\'print $1\'} | uniq"
kafka_consumer_topic_list_result = Facter::Core::Execution.exec(kafka_consumer_topic_list_cmd)
kafka_consumer_topic_list_result.split(/\n/).reject { |c| c.empty? }
topic_hash = {}
kafka_consumer_topic_list_result.each_line do |topic|
topicid = topic.strip
kafka_consumer_topic_partition_cmd="/opt/kafka/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group #{groupid} --describe | grep -v TOPIC | grep #{topicid} | awk {\'print $2\'} | sort"
kafka_consumer_topic_partition_result=Facter::Core::Execution.exec(kafka_consumer_topic_partition_cmd)
kafka_consumer_topic_partition_result.gsub("\n", ' ').squeeze(' ')
topic_hash[topic] = kafka_consumer_topic_partition_result
end
group_hash[group] = topic_hash
end
group_hash
end
end
This is posted only as a snapshot and maybe as a source of inspiration. For an actual working version, it should be done using Java or Scala in order to leverage the power of libraries (from what i know Python/Go and other programming languages have only libraries on consumer/producer part)
If i will pursue the task of rewriting this using only one interrogation loop or in Java/Scala, you will see it.