Integrate Kafka with Datadog monitoring using puppet

Hi,

Since i was in debt with an article on how to integate Kafka monitoring using Datadog, let me tell you a couple of things about this topic. First of all, we are taking the same config of Kafka with Jolokia that was describe in following article. From the install of the brokers on our infrastructure, JMX data is published on port 9990 (this will be needed in the datadog config).

The files you need to create for this task are as follows:

datadogagent.pp

class profiles::datadogagent {
  $_check_api_key = hiera('datadog_agent::api_key')

  contain 'datadog_agent'
  contain 'profiles::datadogagent_config_kafka'
  contain 'datadog_agent::integrations::zk'

  Class['datadog_agent'] -> Class['profiles::datadog_agent_config_kafka']
  Class['datadog_agent'] -> Class['datadog_agent::integrations::zk']
}

datadogagent_config_kafka.pp

class profiles::datadogagent_config_kafka (
$servers = [{'host' => 'localhost', 'port' => '9990'}]
) inherits datadog_agent::params {
  include datadog_agent

  validate_array($servers)

  file { "${datadog_agent::params::conf_dir}/kafka.yaml":
    ensure  => file,
    owner   => $datadog_agent::params::dd_user,
    group   => $datadog_agent::params::dd_group,
    mode    => '0600',
    content => template("${module_name}/kafka.yaml.erb"),
    require => Package[$datadog_agent::params::package_name],
    notify  => Service[$datadog_agent::params::service_name],
  }
}

And since, there isn’t yet an integration by default for the kafka on the datadog module which you can find it here:

https://github.com/DataDog/puppet-datadog-agent

i created in the templates directory the following file:

kafka.yaml.erb (as you can see from the header this is actually the template given by datadog for kafka integration with specific host and port)

##########
# WARNING
##########
# This sample works only for Kafka >= 0.8.2.
# If you are running a version older than that, you can refer to agent 5.2.x released
# sample files, https://raw.githubusercontent.com/DataDog/dd-agent/5.2.1/conf.d/kafka.yaml.example

instances:
<% @servers.each do |server| -%>
  - host: <%= server['host'] %>
    port: <%= server['port'] %> # This is the JMX port on which Kafka exposes its metrics (usually 9999)
    tags:
      kafka: broker

