Categories
cloud kafka newtools

Integrate Kafka with Datadog monitoring using puppet

Hi,

Since i was in debt with an article on how to integate Kafka monitoring using Datadog, let me tell you a couple of things about this topic. First of all, we are taking the same config of Kafka with Jolokia that was describe in following article. From the install of the brokers on our infrastructure, JMX data is published on port 9990 (this will be needed in the datadog config).

The files you need to create for this task are as follows:

datadogagent.pp

class profiles::datadogagent {
  $_check_api_key = hiera('datadog_agent::api_key')

  contain 'datadog_agent'
  contain 'profiles::datadogagent_config_kafka'
  contain 'datadog_agent::integrations::zk'

  Class['datadog_agent'] -> Class['profiles::datadog_agent_config_kafka']
  Class['datadog_agent'] -> Class['datadog_agent::integrations::zk']
}

datadogagent_config_kafka.pp

class profiles::datadogagent_config_kafka (
$servers = [{'host' => 'localhost', 'port' => '9990'}]
) inherits datadog_agent::params {
  include datadog_agent

  validate_array($servers)

  file { "${datadog_agent::params::conf_dir}/kafka.yaml":
    ensure  => file,
    owner   => $datadog_agent::params::dd_user,
    group   => $datadog_agent::params::dd_group,
    mode    => '0600',
    content => template("${module_name}/kafka.yaml.erb"),
    require => Package[$datadog_agent::params::package_name],
    notify  => Service[$datadog_agent::params::service_name],
  }
}

And since, there isn’t yet an integration by default for the kafka on the datadog module which you can find it here:

https://github.com/DataDog/puppet-datadog-agent

i created in the templates directory the following file:

kafka.yaml.erb (as you can see from the header this is actually the template given by datadog for kafka integration with specific host and port)

##########
# WARNING
##########
# This sample works only for Kafka >= 0.8.2.
# If you are running a version older than that, you can refer to agent 5.2.x released
# sample files, https://raw.githubusercontent.com/DataDog/dd-agent/5.2.1/conf.d/kafka.yaml.example

instances:
<% @servers.each do |server| -%>
  - host: <%= server['host'] %>
    port: <%= server['port'] %> # This is the JMX port on which Kafka exposes its metrics (usually 9999)
    tags:
      kafka: broker

