Categories
kafka puppet

Fact for kafka_consumer hash…or kind of

Hi,

There is a late requirement that we activate the kafka_consumer functionality of Datadog.

Unfortunately this is a challenge if you don’t have a fixed number of consumer groups and topics (on one client we had a couple of hundreds consumer groups)

Here is how it should look in the example file

  #  consumer_groups:
  #    <CONSUMER_NAME_1>:
  #      <TOPIC_NAME_1>: [0, 1, 4, 12]
  #    <CONSUMER_NAME_2>:
  #      <TOPIC_NAME_2>:
  #    <CONSUMER_NAME_3>

So, if you want to grab the data using the old kafka-consumer-groups.sh, you will have to do this in one iteration. I tried it this way, but even if i am done, should not be a option for larger clusters

require 'facter'
Facter.add('kafka_consumers_config') do
  setcode do
      kafka_consumers_cmd = '/opt/kafka/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list'
      kafka_consumers_result = Facter::Core::Execution.exec(kafka_consumers_cmd)
      kafka_consumers_result.to_s.split(/\n/)
      group_hash = {}
      kafka_consumers_result.each_line do |group|
        groupid = group.strip 
        kafka_consumer_topic_list_cmd="/opt/kafka/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group #{groupid} --describe | sort | grep -v TOPIC  | awk {\'print $1\'} | uniq"
        kafka_consumer_topic_list_result = Facter::Core::Execution.exec(kafka_consumer_topic_list_cmd)
        kafka_consumer_topic_list_result.split(/\n/).reject { |c| c.empty? }
        topic_hash = {}
        kafka_consumer_topic_list_result.each_line do |topic|
           topicid = topic.strip
           kafka_consumer_topic_partition_cmd="/opt/kafka/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group #{groupid} --describe | grep -v TOPIC | grep #{topicid} | awk {\'print $2\'} | sort"
           kafka_consumer_topic_partition_result=Facter::Core::Execution.exec(kafka_consumer_topic_partition_cmd)
           kafka_consumer_topic_partition_result.gsub("\n", ' ').squeeze(' ')
           topic_hash[topic] = kafka_consumer_topic_partition_result
        end
      group_hash[group] = topic_hash    
      end
    group_hash
  end
end

This is posted only as a snapshot and maybe as a source of inspiration. For an actual working version, it should be done using Java or Scala in order to leverage the power of libraries (from what i know Python/Go and other programming languages have only libraries on consumer/producer part)

If i will pursue the task of rewriting this using only one interrogation loop or in Java/Scala, you will see it.

Categories
kafka

Adding custom kafka check consumer.lag to datadog from JMX

Hi,

We had the necessity to add the consumer.lag check to datadog. Since we did not have access to the kafka.consumer domain which from what i believe it’s on the client side i decided to connect to the kafka node using JMX (so JConsole was the tool). At MBeans tab you will gladly see that there isn’t what you need by default for kafka.consumer.max_lag. The actual info you can grab is located under kafka.server and even more accurate under FetcherLagMetrics as shown.

If you go all the way down on the hierarchy, you will get partition 0 details for one of the topics. I will use this example for base construct of regex tag:

kafka.server:type=FetcherLagMetrics,name=ConsumerLag,clientId=ReplicaFetcherThread-0-1001,topic=__consumer_offsets,partition=0

Using the same template you can directly construct the block that you need to add to kafka.yaml and it should look like this:

 - include:
        domain: 'kafka.server'
        bean_regex: 'kafka\.server:type=FetcherLagMetrics,name=ConsumerLag,clientId=.*'
        attribute:
          Value:
            metric_type: rate
            alias: kafka.consumer.lag

After a agent restart you will gladly see that the number of metrics that are collected increases and you have a new check in the datadog web interface.
After that, you can manually add in the dashboard a new graph that uses this check and you can also configure it on specific criteria, like host, topic, partition.

Hope this is helpful,

Cheers

Categories
cloud kafka newtools

Integrate Kafka with Datadog monitoring using puppet

