Tag: kafka

  • Kafka problem that wasn’t a problem after all

    Hi,

    Do not make my mistake from the last couple of weeks trying to connect to a “secured” kafka cluster using TLS. I wrote following article http://log-it.tech/2017/07/27/configure-kafka-truststore-keystore-using-puppet/ some time ago, and i know that it’s far from bullet proof but it does the job.
    Now let’s get to the subject, if you want to connect to the node once this is activated you can not use localhost anymore. And the way i figured it out is by trying to test the port using openssl command.
    The config in server.properties is

    'listeners'                     => "PLAINTEXT://${::fqdn}:9092,SSL://${::fqdn}:9093", #both listeners are enabled
    'advertised.listeners'          => "PLAINTEXT://${::fqdn}:9092,SSL://${::fqdn}:9093",

    So, please keep in mind that it’s configured to listen on FQDN, so normally the external interface is the target not the loopback adapter.
    Now if you try to test it using localhost you will surely get this output:

    /opt/kafka/bin# openssl s_client -debug -connect localhost:9093 -tls1
    connect: Connection refused
    connect:errno=111

    Do not try to check if the firewall or port it’s opened. You can easily check that using iptables -L or netstat -tulpen | grep 9093. The problem is that instead of localhost you should be using FQDN like openssl s_client -debug -connect ${fqdn}:9093 -tls1 and you will see a lot of keys/certificates.
    Now, if you want for example to use the standard .sh scripts that are delivered with kafka installation, you should created a file called config.properties (for example) and pass it as parameter. In case zookeeper connect (with the –zookeeper parameter) this is not needed but if you want to start a console consumer or producer, or you want to check the consumer groups, this will be needed. Let me just give you an example:

    /opt/kafka/bin# ./kafka-consumer-groups.sh --command-config /root/config.properties --bootstrap-server ${fqdn}:9093 --list
    Note: This will only show information about consumers that use the Java consumer API (non-ZooKeeper-based consumers).
    
    console-consumer-30514
    KMOffsetCache-kafka2
    KMOffsetCache-kafka0
    KMOffsetCache-kafka1
    

    Oterwise, it will not work. And my config file looks like this:

    security.protocol=SSL
    ssl.truststore.location=/home/kafka/kafka.client.truststore.jks
    ssl.truststore.password=password
    ssl.keystore.location=/home/kafka/kafka.client.keystore.jks
    ssl.keystore.password=password
    ssl.key.password=password
    

    I can not give you all the details to all the commands but at least i am confident i put you on the right track.

    Cheers

  • Implementing logrotate for kafka

    Hi,

    Yes, we will need to implement also logrotate if we want to keep kafka under control. My solution was with puppet, as you probably expected. After i took a look on the documentation related to log4j properties i this i had a configuration figured out that should look like the following erb template

    # Unless required by applicable law or agreed to in writing, software
    # distributed under the License is distributed on an "AS IS" BASIS,
    # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    # See the License for the specific language governing permissions and
    # limitations under the License.
    
    log4j.rootLogger=INFO, stdout
    
    log4j.appender.stdout=org.apache.log4j.ConsoleAppender
    log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
    log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n
    
    log4j.appender.kafkaAppender=org.apache.log4j.RollingFileAppender
    #log4j.appender.kafkaAppender=org.apache.log4j.DailyRollingFileAppender
    #log4j.appender.kafkaAppender.DatePattern='.'yyyy-MM-dd-HH
    log4j.appender.kafkaAppender.File=${kafka.logs.dir}/server.log
    log4j.appender.kafkaAppender.MaxFileSize=<%= @filesize %>
    log4j.appender.kafkaAppender.MaxBackupIndex=<%= @backupindex %>
    log4j.appender.kafkaAppender.layout=org.apache.log4j.PatternLayout
    log4j.appender.kafkaAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
    
    log4j.appender.stateChangeAppender=org.apache.log4j.RollingFileAppender
    #log4j.appender.stateChangeAppender=org.apache.log4j.DailyRollingFileAppender
    #log4j.appender.stateChangeAppender.DatePattern='.'yyyy-MM-dd-HH
    log4j.appender.stateChangeAppender.File=${kafka.logs.dir}/state-change.log
    log4j.appender.stateChangeAppender.MaxFileSize=<%= @filesize %>
    log4j.appender.stateChangeAppender.MaxBackupIndex=<%= @backupindex %>
    log4j.appender.stateChangeAppender.layout=org.apache.log4j.PatternLayout
    log4j.appender.stateChangeAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
    
    log4j.appender.requestAppender=org.apache.log4j.RollingFileAppender
    #log4j.appender.requestAppender=org.apache.log4j.DailyRollingFileAppender
    #log4j.appender.requestAppender.DatePattern='.'yyyy-MM-dd-HH
    log4j.appender.requestAppender.File=${kafka.logs.dir}/kafka-request.log
    log4j.appender.requestAppender.MaxFileSize=<%= @filesize%>
    log4j.appender.requestAppender.MaxBackupIndex=<%= @backupindex %>
    log4j.appender.requestAppender.layout=org.apache.log4j.PatternLayout
    log4j.appender.requestAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
    
    log4j.appender.cleanerAppender=org.apache.log4j.RollingFileAppender
    #log4j.appender.cleanerAppender=org.apache.log4j.DailyRollingFileAppender
    #log4j.appender.cleanerAppender.DatePattern='.'yyyy-MM-dd-HH
    log4j.appender.cleanerAppender.File=${kafka.logs.dir}/log-cleaner.log
    log4j.appender.cleanerAppender.MaxFileSize=<%= @filesize %>
    log4j.appender.cleanerAppender.MaxBackupIndex=<%= @backupindex %>
    log4j.appender.cleanerAppender.layout=org.apache.log4j.PatternLayout
    log4j.appender.cleanerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
    
    log4j.appender.controlAppender=org.apache.log4j.RollingFileAppender
    #log4j.appender.controllerAppender=org.apache.log4j.DailyRollingFileAppender
    #log4j.appender.controllerAppender.DatePattern='.'yyyy-MM-dd-HH
    log4j.appender.controllerAppender.File=${kafka.logs.dir}/controller.log
    log4j.appender.controllerAppender.MaxFileSize=<%= @filesize %>
    log4j.appender.controllerAppender.MaxBackupIndex=<%= @backupindex %>
    log4j.appender.controllerAppender.layout=org.apache.log4j.PatternLayout
    log4j.appender.controllerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
    
    log4j.appender.authorizerAppender=org.apache.log4j.RollingFileAppender
    #log4j.appender.authorizerAppender=org.apache.log4j.DailyRollingFileAppender
    #log4j.appender.authorizerAppender.DatePattern='.'yyyy-MM-dd-HH
    log4j.appender.authorizerAppender.File=${kafka.logs.dir}/kafka-authorizer.log
    log4j.appender.authorizerAppender.MaxFileSize=<%= @filesize %>
    log4j.appender.authorizerAppender.MaxBackupIndex=<%= @backupindex %>
    log4j.appender.authorizerAppender.layout=org.apache.log4j.PatternLayout
    log4j.appender.authorizerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
    
    # Turn on all our debugging info
    #log4j.logger.kafka.producer.async.DefaultEventHandler=DEBUG, kafkaAppender
    #log4j.logger.kafka.client.ClientUtils=DEBUG, kafkaAppender
    #log4j.logger.kafka.perf=DEBUG, kafkaAppender
    #log4j.logger.kafka.perf.ProducerPerformance$ProducerThread=DEBUG, kafkaAppender
    #log4j.logger.org.I0Itec.zkclient.ZkClient=DEBUG
    log4j.logger.kafka=INFO, kafkaAppender
    
    log4j.logger.kafka.network.RequestChannel$=WARN, requestAppender
    log4j.additivity.kafka.network.RequestChannel$=false
    
    #log4j.logger.kafka.network.Processor=TRACE, requestAppender
    #log4j.logger.kafka.server.KafkaApis=TRACE, requestAppender
    #log4j.additivity.kafka.server.KafkaApis=false
    log4j.logger.kafka.request.logger=WARN, requestAppender
    log4j.additivity.kafka.request.logger=false
    
    log4j.logger.kafka.controller=TRACE, controllerAppender
    log4j.additivity.kafka.controller=false
    
    log4j.logger.kafka.log.LogCleaner=INFO, cleanerAppender
    log4j.additivity.kafka.log.LogCleaner=false
    
    log4j.logger.state.change.logger=TRACE, stateChangeAppender
    log4j.additivity.state.change.logger=false
    
    #Change this to debug to get the actual audit log for authorizer.
    log4j.logger.kafka.authorizer.logger=WARN, authorizerAppender
    log4j.additivity.kafka.authorizer.logger=false
    
    

    It just adds the two most important values required for a working logrotate system MaxFileSize and MaxBackupIndex. The first tells us what is the max size for a file and the second one the amount of files of each type to be kept.
    In order to use it you need to put it in a class that in my view is constructed as follows:

    class profiles::kafkalogrotate {
      
        $filesize = hiera('kafkalogrotate::size','50MB')
        $backupindex = hiera('kafkalogrotate::backupindex',10)
        validate_string($filesize)
        validate_integer($backupindex)
    	file {"adding_log4j":
    	path => "/opt/kafka/config/log4j.properties",
    	content => template("${module_name}/log4j.properties.erb"),
    	replace => true,
    	owner => 'kafka',
      group => 'kafka',
      mode => '0644',
    	}
    }
    

    I put the path as we generally install kafka in /opt/kafka but that can be changed. In our case restart of kafka brokers is not needed immediately so i just called the class in my kafka manifests.

    That is all, if i think of something additional to what i wrote, i will post it.

    Cheers!

  • Securing kafka-manager endpoints with iptables rules behind traefik

    Hi,

    One extra addition to my traefik balancing article from http://log-it.tech/2017/08/19/puppet-implementation-traefik-load-balancer-kafka-manager/ is that even so now we have the balancing capability we still need to restrict access to unsecured endpoint. I thought all the code to be deployable on all of the nodes. If this is taken in consideration, our issue with the firewall rules should be easily solved by using the puppetlabs module https://github.com/puppetlabs/puppetlabs-firewall and the code that i included looks like:

    $hosts_count = $kafka_hosts.count
      
      package {'iptables-persistent':
      name => 'iptables-persistent',
      ensure => installed,
      }
      resources { 'firewall':
        purge => true,
      }
      
      $kafka_hosts.each | Integer $index,String $host | {
        firewall {"10${index} adding tcp rule kafka manager node ${index}":
          proto => 'tcp',
          dport => 9000,
          source => "${host}",
          destination => "${fqdn}",
          action => 'accept',
          }
      } 
      firewall {"10${hosts_count} droping rest of kafka manager calls":
        proto => 'tcp',
          dport => 9000,
          destination => "${fqdn}",
          action => 'drop',
    }

    This should be add rules in order to allow traffic on port 9000 only between the kafka hosts that have kafka manager installed.
    Cheers

  • Fixing the keystore/trustore distribution code

    Hi,

    There is an extra thing to be added to my article http://log-it.tech/2017/07/27/configure-kafka-truststore-keystore-using-puppet/

    As is the code copies the files at each puppet run to the other nodes which not contain the keystore generation code. And to fix this i used yet again another puppet module that should share data between the nodes, you can find it here https://github.com/WhatsARanjit/puppet-share_data

    As far as i saw it gets the job done, and in order to use it, you will need to include the following pieces of code to your repo. First of all, one piece of custom fact:

    
    require 'facter'
    
    filename = '/home/kafka/kafka.server.keystore.jks'
    Facter.add(:kafkakeystore) do
        setcode do
            if File.file?(filename)
                kafkakeystore = "enabled"
            else
            	kafkakeystore = "disabled"    
            end
        end
    end
    

    If the file is present, this means that the setup is probably activated. For the kafka manifests, if it’s not the node on which the keystore it’s generated we need to share the fact which we actually added in form:

        share_data { "${fqdn}":
          data => [ $::fqdn,$::kafkakeystore ],
          label => 'keystore',
        }
    

    If it’s the node that actually generates and copies the keystore then we will need to include in the class that actually does this kafka_security_gen following piece:

     $data = share_data::retrieve('keystore')
         $data.each |$item| {
       # $servers.each |String $server| {
       if (member($servers,$item[0]) and $item[1] == "disabled") {
            exec{"copy files to ${item[0]}":
                cwd => '/home/kafka',
                path   => '/usr/bin:/usr/sbin:/bin',
                command => "scp /home/kafka/kafka* kafka@${item[0]}:/home/kafka",
                user => 'kafka',
            }
            }
         }
    

    And this should assure you that puppet will not try to copy the keystore on nodes that already has it. Now come to think of it, if you need to refresh the store, it should be a proble, but i will think also for a fix for that and come back.

    Cheers!

  • Balancing requests to kafka-manager using traefik

    Hi,

    Just wanted to share with you a quite small and simple config to balance the traffic between three machines that have kafka-manager installed. For this i used traefik since it was new to me and i wanted to gain a little bit of experience with it.

    It’s an interesting solution but it took me a while to get the pieces working. I will post here my config and will explain the needed part to get it working.

    logLevel = "DEBUG"
    defaultEntryPoints = ["http"]
    [entryPoints]
      [entryPoints.http]
      address = ":80"
    [web]
    address = ":8080"
    
    [file]
    watch = true
    
    [backends]
      [backends.backend1]
        [backends.backend1.LoadBalancer]
          method = "drr"
        [backends.backend1.servers.server1]
        url = "http://[kafka1.hostname]:9000"
        weight = 1
        [backends.backend1.servers.server2]
        url = "http://[kafka2.hostname]:9000"
        weight = 2
        [backends.backend1.servers.server3]
        url = "http://[kafka3.hostname]:9000"
        weight = 1
    [frontends]
      [frontends.frontend1]
      entrypoint = ["http"]
      backend = "backend1"
      passHostHeader = true
      priority = 10
    

    This is very basic as you can see but it took me a while to understand that you need the file block with watch = true in order for the daemon to see and parse the rules that are listed. You can also have a separate rules file and for that it would be best to consult the traefik documentation.

    I will have to do now the redirect from HTTP to HTTPS in order to secure the connection to frontend. The idea of traefik is that it works like entrypoint -> frontend -> backend and as far as i saw this will be done on the entrypoint level.

    Two extra additions is that you need a default entry point in order for your frontend not to be ignored and also put it on log level DEBUG because otherwise it won’t log much.

    Keep you posted on the progress and also you can find traefik here https://docs.traefik.io

    Cheers!

  • Install Kafka Manager with Puppet

    Hi,

    I will continue on this line with the install of a management tool called Kafka manager using same old Puppet.

    The main source of the project is here:

    https://github.com/yahoo/kafka-manager

    If you scroll down at packaging you will see that you have the possibility to create a .deb package for Ubuntu or Debian setup or a rpm. To do this just clone the project, go into it and run the command

    sbt debian:packageBin
    

    It will take some time but eventually it will create the required package. Now, since you have the package, you can easily ask a friend to upload it to your pipeline apt repo (this is what i did, i don’t have yet the experience on how to do that) and install and configure it via puppet. In order to do that, i “wrote” the following manifest:

    kafkamanager.pp

    class profiles::kafkamanager {
    	
    	$zookeeper_connect = hiera('kafkamanager::zookeeperconnect')
    	package {'kafka-manager':
    		ensure => installed,
    	}
    	group { 'kafka-manager':
    		ensure => 'present',	
    	}
    	user { 'kafka-manager':
    		ensure => 'present',
    		groups => 'kafka-manager'
    	}
    	
    	Group['kafka-manager'] -> User['kafka-manager']
    	
    	file { '/usr/share/kafka-manager' :
        		ensure    => directory,
        		owner     => 'kafka-manager',
        		group      => 'kafka-manager',
        		require     => [ User['kafka-manager'], Group['kafka-manager'], ],
        		recurse    => true,
    	}
    
    	file_line { 'config_zookeeper':
    	    path => '/etc/kafka-manager/application.conf',
    	    match => 'kafka-manager.zkhosts=\"kafka-manager-zookeeper:2181\"',
    	    line => "kafka-manager.zkhosts=\"${zookeeper_connect}\"",
    	    replace => true,
    	}
        
        file_line { 'enable_auth':
            path => '/etc/kafka-manager/application.conf',
            match => 'basicAuthentication.enabled=false',
            line => 'basicAuthentication.enabled=true',
            replace => true,
            }
    	service { 'kafka-manager':
    		ensure => 'running',
    		enable => true,
    		require => [ File['/usr/share/kafka-manager'], File_line['config_zookeeper'] ],	
    	}
    }

    This should be all, you are now free to login with the user and pass that you find in the application.conf file and start mapping Kafka clusters.

    Cheers!

  • Integrate Kafka with Datadog monitoring using puppet

    Hi,

    Since i was in debt with an article on how to integate Kafka monitoring using Datadog, let me tell you a couple of things about this topic. First of all, we are taking the same config of Kafka with Jolokia that was describe in following article. From the install of the brokers on our infrastructure, JMX data is published on port 9990 (this will be needed in the datadog config).

    The files you need to create for this task are as follows:

    datadogagent.pp

    class profiles::datadogagent {
      $_check_api_key = hiera('datadog_agent::api_key')
    
      contain 'datadog_agent'
      contain 'profiles::datadogagent_config_kafka'
      contain 'datadog_agent::integrations::zk'
    
      Class['datadog_agent'] -> Class['profiles::datadog_agent_config_kafka']
      Class['datadog_agent'] -> Class['datadog_agent::integrations::zk']
    }

    datadogagent_config_kafka.pp

    class profiles::datadogagent_config_kafka (
    $servers = [{'host' => 'localhost', 'port' => '9990'}]
    ) inherits datadog_agent::params {
      include datadog_agent
    
      validate_array($servers)
    
      file { "${datadog_agent::params::conf_dir}/kafka.yaml":
        ensure  => file,
        owner   => $datadog_agent::params::dd_user,
        group   => $datadog_agent::params::dd_group,
        mode    => '0600',
        content => template("${module_name}/kafka.yaml.erb"),
        require => Package[$datadog_agent::params::package_name],
        notify  => Service[$datadog_agent::params::service_name],
      }
    }
    

    And since, there isn’t yet an integration by default for the kafka on the datadog module which you can find it here:

    https://github.com/DataDog/puppet-datadog-agent

    i created in the templates directory the following file:

    kafka.yaml.erb (as you can see from the header this is actually the template given by datadog for kafka integration with specific host and port)

    ##########
    # WARNING
    ##########
    # This sample works only for Kafka >= 0.8.2.
    # If you are running a version older than that, you can refer to agent 5.2.x released
    # sample files, https://raw.githubusercontent.com/DataDog/dd-agent/5.2.1/conf.d/kafka.yaml.example
    
    instances:
    <% @servers.each do |server| -%>
      - host: <%= server['host'] %>
        port: <%= server['port'] %> # This is the JMX port on which Kafka exposes its metrics (usually 9999)
        tags:
          kafka: broker
    
    init_config:
      is_jmx: true
    
      # Metrics collected by this check. You should not have to modify this.
      conf:
        # v0.8.2.x Producers
        - include:
            domain: 'kafka.producer'
            bean_regex: 'kafka\.producer:type=ProducerRequestMetrics,name=ProducerRequestRateAndTimeMs,clientId=.*'
            attribute:
              Count:
                metric_type: rate
                alias: kafka.producer.request_rate
        - include:
            domain: 'kafka.producer'
            bean_regex: 'kafka\.producer:type=ProducerRequestMetrics,name=ProducerRequestRateAndTimeMs,clientId=.*'
            attribute:
              Mean:
                metric_type: gauge
                alias: kafka.producer.request_latency_avg
        - include:
            domain: 'kafka.producer'
            bean_regex: 'kafka\.producer:type=ProducerTopicMetrics,name=BytesPerSec,clientId=.*'
            attribute:
              Count:
                metric_type: rate
                alias: kafka.producer.bytes_out
        - include:
            domain: 'kafka.producer'
            bean_regex: 'kafka\.producer:type=ProducerTopicMetrics,name=MessagesPerSec,clientId=.*'
            attribute:
              Count:
                metric_type: rate
                alias: kafka.producer.message_rate
        # v0.8.2.x Consumers
        - include:
            domain: 'kafka.consumer'
            bean_regex: 'kafka\.consumer:type=ConsumerFetcherManager,name=MaxLag,clientId=.*'
            attribute:
              Value:
                metric_type: gauge
                alias: kafka.consumer.max_lag
        - include:
            domain: 'kafka.consumer'
            bean_regex: 'kafka\.consumer:type=ConsumerFetcherManager,name=MinFetchRate,clientId=.*'
            attribute:
              Value:
                metric_type: gauge
                alias: kafka.consumer.fetch_rate
        - include:
            domain: 'kafka.consumer'
            bean_regex: 'kafka\.consumer:type=ConsumerTopicMetrics,name=BytesPerSec,clientId=.*'
            attribute:
              Count:
                metric_type: rate
                alias: kafka.consumer.bytes_in
        - include:
            domain: 'kafka.consumer'
            bean_regex: 'kafka\.consumer:type=ConsumerTopicMetrics,name=MessagesPerSec,clientId=.*'
            attribute:
              Count:
                metric_type: rate
                alias: kafka.consumer.messages_in
    
        # Offsets committed to ZooKeeper
        - include:
            domain: 'kafka.consumer'
            bean_regex: 'kafka\.consumer:type=ZookeeperConsumerConnector,name=ZooKeeperCommitsPerSec,clientId=.*'
            attribute:
              Count:
                metric_type: rate
                alias: kafka.consumer.zookeeper_commits
        # Offsets committed to Kafka
        - include:
            domain: 'kafka.consumer'
            bean_regex: 'kafka\.consumer:type=ZookeeperConsumerConnector,name=KafkaCommitsPerSec,clientId=.*'
            attribute:
              Count:
                metric_type: rate
                alias: kafka.consumer.kafka_commits
        # v0.9.0.x Producers
        - include:
            domain: 'kafka.producer'
            bean_regex: 'kafka\.producer:type=producer-metrics,client-id=.*'
            attribute:
              response-rate:
                metric_type: gauge
                alias: kafka.producer.response_rate
        - include:
            domain: 'kafka.producer'
            bean_regex: 'kafka\.producer:type=producer-metrics,client-id=.*'
            attribute:
              request-rate:
                metric_type: gauge
                alias: kafka.producer.request_rate
        - include:
            domain: 'kafka.producer'
            bean_regex: 'kafka\.producer:type=producer-metrics,client-id=.*'
            attribute:
              request-latency-avg:
                metric_type: gauge
                alias: kafka.producer.request_latency_avg
        - include:
            domain: 'kafka.producer'
            bean_regex: 'kafka\.producer:type=producer-metrics,client-id=.*'
            attribute:
              outgoing-byte-rate:
                metric_type: gauge
                alias: kafka.producer.bytes_out
        - include:
            domain: 'kafka.producer'
            bean_regex: 'kafka\.producer:type=producer-metrics,client-id=.*'
            attribute:
              io-wait-time-ns-avg:
                metric_type: gauge
                alias: kafka.producer.io_wait
    
        # v0.9.0.x Consumers
        - include:
            domain: 'kafka.consumer'
            bean_regex: 'kafka\.consumer:type=consumer-fetch-manager-metrics,client-id=.*'
            attribute:
              bytes-consumed-rate:
                metric_type: gauge
                alias: kafka.consumer.bytes_in
        - include:
            domain: 'kafka.consumer'
            bean_regex: 'kafka\.consumer:type=consumer-fetch-manager-metrics,client-id=.*'
            attribute:
              records-consumed-rate:
                metric_type: gauge
                alias: kafka.consumer.messages_in
        #
        # Aggregate cluster stats
        #
        - include:
            domain: 'kafka.server'
            bean: 'kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec'
            attribute:
              Count:
                metric_type: rate
                alias: kafka.net.bytes_out.rate
        - include:
            domain: 'kafka.server'
            bean: 'kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec'
            attribute:
              Count:
                metric_type: rate
                alias: kafka.net.bytes_in.rate
        - include:
            domain: 'kafka.server'
            bean: 'kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec'
            attribute:
              Count:
                metric_type: rate
                alias: kafka.messages_in.rate
        - include:
            domain: 'kafka.server'
            bean: 'kafka.server:type=BrokerTopicMetrics,name=BytesRejectedPerSec'
            attribute:
              Count:
                metric_type: rate
                alias: kafka.net.bytes_rejected.rate
    
        #
        # Request timings
        #
        - include:
            domain: 'kafka.server'
            bean: 'kafka.server:type=BrokerTopicMetrics,name=FailedFetchRequestsPerSec'
            attribute:
              Count:
                metric_type: rate
                alias: kafka.request.fetch.failed.rate
        - include:
            domain: 'kafka.server'
            bean: 'kafka.server:type=BrokerTopicMetrics,name=FailedProduceRequestsPerSec'
            attribute:
              Count:
                metric_type: rate
                alias: kafka.request.produce.failed.rate
        - include:
            domain: 'kafka.network'
            bean: 'kafka.network:type=RequestMetrics,name=RequestsPerSec,request=Produce'
            attribute:
              Count:
                metric_type: rate
                alias: kafka.request.produce.rate
        - include:
            domain: 'kafka.network'
            bean: 'kafka.network:type=RequestMetrics,name=TotalTimeMs,request=Produce'
            attribute:
              Mean:
                metric_type: gauge
                alias: kafka.request.produce.time.avg
              99thPercentile:
                metric_type: gauge
                alias: kafka.request.produce.time.99percentile
        - include:
            domain: 'kafka.network'
            bean: 'kafka.network:type=RequestMetrics,name=RequestsPerSec,request=FetchConsumer'
            attribute:
              Count:
                metric_type: rate
                alias: kafka.request.fetch_consumer.rate
        - include:
            domain: 'kafka.network'
            bean: 'kafka.network:type=RequestMetrics,name=RequestsPerSec,request=FetchFollower'
            attribute:
              Count:
                metric_type: rate
                alias: kafka.request.fetch_follower.rate
        - include:
            domain: 'kafka.network'
            bean: 'kafka.network:type=RequestMetrics,name=TotalTimeMs,request=FetchConsumer'
            attribute:
              Mean:
                metric_type: gauge
                alias: kafka.request.fetch_consumer.time.avg
              99thPercentile:
                metric_type: gauge
                alias: kafka.request.fetch_consumer.time.99percentile
        - include:
            domain: 'kafka.network'
            bean: 'kafka.network:type=RequestMetrics,name=TotalTimeMs,request=FetchFollower'
            attribute:
              Mean:
                metric_type: gauge
                alias: kafka.request.fetch_follower.time.avg
              99thPercentile:
                metric_type: gauge
                alias: kafka.request.fetch_follower.time.99percentile
        - include:
            domain: 'kafka.network'
            bean: 'kafka.network:type=RequestMetrics,name=TotalTimeMs,request=UpdateMetadata'
            attribute:
              Mean:
                metric_type: gauge
                alias: kafka.request.update_metadata.time.avg
              99thPercentile:
                metric_type: gauge
                alias: kafka.request.update_metadata.time.99percentile
        - include:
            domain: 'kafka.network'
            bean: 'kafka.network:type=RequestMetrics,name=TotalTimeMs,request=Metadata'
            attribute:
              Mean:
                metric_type: gauge
                alias: kafka.request.metadata.time.avg
              99thPercentile:
                metric_type: gauge
                alias: kafka.request.metadata.time.99percentile
        - include:
            domain: 'kafka.network'
            bean: 'kafka.network:type=RequestMetrics,name=TotalTimeMs,request=Offsets'
            attribute:
              Mean:
                metric_type: gauge
                alias: kafka.request.offsets.time.avg
              99thPercentile:
                metric_type: gauge
                alias: kafka.request.offsets.time.99percentile
        - include:
            domain: 'kafka.server'
            bean: 'kafka.server:type=KafkaRequestHandlerPool,name=RequestHandlerAvgIdlePercent'
            attribute:
              Count:
                metric_type: rate
                alias: kafka.request.handler.avg.idle.pct.rate
        - include:
            domain: 'kafka.server'
            bean: 'kafka.server:type=ProducerRequestPurgatory,name=PurgatorySize'
            attribute:
              Value:
                metric_type: gauge
                alias: kafka.request.producer_request_purgatory.size
        - include:
            domain: 'kafka.server'
            bean: 'kafka.server:type=FetchRequestPurgatory,name=PurgatorySize'
            attribute:
              Value:
                metric_type: gauge
                alias: kafka.request.fetch_request_purgatory.size
    
        #
        # Replication stats
        #
        - include:
            domain: 'kafka.server'
            bean: 'kafka.server:type=ReplicaManager,name=UnderReplicatedPartitions'
            attribute:
              Value:
                metric_type: gauge
                alias: kafka.replication.under_replicated_partitions
        - include:
            domain: 'kafka.server'
            bean: 'kafka.server:type=ReplicaManager,name=IsrShrinksPerSec'
            attribute:
              Count:
                metric_type: rate
                alias: kafka.replication.isr_shrinks.rate
        - include:
            domain: 'kafka.server'
            bean: 'kafka.server:type=ReplicaManager,name=IsrExpandsPerSec'
            attribute:
              Count:
                metric_type: rate
                alias: kafka.replication.isr_expands.rate
        - include:
            domain: 'kafka.controller'
            bean: 'kafka.controller:type=ControllerStats,name=LeaderElectionRateAndTimeMs'
            attribute:
              Count:
                metric_type: rate
                alias: kafka.replication.leader_elections.rate
        - include:
            domain: 'kafka.controller'
            bean: 'kafka.controller:type=ControllerStats,name=UncleanLeaderElectionsPerSec'
            attribute:
              Count:
                metric_type: rate
                alias: kafka.replication.unclean_leader_elections.rate
        - include:
            domain: 'kafka.controller'
            bean: 'kafka.controller:type=KafkaController,name=OfflinePartitionsCount'
            attribute:
              Value:
                metric_type: gauge
                alias: kafka.replication.offline_partitions_count
        - include:
            domain: 'kafka.controller'
            bean: 'kafka.controller:type=KafkaController,name=ActiveControllerCount'
            attribute:
              Value:
                metric_type: gauge
                alias: kafka.replication.active_controller_count
        - include:
            domain: 'kafka.server'
            bean: 'kafka.server:type=ReplicaManager,name=PartitionCount'
            attribute:
              Value:
                metric_type: gauge
                alias: kafka.replication.partition_count
        - include:
            domain: 'kafka.server'
            bean: 'kafka.server:type=ReplicaManager,name=LeaderCount'
            attribute:
              Value:
                metric_type: gauge
                alias: kafka.replication.leader_count
        - include:
            domain: 'kafka.server'
            bean: 'kafka.server:type=ReplicaFetcherManager,name=MaxLag,clientId=Replica'
            attribute:
              Value:
                metric_type: gauge
                alias: kafka.replication.max_lag
    
        #
        # Log flush stats
        #
        - include:
            domain: 'kafka.log'
            bean: 'kafka.log:type=LogFlushStats,name=LogFlushRateAndTimeMs'
            attribute:
              Count:
                metric_type: rate
                alias: kafka.log.flush_rate.rate
    
    <% end -%>

    To integrate all of this node, you need to add in your fqdn.yaml the class in the format:

    ---
    classes:
     - profiles::datadogagent
    
    datadog_agent::api_key: [your key]
    

    After this runs, datadog-agent is installed and you can check it by using ps -ef | grep datadog-agent and also if you like to take a look and you should do that, you will find that there are two new files added to /etc/dd-agent/conf.d called kafka.yaml and zk.yaml.

    You are done, please feel free to login to the datadog portal and check you host.

    Cheers

  • Automigrate data contained in a topic to different Kafka brokers

    Hi,

    Let’s keep it short, you have a cluster composed of three Kafka brokers and you add another two. The details of doing that are pretty straight forward. If they are new, give them a unique broker.id and put the zoookeeper.connect string in order to know on which cluster to register. If they are already registered with a id, please keep in mind that you need to change it also in meta.properties from the logs.dir location.

    So, we have now the situation:

    Connecting to localhost:2181
    Welcome to ZooKeeper!
    JLine support is disabled
    
    WATCHER::
    
    WatchedEvent state:SyncConnected type:None path:null
    ls /brokers/ids
    [13, 14, 1003, 1002, 1001]

    And we need to migrate one topic which has a replication factor of two and also two partitions:

    Topic:test    PartitionCount:2    ReplicationFactor:2    Configs:
        Topic: test    Partition: 0    Leader: 1003    Replicas: 1003,1001    Isr: 1001,1003
        Topic: test    Partition: 1    Leader: 1001    Replicas: 1001,1002    Isr: 1002,1001

    In order to achieve this you will need to follow these steps:

    Steps to be taken:
    1 Create json with content:

    {"topics": [{"topic": "test"}],
      "version":1
      }

    2 Run the following command: ./kafka-reassign-partitions.sh –zookeeper localhost:2181 –topics-to-move-json-file topics-to-move.json –broker-list “13,14” –generate
    Output:

    Current partition replica assignment
    
    {"version":1,"partitions":[{"topic":"test","partition":0,"replicas":[1003,1001]},{"topic":"test","partition":1,"replicas":[1001,1002]}]}
    Proposed partition reassignment configuration
    
    {"version":1,"partitions":[{"topic":"test","partition":0,"replicas":[14,13]},{"topic":"test","partition":1,"replicas":[13,14]}]}

    3 Create file expand-cluster-reassignment.json with following content:

    {"version":1,"partitions":[{"topic":"test","partition":0,"replicas":[14,13]},{"topic":"test","partition":1,"replicas":[13,14]}]}

    4 Run following command for topic migration: ./kafka-reassign-partitions.sh –zookeeper localhost:2181 –reassignment-json-file expand-cluster-reassignment.json –execute
    Output:

    Current partition replica assignment
    
    {"version":1,"partitions":[{"topic":"test","partition":0,"replicas":[1003,1001]},{"topic":"test","partition":1,"replicas":[1001,1002]}]}
    
    Save this to use as the --reassignment-json-file option during rollback
    Successfully started reassignment of partitions.

    5 Verify the status of migration with command: ./kafka-reassign-partitions.sh –zookeeper localhost:2181 –reassignment-json-file expand-cluster-reassignment.json –verify

    Output:
    Status of partition reassignment: 
    Reassignment of partition [test,0] completed successfully
    Reassignment of partition [test,1] completed successfully
    

    Final status:

    ./kafka-topics.sh --zookeeper localhost:2181 --topic test --describe
    Topic:test	PartitionCount:2	ReplicationFactor:2	Configs:
    	Topic: test	Partition: 0	Leader: 13	Replicas: 13,14	Isr: 14,13
    	Topic: test	Partition: 1	Leader: 14	Replicas: 14,13	Isr: 13,14

    This is actual a follow up of the steps described here:

    https://kafka.apache.org/documentation.html#basic_ops_automigrate

    Cheers!

  • How to deploy Prometheus infrastructure for Kafka monitoring using puppet

    Hi,

    In the last couple of days i worked on deployment of Prometheus server and agent for Kafka monitoring. In that purpose i will share with you the main points that you need to do in order to achieve this.

    First thing to do is to use the prometheus and grafana modules that you will find at the following links:

    https://forge.puppet.com/puppet/prometheus
    https://forge.puppet.com/puppet/grafana

    After these are imported in puppet you need to create the following puppet files:

    grafana.pp

    class profiles::grafana {
        class { '::grafana':
          cfg => {
            app_mode => 'production',
            server   => {
              http_port     => 8080,
            },
            database => {
              type     => 'sqlite3',
              host     => '127.0.0.1:3306',
              name     => 'grafana',
              user     => 'root',
              password => 'grafana',
            },
            users    => {
              allow_sign_up => false,
            },
          },
        }
    }

    puppetserver.pp

    class profiles::prometheusserver {
        $kafka_nodes=hiera(profiles::prometheusserver::nodes)
       
        if $kafka_nodes {
    	class {'::prometheus':
    	   global_config  => { 'scrape_interval'=> '15s', 'evaluation_interval'=> '15s', 'external_labels'=> { 'monitor'=>'master'}},
           rule_files     => [ "/etc/prometheus/alert.rules" ],
           scrape_configs => [ {'job_name'=>'prometheus','scrape_interval'=> '30s','scrape_timeout'=>'30s','static_configs'=> [{'targets'=>['localhost:9090'], 'labels'=> { 'alias'=>'Prometheus'}}]},{'job_name'=> kafka, 'scrape_interval'=> '10s', 'scrape_timeout'=> '10s', 'static_configs'=> [{'targets'=> $kafka_nodes }]}],
        }
       
        } else {
        class {'::prometheus':
    	   global_config  => { 'scrape_interval'=> '15s', 'evaluation_interval'=> '15s', 'external_labels'=> { 'monitor'=>'master'}},
           rule_files     => [ "/etc/prometheus/alert.rules" ],
           scrape_configs => [ {'job_name'=>'prometheus','scrape_interval'=> '30s','scrape_timeout'=>'30s','static_configs'=> [{'targets'=>['localhost:9090'], 'labels'=> { 'alias'=>'Prometheus'}}]}],
        }
        }
    }

    prometheusnode.pp

    class profiles_opqs::prometheusnode(
    	$jmxexporter_dir = hiera('jmxexporter::dir','/opt/jmxexporter'),
    	$jmxexporter_version = hiera('jmxexporter::version','0.9')
    ){
    	include ::prometheus::node_exporter
    	#validate_string($jmxexporter_dir)
    
    	file {"${jmxexporter_dir}":
    		ensure => 'directory',
    	}
    	file {"${jmxexporter_dir}/prometheus_config.yaml":
    		source => 'puppet:///modules/profiles/prometheus_config',
    	}
    	wget::fetch {"https://repo1.maven.org/maven2/io/prometheus/jmx/jmx_prometheus_javaagent/${jmxexporter_version}/jmx_prometheus_javaagent-${jmxexporter_version}.jar":
    	destination => "${jmxexporter_dir}/",
    	cache_dir => '/tmp/',
    	timeout => 0,
    	verbose => false,
    	unless => "test -e ${jmxexporter_dir}/jmx_prometheus_javaagent-${jmxexporter_version}.jar",
    	}	
    }

    It is true that i used the wget module to take the JMX exporter so i must give you that as well

     https://forge.puppet.com/leonardothibes/wget

    As also required is the file to configure the jmx exporter configuration file in order to translate the JMX data provided by Kafka to fields to be imported in Prometheus
    prometheus_config:

    lowercaseOutputName: true
    rules:
    - pattern : kafka.cluster<type=(.+), name=(.+), topic=(.+), partition=(.+)><>Value
      name: kafka_cluster_$1_$2
      labels:
        topic: "$3"
        partition: "$4"
    - pattern : kafka.log<type=Log, name=(.+), topic=(.+), partition=(.+)><>Value
      name: kafka_log_$1
      labels:
        topic: "$2"
        partition: "$3"
    - pattern : kafka.controller<type=(.+), name=(.+)><>(Count|Value)
      name: kafka_controller_$1_$2
    - pattern : kafka.network<type=(.+), name=(.+)><>Value
      name: kafka_network_$1_$2
    - pattern : kafka.network<type=(.+), name=(.+)PerSec, request=(.+)><>Count
      name: kafka_network_$1_$2_total
      labels:
        request: "$3"
    - pattern : kafka.network<type=(.+), name=(\w+), networkProcessor=(.+)><>Count
      name: kafka_network_$1_$2
      labels:
        request: "$3"
      type: COUNTER
    - pattern : kafka.network<type=(.+), name=(\w+), request=(\w+)><>Count
      name: kafka_network_$1_$2
      labels:
        request: "$3"
    - pattern : kafka.network<type=(.+), name=(\w+)><>Count
      name: kafka_network_$1_$2
    - pattern : kafka.server<type=(.+), name=(.+)PerSec\w*, topic=(.+)><>Count
      name: kafka_server_$1_$2_total
      labels:
        topic: "$3"
    - pattern : kafka.server<type=(.+), name=(.+)PerSec\w*><>Count
      name: kafka_server_$1_$2_total
      type: COUNTER
    
    - pattern : kafka.server<type=(.+), name=(.+), clientId=(.+), topic=(.+), partition=(.*)><>(Count|Value)
      name: kafka_server_$1_$2
      labels:
        clientId: "$3"
        topic: "$4"
        partition: "$5"
    - pattern : kafka.server<type=(.+), name=(.+), topic=(.+), partition=(.*)><>(Count|Value)
      name: kafka_server_$1_$2
      labels:
        topic: "$3"
        partition: "$4"
    - pattern : kafka.server<type=(.+), name=(.+), topic=(.+)><>(Count|Value)
      name: kafka_server_$1_$2
      labels:
        topic: "$3"
      type: COUNTER
    
    - pattern : kafka.server<type=(.+), name=(.+), clientId=(.+), brokerHost=(.+), brokerPort=(.+)><>(Count|Value)
      name: kafka_server_$1_$2
      labels:
        clientId: "$3"
        broker: "$4:$5"
    - pattern : kafka.server<type=(.+), name=(.+), clientId=(.+)><>(Count|Value)
      name: kafka_server_$1_$2
      labels:
        clientId: "$3"
    - pattern : kafka.server<type=(.+), name=(.+)><>(Count|Value)
      name: kafka_server_$1_$2
    
    - pattern : kafka.(\w+)<type=(.+), name=(.+)PerSec\w*><>Count
      name: kafka_$1_$2_$3_total
    - pattern : kafka.(\w+)<type=(.+), name=(.+)PerSec\w*, topic=(.+)><>Count
      name: kafka_$1_$2_$3_total
      labels:
        topic: "$4"
      type: COUNTER
    - pattern : kafka.(\w+)<type=(.+), name=(.+)PerSec\w*, topic=(.+), partition=(.+)><>Count
      name: kafka_$1_$2_$3_total
      labels:
        topic: "$4"
        partition: "$5"
      type: COUNTER
    - pattern : kafka.(\w+)<type=(.+), name=(.+)><>(Count|Value)
      name: kafka_$1_$2_$3_$4
      type: COUNTER
    - pattern : kafka.(\w+)<type=(.+), name=(.+), (\w+)=(.+)><>(Count|Value)
      name: kafka_$1_$2_$3_$6
      labels:
        "$4": "$5"

    Ok, so in order to put this together, we will use plain old hiera :). For the server on which you want to configure prometheus server you will need to create a role or just put it in the fqdn.yaml that looks like this:

    prometheus.yaml

    ---
    classes:
      - 'profiles::prometheusserver'
      - 'profiles::grafana'
    
    alertrules:
        -
            name: 'InstanceDown'
            condition:  'up == 0'
            timeduration: '5m'
            labels:
                -
                    name: 'severity'
                    content: 'critical'
            annotations:
                -
                    name: 'summary'
                    content: 'Instance {{ $labels.instance }} down'
                -
                    name: 'description'
                    content: '{{ $labels.instance }} of job {{ $labels.job }} has been down for more than 5 minutes.'
    

    This is as default installation, because it’s a “role”, on each prometheus host i also created a specific fqdn.yaml file to specify in order to tell what nodes should be checked for exposed metrics. Here is an example:

    ---
    profiles::prometheusserver::nodes:
        - 'kafka0:7071'
        - 'kafka1:7071'
        - 'kafka2:7071'
    

    The three nodes are as an example, you can put all the nodes on which you include the prometheus node class.
    Let me show you how this should also look:

    ---
    classes:
     - 'profiles::prometheusnode'
     
    profiles::kafka::jolokia: '-javaagent:/usr/share/java/jolokia-jvm-agent.jar -javaagent:/opt/jmxexporter/jmx_prometheus_javaagent-0.9.jar=7071:/opt/jmxexporter/prometheus_config.yaml

    Now i need to explain that jolokia variable, right? Yeah, it’s pretty straight forward. The kafka installation was already wrote, and it included the jolokia agent and our broker definition block looks like this:

    
     class { '::kafka::broker':
        config    => $broker_config,
        opts      => hiera('profiles::kafka::jolokia', '-javaagent:/usr/share/java/jolokia-jvm-agent.jar'),
        heap_opts => "-Xmx${jvm_heap_size}M -Xms${jvm_heap_size}M",
      }
    }

    So i needed to puth the jmx exporter agent beside jolokia on kafka startup, and when this will be deployed you will see the jmxexporter started as agent. Anyhow, when all is deployed you will have a prometheus config that should look like:

    ---
    global:
      scrape_interval: 15s
      evaluation_interval: 15s
      external_labels:
        monitor: master
    rule_files:
    - /etc/prometheus/alert.rules
    scrape_configs:
    - job_name: prometheus
      scrape_interval: 30s
      scrape_timeout: 30s
      static_configs:
      - targets:
        - localhost:9090
        labels:
          alias: Prometheus
    - job_name: kafka
      scrape_interval: 10s
      scrape_timeout: 10s
      static_configs:
      - targets:
        - kafka0:7071
        - kafka1:7071
        - kafka2:7071

    You can also see the nodes at Status -> Targets from the menu, and yeah, all the metrics are available by node at http://[kafka-node]:7071/metrics.

    I think this should be it, i don’t know i covered everything and there are a lot of details related to our custom installation but at least i managed to provide so details related to it. The article that helped me very much to do is can be visited here

    https://www.robustperception.io/monitoring-kafka-with-prometheus/

    Cheers!

  • Wrong Kafka configuration and deployment using puppet

    Hi,

    I just want to share with you one case that we have last week and that involved some wrong kafka deplyment from puppet that actually filled the filesystems and got our colleagues that were using it for a transport layer on ELK.

    Lets start with the begining, we have some puppet code to deploy and configure kafka machines. To keep it simple the broker config block from puppet looks like this:

    $broker_config = {
        'broker.id'                     => '-1', # always set broker.id to -1.
        # broker specific config
        'zookeeper.connect'             => hiera('::kafka::zookeeper_connect', $zookeeper_connect),
        'inter.broker.protocol.version' => hiera('::kafka::inter_broker_protocol_version', $kafka_version),
        'log.dir'                       => hiera('::kafka::log_dir', '/srv/kafka-logs'),
        'log.dirs'                      => hiera('::kafka::log_dirs', '/srv/kafka-logs'),
        'log.retention.hours'           => hiera('::kafka::log_retention_hours', $days7),
        'log.retention.bytes'           => hiera('::kafka::log_retention_bytes', '-1'),
        # confiure availability
        'num.partitions'                => hiera('::kafka::num_partitions', 256),
        'default.replication.factor'    => hiera('::kafka::default_replication_factor', $default_replication_factor),
        # configurre administratability (this is a word now)
        'delete.topic.enable'           => hiera('::kafka::delete_topic_enable', 'true'),
      }
    

    As you can see, there are two fields that need to be carefully configured, one is the log_retention_bytes and the other is the num_partitions. I am underlying this for a very simple reason, if we will take a look in the kafka server.properies file which the engine use to start the broker, we will see

    log.retention.bytes=107374182400
    num.partitions=256

     Now, the first one is the default value (and it means the maximum size per partition), and if you go to an online converter you will see that it means 100GB. This shouldn’t be a problem if you override it topic based, and even more if you manually create a topic with one or two partitions and you have the required space. But our case was different, they were using for logstash which created a default topic configuration with 256 partitions and if the maximum size was 100GB then it though it had 25600GB which i am not mistaken should also translate in 25TB (yeah, that it’s usually more than enough for a ELK instance).

    Since we were running only three nodes with 100GB filesystem each, you can imagine that it filled up and the servers crashed.

    Now there are two ways to avoid this, depending the number of consumers in the consumer group, and also the retention period. If you have only one consumer and you want to have data for a larger period of time, you can leave it at 100GB (if you have that much storage available) and manually create a topic with 1 partition and a replication factor equal to the number of nodes that you want to use for high availability, or you can leave the partition number to 256, and you highly decrease the retention bytes value. This translates in an equal distribution of data on multiple consumers, but it comes with a lesser storage period of time (now it is also true that it depends on the amount and type of data that you are transferring).

    The solution that we adopted is to leave the partition number unchanged and decrease the partition size to 200MB. Keep you informed how it works. 🙂

    Cheers!