init_config:
  is_jmx: true

  # Metrics collected by this check. You should not have to modify this.
  conf:
    # v0.8.2.x Producers
    - include:
        domain: 'kafka.producer'
        bean_regex: 'kafka\.producer:type=ProducerRequestMetrics,name=ProducerRequestRateAndTimeMs,clientId=.*'
        attribute:
          Count:
            metric_type: rate
            alias: kafka.producer.request_rate
    - include:
        domain: 'kafka.producer'
        bean_regex: 'kafka\.producer:type=ProducerRequestMetrics,name=ProducerRequestRateAndTimeMs,clientId=.*'
        attribute:
          Mean:
            metric_type: gauge
            alias: kafka.producer.request_latency_avg
    - include:
        domain: 'kafka.producer'
        bean_regex: 'kafka\.producer:type=ProducerTopicMetrics,name=BytesPerSec,clientId=.*'
        attribute:
          Count:
            metric_type: rate
            alias: kafka.producer.bytes_out
    - include:
        domain: 'kafka.producer'
        bean_regex: 'kafka\.producer:type=ProducerTopicMetrics,name=MessagesPerSec,clientId=.*'
        attribute:
          Count:
            metric_type: rate
            alias: kafka.producer.message_rate
    # v0.8.2.x Consumers
    - include:
        domain: 'kafka.consumer'
        bean_regex: 'kafka\.consumer:type=ConsumerFetcherManager,name=MaxLag,clientId=.*'
        attribute:
          Value:
            metric_type: gauge
            alias: kafka.consumer.max_lag
    - include:
        domain: 'kafka.consumer'
        bean_regex: 'kafka\.consumer:type=ConsumerFetcherManager,name=MinFetchRate,clientId=.*'
        attribute:
          Value:
            metric_type: gauge
            alias: kafka.consumer.fetch_rate
    - include:
        domain: 'kafka.consumer'
        bean_regex: 'kafka\.consumer:type=ConsumerTopicMetrics,name=BytesPerSec,clientId=.*'
        attribute:
          Count:
            metric_type: rate
            alias: kafka.consumer.bytes_in
    - include:
        domain: 'kafka.consumer'
        bean_regex: 'kafka\.consumer:type=ConsumerTopicMetrics,name=MessagesPerSec,clientId=.*'
        attribute:
          Count:
            metric_type: rate
            alias: kafka.consumer.messages_in

    # Offsets committed to ZooKeeper
    - include:
        domain: 'kafka.consumer'
        bean_regex: 'kafka\.consumer:type=ZookeeperConsumerConnector,name=ZooKeeperCommitsPerSec,clientId=.*'
        attribute:
          Count:
            metric_type: rate
            alias: kafka.consumer.zookeeper_commits
    # Offsets committed to Kafka
    - include:
        domain: 'kafka.consumer'
        bean_regex: 'kafka\.consumer:type=ZookeeperConsumerConnector,name=KafkaCommitsPerSec,clientId=.*'
        attribute:
          Count:
            metric_type: rate
            alias: kafka.consumer.kafka_commits
    # v0.9.0.x Producers
    - include:
        domain: 'kafka.producer'
        bean_regex: 'kafka\.producer:type=producer-metrics,client-id=.*'
        attribute:
          response-rate:
            metric_type: gauge
            alias: kafka.producer.response_rate
    - include:
        domain: 'kafka.producer'
        bean_regex: 'kafka\.producer:type=producer-metrics,client-id=.*'
        attribute:
          request-rate:
            metric_type: gauge
            alias: kafka.producer.request_rate
    - include:
        domain: 'kafka.producer'
        bean_regex: 'kafka\.producer:type=producer-metrics,client-id=.*'
        attribute:
          request-latency-avg:
            metric_type: gauge
            alias: kafka.producer.request_latency_avg
    - include:
        domain: 'kafka.producer'
        bean_regex: 'kafka\.producer:type=producer-metrics,client-id=.*'
        attribute:
          outgoing-byte-rate:
            metric_type: gauge
            alias: kafka.producer.bytes_out
    - include:
        domain: 'kafka.producer'
        bean_regex: 'kafka\.producer:type=producer-metrics,client-id=.*'
        attribute:
          io-wait-time-ns-avg:
            metric_type: gauge
            alias: kafka.producer.io_wait

    # v0.9.0.x Consumers
    - include:
        domain: 'kafka.consumer'
        bean_regex: 'kafka\.consumer:type=consumer-fetch-manager-metrics,client-id=.*'
        attribute:
          bytes-consumed-rate:
            metric_type: gauge
            alias: kafka.consumer.bytes_in
    - include:
        domain: 'kafka.consumer'
        bean_regex: 'kafka\.consumer:type=consumer-fetch-manager-metrics,client-id=.*'
        attribute:
          records-consumed-rate:
            metric_type: gauge
            alias: kafka.consumer.messages_in
    #
    # Aggregate cluster stats
    #
    - include:
        domain: 'kafka.server'
        bean: 'kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec'
        attribute:
          Count:
            metric_type: rate
            alias: kafka.net.bytes_out.rate
    - include:
        domain: 'kafka.server'
        bean: 'kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec'
        attribute:
          Count:
            metric_type: rate
            alias: kafka.net.bytes_in.rate
    - include:
        domain: 'kafka.server'
        bean: 'kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec'
        attribute:
          Count:
            metric_type: rate
            alias: kafka.messages_in.rate
    - include:
        domain: 'kafka.server'
        bean: 'kafka.server:type=BrokerTopicMetrics,name=BytesRejectedPerSec'
        attribute:
          Count:
            metric_type: rate
            alias: kafka.net.bytes_rejected.rate

    #
    # Request timings
    #
    - include:
        domain: 'kafka.server'
        bean: 'kafka.server:type=BrokerTopicMetrics,name=FailedFetchRequestsPerSec'
        attribute:
          Count:
            metric_type: rate
            alias: kafka.request.fetch.failed.rate
    - include:
        domain: 'kafka.server'
        bean: 'kafka.server:type=BrokerTopicMetrics,name=FailedProduceRequestsPerSec'
        attribute:
          Count:
            metric_type: rate
            alias: kafka.request.produce.failed.rate
    - include:
        domain: 'kafka.network'
        bean: 'kafka.network:type=RequestMetrics,name=RequestsPerSec,request=Produce'
        attribute:
          Count:
            metric_type: rate
            alias: kafka.request.produce.rate
    - include:
        domain: 'kafka.network'
        bean: 'kafka.network:type=RequestMetrics,name=TotalTimeMs,request=Produce'
        attribute:
          Mean:
            metric_type: gauge
            alias: kafka.request.produce.time.avg
          99thPercentile:
            metric_type: gauge
            alias: kafka.request.produce.time.99percentile
    - include:
        domain: 'kafka.network'
        bean: 'kafka.network:type=RequestMetrics,name=RequestsPerSec,request=FetchConsumer'
        attribute:
          Count:
            metric_type: rate
            alias: kafka.request.fetch_consumer.rate
    - include:
        domain: 'kafka.network'
        bean: 'kafka.network:type=RequestMetrics,name=RequestsPerSec,request=FetchFollower'
        attribute:
          Count:
            metric_type: rate
            alias: kafka.request.fetch_follower.rate
    - include:
        domain: 'kafka.network'
        bean: 'kafka.network:type=RequestMetrics,name=TotalTimeMs,request=FetchConsumer'
        attribute:
          Mean:
            metric_type: gauge
            alias: kafka.request.fetch_consumer.time.avg
          99thPercentile:
            metric_type: gauge
            alias: kafka.request.fetch_consumer.time.99percentile
    - include:
        domain: 'kafka.network'
        bean: 'kafka.network:type=RequestMetrics,name=TotalTimeMs,request=FetchFollower'
        attribute:
          Mean:
            metric_type: gauge
            alias: kafka.request.fetch_follower.time.avg
          99thPercentile:
            metric_type: gauge
            alias: kafka.request.fetch_follower.time.99percentile
    - include:
        domain: 'kafka.network'
        bean: 'kafka.network:type=RequestMetrics,name=TotalTimeMs,request=UpdateMetadata'
        attribute:
          Mean:
            metric_type: gauge
            alias: kafka.request.update_metadata.time.avg
          99thPercentile:
            metric_type: gauge
            alias: kafka.request.update_metadata.time.99percentile
    - include:
        domain: 'kafka.network'
        bean: 'kafka.network:type=RequestMetrics,name=TotalTimeMs,request=Metadata'
        attribute:
          Mean:
            metric_type: gauge
            alias: kafka.request.metadata.time.avg
          99thPercentile:
            metric_type: gauge
            alias: kafka.request.metadata.time.99percentile
    - include:
        domain: 'kafka.network'
        bean: 'kafka.network:type=RequestMetrics,name=TotalTimeMs,request=Offsets'
        attribute:
          Mean:
            metric_type: gauge
            alias: kafka.request.offsets.time.avg
          99thPercentile:
            metric_type: gauge
            alias: kafka.request.offsets.time.99percentile
    - include:
        domain: 'kafka.server'
        bean: 'kafka.server:type=KafkaRequestHandlerPool,name=RequestHandlerAvgIdlePercent'
        attribute:
          Count:
            metric_type: rate
            alias: kafka.request.handler.avg.idle.pct.rate
    - include:
        domain: 'kafka.server'
        bean: 'kafka.server:type=ProducerRequestPurgatory,name=PurgatorySize'
        attribute:
          Value:
            metric_type: gauge
            alias: kafka.request.producer_request_purgatory.size
    - include:
        domain: 'kafka.server'
        bean: 'kafka.server:type=FetchRequestPurgatory,name=PurgatorySize'
        attribute:
          Value:
            metric_type: gauge
            alias: kafka.request.fetch_request_purgatory.size

    #
    # Replication stats
    #
    - include:
        domain: 'kafka.server'
        bean: 'kafka.server:type=ReplicaManager,name=UnderReplicatedPartitions'
        attribute:
          Value:
            metric_type: gauge
            alias: kafka.replication.under_replicated_partitions
    - include:
        domain: 'kafka.server'
        bean: 'kafka.server:type=ReplicaManager,name=IsrShrinksPerSec'
        attribute:
          Count:
            metric_type: rate
            alias: kafka.replication.isr_shrinks.rate
    - include:
        domain: 'kafka.server'
        bean: 'kafka.server:type=ReplicaManager,name=IsrExpandsPerSec'
        attribute:
          Count:
            metric_type: rate
            alias: kafka.replication.isr_expands.rate
    - include:
        domain: 'kafka.controller'
        bean: 'kafka.controller:type=ControllerStats,name=LeaderElectionRateAndTimeMs'
        attribute:
          Count:
            metric_type: rate
            alias: kafka.replication.leader_elections.rate
    - include:
        domain: 'kafka.controller'
        bean: 'kafka.controller:type=ControllerStats,name=UncleanLeaderElectionsPerSec'
        attribute:
          Count:
            metric_type: rate
            alias: kafka.replication.unclean_leader_elections.rate
    - include:
        domain: 'kafka.controller'
        bean: 'kafka.controller:type=KafkaController,name=OfflinePartitionsCount'
        attribute:
          Value:
            metric_type: gauge
            alias: kafka.replication.offline_partitions_count
    - include:
        domain: 'kafka.controller'
        bean: 'kafka.controller:type=KafkaController,name=ActiveControllerCount'
        attribute:
          Value:
            metric_type: gauge
            alias: kafka.replication.active_controller_count
    - include:
        domain: 'kafka.server'
        bean: 'kafka.server:type=ReplicaManager,name=PartitionCount'
        attribute:
          Value:
            metric_type: gauge
            alias: kafka.replication.partition_count
    - include:
        domain: 'kafka.server'
        bean: 'kafka.server:type=ReplicaManager,name=LeaderCount'
        attribute:
          Value:
            metric_type: gauge
            alias: kafka.replication.leader_count
    - include:
        domain: 'kafka.server'
        bean: 'kafka.server:type=ReplicaFetcherManager,name=MaxLag,clientId=Replica'
        attribute:
          Value:
            metric_type: gauge
            alias: kafka.replication.max_lag

    #
    # Log flush stats
    #
    - include:
        domain: 'kafka.log'
        bean: 'kafka.log:type=LogFlushStats,name=LogFlushRateAndTimeMs'
        attribute:
          Count:
            metric_type: rate
            alias: kafka.log.flush_rate.rate

