• Puppet implementation of traefik load balancer for kafka-manager

    Hi,

    It’s time to give the puppet implementation for the traefik small case. It is related to the following article http://log-it.tech/2017/08/08/balancing-requests-kafka-manager-using-traefik/

    Starting from that i tried to find a puppet module that can actually install the package more or less accurate and i found this https://forge.puppet.com/praekeltfoundation/traefik

    Now, for the service install it works, but for defining of the traefik.toml and rules. toml it was a real pain. First of all one of the function call in the module does not work, and after fixing it, it does’t really align the toml file as required, so i decided to do this in a more simple way. I put the traefik.toml in a file since it doesn’t really contain anything dynamically related to our environment. It looks like:

    accessLogsFile = "/var/log/traefik/access.log"
    traefikLogsFile = "/var/log/traefik/traefik.log"
    logLevel = "DEBUG"
    defaultEntryPoints = ["https"]
    [entryPoints]
      [entryPoints.http]
      address = ":80"
        [entryPoints.http.redirect]
          entryPoint = "https"
      [entryPoints.https]
      address = ":443"
        [entryPoints.https.tls]
          [[entryPoints.https.tls.certificates]]
          CertFile = "/etc/traefik/traefik.crt"
          KeyFile = "/etc/traefik/traefik.key"
    
    
    [web]
    address = ":8080"
    
    [file]
    filename = "/etc/traefik/rules.toml"
    watch = true
    

    Now, the config files are stored in /etc/traefik, and i made the convention to store also the self generated certificate for HTTPS also in this location. Sure you can set it dynamically, but for a small load balance and a cluster of a few nodes this should not be a problem.
    Ok, as you can see we have a different rules.toml file which in our case it will be created by erb template, and the source is:

    [backends]
      [backends.kafka-manager]
        [backends.kafka-manager.LoadBalancer]
          method = "drr"
         <% @kafka_hosts_hash.each do |value, index| %>
        [backends.kafka-manager.servers.server<%= index %>]
        url = "http://<%= value %>:9000"
        weight = 1
        <% end %>
    [frontends]
      [frontends.kafka-manager]
      entrypoints = ["http","https"]
      backend = "kafka-manager"
      passHostHeader = true
      priority = 10
    

    This is pretty straightforward and it will be linked with the last piece of the puzzle, which is the puppet class and it actually looks like this:

    class profiles::traefikinstall {
      $version = hiera("profiles::traefik::version",'1.3.5')
    
      class {'traefik': 
        version           => $version,
      }
      exec {'generate_cert':
      command => "openssl req -newkey rsa:4096 -nodes -sha512 -x509 -days 3650 -nodes -subj \"/CN=${fqdn}/OU=traefik/O=log-it.tech/L=Bucharest/S=Romania/C=RO\" -out /etc/traefik/traefik.crt -keyout /etc/traefik/traefik.key",
      path => ['/usr/bin','/usr/sbin','/bin','/sbin'],
      onlyif => "test ! -f /etc/traefik/traefik.crt"
      } ->
      file {"/etc/traefik/traefik.toml":
        source => 'puppet:///modules/profiles/traefik.toml',
        mode => '0644',
        replace => false,
        notify => Service['traefik'],
      }
      $kafka_hosts = query_nodes("role='kafka'").sort #here it should be any role or fact that indicates that it should have kafka-manager installed
      $kafka_hosts_hash = $kafka_hosts.map | $index, $value| { [$value,$index+1] }.hash
    
      file {"/etc/traefik/rules.toml":
        content => template("${module_name}/rules.toml.erb"),
        mode => '0644',
        replace => false,
      }
    }
    

    And this is all the code you need to deploy a traefik instance that it’s “secured” via HTTPS and has load balancing between all kafka-manager instances. Now it’s true that you can secure it by adding iptables rules that restrict traffic on port 9000 (the default kafka manager port) just from the hosts in the cluster, but i will come back also with that part in the future if it will be done.

    Cheers!

  • Install puppet gems on puppet master using hiera

    Morning,

    I needed to install a toml-rb gem in order to my puppet traefik module to work and i just want to short post my workaround on doing that automatically. There was some code in our repo for that but it used only hiera array, don’t really know, so i had to write a very short class that can take a hash for the installed process. It looks like this:

    class profiles::puppetinstall {
        $packages = hiera_hash('profiles::puppetinstall::packages',undef)
        if packages {
            ensure_packages($packages)
            }
    }

    And in my role file called puppetmaster.yaml in this case i had to put:

    
    classes:
     - 'profiles::puppetinstall'
    
    profiles::puppetinstall::packages:
       toml-rb:
          provider: 'puppet_gem'

    Now i know that maybe it’s not that elegant, but it fixed my problem. Hopefully i will put all the details related to traefik implementation. And yes, if you are wondering from were can you get the ensure_packages resource, i can tell you it is included in stdlib package https://forge.puppet.com/puppetlabs/stdlib#ensure_packages

    P.S: That was for the puppet agent and standard gems, for the gems that need to be installed on puppet server i needed to write the following piece of code:

    $packages_puppetserver = hiera_array('profiles::puppetinstall::puppetserver_packages',undef)
    if $packages_puppetserver {
            $packages_puppetserver.each |String $package_name| {
                exec {"install ${package_name}":
                    command => "/opt/puppetlabs/bin/puppetserver gem install ${package_name}",
                    path => [ '/usr/bin','/usr/sbin','/bin','/sbin' ],
                    unless => "/opt/puppetlabs/bin/puppetserver gem list | grep ${package_name}",
                }
            }    
        }

    The way to put the packages in hiera is similar:

    profiles::puppetinstall::puppetserver_packages:
     - 'toml-rb'

    Cheers!

  • Balancing requests to kafka-manager using traefik

    Hi,

    Just wanted to share with you a quite small and simple config to balance the traffic between three machines that have kafka-manager installed. For this i used traefik since it was new to me and i wanted to gain a little bit of experience with it.

    It’s an interesting solution but it took me a while to get the pieces working. I will post here my config and will explain the needed part to get it working.

    logLevel = "DEBUG"
    defaultEntryPoints = ["http"]
    [entryPoints]
      [entryPoints.http]
      address = ":80"
    [web]
    address = ":8080"
    
    [file]
    watch = true
    
    [backends]
      [backends.backend1]
        [backends.backend1.LoadBalancer]
          method = "drr"
        [backends.backend1.servers.server1]
        url = "http://[kafka1.hostname]:9000"
        weight = 1
        [backends.backend1.servers.server2]
        url = "http://[kafka2.hostname]:9000"
        weight = 2
        [backends.backend1.servers.server3]
        url = "http://[kafka3.hostname]:9000"
        weight = 1
    [frontends]
      [frontends.frontend1]
      entrypoint = ["http"]
      backend = "backend1"
      passHostHeader = true
      priority = 10
    

    This is very basic as you can see but it took me a while to understand that you need the file block with watch = true in order for the daemon to see and parse the rules that are listed. You can also have a separate rules file and for that it would be best to consult the traefik documentation.

    I will have to do now the redirect from HTTP to HTTPS in order to secure the connection to frontend. The idea of traefik is that it works like entrypoint -> frontend -> backend and as far as i saw this will be done on the entrypoint level.

    Two extra additions is that you need a default entry point in order for your frontend not to be ignored and also put it on log level DEBUG because otherwise it won’t log much.

    Keep you posted on the progress and also you can find traefik here https://docs.traefik.io

    Cheers!

  • Configure kafka truststore and keystore using puppet

    Hi,

    Since the last article was about the template needed to generate the truststore and keystore, now it’s time to give you the rest of the fragments for the deployment with puppet.
    So, We will have the kafka_security gen class that was called on the last post and it should look like this in the final form:

    class profiles::kafka_security_gen {
        $pass = hiera('profiles::kafka_security_gen::password','password')
        $keystorepass = hiera('profiles::kafka_security_gen::keystorepass','password')
        
        $cluster_servers = query_nodes("role='kafka'")
        $servers = $cluster_servers.delete("${::fqdn}")
        
        file {'/home/kafka/security.sh':
            owner => kafka,
            group => kafka,
            ensure => file,
            mode => '0755',
            content => template("${module_name}/security.sh.erb"),
            replace => 'no'
    } ->
        exec {'generate_keystore':
        path   => '/usr/bin:/usr/sbin:/bin',
        command => '/home/kafka/security.sh',
        user => 'kafka',
        unless => 'test -f /home/kafka/kafka.server.keystore.jks'
        }
        $servers.each |String $server| {
            exec{"copy files to ${server}":
                cwd => '/home/kafka',
                path   => '/usr/bin:/usr/sbin:/bin',
                command => "scp /home/kafka/kafka* kafka@${server}:/home/kafka",
                user => 'kafka',
            }
            
         }
        ssh_keygen {'kafka':
        home => '/home/kafka/',
        type => rsa,
      }
      @@file {"/home/kafka/.ssh/authorized_keys":
        ensure => present,
        mode => '0600',
        owner => 'kafka',
        group => 'kafka',
        content => "${::sharedkey}",
        tag => "${::instance_tag}",
      }
      
    }

    The only thing i did not manage to fix is the copying of the stores, it will copy them each time, but i will fix that in the near future with some custom facts.
    Ok, now let’s see the integration part with kafka code. Unfortunately, i can not provide you all the code but as far as i can provide you is:

        
    $kafkadirs = ['/home/kafka/', '/home/kafka/.ssh/']
    file { $kafkadirs: 
        ensure => directory,
        mode => '0700',
        owner => 'kafka',
        group => 'kafka'
        }
        $security_enabled = hiera('profiles::kafka::security',false)
    if ($security_enabled == false) {
      $broker_config = {
        ....
      }
      } else {
        # if hostname starts with kafka0 them it's the CA auth (in our case kafka servers are on form kafka0....)
        if ($hostname == 'kafka0') {
        contain 'profiles::kafka_security_gen'
        Class['profiles::kafka_security_gen']
        }
        #this shares the public key on a couple of servers wich are isolated by a tag
        File <<| tag == "${::instance_tag}" |>>
        #path for keystore/truststore
        $keystore_location = '/home/kafka/kafka.server.keystore.jks'
        $truststore_location = '/home/kafka/kafka.server.truststore.jks'
        #broker config
        $broker_config = {
        ....
        'security.inter.broker.protocol' => 'SSL',
        'ssl.client.auth'               => 'none',
        'ssl.enabled.protocols'         => ['TLSv1.2', 'TLSv1.1', 'TLSv1'],
        'ssl.key.password'              => hiera('profiles::kafka_security_gen::password','password'),
        'ssl.keystore.location'         => $keystore_location,
        'ssl.keystore.password'         => hiera('profiles::kafka_security_gen::keystorepass','password'),
        'ssl.keystore.type'             => 'JKS',
        'ssl.protocol'                  => 'TLS',
        'ssl.truststore.location'       => $truststore_location,
        'ssl.truststore.password'       => hiera('profiles::kafka_security_gen::keystorepass','password'),
        'ssl.truststore.type'           => 'JKS',
        'listeners'                     => "PLAINTEXT://${::fqdn}:9092,SSL://${::fqdn}:9093", #both listeners are enabled
        'advertised.listeners'          => "PLAINTEXT://${::fqdn}:9092,SSL://${::fqdn}:9093",
        }
      }
    class { '::kafka::broker':
        config    => $broker_config,
        opts      => hiera('profiles::kafka::jolokia', '-javaagent:/usr/share/java/jolokia-jvm-agent.jar'),
        heap_opts => "-Xmx${jvm_heap_size}M -Xms${jvm_heap_size}M",
      }
    

    This can be easily activated by putting in the hierarchy the profiles::kafka::security option with true and it should do all the work. Once it’s done it can be tested using openssl s_client -debug -connect localhost:9093 -tls1

    The link to the generate article for the security script template is http://log-it.tech/2017/07/23/generate-kafka-keytrust-store-tls-activation/ and also a quite interesting guideline fron Confluence http://docs.confluent.io/current/kafka/ssl.html#configuring-kafka-brokers

    That’s all folks. 🙂

    Cheers

  • Generate Kafka key/trust store for TLS activation

    Morning folks,

    Just wanted to share with you also the code for script required on generation of keystore and truststore for Kafka. It is not much but it might be helpful someday.

    So, the erb script should look like this, and i will show you as well how you can integrate this also with puppet:

    #!/bin/bash
    HOST=<%= @fqdn %>
    PASSWORD=<%= @pass %>
    KEYSTOREPASS=<%= @keystorepass %>
    VALIDITY=365
    
    keytool -keystore kafka.server.keystore.jks -alias $HOST -validity $VALIDITY -genkey -dname "CN=${HOST}, OU=Test, O=Test, L=Bucharest S=Romania C=RO" -storepass $KEYSTOREPASS -keypass $KEYSTOREPASS
    openssl req -new -x509 -keyout ca-key -out ca-cert -days $VALIDITY -subj "/CN=${HOST}/OU=Tests/O=Test/L=Bucharest/S=Romania/C=RO" -passout pass:$PASSWORD
    keytool -keystore kafka.server.truststore.jks -alias CARoot -import -file ca-cert -storepass $KEYSTOREPASS -noprompt
    keytool -keystore kafka.server.keystore.jks -alias CARoot -import -file ca-cert -storepass $KEYSTOREPASS -noprompt
    keytool -keystore kafka.client.keystore.jks -alias $HOST -validity $VALIDITY -genkey -dname "CN=${HOST}, OU=Test, O=Test, L=Bucharest S=Romania C=RO" -storepass $KEYSTOREPASS -keypass $KEYSTOREPASS
    keytool -keystore kafka.client.keystore.jks -alias CARoot -import -file ca-cert -storepass $KEYSTOREPASS -noprompt
    keytool -keystore kafka.client.truststore.jks -alias CARoot -import -file ca-cert -storepass $KEYSTOREPASS -noprompt
    
    <% @servers.each do |server| -%>
    # <%= server %>
    keytool -keystore kafka.server.keystore.jks -alias <%= server %> -validity $VALIDITY -genkey -dname "CN=<%= server %>, OU=Test, O=Test, L=Bucharest S=Romania C=RO" -storepass $KEYSTOREPASS -keypass $KEYSTOREPASS
    keytool -keystore kafka.server.keystore.jks -alias <%= server %> -certreq -file cert-file-<%= server %>.host -storepass $KEYSTOREPASS
    openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file-<%= server %>.host -out cert-signed-<%= server %>.host -days $VALIDITY -CAcreateserial -passin pass:$PASSWORD
    keytool -keystore kafka.server.keystore.jks -alias <%= server %> -import -file cert-signed-<%= server %>.host -storepass $KEYSTOREPASS -noprompt
    
    keytool -keystore kafka.client.keystore.jks -alias <%= server %> -validity $VALIDITY -genkey -dname "CN=<%= server %>, OU=Test, O=Test, L=Bucharest S=Romania C=RO" -storepass $KEYSTOREPASS -keypass $KEYSTOREPASS
    keytool -keystore kafka.client.keystore.jks -alias <%= server %> -certreq -file cert-file-<%= server %>.client -storepass $KEYSTOREPASS
    openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file-<%= server %>.client -out cert-signed-<%= server %>.client -days $VALIDITY -CAcreateserial -passin pass:$PASSWORD
    keytool -keystore kafka.client.keystore.jks -alias <%= server %> -import -file cert-signed-<%= server %>.client -storepass $KEYSTOREPASS -noprompt
    
    <% end -%>

    As you probably saw already, there are some variables that should be taken from the puppet file, like the list of servers and some passwords for the PEM key and truststore/keystore password.

    Now let’s take a look at the puppet file that should generate this:

    class profiles::kafka_security_gen {
        $pass = hiera('profiles::kafka_security_gen::password','password')
        $keystorepass = hiera('profiles::kafka_security_gen::keystorepass','password')
        
        $cluster_servers = query_nodes("role='kafka'")
        $servers = $cluster_servers.delete("${::fqdn}")
        
        file {'/tmp/security.sh':
            ensure => file,
            mode => '0755',
            content => template("${module_name}/security.sh.erb"),
    }
    }

    This should suffice so far, the idea is that you need to create the truststore and keystore separately for the host that generates them and after that generate and import the CA root certificate, once this is done there are straight forward steps to be taken in order to build the stores.
    I am currently working on a solution to automatic distribute and integrate this on all machines, keep you posted.

    Cheers

  • Jupyter Notebook – very very interesting tool

    Hi,

    As i was taking a look on the Docker newsletter beside Moby and other articles related to that i found this interesting tool and also tutorial/presentation:

    Beside that you can find the official site here: http://jupyter.org

    This caught my attention and i will certainly try this on a machine. I am pretty curios since i believe this is used to power the Wolfram Notebook.

    Cheers!

  • Install Kafka Manager with Puppet

    Hi,

    I will continue on this line with the install of a management tool called Kafka manager using same old Puppet.

    The main source of the project is here:

    https://github.com/yahoo/kafka-manager

    If you scroll down at packaging you will see that you have the possibility to create a .deb package for Ubuntu or Debian setup or a rpm. To do this just clone the project, go into it and run the command

    sbt debian:packageBin
    

    It will take some time but eventually it will create the required package. Now, since you have the package, you can easily ask a friend to upload it to your pipeline apt repo (this is what i did, i don’t have yet the experience on how to do that) and install and configure it via puppet. In order to do that, i “wrote” the following manifest:

    kafkamanager.pp

    class profiles::kafkamanager {
    	
    	$zookeeper_connect = hiera('kafkamanager::zookeeperconnect')
    	package {'kafka-manager':
    		ensure => installed,
    	}
    	group { 'kafka-manager':
    		ensure => 'present',	
    	}
    	user { 'kafka-manager':
    		ensure => 'present',
    		groups => 'kafka-manager'
    	}
    	
    	Group['kafka-manager'] -> User['kafka-manager']
    	
    	file { '/usr/share/kafka-manager' :
        		ensure    => directory,
        		owner     => 'kafka-manager',
        		group      => 'kafka-manager',
        		require     => [ User['kafka-manager'], Group['kafka-manager'], ],
        		recurse    => true,
    	}
    
    	file_line { 'config_zookeeper':
    	    path => '/etc/kafka-manager/application.conf',
    	    match => 'kafka-manager.zkhosts=\"kafka-manager-zookeeper:2181\"',
    	    line => "kafka-manager.zkhosts=\"${zookeeper_connect}\"",
    	    replace => true,
    	}
        
        file_line { 'enable_auth':
            path => '/etc/kafka-manager/application.conf',
            match => 'basicAuthentication.enabled=false',
            line => 'basicAuthentication.enabled=true',
            replace => true,
            }
    	service { 'kafka-manager':
    		ensure => 'running',
    		enable => true,
    		require => [ File['/usr/share/kafka-manager'], File_line['config_zookeeper'] ],	
    	}
    }

    This should be all, you are now free to login with the user and pass that you find in the application.conf file and start mapping Kafka clusters.

    Cheers!

  • Integrate Kafka with Datadog monitoring using puppet

    Hi,

    Since i was in debt with an article on how to integate Kafka monitoring using Datadog, let me tell you a couple of things about this topic. First of all, we are taking the same config of Kafka with Jolokia that was describe in following article. From the install of the brokers on our infrastructure, JMX data is published on port 9990 (this will be needed in the datadog config).

    The files you need to create for this task are as follows:

    datadogagent.pp

    class profiles::datadogagent {
      $_check_api_key = hiera('datadog_agent::api_key')
    
      contain 'datadog_agent'
      contain 'profiles::datadogagent_config_kafka'
      contain 'datadog_agent::integrations::zk'
    
      Class['datadog_agent'] -> Class['profiles::datadog_agent_config_kafka']
      Class['datadog_agent'] -> Class['datadog_agent::integrations::zk']
    }

    datadogagent_config_kafka.pp

    class profiles::datadogagent_config_kafka (
    $servers = [{'host' => 'localhost', 'port' => '9990'}]
    ) inherits datadog_agent::params {
      include datadog_agent
    
      validate_array($servers)
    
      file { "${datadog_agent::params::conf_dir}/kafka.yaml":
        ensure  => file,
        owner   => $datadog_agent::params::dd_user,
        group   => $datadog_agent::params::dd_group,
        mode    => '0600',
        content => template("${module_name}/kafka.yaml.erb"),
        require => Package[$datadog_agent::params::package_name],
        notify  => Service[$datadog_agent::params::service_name],
      }
    }
    

    And since, there isn’t yet an integration by default for the kafka on the datadog module which you can find it here:

    https://github.com/DataDog/puppet-datadog-agent

    i created in the templates directory the following file:

    kafka.yaml.erb (as you can see from the header this is actually the template given by datadog for kafka integration with specific host and port)

    ##########
    # WARNING
    ##########
    # This sample works only for Kafka >= 0.8.2.
    # If you are running a version older than that, you can refer to agent 5.2.x released
    # sample files, https://raw.githubusercontent.com/DataDog/dd-agent/5.2.1/conf.d/kafka.yaml.example
    
    instances:
    <% @servers.each do |server| -%>
      - host: <%= server['host'] %>
        port: <%= server['port'] %> # This is the JMX port on which Kafka exposes its metrics (usually 9999)
        tags:
          kafka: broker
    
    init_config:
      is_jmx: true
    
      # Metrics collected by this check. You should not have to modify this.
      conf:
        # v0.8.2.x Producers
        - include:
            domain: 'kafka.producer'
            bean_regex: 'kafka\.producer:type=ProducerRequestMetrics,name=ProducerRequestRateAndTimeMs,clientId=.*'
            attribute:
              Count:
                metric_type: rate
                alias: kafka.producer.request_rate
        - include:
            domain: 'kafka.producer'
            bean_regex: 'kafka\.producer:type=ProducerRequestMetrics,name=ProducerRequestRateAndTimeMs,clientId=.*'
            attribute:
              Mean:
                metric_type: gauge
                alias: kafka.producer.request_latency_avg
        - include:
            domain: 'kafka.producer'
            bean_regex: 'kafka\.producer:type=ProducerTopicMetrics,name=BytesPerSec,clientId=.*'
            attribute:
              Count:
                metric_type: rate
                alias: kafka.producer.bytes_out
        - include:
            domain: 'kafka.producer'
            bean_regex: 'kafka\.producer:type=ProducerTopicMetrics,name=MessagesPerSec,clientId=.*'
            attribute:
              Count:
                metric_type: rate
                alias: kafka.producer.message_rate
        # v0.8.2.x Consumers
        - include:
            domain: 'kafka.consumer'
            bean_regex: 'kafka\.consumer:type=ConsumerFetcherManager,name=MaxLag,clientId=.*'
            attribute:
              Value:
                metric_type: gauge
                alias: kafka.consumer.max_lag
        - include:
            domain: 'kafka.consumer'
            bean_regex: 'kafka\.consumer:type=ConsumerFetcherManager,name=MinFetchRate,clientId=.*'
            attribute:
              Value:
                metric_type: gauge
                alias: kafka.consumer.fetch_rate
        - include:
            domain: 'kafka.consumer'
            bean_regex: 'kafka\.consumer:type=ConsumerTopicMetrics,name=BytesPerSec,clientId=.*'
            attribute:
              Count:
                metric_type: rate
                alias: kafka.consumer.bytes_in
        - include:
            domain: 'kafka.consumer'
            bean_regex: 'kafka\.consumer:type=ConsumerTopicMetrics,name=MessagesPerSec,clientId=.*'
            attribute:
              Count:
                metric_type: rate
                alias: kafka.consumer.messages_in
    
        # Offsets committed to ZooKeeper
        - include:
            domain: 'kafka.consumer'
            bean_regex: 'kafka\.consumer:type=ZookeeperConsumerConnector,name=ZooKeeperCommitsPerSec,clientId=.*'
            attribute:
              Count:
                metric_type: rate
                alias: kafka.consumer.zookeeper_commits
        # Offsets committed to Kafka
        - include:
            domain: 'kafka.consumer'
            bean_regex: 'kafka\.consumer:type=ZookeeperConsumerConnector,name=KafkaCommitsPerSec,clientId=.*'
            attribute:
              Count:
                metric_type: rate
                alias: kafka.consumer.kafka_commits
        # v0.9.0.x Producers
        - include:
            domain: 'kafka.producer'
            bean_regex: 'kafka\.producer:type=producer-metrics,client-id=.*'
            attribute:
              response-rate:
                metric_type: gauge
                alias: kafka.producer.response_rate
        - include:
            domain: 'kafka.producer'
            bean_regex: 'kafka\.producer:type=producer-metrics,client-id=.*'
            attribute:
              request-rate:
                metric_type: gauge
                alias: kafka.producer.request_rate
        - include:
            domain: 'kafka.producer'
            bean_regex: 'kafka\.producer:type=producer-metrics,client-id=.*'
            attribute:
              request-latency-avg:
                metric_type: gauge
                alias: kafka.producer.request_latency_avg
        - include:
            domain: 'kafka.producer'
            bean_regex: 'kafka\.producer:type=producer-metrics,client-id=.*'
            attribute:
              outgoing-byte-rate:
                metric_type: gauge
                alias: kafka.producer.bytes_out
        - include:
            domain: 'kafka.producer'
            bean_regex: 'kafka\.producer:type=producer-metrics,client-id=.*'
            attribute:
              io-wait-time-ns-avg:
                metric_type: gauge
                alias: kafka.producer.io_wait
    
        # v0.9.0.x Consumers
        - include:
            domain: 'kafka.consumer'
            bean_regex: 'kafka\.consumer:type=consumer-fetch-manager-metrics,client-id=.*'
            attribute:
              bytes-consumed-rate:
                metric_type: gauge
                alias: kafka.consumer.bytes_in
        - include:
            domain: 'kafka.consumer'
            bean_regex: 'kafka\.consumer:type=consumer-fetch-manager-metrics,client-id=.*'
            attribute:
              records-consumed-rate:
                metric_type: gauge
                alias: kafka.consumer.messages_in
        #
        # Aggregate cluster stats
        #
        - include:
            domain: 'kafka.server'
            bean: 'kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec'
            attribute:
              Count:
                metric_type: rate
                alias: kafka.net.bytes_out.rate
        - include:
            domain: 'kafka.server'
            bean: 'kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec'
            attribute:
              Count:
                metric_type: rate
                alias: kafka.net.bytes_in.rate
        - include:
            domain: 'kafka.server'
            bean: 'kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec'
            attribute:
              Count:
                metric_type: rate
                alias: kafka.messages_in.rate
        - include:
            domain: 'kafka.server'
            bean: 'kafka.server:type=BrokerTopicMetrics,name=BytesRejectedPerSec'
            attribute:
              Count:
                metric_type: rate
                alias: kafka.net.bytes_rejected.rate
    
        #
        # Request timings
        #
        - include:
            domain: 'kafka.server'
            bean: 'kafka.server:type=BrokerTopicMetrics,name=FailedFetchRequestsPerSec'
            attribute:
              Count:
                metric_type: rate
                alias: kafka.request.fetch.failed.rate
        - include:
            domain: 'kafka.server'
            bean: 'kafka.server:type=BrokerTopicMetrics,name=FailedProduceRequestsPerSec'
            attribute:
              Count:
                metric_type: rate
                alias: kafka.request.produce.failed.rate
        - include:
            domain: 'kafka.network'
            bean: 'kafka.network:type=RequestMetrics,name=RequestsPerSec,request=Produce'
            attribute:
              Count:
                metric_type: rate
                alias: kafka.request.produce.rate
        - include:
            domain: 'kafka.network'
            bean: 'kafka.network:type=RequestMetrics,name=TotalTimeMs,request=Produce'
            attribute:
              Mean:
                metric_type: gauge
                alias: kafka.request.produce.time.avg
              99thPercentile:
                metric_type: gauge
                alias: kafka.request.produce.time.99percentile
        - include:
            domain: 'kafka.network'
            bean: 'kafka.network:type=RequestMetrics,name=RequestsPerSec,request=FetchConsumer'
            attribute:
              Count:
                metric_type: rate
                alias: kafka.request.fetch_consumer.rate
        - include:
            domain: 'kafka.network'
            bean: 'kafka.network:type=RequestMetrics,name=RequestsPerSec,request=FetchFollower'
            attribute:
              Count:
                metric_type: rate
                alias: kafka.request.fetch_follower.rate
        - include:
            domain: 'kafka.network'
            bean: 'kafka.network:type=RequestMetrics,name=TotalTimeMs,request=FetchConsumer'
            attribute:
              Mean:
                metric_type: gauge
                alias: kafka.request.fetch_consumer.time.avg
              99thPercentile:
                metric_type: gauge
                alias: kafka.request.fetch_consumer.time.99percentile
        - include:
            domain: 'kafka.network'
            bean: 'kafka.network:type=RequestMetrics,name=TotalTimeMs,request=FetchFollower'
            attribute:
              Mean:
                metric_type: gauge
                alias: kafka.request.fetch_follower.time.avg
              99thPercentile:
                metric_type: gauge
                alias: kafka.request.fetch_follower.time.99percentile
        - include:
            domain: 'kafka.network'
            bean: 'kafka.network:type=RequestMetrics,name=TotalTimeMs,request=UpdateMetadata'
            attribute:
              Mean:
                metric_type: gauge
                alias: kafka.request.update_metadata.time.avg
              99thPercentile:
                metric_type: gauge
                alias: kafka.request.update_metadata.time.99percentile
        - include:
            domain: 'kafka.network'
            bean: 'kafka.network:type=RequestMetrics,name=TotalTimeMs,request=Metadata'
            attribute:
              Mean:
                metric_type: gauge
                alias: kafka.request.metadata.time.avg
              99thPercentile:
                metric_type: gauge
                alias: kafka.request.metadata.time.99percentile
        - include:
            domain: 'kafka.network'
            bean: 'kafka.network:type=RequestMetrics,name=TotalTimeMs,request=Offsets'
            attribute:
              Mean:
                metric_type: gauge
                alias: kafka.request.offsets.time.avg
              99thPercentile:
                metric_type: gauge
                alias: kafka.request.offsets.time.99percentile
        - include:
            domain: 'kafka.server'
            bean: 'kafka.server:type=KafkaRequestHandlerPool,name=RequestHandlerAvgIdlePercent'
            attribute:
              Count:
                metric_type: rate
                alias: kafka.request.handler.avg.idle.pct.rate
        - include:
            domain: 'kafka.server'
            bean: 'kafka.server:type=ProducerRequestPurgatory,name=PurgatorySize'
            attribute:
              Value:
                metric_type: gauge
                alias: kafka.request.producer_request_purgatory.size
        - include:
            domain: 'kafka.server'
            bean: 'kafka.server:type=FetchRequestPurgatory,name=PurgatorySize'
            attribute:
              Value:
                metric_type: gauge
                alias: kafka.request.fetch_request_purgatory.size
    
        #
        # Replication stats
        #
        - include:
            domain: 'kafka.server'
            bean: 'kafka.server:type=ReplicaManager,name=UnderReplicatedPartitions'
            attribute:
              Value:
                metric_type: gauge
                alias: kafka.replication.under_replicated_partitions
        - include:
            domain: 'kafka.server'
            bean: 'kafka.server:type=ReplicaManager,name=IsrShrinksPerSec'
            attribute:
              Count:
                metric_type: rate
                alias: kafka.replication.isr_shrinks.rate
        - include:
            domain: 'kafka.server'
            bean: 'kafka.server:type=ReplicaManager,name=IsrExpandsPerSec'
            attribute:
              Count:
                metric_type: rate
                alias: kafka.replication.isr_expands.rate
        - include:
            domain: 'kafka.controller'
            bean: 'kafka.controller:type=ControllerStats,name=LeaderElectionRateAndTimeMs'
            attribute:
              Count:
                metric_type: rate
                alias: kafka.replication.leader_elections.rate
        - include:
            domain: 'kafka.controller'
            bean: 'kafka.controller:type=ControllerStats,name=UncleanLeaderElectionsPerSec'
            attribute:
              Count:
                metric_type: rate
                alias: kafka.replication.unclean_leader_elections.rate
        - include:
            domain: 'kafka.controller'
            bean: 'kafka.controller:type=KafkaController,name=OfflinePartitionsCount'
            attribute:
              Value:
                metric_type: gauge
                alias: kafka.replication.offline_partitions_count
        - include:
            domain: 'kafka.controller'
            bean: 'kafka.controller:type=KafkaController,name=ActiveControllerCount'
            attribute:
              Value:
                metric_type: gauge
                alias: kafka.replication.active_controller_count
        - include:
            domain: 'kafka.server'
            bean: 'kafka.server:type=ReplicaManager,name=PartitionCount'
            attribute:
              Value:
                metric_type: gauge
                alias: kafka.replication.partition_count
        - include:
            domain: 'kafka.server'
            bean: 'kafka.server:type=ReplicaManager,name=LeaderCount'
            attribute:
              Value:
                metric_type: gauge
                alias: kafka.replication.leader_count
        - include:
            domain: 'kafka.server'
            bean: 'kafka.server:type=ReplicaFetcherManager,name=MaxLag,clientId=Replica'
            attribute:
              Value:
                metric_type: gauge
                alias: kafka.replication.max_lag
    
        #
        # Log flush stats
        #
        - include:
            domain: 'kafka.log'
            bean: 'kafka.log:type=LogFlushStats,name=LogFlushRateAndTimeMs'
            attribute:
              Count:
                metric_type: rate
                alias: kafka.log.flush_rate.rate
    
    <% end -%>

    To integrate all of this node, you need to add in your fqdn.yaml the class in the format:

    ---
    classes:
     - profiles::datadogagent
    
    datadog_agent::api_key: [your key]
    

    After this runs, datadog-agent is installed and you can check it by using ps -ef | grep datadog-agent and also if you like to take a look and you should do that, you will find that there are two new files added to /etc/dd-agent/conf.d called kafka.yaml and zk.yaml.

    You are done, please feel free to login to the datadog portal and check you host.

    Cheers

  • Migrating Websphere MQ 7.0.1 to IBM MQ 8.0.0.x

    Hi,

    Since there aren’t that many things new that i have done regarding Kafka and opensource software, i want to share with you this, not to be lost on a distant gitlab repo my IBM MQ migration steps and script.

    In order to migrate an instance from an older version (without support ) to a newer version, even IBM recommended  that you leave it as is, just stop it and migrate the packages after that you can use a script which i will provide below.

    First step is to stop the running queue managers using the following command:

    for i in $(dspmq | cut -d'(' -f2 | cut -d')' -f1); do endmqm $i; done

    If it fails and connections still remain unclosed, you can try using endmqm -i instead of endmqm. After the feedback is returned that queue managers are stopped, just to be sure, you can extra check it using a simple ps -ef | grep amq. Please check the status of the MQ listener as well, doing it with ps -ef | grep mqlsr. After you made sure everything is stopped, you can contact your AIX admin to commit the new packages. This should be checked using lslpp -l | greep mqm, an example output looks like this:

    # lslpp -l | grep mqm
      mqm.base.runtime           8.0.0.6  COMMITTED  IBM WebSphere MQ Runtime for
      mqm.base.samples           8.0.0.6  COMMITTED  IBM WebSphere MQ Samples
      mqm.base.sdk               8.0.0.6  COMMITTED  IBM WebSphere MQ Base Kit for
      mqm.client.rte             8.0.0.6  COMMITTED  IBM WebSphere MQ Client for
      mqm.java.rte               8.0.0.6  COMMITTED  IBM WebSphere MQ Java Client,
      mqm.msg.en_US              8.0.0.6  COMMITTED  IBM WebSphere MQ Messages -
      mqm.server.rte             8.0.0.6  COMMITTED  IBM WebSphere MQ Server
      mqm.base.runtime           8.0.0.6  COMMITTED  IBM WebSphere MQ Runtime for
    

    You now know you can get forward with the migration, and in that purpose you can also use the script below, it’s not bullet proof but you can change it and modify it for you needs:

    #!/usr/bin/ksh
    echo ". /usr/mqm/bin/setmqenv -s" >> $HOME/.profile
    env | grep PATH | grep /usr/mqm/bin > /dev/null
    if [ "$?" -ne "0" ];then
    	export PATH=$PATH:/usr/mqm/bin
    fi
    SYSTEM_CRITICAL_VAL=`mqconfig | grep CRITICAL`
    if [ "$?" -eq "0" ];then
    	echo "System values for MQ installation are not OK, please check $SYSTEM_CRITICAL_VAL"
    	exit
    fi
    INSTALLATION_NAME=`dspmqver | grep InstName | cut -d':' -f2 | sed 's/ //g'`
    for i in $(dspmq | cut -d'(' -f2 | cut -d')' -f1);do
    	echo "Queue manager ${i} is being set to installation ${INSTALLATION_NAME}"
    	setmqm -m $i -n $INSTALLATION_NAME
    	#echo $?
    	if ([ "$?" -eq "0" ] || [ "$?" -eq "59" ]);then
    		echo "Starting instance $i"
    		strmqm $i
    		if [ "$?" -eq "0" ];then
    		sleep 10
    		echo "ALTER QMGR CHLAUTH(DISABLED) " | runmqsc $i
    		echo "ALTER AUTHINFO(SYSTEM.DEFAULT.AUTHINFO.IDPWOS) AUTHTYPE(IDPWOS) CHCKLOCL(NONE) CHCKCLNT (NONE)" | runmqsc $i
    		echo "REFRESH SECURITY TYPE(AUTHSERV)" | runmqsc $i
    		echo "ALTER CHL('SYSTEM.DEF.SVRCONN') CHLTYPE(SVRCONN) MAXMSGL(104857600) HBINT(300) MAXINST(999999999) MCAUSER('mqm') SSLCAUTH(OPTIONAL) "| runmqsc $i
    		endmqm -i $i
    		sleep 30
    		fi
    	fi
    done	

    Once this is done, you should have been migrated to MQ 8 and can connect to it using SYSTEM.DEF.SVRCONN which has MCAUSER setup to mqm. Please keep in mind that this a temporary solution and that  IBM MQ connection policies should be enabled for channel connect (you can also opt for a different user on the server connect channel and authorize just the objects that you connector needs)

    That should be all for today,

    Cheers!

  • Automigrate data contained in a topic to different Kafka brokers

    Hi,

    Let’s keep it short, you have a cluster composed of three Kafka brokers and you add another two. The details of doing that are pretty straight forward. If they are new, give them a unique broker.id and put the zoookeeper.connect string in order to know on which cluster to register. If they are already registered with a id, please keep in mind that you need to change it also in meta.properties from the logs.dir location.

    So, we have now the situation:

    Connecting to localhost:2181
    Welcome to ZooKeeper!
    JLine support is disabled
    
    WATCHER::
    
    WatchedEvent state:SyncConnected type:None path:null
    ls /brokers/ids
    [13, 14, 1003, 1002, 1001]

    And we need to migrate one topic which has a replication factor of two and also two partitions:

    Topic:test    PartitionCount:2    ReplicationFactor:2    Configs:
        Topic: test    Partition: 0    Leader: 1003    Replicas: 1003,1001    Isr: 1001,1003
        Topic: test    Partition: 1    Leader: 1001    Replicas: 1001,1002    Isr: 1002,1001

    In order to achieve this you will need to follow these steps:

    Steps to be taken:
    1 Create json with content:

    {"topics": [{"topic": "test"}],
      "version":1
      }

    2 Run the following command: ./kafka-reassign-partitions.sh –zookeeper localhost:2181 –topics-to-move-json-file topics-to-move.json –broker-list “13,14” –generate
    Output:

    Current partition replica assignment
    
    {"version":1,"partitions":[{"topic":"test","partition":0,"replicas":[1003,1001]},{"topic":"test","partition":1,"replicas":[1001,1002]}]}
    Proposed partition reassignment configuration
    
    {"version":1,"partitions":[{"topic":"test","partition":0,"replicas":[14,13]},{"topic":"test","partition":1,"replicas":[13,14]}]}

    3 Create file expand-cluster-reassignment.json with following content:

    {"version":1,"partitions":[{"topic":"test","partition":0,"replicas":[14,13]},{"topic":"test","partition":1,"replicas":[13,14]}]}

    4 Run following command for topic migration: ./kafka-reassign-partitions.sh –zookeeper localhost:2181 –reassignment-json-file expand-cluster-reassignment.json –execute
    Output:

    Current partition replica assignment
    
    {"version":1,"partitions":[{"topic":"test","partition":0,"replicas":[1003,1001]},{"topic":"test","partition":1,"replicas":[1001,1002]}]}
    
    Save this to use as the --reassignment-json-file option during rollback
    Successfully started reassignment of partitions.

    5 Verify the status of migration with command: ./kafka-reassign-partitions.sh –zookeeper localhost:2181 –reassignment-json-file expand-cluster-reassignment.json –verify

    Output:
    Status of partition reassignment: 
    Reassignment of partition [test,0] completed successfully
    Reassignment of partition [test,1] completed successfully
    

    Final status:

    ./kafka-topics.sh --zookeeper localhost:2181 --topic test --describe
    Topic:test	PartitionCount:2	ReplicationFactor:2	Configs:
    	Topic: test	Partition: 0	Leader: 13	Replicas: 13,14	Isr: 14,13
    	Topic: test	Partition: 1	Leader: 14	Replicas: 14,13	Isr: 13,14

    This is actual a follow up of the steps described here:

    https://kafka.apache.org/documentation.html#basic_ops_automigrate

    Cheers!