Hi,

Since i was in debt with an article on how to integate Kafka monitoring using Datadog, let me tell you a couple of things about this topic. First of all, we are taking the same config of Kafka with Jolokia that was describe in following article. From the install of the brokers on our infrastructure, JMX data is published on port 9990 (this will be needed in the datadog config).

The files you need to create for this task are as follows:

datadogagent.pp

class profiles::datadogagent {
  $_check_api_key = hiera('datadog_agent::api_key')

  contain 'datadog_agent'
  contain 'profiles::datadogagent_config_kafka'
  contain 'datadog_agent::integrations::zk'

  Class['datadog_agent'] -> Class['profiles::datadog_agent_config_kafka']
  Class['datadog_agent'] -> Class['datadog_agent::integrations::zk']
}

datadogagent_config_kafka.pp

class profiles::datadogagent_config_kafka (
$servers = [{'host' => 'localhost', 'port' => '9990'}]
) inherits datadog_agent::params {
  include datadog_agent

  validate_array($servers)

  file { "${datadog_agent::params::conf_dir}/kafka.yaml":
    ensure  => file,
    owner   => $datadog_agent::params::dd_user,
    group   => $datadog_agent::params::dd_group,
    mode    => '0600',
    content => template("${module_name}/kafka.yaml.erb"),
    require => Package[$datadog_agent::params::package_name],
    notify  => Service[$datadog_agent::params::service_name],
  }
}

And since, there isn’t yet an integration by default for the kafka on the datadog module which you can find it here:

https://github.com/DataDog/puppet-datadog-agent

i created in the templates directory the following file:

kafka.yaml.erb (as you can see from the header this is actually the template given by datadog for kafka integration with specific host and port)

##########
# WARNING
##########
# This sample works only for Kafka >= 0.8.2.
# If you are running a version older than that, you can refer to agent 5.2.x released
# sample files, https://raw.githubusercontent.com/DataDog/dd-agent/5.2.1/conf.d/kafka.yaml.example

instances:
<% @servers.each do |server| -%>
  - host: <%= server['host'] %>
    port: <%= server['port'] %> # This is the JMX port on which Kafka exposes its metrics (usually 9999)
    tags:
      kafka: broker