<% end -%>

To integrate all of this node, you need to add in your fqdn.yaml the class in the format:

---
classes:
 - profiles::datadogagent

datadog_agent::api_key: [your key]

After this runs, datadog-agent is installed and you can check it by using ps -ef | grep datadog-agent and also if you like to take a look and you should do that, you will find that there are two new files added to /etc/dd-agent/conf.d called kafka.yaml and zk.yaml.

You are done, please feel free to login to the datadog portal and check you host.

Cheers

Categories
mq

Migrating Websphere MQ 7.0.1 to IBM MQ 8.0.0.x

Hi,

Since there aren’t that many things new that i have done regarding Kafka and opensource software, i want to share with you this, not to be lost on a distant gitlab repo my IBM MQ migration steps and script.

In order to migrate an instance from an older version (without support ) to a newer version, even IBM recommended  that you leave it as is, just stop it and migrate the packages after that you can use a script which i will provide below.

First step is to stop the running queue managers using the following command:

for i in $(dspmq | cut -d'(' -f2 | cut -d')' -f1); do endmqm $i; done

If it fails and connections still remain unclosed, you can try using endmqm -i instead of endmqm. After the feedback is returned that queue managers are stopped, just to be sure, you can extra check it using a simple ps -ef | grep amq. Please check the status of the MQ listener as well, doing it with ps -ef | grep mqlsr. After you made sure everything is stopped, you can contact your AIX admin to commit the new packages. This should be checked using lslpp -l | greep mqm, an example output looks like this:

