Fact for kafka_consumer hash…or kind of

Hi,

There is a late requirement that we activate the kafka_consumer functionality of Datadog.

Unfortunately this is a challenge if you don’t have a fixed number of consumer groups and topics (on one client we had a couple of hundreds consumer groups)

Here is how it should look in the example file

  #  consumer_groups:
  #    <CONSUMER_NAME_1>:
  #      <TOPIC_NAME_1>: [0, 1, 4, 12]
  #    <CONSUMER_NAME_2>:
  #      <TOPIC_NAME_2>:
  #    <CONSUMER_NAME_3>

So, if you want to grab the data using the old kafka-consumer-groups.sh, you will have to do this in one iteration. I tried it this way, but even if i am done, should not be a option for larger clusters

require 'facter'
Facter.add('kafka_consumers_config') do
  setcode do
      kafka_consumers_cmd = '/opt/kafka/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list'
      kafka_consumers_result = Facter::Core::Execution.exec(kafka_consumers_cmd)
      kafka_consumers_result.to_s.split(/\n/)
      group_hash = {}
      kafka_consumers_result.each_line do |group|
        groupid = group.strip 
        kafka_consumer_topic_list_cmd="/opt/kafka/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group #{groupid} --describe | sort | grep -v TOPIC  | awk {\'print $1\'} | uniq"
        kafka_consumer_topic_list_result = Facter::Core::Execution.exec(kafka_consumer_topic_list_cmd)
        kafka_consumer_topic_list_result.split(/\n/).reject { |c| c.empty? }
        topic_hash = {}
        kafka_consumer_topic_list_result.each_line do |topic|
           topicid = topic.strip
           kafka_consumer_topic_partition_cmd="/opt/kafka/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group #{groupid} --describe | grep -v TOPIC | grep #{topicid} | awk {\'print $2\'} | sort"
           kafka_consumer_topic_partition_result=Facter::Core::Execution.exec(kafka_consumer_topic_partition_cmd)
           kafka_consumer_topic_partition_result.gsub("\n", ' ').squeeze(' ')
           topic_hash[topic] = kafka_consumer_topic_partition_result
        end
      group_hash[group] = topic_hash    
      end
    group_hash
  end
end

This is posted only as a snapshot and maybe as a source of inspiration. For an actual working version, it should be done using Java or Scala in order to leverage the power of libraries (from what i know Python/Go and other programming languages have only libraries on consumer/producer part)

If i will pursue the task of rewriting this using only one interrogation loop or in Java/Scala, you will see it.