init_config:
  is_jmx: true

  # Metrics collected by this check. You should not have to modify this.
  conf:
    # v0.8.2.x Producers
    - include:
        domain: 'kafka.producer'
        bean_regex: 'kafka\.producer:type=ProducerRequestMetrics,name=ProducerRequestRateAndTimeMs,clientId=.*'
        attribute:
          Count:
            metric_type: rate
            alias: kafka.producer.request_rate
    - include:
        domain: 'kafka.producer'
        bean_regex: 'kafka\.producer:type=ProducerRequestMetrics,name=ProducerRequestRateAndTimeMs,clientId=.*'
        attribute:
          Mean:
            metric_type: gauge
            alias: kafka.producer.request_latency_avg
    - include:
        domain: 'kafka.producer'
        bean_regex: 'kafka\.producer:type=ProducerTopicMetrics,name=BytesPerSec,clientId=.*'
        attribute:
          Count:
            metric_type: rate
            alias: kafka.producer.bytes_out
    - include:
        domain: 'kafka.producer'
        bean_regex: 'kafka\.producer:type=ProducerTopicMetrics,name=MessagesPerSec,clientId=.*'
        attribute:
          Count:
            metric_type: rate
            alias: kafka.producer.message_rate
    # v0.8.2.x Consumers
    - include:
        domain: 'kafka.consumer'
        bean_regex: 'kafka\.consumer:type=ConsumerFetcherManager,name=MaxLag,clientId=.*'
        attribute:
          Value:
            metric_type: gauge
            alias: kafka.consumer.max_lag
    - include:
        domain: 'kafka.consumer'
        bean_regex: 'kafka\.consumer:type=ConsumerFetcherManager,name=MinFetchRate,clientId=.*'
        attribute:
          Value:
            metric_type: gauge
            alias: kafka.consumer.fetch_rate
    - include:
        domain: 'kafka.consumer'
        bean_regex: 'kafka\.consumer:type=ConsumerTopicMetrics,name=BytesPerSec,clientId=.*'
        attribute:
          Count:
            metric_type: rate
            alias: kafka.consumer.bytes_in
    - include:
        domain: 'kafka.consumer'
        bean_regex: 'kafka\.consumer:type=ConsumerTopicMetrics,name=MessagesPerSec,clientId=.*'
        attribute:
          Count:
            metric_type: rate
            alias: kafka.consumer.messages_in

    # Offsets committed to ZooKeeper
    - include:
        domain: 'kafka.consumer'
        bean_regex: 'kafka\.consumer:type=ZookeeperConsumerConnector,name=ZooKeeperCommitsPerSec,clientId=.*'
        attribute:
          Count:
            metric_type: rate
            alias: kafka.consumer.zookeeper_commits
    # Offsets committed to Kafka
    - include:
        domain: 'kafka.consumer'
        bean_regex: 'kafka\.consumer:type=ZookeeperConsumerConnector,name=KafkaCommitsPerSec,clientId=.*'
        attribute:
          Count:
            metric_type: rate
            alias: kafka.consumer.kafka_commits
    # v0.9.0.x Producers
    - include:
        domain: 'kafka.producer'
        bean_regex: 'kafka\.producer:type=producer-metrics,client-id=.*'
        attribute:
          response-rate:
            metric_type: gauge
            alias: kafka.producer.response_rate
    - include:
        domain: 'kafka.producer'
        bean_regex: 'kafka\.producer:type=producer-metrics,client-id=.*'
        attribute:
          request-rate:
            metric_type: gauge
            alias: kafka.producer.request_rate
    - include:
        domain: 'kafka.producer'
        bean_regex: 'kafka\.producer:type=producer-metrics,client-id=.*'
        attribute:
          request-latency-avg:
            metric_type: gauge
            alias: kafka.producer.request_latency_avg
    - include:
        domain: 'kafka.producer'
        bean_regex: 'kafka\.producer:type=producer-metrics,client-id=.*'
        attribute:
          outgoing-byte-rate:
            metric_type: gauge
            alias: kafka.producer.bytes_out
    - include:
        domain: 'kafka.producer'
        bean_regex: 'kafka\.producer:type=producer-metrics,client-id=.*'
        attribute:
          io-wait-time-ns-avg:
            metric_type: gauge
            alias: kafka.producer.io_wait

    # v0.9.0.x Consumers
    - include:
        domain: 'kafka.consumer'
        bean_regex: 'kafka\.consumer:type=consumer-fetch-manager-metrics,client-id=.*'
        attribute:
          bytes-consumed-rate:
            metric_type: gauge
            alias: kafka.consumer.bytes_in
    - include:
        domain: 'kafka.consumer'
        bean_regex: 'kafka\.consumer:type=consumer-fetch-manager-metrics,client-id=.*'
        attribute:
          records-consumed-rate:
            metric_type: gauge
            alias: kafka.consumer.messages_in
    #
    # Aggregate cluster stats
    #
    - include:
        domain: 'kafka.server'
        bean: 'kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec'
        attribute:
          Count:
            metric_type: rate
            alias: kafka.net.bytes_out.rate
    - include:
        domain: 'kafka.server'
        bean: 'kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec'
        attribute:
          Count:
            metric_type: rate
            alias: kafka.net.bytes_in.rate
    - include:
        domain: 'kafka.server'
        bean: 'kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec'
        attribute:
          Count:
            metric_type: rate
            alias: kafka.messages_in.rate
    - include:
        domain: 'kafka.server'
        bean: 'kafka.server:type=BrokerTopicMetrics,name=BytesRejectedPerSec'
        attribute:
          Count:
            metric_type: rate
            alias: kafka.net.bytes_rejected.rate

    #
    # Request timings
    #
    - include:
        domain: 'kafka.server'
        bean: 'kafka.server:type=BrokerTopicMetrics,name=FailedFetchRequestsPerSec'
        attribute:
          Count:
            metric_type: rate
            alias: kafka.request.fetch.failed.rate
    - include:
        domain: 'kafka.server'
        bean: 'kafka.server:type=BrokerTopicMetrics,name=FailedProduceRequestsPerSec'
        attribute:
          Count:
            metric_type: rate
            alias: kafka.request.produce.failed.rate
    - include:
        domain: 'kafka.network'
        bean: 'kafka.network:type=RequestMetrics,name=RequestsPerSec,request=Produce'
        attribute:
          Count:
            metric_type: rate
            alias: kafka.request.produce.rate
    - include:
        domain: 'kafka.network'
        bean: 'kafka.network:type=RequestMetrics,name=TotalTimeMs,request=Produce'
        attribute:
          Mean:
            metric_type: gauge
            alias: kafka.request.produce.time.avg
          99thPercentile:
            metric_type: gauge
            alias: kafka.request.produce.time.99percentile
    - include:
        domain: 'kafka.network'
        bean: 'kafka.network:type=RequestMetrics,name=RequestsPerSec,request=FetchConsumer'
        attribute:
          Count:
            metric_type: rate
            alias: kafka.request.fetch_consumer.rate
    - include:
        domain: 'kafka.network'
        bean: 'kafka.network:type=RequestMetrics,name=RequestsPerSec,request=FetchFollower'
        attribute:
          Count:
            metric_type: rate
            alias: kafka.request.fetch_follower.rate
    - include:
        domain: 'kafka.network'
        bean: 'kafka.network:type=RequestMetrics,name=TotalTimeMs,request=FetchConsumer'
        attribute:
          Mean:
            metric_type: gauge
            alias: kafka.request.fetch_consumer.time.avg
          99thPercentile:
            metric_type: gauge
            alias: kafka.request.fetch_consumer.time.99percentile
    - include:
        domain: 'kafka.network'
        bean: 'kafka.network:type=RequestMetrics,name=TotalTimeMs,request=FetchFollower'
        attribute:
          Mean:
            metric_type: gauge
            alias: kafka.request.fetch_follower.time.avg
          99thPercentile:
            metric_type: gauge
            alias: kafka.request.fetch_follower.time.99percentile
    - include:
        domain: 'kafka.network'
        bean: 'kafka.network:type=RequestMetrics,name=TotalTimeMs,request=UpdateMetadata'
        attribute:
          Mean:
            metric_type: gauge
            alias: kafka.request.update_metadata.time.avg
          99thPercentile:
            metric_type: gauge
            alias: kafka.request.update_metadata.time.99percentile
    - include:
        domain: 'kafka.network'
        bean: 'kafka.network:type=RequestMetrics,name=TotalTimeMs,request=Metadata'
        attribute:
          Mean:
            metric_type: gauge
            alias: kafka.request.metadata.time.avg
          99thPercentile:
            metric_type: gauge
            alias: kafka.request.metadata.time.99percentile
    - include:
        domain: 'kafka.network'
        bean: 'kafka.network:type=RequestMetrics,name=TotalTimeMs,request=Offsets'
        attribute:
          Mean:
            metric_type: gauge
            alias: kafka.request.offsets.time.avg
          99thPercentile:
            metric_type: gauge
            alias: kafka.request.offsets.time.99percentile
    - include:
        domain: 'kafka.server'
        bean: 'kafka.server:type=KafkaRequestHandlerPool,name=RequestHandlerAvgIdlePercent'
        attribute:
          Count:
            metric_type: rate
            alias: kafka.request.handler.avg.idle.pct.rate
    - include:
        domain: 'kafka.server'
        bean: 'kafka.server:type=ProducerRequestPurgatory,name=PurgatorySize'
        attribute:
          Value:
            metric_type: gauge
            alias: kafka.request.producer_request_purgatory.size
    - include:
        domain: 'kafka.server'
        bean: 'kafka.server:type=FetchRequestPurgatory,name=PurgatorySize'
        attribute:
          Value:
            metric_type: gauge
            alias: kafka.request.fetch_request_purgatory.size

    #
    # Replication stats
    #
    - include:
        domain: 'kafka.server'
        bean: 'kafka.server:type=ReplicaManager,name=UnderReplicatedPartitions'
        attribute:
          Value:
            metric_type: gauge
            alias: kafka.replication.under_replicated_partitions
    - include:
        domain: 'kafka.server'
        bean: 'kafka.server:type=ReplicaManager,name=IsrShrinksPerSec'
        attribute:
          Count:
            metric_type: rate
            alias: kafka.replication.isr_shrinks.rate
    - include:
        domain: 'kafka.server'
        bean: 'kafka.server:type=ReplicaManager,name=IsrExpandsPerSec'
        attribute:
          Count:
            metric_type: rate
            alias: kafka.replication.isr_expands.rate
    - include:
        domain: 'kafka.controller'
        bean: 'kafka.controller:type=ControllerStats,name=LeaderElectionRateAndTimeMs'
        attribute:
          Count:
            metric_type: rate
            alias: kafka.replication.leader_elections.rate
    - include:
        domain: 'kafka.controller'
        bean: 'kafka.controller:type=ControllerStats,name=UncleanLeaderElectionsPerSec'
        attribute:
          Count:
            metric_type: rate
            alias: kafka.replication.unclean_leader_elections.rate
    - include:
        domain: 'kafka.controller'
        bean: 'kafka.controller:type=KafkaController,name=OfflinePartitionsCount'
        attribute:
          Value:
            metric_type: gauge
            alias: kafka.replication.offline_partitions_count
    - include:
        domain: 'kafka.controller'
        bean: 'kafka.controller:type=KafkaController,name=ActiveControllerCount'
        attribute:
          Value:
            metric_type: gauge
            alias: kafka.replication.active_controller_count
    - include:
        domain: 'kafka.server'
        bean: 'kafka.server:type=ReplicaManager,name=PartitionCount'
        attribute:
          Value:
            metric_type: gauge
            alias: kafka.replication.partition_count
    - include:
        domain: 'kafka.server'
        bean: 'kafka.server:type=ReplicaManager,name=LeaderCount'
        attribute:
          Value:
            metric_type: gauge
            alias: kafka.replication.leader_count
    - include:
        domain: 'kafka.server'
        bean: 'kafka.server:type=ReplicaFetcherManager,name=MaxLag,clientId=Replica'
        attribute:
          Value:
            metric_type: gauge
            alias: kafka.replication.max_lag

    #
    # Log flush stats
    #
    - include:
        domain: 'kafka.log'
        bean: 'kafka.log:type=LogFlushStats,name=LogFlushRateAndTimeMs'
        attribute:
          Count:
            metric_type: rate
            alias: kafka.log.flush_rate.rate

<% end -%>

To integrate all of this node, you need to add in your fqdn.yaml the class in the format:

---
classes:
 - profiles::datadogagent

datadog_agent::api_key: [your key]

After this runs, datadog-agent is installed and you can check it by using ps -ef | grep datadog-agent and also if you like to take a look and you should do that, you will find that there are two new files added to /etc/dd-agent/conf.d called kafka.yaml and zk.yaml.

You are done, please feel free to login to the datadog portal and check you host.

Cheers