# lslpp -l | grep mqm
  mqm.base.runtime           8.0.0.6  COMMITTED  IBM WebSphere MQ Runtime for
  mqm.base.samples           8.0.0.6  COMMITTED  IBM WebSphere MQ Samples
  mqm.base.sdk               8.0.0.6  COMMITTED  IBM WebSphere MQ Base Kit for
  mqm.client.rte             8.0.0.6  COMMITTED  IBM WebSphere MQ Client for
  mqm.java.rte               8.0.0.6  COMMITTED  IBM WebSphere MQ Java Client,
  mqm.msg.en_US              8.0.0.6  COMMITTED  IBM WebSphere MQ Messages -
  mqm.server.rte             8.0.0.6  COMMITTED  IBM WebSphere MQ Server
  mqm.base.runtime           8.0.0.6  COMMITTED  IBM WebSphere MQ Runtime for

You now know you can get forward with the migration, and in that purpose you can also use the script below, it’s not bullet proof but you can change it and modify it for you needs:

#!/usr/bin/ksh
echo ". /usr/mqm/bin/setmqenv -s" >> $HOME/.profile
env | grep PATH | grep /usr/mqm/bin > /dev/null
if [ "$?" -ne "0" ];then
	export PATH=$PATH:/usr/mqm/bin
fi
SYSTEM_CRITICAL_VAL=`mqconfig | grep CRITICAL`
if [ "$?" -eq "0" ];then
	echo "System values for MQ installation are not OK, please check $SYSTEM_CRITICAL_VAL"
	exit