init_config:
  is_jmx: true

  # Metrics collected by this check. You should not have to modify this.
  conf:
    # v0.8.2.x Producers
    - include:
        domain: 'kafka.producer'
        bean_regex: 'kafka\.producer:type=ProducerRequestMetrics,name=ProducerRequestRateAndTimeMs,clientId=.*'
        attribute:
          Count:
            metric_type: rate
            alias: kafka.producer.request_rate
    - include:
        domain: 'kafka.producer'
        bean_regex: 'kafka\.producer:type=ProducerRequestMetrics,name=ProducerRequestRateAndTimeMs,clientId=.*'
        attribute:
          Mean:
            metric_type: gauge
            alias: kafka.producer.request_latency_avg
    - include:
        domain: 'kafka.producer'
        bean_regex: 'kafka\.producer:type=ProducerTopicMetrics,name=BytesPerSec,clientId=.*'
        attribute:
          Count:
            metric_type: rate
            alias: kafka.producer.bytes_out
    - include:
        domain: 'kafka.producer'
        bean_regex: 'kafka\.producer:type=ProducerTopicMetrics,name=MessagesPerSec,clientId=.*'
        attribute:
          Count:
            metric_type: rate
            alias: kafka.producer.message_rate
    # v0.8.2.x Consumers
    - include:
        domain: 'kafka.consumer'
        bean_regex: 'kafka\.consumer:type=ConsumerFetcherManager,name=MaxLag,clientId=.*'
        attribute:
          Value:
            metric_type: gauge
            alias: kafka.consumer.max_lag
    - include:
        domain: 'kafka.consumer'
        bean_regex: 'kafka\.consumer:type=ConsumerFetcherManager,name=MinFetchRate,clientId=.*'
        attribute:
          Value:
            metric_type: gauge
            alias: kafka.consumer.fetch_rate
    - include:
        domain: 'kafka.consumer'
        bean_regex: 'kafka\.consumer:type=ConsumerTopicMetrics,name=BytesPerSec,clientId=.*'
        attribute:
          Count:
            metric_type: rate
            alias: kafka.consumer.bytes_in
    - include:
        domain: 'kafka.consumer'
        bean_regex: 'kafka\.consumer:type=ConsumerTopicMetrics,name=MessagesPerSec,clientId=.*'
        attribute:
          Count:
            metric_type: rate
            alias: kafka.consumer.messages_in

    # Offsets committed to ZooKeeper
    - include:
        domain: 'kafka.consumer'
        bean_regex: 'kafka\.consumer:type=ZookeeperConsumerConnector,name=ZooKeeperCommitsPerSec,clientId=.*'
        attribute:
          Count:
            metric_type: rate
            alias: kafka.consumer.zookeeper_commits
    # Offsets committed to Kafka
    - include:
        domain: 'kafka.consumer'
        bean_regex: 'kafka\.consumer:type=ZookeeperConsumerConnector,name=KafkaCommitsPerSec,clientId=.*'
        attribute:
          Count:
            metric_type: rate
            alias: kafka.consumer.kafka_commits
    # v0.9.0.x Producers
    - include:
        domain: 'kafka.producer'
        bean_regex: 'kafka\.producer:type=producer-metrics,client-id=.*'
        attribute:
          response-rate:
            metric_type: gauge
            alias: kafka.producer.response_rate
    - include:
        domain: 'kafka.producer'
        bean_regex: 'kafka\.producer:type=producer-metrics,client-id=.*'
        attribute:
          request-rate:
            metric_type: gauge
            alias: kafka.producer.request_rate
    - include:
        domain: 'kafka.producer'
        bean_regex: 'kafka\.producer:type=producer-metrics,client-id=.*'
        attribute:
          request-latency-avg:
            metric_type: gauge
            alias: kafka.producer.request_latency_avg
    - include:
        domain: 'kafka.producer'
        bean_regex: 'kafka\.producer:type=producer-metrics,client-id=.*'
        attribute:
          outgoing-byte-rate:
            metric_type: gauge
            alias: kafka.producer.bytes_out
    - include:
        domain: 'kafka.producer'
        bean_regex: 'kafka\.producer:type=producer-metrics,client-id=.*'
        attribute:
          io-wait-time-ns-avg:
            metric_type: gauge
            alias: kafka.producer.io_wait

    # v0.9.0.x Consumers
    - include:
        domain: 'kafka.consumer'
        bean_regex: 'kafka\.consumer:type=consumer-fetch-manager-metrics,client-id=.*'
        attribute:
          bytes-consumed-rate:
            metric_type: gauge
            alias: kafka.consumer.bytes_in
    - include:
        domain: 'kafka.consumer'
        bean_regex: 'kafka\.consumer:type=consumer-fetch-manager-metrics,client-id=.*'
        attribute:
          records-consumed-rate:
            metric_type: gauge
            alias: kafka.consumer.messages_in
    #
    # Aggregate cluster stats
    #
    - include:
        domain: 'kafka.server'
        bean: 'kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec'
        attribute:
          Count:
            metric_type: rate
            alias: kafka.net.bytes_out.rate
    - include:
        domain: 'kafka.server'
        bean: 'kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec'
        attribute:
          Count:
            metric_type: rate
            alias: kafka.net.bytes_in.rate
    - include:
        domain: 'kafka.server'
        bean: 'kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec'
        attribute:
          Count:
            metric_type: rate
            alias: kafka.messages_in.rate
    - include:
        domain: 'kafka.server'
        bean: 'kafka.server:type=BrokerTopicMetrics,name=BytesRejectedPerSec'
        attribute:
          Count:
            metric_type: rate
            alias: kafka.net.bytes_rejected.rate

    #
    # Request timings
    #
    - include:
        domain: 'kafka.server'
        bean: 'kafka.server:type=BrokerTopicMetrics,name=FailedFetchRequestsPerSec'
        attribute:
          Count:
            metric_type: rate
            alias: kafka.request.fetch.failed.rate
    - include:
        domain: 'kafka.server'
        bean: 'kafka.server:type=BrokerTopicMetrics,name=FailedProduceRequestsPerSec'
        attribute:
          Count:
            metric_type: rate
            alias: kafka.request.produce.failed.rate
    - include:
        domain: 'kafka.network'
        bean: 'kafka.network:type=RequestMetrics,name=RequestsPerSec,request=Produce'
        attribute:
          Count:
            metric_type: rate
            alias: kafka.request.produce.rate
    - include:
        domain: 'kafka.network'
        bean: 'kafka.network:type=RequestMetrics,name=TotalTimeMs,request=Produce'
        attribute:
          Mean:
            metric_type: gauge
            alias: kafka.request.produce.time.avg
          99thPercentile:
            metric_type: gauge
            alias: kafka.request.produce.time.99percentile
    - include:
        domain: 'kafka.network'
        bean: 'kafka.network:type=RequestMetrics,name=RequestsPerSec,request=FetchConsumer'
        attribute:
          Count:
            metric_type: rate
            alias: kafka.request.fetch_consumer.rate
    - include:
        domain: 'kafka.network'
        bean: 'kafka.network:type=RequestMetrics,name=RequestsPerSec,request=FetchFollower'
        attribute:
          Count:
            metric_type: rate
            alias: kafka.request.fetch_follower.rate
    - include:
        domain: 'kafka.network'
        bean: 'kafka.network:type=RequestMetrics,name=TotalTimeMs,request=FetchConsumer'
        attribute:
          Mean:
            metric_type: gauge
            alias: kafka.request.fetch_consumer.time.avg
          99thPercentile:
            metric_type: gauge
            alias: kafka.request.fetch_consumer.time.99percentile
    - include:
        domain: 'kafka.network'
        bean: 'kafka.network:type=RequestMetrics,name=TotalTimeMs,request=FetchFollower'
        attribute:
          Mean:
            metric_type: gauge
            alias: kafka.request.fetch_follower.time.avg
          99thPercentile:
            metric_type: gauge
            alias: kafka.request.fetch_follower.time.99percentile
    - include:
        domain: 'kafka.network'
        bean: 'kafka.network:type=RequestMetrics,name=TotalTimeMs,request=UpdateMetadata'
        attribute:
          Mean:
            metric_type: gauge
            alias: kafka.request.update_metadata.time.avg
          99thPercentile:
            metric_type: gauge
            alias: kafka.request.update_metadata.time.99percentile
    - include:
        domain: 'kafka.network'
        bean: 'kafka.network:type=RequestMetrics,name=TotalTimeMs,request=Metadata'
        attribute:
          Mean:
            metric_type: gauge
            alias: kafka.request.metadata.time.avg
          99thPercentile:
            metric_type: gauge
            alias: kafka.request.metadata.time.99percentile
    - include:
        domain: 'kafka.network'
        bean: 'kafka.network:type=RequestMetrics,name=TotalTimeMs,request=Offsets'
        attribute:
          Mean:
            metric_type: gauge
            alias: kafka.request.offsets.time.avg
          99thPercentile:
            metric_type: gauge
            alias: kafka.request.offsets.time.99percentile
    - include:
        domain: 'kafka.server'
        bean: 'kafka.server:type=KafkaRequestHandlerPool,name=RequestHandlerAvgIdlePercent'
        attribute:
          Count:
            metric_type: rate
            alias: kafka.request.handler.avg.idle.pct.rate
    - include:
        domain: 'kafka.server'
        bean: 'kafka.server:type=ProducerRequestPurgatory,name=PurgatorySize'
        attribute:
          Value:
            metric_type: gauge
            alias: kafka.request.producer_request_purgatory.size
    - include:
        domain: 'kafka.server'
        bean: 'kafka.server:type=FetchRequestPurgatory,name=PurgatorySize'
        attribute:
          Value:
            metric_type: gauge
            alias: kafka.request.fetch_request_purgatory.size

    #
    # Replication stats
    #
    - include:
        domain: 'kafka.server'
        bean: 'kafka.server:type=ReplicaManager,name=UnderReplicatedPartitions'
        attribute:
          Value:
            metric_type: gauge
            alias: kafka.replication.under_replicated_partitions
    - include:
        domain: 'kafka.server'
        bean: 'kafka.server:type=ReplicaManager,name=IsrShrinksPerSec'
        attribute:
          Count:
            metric_type: rate
            alias: kafka.replication.isr_shrinks.rate
    - include:
        domain: 'kafka.server'
        bean: 'kafka.server:type=ReplicaManager,name=IsrExpandsPerSec'
        attribute:
          Count:
            metric_type: rate
            alias: kafka.replication.isr_expands.rate
    - include:
        domain: 'kafka.controller'
        bean: 'kafka.controller:type=ControllerStats,name=LeaderElectionRateAndTimeMs'
        attribute:
          Count:
            metric_type: rate
            alias: kafka.replication.leader_elections.rate
    - include:
        domain: 'kafka.controller'
        bean: 'kafka.controller:type=ControllerStats,name=UncleanLeaderElectionsPerSec'
        attribute:
          Count:
            metric_type: rate
            alias: kafka.replication.unclean_leader_elections.rate
    - include:
        domain: 'kafka.controller'
        bean: 'kafka.controller:type=KafkaController,name=OfflinePartitionsCount'
        attribute:
          Value:
            metric_type: gauge
            alias: kafka.replication.offline_partitions_count
    - include:
        domain: 'kafka.controller'
        bean: 'kafka.controller:type=KafkaController,name=ActiveControllerCount'
        attribute:
          Value:
            metric_type: gauge
            alias: kafka.replication.active_controller_count
    - include:
        domain: 'kafka.server'
        bean: 'kafka.server:type=ReplicaManager,name=PartitionCount'
        attribute:
          Value:
            metric_type: gauge
            alias: kafka.replication.partition_count
    - include:
        domain: 'kafka.server'
        bean: 'kafka.server:type=ReplicaManager,name=LeaderCount'
        attribute:
          Value:
            metric_type: gauge
            alias: kafka.replication.leader_count
    - include:
        domain: 'kafka.server'
        bean: 'kafka.server:type=ReplicaFetcherManager,name=MaxLag,clientId=Replica'
        attribute:
          Value:
            metric_type: gauge
            alias: kafka.replication.max_lag

    #
    # Log flush stats
    #
    - include:
        domain: 'kafka.log'
        bean: 'kafka.log:type=LogFlushStats,name=LogFlushRateAndTimeMs'
        attribute:
          Count:
            metric_type: rate
            alias: kafka.log.flush_rate.rate

<% end -%>

To integrate all of this node, you need to add in your fqdn.yaml the class in the format:

---
classes:
 - profiles::datadogagent

datadog_agent::api_key: [your key]

After this runs, datadog-agent is installed and you can check it by using ps -ef | grep datadog-agent and also if you like to take a look and you should do that, you will find that there are two new files added to /etc/dd-agent/conf.d called kafka.yaml and zk.yaml.

You are done, please feel free to login to the datadog portal and check you host.

Cheers

Categories
cloud kafka

Monitoring Kafka with DataDog

Hi,

A very interesting series of articles that should be checked regarding one option of Kafka monitoring with Datadog :

 https://www.datadoghq.com/blog/monitoring-kafka-performance-metrics/.

I will have in the near future a task regarding this, will post the outcome when it’s done.

P.S: It was done and you can find the implementation here :

Integrate Kafka with Datadog monitoring using puppet

As for the metrics point of view we will see if this is really an option, i have tried the same with Prometheus and Grafana and it seems to wok better. Keep you posted

Cheers