Hi,
Since i was in debt with an article on how to integate Kafka monitoring using Datadog, let me tell you a couple of things about this topic. First of all, we are taking the same config of Kafka with Jolokia that was describe in following article. From the install of the brokers on our infrastructure, JMX data is published on port 9990 (this will be needed in the datadog config).
The files you need to create for this task are as follows:
datadogagent.pp
class profiles::datadogagent {
$_check_api_key = hiera('datadog_agent::api_key')
contain 'datadog_agent'
contain 'profiles::datadogagent_config_kafka'
contain 'datadog_agent::integrations::zk'
Class['datadog_agent'] -> Class['profiles::datadog_agent_config_kafka']
Class['datadog_agent'] -> Class['datadog_agent::integrations::zk']
}
datadogagent_config_kafka.pp
class profiles::datadogagent_config_kafka (
$servers = [{'host' => 'localhost', 'port' => '9990'}]
) inherits datadog_agent::params {
include datadog_agent
validate_array($servers)
file { "${datadog_agent::params::conf_dir}/kafka.yaml":
ensure => file,
owner => $datadog_agent::params::dd_user,
group => $datadog_agent::params::dd_group,
mode => '0600',
content => template("${module_name}/kafka.yaml.erb"),
require => Package[$datadog_agent::params::package_name],
notify => Service[$datadog_agent::params::service_name],
}
}
And since, there isn’t yet an integration by default for the kafka on the datadog module which you can find it here:
https://github.com/DataDog/puppet-datadog-agent
i created in the templates directory the following file:
kafka.yaml.erb (as you can see from the header this is actually the template given by datadog for kafka integration with specific host and port)
##########
# WARNING
##########
# This sample works only for Kafka >= 0.8.2.
# If you are running a version older than that, you can refer to agent 5.2.x released
# sample files, https://raw.githubusercontent.com/DataDog/dd-agent/5.2.1/conf.d/kafka.yaml.example
instances:
<% @servers.each do |server| -%>
- host: <%= server['host'] %>
port: <%= server['port'] %> # This is the JMX port on which Kafka exposes its metrics (usually 9999)
tags:
kafka: broker
init_config:
is_jmx: true
# Metrics collected by this check. You should not have to modify this.
conf:
# v0.8.2.x Producers
- include:
domain: 'kafka.producer'
bean_regex: 'kafka\.producer:type=ProducerRequestMetrics,name=ProducerRequestRateAndTimeMs,clientId=.*'
attribute:
Count:
metric_type: rate
alias: kafka.producer.request_rate
- include:
domain: 'kafka.producer'
bean_regex: 'kafka\.producer:type=ProducerRequestMetrics,name=ProducerRequestRateAndTimeMs,clientId=.*'
attribute:
Mean:
metric_type: gauge
alias: kafka.producer.request_latency_avg
- include:
domain: 'kafka.producer'
bean_regex: 'kafka\.producer:type=ProducerTopicMetrics,name=BytesPerSec,clientId=.*'
attribute:
Count:
metric_type: rate
alias: kafka.producer.bytes_out
- include:
domain: 'kafka.producer'
bean_regex: 'kafka\.producer:type=ProducerTopicMetrics,name=MessagesPerSec,clientId=.*'
attribute:
Count:
metric_type: rate
alias: kafka.producer.message_rate
# v0.8.2.x Consumers
- include:
domain: 'kafka.consumer'
bean_regex: 'kafka\.consumer:type=ConsumerFetcherManager,name=MaxLag,clientId=.*'
attribute:
Value:
metric_type: gauge
alias: kafka.consumer.max_lag
- include:
domain: 'kafka.consumer'
bean_regex: 'kafka\.consumer:type=ConsumerFetcherManager,name=MinFetchRate,clientId=.*'
attribute:
Value:
metric_type: gauge
alias: kafka.consumer.fetch_rate
- include:
domain: 'kafka.consumer'
bean_regex: 'kafka\.consumer:type=ConsumerTopicMetrics,name=BytesPerSec,clientId=.*'
attribute:
Count:
metric_type: rate
alias: kafka.consumer.bytes_in
- include:
domain: 'kafka.consumer'
bean_regex: 'kafka\.consumer:type=ConsumerTopicMetrics,name=MessagesPerSec,clientId=.*'
attribute:
Count:
metric_type: rate
alias: kafka.consumer.messages_in
# Offsets committed to ZooKeeper
- include:
domain: 'kafka.consumer'
bean_regex: 'kafka\.consumer:type=ZookeeperConsumerConnector,name=ZooKeeperCommitsPerSec,clientId=.*'
attribute:
Count:
metric_type: rate
alias: kafka.consumer.zookeeper_commits
# Offsets committed to Kafka
- include:
domain: 'kafka.consumer'
bean_regex: 'kafka\.consumer:type=ZookeeperConsumerConnector,name=KafkaCommitsPerSec,clientId=.*'
attribute:
Count:
metric_type: rate
alias: kafka.consumer.kafka_commits
# v0.9.0.x Producers
- include:
domain: 'kafka.producer'
bean_regex: 'kafka\.producer:type=producer-metrics,client-id=.*'
attribute:
response-rate:
metric_type: gauge
alias: kafka.producer.response_rate
- include:
domain: 'kafka.producer'
bean_regex: 'kafka\.producer:type=producer-metrics,client-id=.*'
attribute:
request-rate:
metric_type: gauge
alias: kafka.producer.request_rate
- include:
domain: 'kafka.producer'
bean_regex: 'kafka\.producer:type=producer-metrics,client-id=.*'
attribute:
request-latency-avg:
metric_type: gauge
alias: kafka.producer.request_latency_avg
- include:
domain: 'kafka.producer'
bean_regex: 'kafka\.producer:type=producer-metrics,client-id=.*'
attribute:
outgoing-byte-rate:
metric_type: gauge
alias: kafka.producer.bytes_out
- include:
domain: 'kafka.producer'
bean_regex: 'kafka\.producer:type=producer-metrics,client-id=.*'
attribute:
io-wait-time-ns-avg:
metric_type: gauge
alias: kafka.producer.io_wait
# v0.9.0.x Consumers
- include:
domain: 'kafka.consumer'
bean_regex: 'kafka\.consumer:type=consumer-fetch-manager-metrics,client-id=.*'
attribute:
bytes-consumed-rate:
metric_type: gauge
alias: kafka.consumer.bytes_in
- include:
domain: 'kafka.consumer'
bean_regex: 'kafka\.consumer:type=consumer-fetch-manager-metrics,client-id=.*'
attribute:
records-consumed-rate:
metric_type: gauge
alias: kafka.consumer.messages_in
#
# Aggregate cluster stats
#
- include:
domain: 'kafka.server'
bean: 'kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec'
attribute:
Count:
metric_type: rate
alias: kafka.net.bytes_out.rate
- include:
domain: 'kafka.server'
bean: 'kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec'
attribute:
Count:
metric_type: rate
alias: kafka.net.bytes_in.rate
- include:
domain: 'kafka.server'
bean: 'kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec'
attribute:
Count:
metric_type: rate
alias: kafka.messages_in.rate
- include:
domain: 'kafka.server'
bean: 'kafka.server:type=BrokerTopicMetrics,name=BytesRejectedPerSec'
attribute:
Count:
metric_type: rate
alias: kafka.net.bytes_rejected.rate
#
# Request timings
#
- include:
domain: 'kafka.server'
bean: 'kafka.server:type=BrokerTopicMetrics,name=FailedFetchRequestsPerSec'
attribute:
Count:
metric_type: rate
alias: kafka.request.fetch.failed.rate
- include:
domain: 'kafka.server'
bean: 'kafka.server:type=BrokerTopicMetrics,name=FailedProduceRequestsPerSec'
attribute:
Count:
metric_type: rate
alias: kafka.request.produce.failed.rate
- include:
domain: 'kafka.network'
bean: 'kafka.network:type=RequestMetrics,name=RequestsPerSec,request=Produce'
attribute:
Count:
metric_type: rate
alias: kafka.request.produce.rate
- include:
domain: 'kafka.network'
bean: 'kafka.network:type=RequestMetrics,name=TotalTimeMs,request=Produce'
attribute:
Mean:
metric_type: gauge
alias: kafka.request.produce.time.avg
99thPercentile:
metric_type: gauge
alias: kafka.request.produce.time.99percentile
- include:
domain: 'kafka.network'
bean: 'kafka.network:type=RequestMetrics,name=RequestsPerSec,request=FetchConsumer'
attribute:
Count:
metric_type: rate
alias: kafka.request.fetch_consumer.rate
- include:
domain: 'kafka.network'
bean: 'kafka.network:type=RequestMetrics,name=RequestsPerSec,request=FetchFollower'
attribute:
Count:
metric_type: rate
alias: kafka.request.fetch_follower.rate
- include:
domain: 'kafka.network'
bean: 'kafka.network:type=RequestMetrics,name=TotalTimeMs,request=FetchConsumer'
attribute:
Mean:
metric_type: gauge
alias: kafka.request.fetch_consumer.time.avg
99thPercentile:
metric_type: gauge
alias: kafka.request.fetch_consumer.time.99percentile
- include:
domain: 'kafka.network'
bean: 'kafka.network:type=RequestMetrics,name=TotalTimeMs,request=FetchFollower'
attribute:
Mean:
metric_type: gauge
alias: kafka.request.fetch_follower.time.avg
99thPercentile:
metric_type: gauge
alias: kafka.request.fetch_follower.time.99percentile
- include:
domain: 'kafka.network'
bean: 'kafka.network:type=RequestMetrics,name=TotalTimeMs,request=UpdateMetadata'
attribute:
Mean:
metric_type: gauge
alias: kafka.request.update_metadata.time.avg
99thPercentile:
metric_type: gauge
alias: kafka.request.update_metadata.time.99percentile
- include:
domain: 'kafka.network'
bean: 'kafka.network:type=RequestMetrics,name=TotalTimeMs,request=Metadata'
attribute:
Mean:
metric_type: gauge
alias: kafka.request.metadata.time.avg
99thPercentile:
metric_type: gauge
alias: kafka.request.metadata.time.99percentile
- include:
domain: 'kafka.network'
bean: 'kafka.network:type=RequestMetrics,name=TotalTimeMs,request=Offsets'
attribute:
Mean:
metric_type: gauge
alias: kafka.request.offsets.time.avg
99thPercentile:
metric_type: gauge
alias: kafka.request.offsets.time.99percentile
- include:
domain: 'kafka.server'
bean: 'kafka.server:type=KafkaRequestHandlerPool,name=RequestHandlerAvgIdlePercent'
attribute:
Count:
metric_type: rate
alias: kafka.request.handler.avg.idle.pct.rate
- include:
domain: 'kafka.server'
bean: 'kafka.server:type=ProducerRequestPurgatory,name=PurgatorySize'
attribute:
Value:
metric_type: gauge
alias: kafka.request.producer_request_purgatory.size
- include:
domain: 'kafka.server'
bean: 'kafka.server:type=FetchRequestPurgatory,name=PurgatorySize'
attribute:
Value:
metric_type: gauge
alias: kafka.request.fetch_request_purgatory.size
#
# Replication stats
#
- include:
domain: 'kafka.server'
bean: 'kafka.server:type=ReplicaManager,name=UnderReplicatedPartitions'
attribute:
Value:
metric_type: gauge
alias: kafka.replication.under_replicated_partitions
- include:
domain: 'kafka.server'
bean: 'kafka.server:type=ReplicaManager,name=IsrShrinksPerSec'
attribute:
Count:
metric_type: rate
alias: kafka.replication.isr_shrinks.rate
- include:
domain: 'kafka.server'
bean: 'kafka.server:type=ReplicaManager,name=IsrExpandsPerSec'
attribute:
Count:
metric_type: rate
alias: kafka.replication.isr_expands.rate
- include:
domain: 'kafka.controller'
bean: 'kafka.controller:type=ControllerStats,name=LeaderElectionRateAndTimeMs'
attribute:
Count:
metric_type: rate
alias: kafka.replication.leader_elections.rate
- include:
domain: 'kafka.controller'
bean: 'kafka.controller:type=ControllerStats,name=UncleanLeaderElectionsPerSec'
attribute:
Count:
metric_type: rate
alias: kafka.replication.unclean_leader_elections.rate
- include:
domain: 'kafka.controller'
bean: 'kafka.controller:type=KafkaController,name=OfflinePartitionsCount'
attribute:
Value:
metric_type: gauge
alias: kafka.replication.offline_partitions_count
- include:
domain: 'kafka.controller'
bean: 'kafka.controller:type=KafkaController,name=ActiveControllerCount'
attribute:
Value:
metric_type: gauge
alias: kafka.replication.active_controller_count
- include:
domain: 'kafka.server'
bean: 'kafka.server:type=ReplicaManager,name=PartitionCount'
attribute:
Value:
metric_type: gauge
alias: kafka.replication.partition_count
- include:
domain: 'kafka.server'
bean: 'kafka.server:type=ReplicaManager,name=LeaderCount'
attribute:
Value:
metric_type: gauge
alias: kafka.replication.leader_count
- include:
domain: 'kafka.server'
bean: 'kafka.server:type=ReplicaFetcherManager,name=MaxLag,clientId=Replica'
attribute:
Value:
metric_type: gauge
alias: kafka.replication.max_lag
#
# Log flush stats
#
- include:
domain: 'kafka.log'
bean: 'kafka.log:type=LogFlushStats,name=LogFlushRateAndTimeMs'
attribute:
Count:
metric_type: rate
alias: kafka.log.flush_rate.rate
<% end -%>
To integrate all of this node, you need to add in your fqdn.yaml the class in the format:
---
classes:
- profiles::datadogagent
datadog_agent::api_key: [your key]
After this runs, datadog-agent is installed and you can check it by using ps -ef | grep datadog-agent and also if you like to take a look and you should do that, you will find that there are two new files added to /etc/dd-agent/conf.d called kafka.yaml and zk.yaml.
You are done, please feel free to login to the datadog portal and check you host.
Cheers