fi
INSTALLATION_NAME=`dspmqver | grep InstName | cut -d':' -f2 | sed 's/ //g'`
for i in $(dspmq | cut -d'(' -f2 | cut -d')' -f1);do
	echo "Queue manager ${i} is being set to installation ${INSTALLATION_NAME}"
	setmqm -m $i -n $INSTALLATION_NAME
	#echo $?
	if ([ "$?" -eq "0" ] || [ "$?" -eq "59" ]);then
		echo "Starting instance $i"
		strmqm $i
		if [ "$?" -eq "0" ];then
		sleep 10
		echo "ALTER QMGR CHLAUTH(DISABLED) " | runmqsc $i
		echo "ALTER AUTHINFO(SYSTEM.DEFAULT.AUTHINFO.IDPWOS) AUTHTYPE(IDPWOS) CHCKLOCL(NONE) CHCKCLNT (NONE)" | runmqsc $i
		echo "REFRESH SECURITY TYPE(AUTHSERV)" | runmqsc $i
		echo "ALTER CHL('SYSTEM.DEF.SVRCONN') CHLTYPE(SVRCONN) MAXMSGL(104857600) HBINT(300) MAXINST(999999999) MCAUSER('mqm') SSLCAUTH(OPTIONAL) "| runmqsc $i
		endmqm -i $i
		sleep 30
		fi
	fi
done	

Once this is done, you should have been migrated to MQ 8 and can connect to it using SYSTEM.DEF.SVRCONN which has MCAUSER setup to mqm. Please keep in mind that this a temporary solution and that  IBM MQ connection policies should be enabled for channel connect (you can also opt for a different user on the server connect channel and authorize just the objects that you connector needs)

That should be all for today,

Cheers!

Categories
kafka

Automigrate data contained in a topic to different Kafka brokers

Hi,

Let’s keep it short, you have a cluster composed of three Kafka brokers and you add another two. The details of doing that are pretty straight forward. If they are new, give them a unique broker.id and put the zoookeeper.connect string in order to know on which cluster to register. If they are already registered with a id, please keep in mind that you need to change it also in meta.properties from the logs.dir location.

So, we have now the situation:

Connecting to localhost:2181
Welcome to ZooKeeper!
JLine support is disabled

WATCHER::

WatchedEvent state:SyncConnected type:None path:null
ls /brokers/ids
[13, 14, 1003, 1002, 1001]

And we need to migrate one topic which has a replication factor of two and also two partitions:

Topic:test    PartitionCount:2    ReplicationFactor:2    Configs:
    Topic: test    Partition: 0    Leader: 1003    Replicas: 1003,1001    Isr: 1001,1003
    Topic: test    Partition: 1    Leader: 1001    Replicas: 1001,1002    Isr: 1002,1001

In order to achieve this you will need to follow these steps:

Steps to be taken:
1 Create json with content:

{"topics": [{"topic": "test"}],
  "version":1
  }

2 Run the following command: ./kafka-reassign-partitions.sh –zookeeper localhost:2181 –topics-to-move-json-file topics-to-move.json –broker-list “13,14” –generate
Output:

Current partition replica assignment

{"version":1,"partitions":[{"topic":"test","partition":0,"replicas":[1003,1001]},{"topic":"test","partition":1,"replicas":[1001,1002]}]}
Proposed partition reassignment configuration

{"version":1,"partitions":[{"topic":"test","partition":0,"replicas":[14,13]},{"topic":"test","partition":1,"replicas":[13,14]}]}

3 Create file expand-cluster-reassignment.json with following content:

{"version":1,"partitions":[{"topic":"test","partition":0,"replicas":[14,13]},{"topic":"test","partition":1,"replicas":[13,14]}]}

4 Run following command for topic migration: ./kafka-reassign-partitions.sh –zookeeper localhost:2181 –reassignment-json-file expand-cluster-reassignment.json –execute
Output:

Current partition replica assignment

{"version":1,"partitions":[{"topic":"test","partition":0,"replicas":[1003,1001]},{"topic":"test","partition":1,"replicas":[1001,1002]}]}

Save this to use as the --reassignment-json-file option during rollback
Successfully started reassignment of partitions.

5 Verify the status of migration with command: ./kafka-reassign-partitions.sh –zookeeper localhost:2181 –reassignment-json-file expand-cluster-reassignment.json –verify

Output:
Status of partition reassignment: 
Reassignment of partition [test,0] completed successfully
Reassignment of partition [test,1] completed successfully

Final status:

./kafka-topics.sh --zookeeper localhost:2181 --topic test --describe
Topic:test	PartitionCount:2	ReplicationFactor:2	Configs:
	Topic: test	Partition: 0	Leader: 13	Replicas: 13,14	Isr: 14,13
	Topic: test	Partition: 1	Leader: 14	Replicas: 14,13	Isr: 13,14

This is actual a follow up of the steps described here:

https://kafka.apache.org/documentation.html#basic_ops_automigrate

Cheers!

Categories
kafka newtools

How to deploy Prometheus infrastructure for Kafka monitoring using puppet

Hi,

In the last couple of days i worked on deployment of Prometheus server and agent for Kafka monitoring. In that purpose i will share with you the main points that you need to do in order to achieve this.

First thing to do is to use the prometheus and grafana modules that you will find at the following links:

https://forge.puppet.com/puppet/prometheus
https://forge.puppet.com/puppet/grafana

After these are imported in puppet you need to create the following puppet files:

grafana.pp

class profiles::grafana {
    class { '::grafana':
      cfg => {
        app_mode => 'production',
        server   => {
          http_port     => 8080,
        },
        database => {
          type     => 'sqlite3',
          host     => '127.0.0.1:3306',
          name     => 'grafana',
          user     => 'root',
          password => 'grafana',
        },
        users    => {
          allow_sign_up => false,
        },
      },
    }
}

puppetserver.pp

class profiles::prometheusserver {
    $kafka_nodes=hiera(profiles::prometheusserver::nodes)
   
    if $kafka_nodes {
	class {'::prometheus':
	   global_config  => { 'scrape_interval'=> '15s', 'evaluation_interval'=> '15s', 'external_labels'=> { 'monitor'=>'master'}},
       rule_files     => [ "/etc/prometheus/alert.rules" ],
       scrape_configs => [ {'job_name'=>'prometheus','scrape_interval'=> '30s','scrape_timeout'=>'30s','static_configs'=> [{'targets'=>['localhost:9090'], 'labels'=> { 'alias'=>'Prometheus'}}]},{'job_name'=> kafka, 'scrape_interval'=> '10s', 'scrape_timeout'=> '10s', 'static_configs'=> [{'targets'=> $kafka_nodes }]}],
    }
   
    } else {
    class {'::prometheus':
	   global_config  => { 'scrape_interval'=> '15s', 'evaluation_interval'=> '15s', 'external_labels'=> { 'monitor'=>'master'}},
       rule_files     => [ "/etc/prometheus/alert.rules" ],
       scrape_configs => [ {'job_name'=>'prometheus','scrape_interval'=> '30s','scrape_timeout'=>'30s','static_configs'=> [{'targets'=>['localhost:9090'], 'labels'=> { 'alias'=>'Prometheus'}}]}],
    }
    }
}

prometheusnode.pp

class profiles_opqs::prometheusnode(
	$jmxexporter_dir = hiera('jmxexporter::dir','/opt/jmxexporter'),
	$jmxexporter_version = hiera('jmxexporter::version','0.9')
){
	include ::prometheus::node_exporter
	#validate_string($jmxexporter_dir)

	file {"${jmxexporter_dir}":
		ensure => 'directory',
	}
	file {"${jmxexporter_dir}/prometheus_config.yaml":
		source => 'puppet:///modules/profiles/prometheus_config',
	}
	wget::fetch {"https://repo1.maven.org/maven2/io/prometheus/jmx/jmx_prometheus_javaagent/${jmxexporter_version}/jmx_prometheus_javaagent-${jmxexporter_version}.jar":
	destination => "${jmxexporter_dir}/",
	cache_dir => '/tmp/',
	timeout => 0,
	verbose => false,
	unless => "test -e ${jmxexporter_dir}/jmx_prometheus_javaagent-${jmxexporter_version}.jar",
	}	
}

It is true that i used the wget module to take the JMX exporter so i must give you that as well

 https://forge.puppet.com/leonardothibes/wget

As also required is the file to configure the jmx exporter configuration file in order to translate the JMX data provided by Kafka to fields to be imported in Prometheus
prometheus_config:

lowercaseOutputName: true
rules:
- pattern : kafka.cluster<type=(.+), name=(.+), topic=(.+), partition=(.+)><>Value
  name: kafka_cluster_$1_$2
  labels:
    topic: "$3"
    partition: "$4"
- pattern : kafka.log<type=Log, name=(.+), topic=(.+), partition=(.+)><>Value
  name: kafka_log_$1
  labels:
    topic: "$2"
    partition: "$3"
- pattern : kafka.controller<type=(.+), name=(.+)><>(Count|Value)
  name: kafka_controller_$1_$2
- pattern : kafka.network<type=(.+), name=(.+)><>Value
  name: kafka_network_$1_$2
- pattern : kafka.network<type=(.+), name=(.+)PerSec, request=(.+)><>Count
  name: kafka_network_$1_$2_total
  labels:
    request: "$3"
- pattern : kafka.network<type=(.+), name=(\w+), networkProcessor=(.+)><>Count
  name: kafka_network_$1_$2
  labels:
    request: "$3"
  type: COUNTER
- pattern : kafka.network<type=(.+), name=(\w+), request=(\w+)><>Count
  name: kafka_network_$1_$2
  labels:
    request: "$3"
- pattern : kafka.network<type=(.+), name=(\w+)><>Count
  name: kafka_network_$1_$2
- pattern : kafka.server<type=(.+), name=(.+)PerSec\w*, topic=(.+)><>Count
  name: kafka_server_$1_$2_total
  labels:
    topic: "$3"
- pattern : kafka.server<type=(.+), name=(.+)PerSec\w*><>Count
  name: kafka_server_$1_$2_total
  type: COUNTER

- pattern : kafka.server<type=(.+), name=(.+), clientId=(.+), topic=(.+), partition=(.*)><>(Count|Value)
  name: kafka_server_$1_$2
  labels:
    clientId: "$3"
    topic: "$4"
    partition: "$5"
- pattern : kafka.server<type=(.+), name=(.+), topic=(.+), partition=(.*)><>(Count|Value)
  name: kafka_server_$1_$2
  labels:
    topic: "$3"
    partition: "$4"
- pattern : kafka.server<type=(.+), name=(.+), topic=(.+)><>(Count|Value)
  name: kafka_server_$1_$2
  labels:
    topic: "$3"
  type: COUNTER

- pattern : kafka.server<type=(.+), name=(.+), clientId=(.+), brokerHost=(.+), brokerPort=(.+)><>(Count|Value)
  name: kafka_server_$1_$2
  labels:
    clientId: "$3"
    broker: "$4:$5"
- pattern : kafka.server<type=(.+), name=(.+), clientId=(.+)><>(Count|Value)
  name: kafka_server_$1_$2
  labels:
    clientId: "$3"
- pattern : kafka.server<type=(.+), name=(.+)><>(Count|Value)
  name: kafka_server_$1_$2

- pattern : kafka.(\w+)<type=(.+), name=(.+)PerSec\w*><>Count
  name: kafka_$1_$2_$3_total
- pattern : kafka.(\w+)<type=(.+), name=(.+)PerSec\w*, topic=(.+)><>Count
  name: kafka_$1_$2_$3_total
  labels:
    topic: "$4"
  type: COUNTER
- pattern : kafka.(\w+)<type=(.+), name=(.+)PerSec\w*, topic=(.+), partition=(.+)><>Count
  name: kafka_$1_$2_$3_total
  labels:
    topic: "$4"
    partition: "$5"
  type: COUNTER
- pattern : kafka.(\w+)<type=(.+), name=(.+)><>(Count|Value)
  name: kafka_$1_$2_$3_$4
  type: COUNTER
- pattern : kafka.(\w+)<type=(.+), name=(.+), (\w+)=(.+)><>(Count|Value)
  name: kafka_$1_$2_$3_$6
  labels:
    "$4": "$5"

Ok, so in order to put this together, we will use plain old hiera :). For the server on which you want to configure prometheus server you will need to create a role or just put it in the fqdn.yaml that looks like this:

prometheus.yaml

---
classes:
  - 'profiles::prometheusserver'
  - 'profiles::grafana'

alertrules:
    -
        name: 'InstanceDown'
        condition:  'up == 0'
        timeduration: '5m'
        labels:
            -
                name: 'severity'
                content: 'critical'
        annotations:
            -
                name: 'summary'
                content: 'Instance {{ $labels.instance }} down'
            -
                name: 'description'
                content: '{{ $labels.instance }} of job {{ $labels.job }} has been down for more than 5 minutes.'

This is as default installation, because it’s a “role”, on each prometheus host i also created a specific fqdn.yaml file to specify in order to tell what nodes should be checked for exposed metrics. Here is an example:

---
profiles::prometheusserver::nodes:
    - 'kafka0:7071'
    - 'kafka1:7071'
    - 'kafka2:7071'

The three nodes are as an example, you can put all the nodes on which you include the prometheus node class.
Let me show you how this should also look:

---
classes:
 - 'profiles::prometheusnode'
 
profiles::kafka::jolokia: '-javaagent:/usr/share/java/jolokia-jvm-agent.jar -javaagent:/opt/jmxexporter/jmx_prometheus_javaagent-0.9.jar=7071:/opt/jmxexporter/prometheus_config.yaml

Now i need to explain that jolokia variable, right? Yeah, it’s pretty straight forward. The kafka installation was already wrote, and it included the jolokia agent and our broker definition block looks like this:


 class { '::kafka::broker':
    config    => $broker_config,
    opts      => hiera('profiles::kafka::jolokia', '-javaagent:/usr/share/java/jolokia-jvm-agent.jar'),
    heap_opts => "-Xmx${jvm_heap_size}M -Xms${jvm_heap_size}M",
  }
}

So i needed to puth the jmx exporter agent beside jolokia on kafka startup, and when this will be deployed you will see the jmxexporter started as agent. Anyhow, when all is deployed you will have a prometheus config that should look like:

---
global:
  scrape_interval: 15s
  evaluation_interval: 15s
  external_labels:
    monitor: master
rule_files:
- /etc/prometheus/alert.rules
scrape_configs:
- job_name: prometheus
  scrape_interval: 30s
  scrape_timeout: 30s
  static_configs:
  - targets:
    - localhost:9090
    labels:
      alias: Prometheus
- job_name: kafka
  scrape_interval: 10s
  scrape_timeout: 10s
  static_configs:
  - targets:
    - kafka0:7071
    - kafka1:7071
    - kafka2:7071

You can also see the nodes at Status -> Targets from the menu, and yeah, all the metrics are available by node at http://[kafka-node]:7071/metrics.

I think this should be it, i don’t know i covered everything and there are a lot of details related to our custom installation but at least i managed to provide so details related to it. The article that helped me very much to do is can be visited here

https://www.robustperception.io/monitoring-kafka-with-prometheus/

Cheers!

Categories
linux

List paths created by package install on Ubuntu

Hi,

I was searching this morning to see what paths and files were created using package install with puppet and i found this:

root@test:~# apt list --installed | grep goss

WARNING: apt does not have a stable CLI interface yet. Use with caution in scripts.

goss/trusty,now 0.3.0-3 amd64 [installed]
root@test:~# dpkg-query -L goss
/.
/usr
/usr/bin
/usr/bin/goss
/usr/share
/usr/share/doc
/usr/share/doc/goss
/usr/share/doc/goss/changelog.gz

No other things to add.
Cheers!

Categories
kafka

Wrong Kafka configuration and deployment using puppet

Hi,

I just want to share with you one case that we have last week and that involved some wrong kafka deplyment from puppet that actually filled the filesystems and got our colleagues that were using it for a transport layer on ELK.

Lets start with the begining, we have some puppet code to deploy and configure kafka machines. To keep it simple the broker config block from puppet looks like this:

$broker_config = {
    'broker.id'                     => '-1', # always set broker.id to -1.
    # broker specific config
    'zookeeper.connect'             => hiera('::kafka::zookeeper_connect', $zookeeper_connect),
    'inter.broker.protocol.version' => hiera('::kafka::inter_broker_protocol_version', $kafka_version),
    'log.dir'                       => hiera('::kafka::log_dir', '/srv/kafka-logs'),
    'log.dirs'                      => hiera('::kafka::log_dirs', '/srv/kafka-logs'),
    'log.retention.hours'           => hiera('::kafka::log_retention_hours', $days7),
    'log.retention.bytes'           => hiera('::kafka::log_retention_bytes', '-1'),
    # confiure availability
    'num.partitions'                => hiera('::kafka::num_partitions', 256),
    'default.replication.factor'    => hiera('::kafka::default_replication_factor', $default_replication_factor),
    # configurre administratability (this is a word now)
    'delete.topic.enable'           => hiera('::kafka::delete_topic_enable', 'true'),
  }

As you can see, there are two fields that need to be carefully configured, one is the log_retention_bytes and the other is the num_partitions. I am underlying this for a very simple reason, if we will take a look in the kafka server.properies file which the engine use to start the broker, we will see

log.retention.bytes=107374182400
num.partitions=256

 Now, the first one is the default value (and it means the maximum size per partition), and if you go to an online converter you will see that it means 100GB. This shouldn’t be a problem if you override it topic based, and even more if you manually create a topic with one or two partitions and you have the required space. But our case was different, they were using for logstash which created a default topic configuration with 256 partitions and if the maximum size was 100GB then it though it had 25600GB which i am not mistaken should also translate in 25TB (yeah, that it’s usually more than enough for a ELK instance).

Since we were running only three nodes with 100GB filesystem each, you can imagine that it filled up and the servers crashed.

Now there are two ways to avoid this, depending the number of consumers in the consumer group, and also the retention period. If you have only one consumer and you want to have data for a larger period of time, you can leave it at 100GB (if you have that much storage available) and manually create a topic with 1 partition and a replication factor equal to the number of nodes that you want to use for high availability, or you can leave the partition number to 256, and you highly decrease the retention bytes value. This translates in an equal distribution of data on multiple consumers, but it comes with a lesser storage period of time (now it is also true that it depends on the amount and type of data that you are transferring).

The solution that we adopted is to leave the partition number unchanged and decrease the partition size to 200MB. Keep you informed how it works. 🙂

Cheers!