Category: kafka

  • Implementing logrotate for kafka

    Hi,

    Yes, we will need to implement also logrotate if we want to keep kafka under control. My solution was with puppet, as you probably expected. After i took a look on the documentation related to log4j properties i this i had a configuration figured out that should look like the following erb template

    # Unless required by applicable law or agreed to in writing, software
    # distributed under the License is distributed on an "AS IS" BASIS,
    # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    # See the License for the specific language governing permissions and
    # limitations under the License.
    
    log4j.rootLogger=INFO, stdout
    
    log4j.appender.stdout=org.apache.log4j.ConsoleAppender
    log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
    log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n
    
    log4j.appender.kafkaAppender=org.apache.log4j.RollingFileAppender
    #log4j.appender.kafkaAppender=org.apache.log4j.DailyRollingFileAppender
    #log4j.appender.kafkaAppender.DatePattern='.'yyyy-MM-dd-HH
    log4j.appender.kafkaAppender.File=${kafka.logs.dir}/server.log
    log4j.appender.kafkaAppender.MaxFileSize=<%= @filesize %>
    log4j.appender.kafkaAppender.MaxBackupIndex=<%= @backupindex %>
    log4j.appender.kafkaAppender.layout=org.apache.log4j.PatternLayout
    log4j.appender.kafkaAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
    
    log4j.appender.stateChangeAppender=org.apache.log4j.RollingFileAppender
    #log4j.appender.stateChangeAppender=org.apache.log4j.DailyRollingFileAppender
    #log4j.appender.stateChangeAppender.DatePattern='.'yyyy-MM-dd-HH
    log4j.appender.stateChangeAppender.File=${kafka.logs.dir}/state-change.log
    log4j.appender.stateChangeAppender.MaxFileSize=<%= @filesize %>
    log4j.appender.stateChangeAppender.MaxBackupIndex=<%= @backupindex %>
    log4j.appender.stateChangeAppender.layout=org.apache.log4j.PatternLayout
    log4j.appender.stateChangeAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
    
    log4j.appender.requestAppender=org.apache.log4j.RollingFileAppender
    #log4j.appender.requestAppender=org.apache.log4j.DailyRollingFileAppender
    #log4j.appender.requestAppender.DatePattern='.'yyyy-MM-dd-HH
    log4j.appender.requestAppender.File=${kafka.logs.dir}/kafka-request.log
    log4j.appender.requestAppender.MaxFileSize=<%= @filesize%>
    log4j.appender.requestAppender.MaxBackupIndex=<%= @backupindex %>
    log4j.appender.requestAppender.layout=org.apache.log4j.PatternLayout
    log4j.appender.requestAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
    
    log4j.appender.cleanerAppender=org.apache.log4j.RollingFileAppender
    #log4j.appender.cleanerAppender=org.apache.log4j.DailyRollingFileAppender
    #log4j.appender.cleanerAppender.DatePattern='.'yyyy-MM-dd-HH
    log4j.appender.cleanerAppender.File=${kafka.logs.dir}/log-cleaner.log
    log4j.appender.cleanerAppender.MaxFileSize=<%= @filesize %>
    log4j.appender.cleanerAppender.MaxBackupIndex=<%= @backupindex %>
    log4j.appender.cleanerAppender.layout=org.apache.log4j.PatternLayout
    log4j.appender.cleanerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
    
    log4j.appender.controlAppender=org.apache.log4j.RollingFileAppender
    #log4j.appender.controllerAppender=org.apache.log4j.DailyRollingFileAppender
    #log4j.appender.controllerAppender.DatePattern='.'yyyy-MM-dd-HH
    log4j.appender.controllerAppender.File=${kafka.logs.dir}/controller.log
    log4j.appender.controllerAppender.MaxFileSize=<%= @filesize %>
    log4j.appender.controllerAppender.MaxBackupIndex=<%= @backupindex %>
    log4j.appender.controllerAppender.layout=org.apache.log4j.PatternLayout
    log4j.appender.controllerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
    
    log4j.appender.authorizerAppender=org.apache.log4j.RollingFileAppender
    #log4j.appender.authorizerAppender=org.apache.log4j.DailyRollingFileAppender
    #log4j.appender.authorizerAppender.DatePattern='.'yyyy-MM-dd-HH
    log4j.appender.authorizerAppender.File=${kafka.logs.dir}/kafka-authorizer.log
    log4j.appender.authorizerAppender.MaxFileSize=<%= @filesize %>
    log4j.appender.authorizerAppender.MaxBackupIndex=<%= @backupindex %>
    log4j.appender.authorizerAppender.layout=org.apache.log4j.PatternLayout
    log4j.appender.authorizerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
    
    # Turn on all our debugging info
    #log4j.logger.kafka.producer.async.DefaultEventHandler=DEBUG, kafkaAppender
    #log4j.logger.kafka.client.ClientUtils=DEBUG, kafkaAppender
    #log4j.logger.kafka.perf=DEBUG, kafkaAppender
    #log4j.logger.kafka.perf.ProducerPerformance$ProducerThread=DEBUG, kafkaAppender
    #log4j.logger.org.I0Itec.zkclient.ZkClient=DEBUG
    log4j.logger.kafka=INFO, kafkaAppender
    
    log4j.logger.kafka.network.RequestChannel$=WARN, requestAppender
    log4j.additivity.kafka.network.RequestChannel$=false
    
    #log4j.logger.kafka.network.Processor=TRACE, requestAppender
    #log4j.logger.kafka.server.KafkaApis=TRACE, requestAppender
    #log4j.additivity.kafka.server.KafkaApis=false
    log4j.logger.kafka.request.logger=WARN, requestAppender
    log4j.additivity.kafka.request.logger=false
    
    log4j.logger.kafka.controller=TRACE, controllerAppender
    log4j.additivity.kafka.controller=false
    
    log4j.logger.kafka.log.LogCleaner=INFO, cleanerAppender
    log4j.additivity.kafka.log.LogCleaner=false
    
    log4j.logger.state.change.logger=TRACE, stateChangeAppender
    log4j.additivity.state.change.logger=false
    
    #Change this to debug to get the actual audit log for authorizer.
    log4j.logger.kafka.authorizer.logger=WARN, authorizerAppender
    log4j.additivity.kafka.authorizer.logger=false
    
    

    It just adds the two most important values required for a working logrotate system MaxFileSize and MaxBackupIndex. The first tells us what is the max size for a file and the second one the amount of files of each type to be kept.
    In order to use it you need to put it in a class that in my view is constructed as follows:

    class profiles::kafkalogrotate {
      
        $filesize = hiera('kafkalogrotate::size','50MB')
        $backupindex = hiera('kafkalogrotate::backupindex',10)
        validate_string($filesize)
        validate_integer($backupindex)
    	file {"adding_log4j":
    	path => "/opt/kafka/config/log4j.properties",
    	content => template("${module_name}/log4j.properties.erb"),
    	replace => true,
    	owner => 'kafka',
      group => 'kafka',
      mode => '0644',
    	}
    }
    

    I put the path as we generally install kafka in /opt/kafka but that can be changed. In our case restart of kafka brokers is not needed immediately so i just called the class in my kafka manifests.

    That is all, if i think of something additional to what i wrote, i will post it.

    Cheers!

  • Adding custom kafka check consumer.lag to datadog from JMX

    Hi,

    We had the necessity to add the consumer.lag check to datadog. Since we did not have access to the kafka.consumer domain which from what i believe it’s on the client side i decided to connect to the kafka node using JMX (so JConsole was the tool). At MBeans tab you will gladly see that there isn’t what you need by default for kafka.consumer.max_lag. The actual info you can grab is located under kafka.server and even more accurate under FetcherLagMetrics as shown.

    If you go all the way down on the hierarchy, you will get partition 0 details for one of the topics. I will use this example for base construct of regex tag:

    kafka.server:type=FetcherLagMetrics,name=ConsumerLag,clientId=ReplicaFetcherThread-0-1001,topic=__consumer_offsets,partition=0

    Using the same template you can directly construct the block that you need to add to kafka.yaml and it should look like this:

     - include:
            domain: 'kafka.server'
            bean_regex: 'kafka\.server:type=FetcherLagMetrics,name=ConsumerLag,clientId=.*'
            attribute:
              Value:
                metric_type: rate
                alias: kafka.consumer.lag

    After a agent restart you will gladly see that the number of metrics that are collected increases and you have a new check in the datadog web interface.
    After that, you can manually add in the dashboard a new graph that uses this check and you can also configure it on specific criteria, like host, topic, partition.

    Hope this is helpful,

    Cheers

  • Securing kafka-manager endpoints with iptables rules behind traefik

    Hi,

    One extra addition to my traefik balancing article from http://log-it.tech/2017/08/19/puppet-implementation-traefik-load-balancer-kafka-manager/ is that even so now we have the balancing capability we still need to restrict access to unsecured endpoint. I thought all the code to be deployable on all of the nodes. If this is taken in consideration, our issue with the firewall rules should be easily solved by using the puppetlabs module https://github.com/puppetlabs/puppetlabs-firewall and the code that i included looks like:

    $hosts_count = $kafka_hosts.count
      
      package {'iptables-persistent':
      name => 'iptables-persistent',
      ensure => installed,
      }
      resources { 'firewall':
        purge => true,
      }
      
      $kafka_hosts.each | Integer $index,String $host | {
        firewall {"10${index} adding tcp rule kafka manager node ${index}":
          proto => 'tcp',
          dport => 9000,
          source => "${host}",
          destination => "${fqdn}",
          action => 'accept',
          }
      } 
      firewall {"10${hosts_count} droping rest of kafka manager calls":
        proto => 'tcp',
          dport => 9000,
          destination => "${fqdn}",
          action => 'drop',
    }

    This should be add rules in order to allow traffic on port 9000 only between the kafka hosts that have kafka manager installed.
    Cheers

  • Fixing the kafka-manager puppet code

    Hi, we have a new code version for kafka-manager deploy. I will not give more details, just that now it also has a fact for the kafka-password and also some minor changes. Fact looks like this:

    require 'facter'
    Facter.add(:kafka_manager_pass) do
      setcode do
        file='/etc/kafka-manager/application.conf'
        if File.exist?(file)
            kafka_manager_pass = Facter::Core::Execution.exec("cat #{file} | grep basicAuthentication.password | cut -d'=' -f2 | tr -d '\"'")
        else
            kafka_manager_pass = "undef"
        end
      end
    end
    

    And also the new puppet code for the class:

    
    class profiles::kafkamanager {
    
       $zookeeper_connect = hiera('kafkamanager::zookeeperconnect')
       $password = hiera("kafkamanager::password:",'password')
    	package {'kafka-manager':
    		ensure => installed,
    	}
    	group { 'kafka-manager':
    		ensure => 'present',	
    	}
    	user { 'kafka-manager':
    		ensure => 'present',
    		groups => 'kafka-manager'
    	}
    	
    	Group['kafka-manager'] -> User['kafka-manager']
    	
    	file { '/usr/share/kafka-manager' :
        		ensure    => directory,
        		owner     => 'kafka-manager',
        		group      => 'kafka-manager',
        		require     => [ User['kafka-manager'], Group['kafka-manager'], ],
        		recurse    => true,
    	}
    	file { '/etc/kafka-manager/application.conf':
    		ensure => present,
    	}
    	file_line { 'config_zookeeper':
    		path   => '/etc/kafka-manager/application.conf',
    		line => "kafka-manager.zkhosts=\"${zookeeper_connect}\"",
    		match => 'kafka-manager.zkhosts=\"\"',
        replace => true,
    	} 
    	if ($::kafka_manager_pass == "undef") {
    	file_line { 'enable_pass_default':
         path => '/etc/kafka-manager/application.conf',
         match => "basicAuthentication.password=\"password\"",
         line => "basicAuthentication.password=\"${password}\"",
         replace => true,
           } 
    	}
    	elsif ($password != $::kafka_manager_pass) {
      file_line { 'enable_pass':
         path => '/etc/kafka-manager/application.conf',
         match => "basicAuthentication.password=\"${::kafka_manager_pass}\"",
         line => "basicAuthentication.password=\"${password}\"",
         replace => true,
           }
      exec {'restart_kafkamanager':
        command => '/usr/sbin/service kafka-manager restart',
        path => ['/bin','/sbin','/usr/bin','/usr/sbin'],
        refreshonly => true,
        subscribe => File_line['enable_pass'],
    } 
            }
        file_line { 'enable_auth':
            path => '/etc/kafka-manager/application.conf',
            match => 'basicAuthentication.enabled=false',
            line => 'basicAuthentication.enabled=true',
            replace => true,
            }
               
    	service { 'kafka-manager':
    		ensure => 'running',
    		enable => true,
    		require => [ File['/usr/share/kafka-manager'], File['/etc/kafka-manager/application.conf'] ],
    		subscribe => File["/etc/kafka-manager/application.conf"],
    	}
    

    Give it a try!

    Cheers!

  • Fixing the keystore/trustore distribution code

    Hi,

    There is an extra thing to be added to my article http://log-it.tech/2017/07/27/configure-kafka-truststore-keystore-using-puppet/

    As is the code copies the files at each puppet run to the other nodes which not contain the keystore generation code. And to fix this i used yet again another puppet module that should share data between the nodes, you can find it here https://github.com/WhatsARanjit/puppet-share_data

    As far as i saw it gets the job done, and in order to use it, you will need to include the following pieces of code to your repo. First of all, one piece of custom fact:

    
    require 'facter'
    
    filename = '/home/kafka/kafka.server.keystore.jks'
    Facter.add(:kafkakeystore) do
        setcode do
            if File.file?(filename)
                kafkakeystore = "enabled"
            else
            	kafkakeystore = "disabled"    
            end
        end
    end
    

    If the file is present, this means that the setup is probably activated. For the kafka manifests, if it’s not the node on which the keystore it’s generated we need to share the fact which we actually added in form:

        share_data { "${fqdn}":
          data => [ $::fqdn,$::kafkakeystore ],
          label => 'keystore',
        }
    

    If it’s the node that actually generates and copies the keystore then we will need to include in the class that actually does this kafka_security_gen following piece:

     $data = share_data::retrieve('keystore')
         $data.each |$item| {
       # $servers.each |String $server| {
       if (member($servers,$item[0]) and $item[1] == "disabled") {
            exec{"copy files to ${item[0]}":
                cwd => '/home/kafka',
                path   => '/usr/bin:/usr/sbin:/bin',
                command => "scp /home/kafka/kafka* kafka@${item[0]}:/home/kafka",
                user => 'kafka',
            }
            }
         }
    

    And this should assure you that puppet will not try to copy the keystore on nodes that already has it. Now come to think of it, if you need to refresh the store, it should be a proble, but i will think also for a fix for that and come back.

    Cheers!

  • Puppet implementation of traefik load balancer for kafka-manager

    Hi,

    It’s time to give the puppet implementation for the traefik small case. It is related to the following article http://log-it.tech/2017/08/08/balancing-requests-kafka-manager-using-traefik/

    Starting from that i tried to find a puppet module that can actually install the package more or less accurate and i found this https://forge.puppet.com/praekeltfoundation/traefik

    Now, for the service install it works, but for defining of the traefik.toml and rules. toml it was a real pain. First of all one of the function call in the module does not work, and after fixing it, it does’t really align the toml file as required, so i decided to do this in a more simple way. I put the traefik.toml in a file since it doesn’t really contain anything dynamically related to our environment. It looks like:

    accessLogsFile = "/var/log/traefik/access.log"
    traefikLogsFile = "/var/log/traefik/traefik.log"
    logLevel = "DEBUG"
    defaultEntryPoints = ["https"]
    [entryPoints]
      [entryPoints.http]
      address = ":80"
        [entryPoints.http.redirect]
          entryPoint = "https"
      [entryPoints.https]
      address = ":443"
        [entryPoints.https.tls]
          [[entryPoints.https.tls.certificates]]
          CertFile = "/etc/traefik/traefik.crt"
          KeyFile = "/etc/traefik/traefik.key"
    
    
    [web]
    address = ":8080"
    
    [file]
    filename = "/etc/traefik/rules.toml"
    watch = true
    

    Now, the config files are stored in /etc/traefik, and i made the convention to store also the self generated certificate for HTTPS also in this location. Sure you can set it dynamically, but for a small load balance and a cluster of a few nodes this should not be a problem.
    Ok, as you can see we have a different rules.toml file which in our case it will be created by erb template, and the source is:

    [backends]
      [backends.kafka-manager]
        [backends.kafka-manager.LoadBalancer]
          method = "drr"
         <% @kafka_hosts_hash.each do |value, index| %>
        [backends.kafka-manager.servers.server<%= index %>]
        url = "http://<%= value %>:9000"
        weight = 1
        <% end %>
    [frontends]
      [frontends.kafka-manager]
      entrypoints = ["http","https"]
      backend = "kafka-manager"
      passHostHeader = true
      priority = 10
    

    This is pretty straightforward and it will be linked with the last piece of the puzzle, which is the puppet class and it actually looks like this:

    class profiles::traefikinstall {
      $version = hiera("profiles::traefik::version",'1.3.5')
    
      class {'traefik': 
        version           => $version,
      }
      exec {'generate_cert':
      command => "openssl req -newkey rsa:4096 -nodes -sha512 -x509 -days 3650 -nodes -subj \"/CN=${fqdn}/OU=traefik/O=log-it.tech/L=Bucharest/S=Romania/C=RO\" -out /etc/traefik/traefik.crt -keyout /etc/traefik/traefik.key",
      path => ['/usr/bin','/usr/sbin','/bin','/sbin'],
      onlyif => "test ! -f /etc/traefik/traefik.crt"
      } ->
      file {"/etc/traefik/traefik.toml":
        source => 'puppet:///modules/profiles/traefik.toml',
        mode => '0644',
        replace => false,
        notify => Service['traefik'],
      }
      $kafka_hosts = query_nodes("role='kafka'").sort #here it should be any role or fact that indicates that it should have kafka-manager installed
      $kafka_hosts_hash = $kafka_hosts.map | $index, $value| { [$value,$index+1] }.hash
    
      file {"/etc/traefik/rules.toml":
        content => template("${module_name}/rules.toml.erb"),
        mode => '0644',
        replace => false,
      }
    }
    

    And this is all the code you need to deploy a traefik instance that it’s “secured” via HTTPS and has load balancing between all kafka-manager instances. Now it’s true that you can secure it by adding iptables rules that restrict traffic on port 9000 (the default kafka manager port) just from the hosts in the cluster, but i will come back also with that part in the future if it will be done.

    Cheers!

  • Balancing requests to kafka-manager using traefik

    Hi,

    Just wanted to share with you a quite small and simple config to balance the traffic between three machines that have kafka-manager installed. For this i used traefik since it was new to me and i wanted to gain a little bit of experience with it.

    It’s an interesting solution but it took me a while to get the pieces working. I will post here my config and will explain the needed part to get it working.

    logLevel = "DEBUG"
    defaultEntryPoints = ["http"]
    [entryPoints]
      [entryPoints.http]
      address = ":80"
    [web]
    address = ":8080"
    
    [file]
    watch = true
    
    [backends]
      [backends.backend1]
        [backends.backend1.LoadBalancer]
          method = "drr"
        [backends.backend1.servers.server1]
        url = "http://[kafka1.hostname]:9000"
        weight = 1
        [backends.backend1.servers.server2]
        url = "http://[kafka2.hostname]:9000"
        weight = 2
        [backends.backend1.servers.server3]
        url = "http://[kafka3.hostname]:9000"
        weight = 1
    [frontends]
      [frontends.frontend1]
      entrypoint = ["http"]
      backend = "backend1"
      passHostHeader = true
      priority = 10
    

    This is very basic as you can see but it took me a while to understand that you need the file block with watch = true in order for the daemon to see and parse the rules that are listed. You can also have a separate rules file and for that it would be best to consult the traefik documentation.

    I will have to do now the redirect from HTTP to HTTPS in order to secure the connection to frontend. The idea of traefik is that it works like entrypoint -> frontend -> backend and as far as i saw this will be done on the entrypoint level.

    Two extra additions is that you need a default entry point in order for your frontend not to be ignored and also put it on log level DEBUG because otherwise it won’t log much.

    Keep you posted on the progress and also you can find traefik here https://docs.traefik.io

    Cheers!

  • Configure kafka truststore and keystore using puppet

    Hi,

    Since the last article was about the template needed to generate the truststore and keystore, now it’s time to give you the rest of the fragments for the deployment with puppet.
    So, We will have the kafka_security gen class that was called on the last post and it should look like this in the final form:

    class profiles::kafka_security_gen {
        $pass = hiera('profiles::kafka_security_gen::password','password')
        $keystorepass = hiera('profiles::kafka_security_gen::keystorepass','password')
        
        $cluster_servers = query_nodes("role='kafka'")
        $servers = $cluster_servers.delete("${::fqdn}")
        
        file {'/home/kafka/security.sh':
            owner => kafka,
            group => kafka,
            ensure => file,
            mode => '0755',
            content => template("${module_name}/security.sh.erb"),
            replace => 'no'
    } ->
        exec {'generate_keystore':
        path   => '/usr/bin:/usr/sbin:/bin',
        command => '/home/kafka/security.sh',
        user => 'kafka',
        unless => 'test -f /home/kafka/kafka.server.keystore.jks'
        }
        $servers.each |String $server| {
            exec{"copy files to ${server}":
                cwd => '/home/kafka',
                path   => '/usr/bin:/usr/sbin:/bin',
                command => "scp /home/kafka/kafka* kafka@${server}:/home/kafka",
                user => 'kafka',
            }
            
         }
        ssh_keygen {'kafka':
        home => '/home/kafka/',
        type => rsa,
      }
      @@file {"/home/kafka/.ssh/authorized_keys":
        ensure => present,
        mode => '0600',
        owner => 'kafka',
        group => 'kafka',
        content => "${::sharedkey}",
        tag => "${::instance_tag}",
      }
      
    }

    The only thing i did not manage to fix is the copying of the stores, it will copy them each time, but i will fix that in the near future with some custom facts.
    Ok, now let’s see the integration part with kafka code. Unfortunately, i can not provide you all the code but as far as i can provide you is:

        
    $kafkadirs = ['/home/kafka/', '/home/kafka/.ssh/']
    file { $kafkadirs: 
        ensure => directory,
        mode => '0700',
        owner => 'kafka',
        group => 'kafka'
        }
        $security_enabled = hiera('profiles::kafka::security',false)
    if ($security_enabled == false) {
      $broker_config = {
        ....
      }
      } else {
        # if hostname starts with kafka0 them it's the CA auth (in our case kafka servers are on form kafka0....)
        if ($hostname == 'kafka0') {
        contain 'profiles::kafka_security_gen'
        Class['profiles::kafka_security_gen']
        }
        #this shares the public key on a couple of servers wich are isolated by a tag
        File <<| tag == "${::instance_tag}" |>>
        #path for keystore/truststore
        $keystore_location = '/home/kafka/kafka.server.keystore.jks'
        $truststore_location = '/home/kafka/kafka.server.truststore.jks'
        #broker config
        $broker_config = {
        ....
        'security.inter.broker.protocol' => 'SSL',
        'ssl.client.auth'               => 'none',
        'ssl.enabled.protocols'         => ['TLSv1.2', 'TLSv1.1', 'TLSv1'],
        'ssl.key.password'              => hiera('profiles::kafka_security_gen::password','password'),
        'ssl.keystore.location'         => $keystore_location,
        'ssl.keystore.password'         => hiera('profiles::kafka_security_gen::keystorepass','password'),
        'ssl.keystore.type'             => 'JKS',
        'ssl.protocol'                  => 'TLS',
        'ssl.truststore.location'       => $truststore_location,
        'ssl.truststore.password'       => hiera('profiles::kafka_security_gen::keystorepass','password'),
        'ssl.truststore.type'           => 'JKS',
        'listeners'                     => "PLAINTEXT://${::fqdn}:9092,SSL://${::fqdn}:9093", #both listeners are enabled
        'advertised.listeners'          => "PLAINTEXT://${::fqdn}:9092,SSL://${::fqdn}:9093",
        }
      }
    class { '::kafka::broker':
        config    => $broker_config,
        opts      => hiera('profiles::kafka::jolokia', '-javaagent:/usr/share/java/jolokia-jvm-agent.jar'),
        heap_opts => "-Xmx${jvm_heap_size}M -Xms${jvm_heap_size}M",
      }
    

    This can be easily activated by putting in the hierarchy the profiles::kafka::security option with true and it should do all the work. Once it’s done it can be tested using openssl s_client -debug -connect localhost:9093 -tls1

    The link to the generate article for the security script template is http://log-it.tech/2017/07/23/generate-kafka-keytrust-store-tls-activation/ and also a quite interesting guideline fron Confluence http://docs.confluent.io/current/kafka/ssl.html#configuring-kafka-brokers

    That’s all folks. 🙂

    Cheers

  • Generate Kafka key/trust store for TLS activation

    Morning folks,

    Just wanted to share with you also the code for script required on generation of keystore and truststore for Kafka. It is not much but it might be helpful someday.

    So, the erb script should look like this, and i will show you as well how you can integrate this also with puppet:

    #!/bin/bash
    HOST=<%= @fqdn %>
    PASSWORD=<%= @pass %>
    KEYSTOREPASS=<%= @keystorepass %>
    VALIDITY=365
    
    keytool -keystore kafka.server.keystore.jks -alias $HOST -validity $VALIDITY -genkey -dname "CN=${HOST}, OU=Test, O=Test, L=Bucharest S=Romania C=RO" -storepass $KEYSTOREPASS -keypass $KEYSTOREPASS
    openssl req -new -x509 -keyout ca-key -out ca-cert -days $VALIDITY -subj "/CN=${HOST}/OU=Tests/O=Test/L=Bucharest/S=Romania/C=RO" -passout pass:$PASSWORD
    keytool -keystore kafka.server.truststore.jks -alias CARoot -import -file ca-cert -storepass $KEYSTOREPASS -noprompt
    keytool -keystore kafka.server.keystore.jks -alias CARoot -import -file ca-cert -storepass $KEYSTOREPASS -noprompt
    keytool -keystore kafka.client.keystore.jks -alias $HOST -validity $VALIDITY -genkey -dname "CN=${HOST}, OU=Test, O=Test, L=Bucharest S=Romania C=RO" -storepass $KEYSTOREPASS -keypass $KEYSTOREPASS
    keytool -keystore kafka.client.keystore.jks -alias CARoot -import -file ca-cert -storepass $KEYSTOREPASS -noprompt
    keytool -keystore kafka.client.truststore.jks -alias CARoot -import -file ca-cert -storepass $KEYSTOREPASS -noprompt
    
    <% @servers.each do |server| -%>
    # <%= server %>
    keytool -keystore kafka.server.keystore.jks -alias <%= server %> -validity $VALIDITY -genkey -dname "CN=<%= server %>, OU=Test, O=Test, L=Bucharest S=Romania C=RO" -storepass $KEYSTOREPASS -keypass $KEYSTOREPASS
    keytool -keystore kafka.server.keystore.jks -alias <%= server %> -certreq -file cert-file-<%= server %>.host -storepass $KEYSTOREPASS
    openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file-<%= server %>.host -out cert-signed-<%= server %>.host -days $VALIDITY -CAcreateserial -passin pass:$PASSWORD
    keytool -keystore kafka.server.keystore.jks -alias <%= server %> -import -file cert-signed-<%= server %>.host -storepass $KEYSTOREPASS -noprompt
    
    keytool -keystore kafka.client.keystore.jks -alias <%= server %> -validity $VALIDITY -genkey -dname "CN=<%= server %>, OU=Test, O=Test, L=Bucharest S=Romania C=RO" -storepass $KEYSTOREPASS -keypass $KEYSTOREPASS
    keytool -keystore kafka.client.keystore.jks -alias <%= server %> -certreq -file cert-file-<%= server %>.client -storepass $KEYSTOREPASS
    openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file-<%= server %>.client -out cert-signed-<%= server %>.client -days $VALIDITY -CAcreateserial -passin pass:$PASSWORD
    keytool -keystore kafka.client.keystore.jks -alias <%= server %> -import -file cert-signed-<%= server %>.client -storepass $KEYSTOREPASS -noprompt
    
    <% end -%>

    As you probably saw already, there are some variables that should be taken from the puppet file, like the list of servers and some passwords for the PEM key and truststore/keystore password.

    Now let’s take a look at the puppet file that should generate this:

    class profiles::kafka_security_gen {
        $pass = hiera('profiles::kafka_security_gen::password','password')
        $keystorepass = hiera('profiles::kafka_security_gen::keystorepass','password')
        
        $cluster_servers = query_nodes("role='kafka'")
        $servers = $cluster_servers.delete("${::fqdn}")
        
        file {'/tmp/security.sh':
            ensure => file,
            mode => '0755',
            content => template("${module_name}/security.sh.erb"),
    }
    }

    This should suffice so far, the idea is that you need to create the truststore and keystore separately for the host that generates them and after that generate and import the CA root certificate, once this is done there are straight forward steps to be taken in order to build the stores.
    I am currently working on a solution to automatic distribute and integrate this on all machines, keep you posted.

    Cheers

  • Install Kafka Manager with Puppet

    Hi,

    I will continue on this line with the install of a management tool called Kafka manager using same old Puppet.

    The main source of the project is here:

    https://github.com/yahoo/kafka-manager

    If you scroll down at packaging you will see that you have the possibility to create a .deb package for Ubuntu or Debian setup or a rpm. To do this just clone the project, go into it and run the command

    sbt debian:packageBin
    

    It will take some time but eventually it will create the required package. Now, since you have the package, you can easily ask a friend to upload it to your pipeline apt repo (this is what i did, i don’t have yet the experience on how to do that) and install and configure it via puppet. In order to do that, i “wrote” the following manifest:

    kafkamanager.pp

    class profiles::kafkamanager {
    	
    	$zookeeper_connect = hiera('kafkamanager::zookeeperconnect')
    	package {'kafka-manager':
    		ensure => installed,
    	}
    	group { 'kafka-manager':
    		ensure => 'present',	
    	}
    	user { 'kafka-manager':
    		ensure => 'present',
    		groups => 'kafka-manager'
    	}
    	
    	Group['kafka-manager'] -> User['kafka-manager']
    	
    	file { '/usr/share/kafka-manager' :
        		ensure    => directory,
        		owner     => 'kafka-manager',
        		group      => 'kafka-manager',
        		require     => [ User['kafka-manager'], Group['kafka-manager'], ],
        		recurse    => true,
    	}
    
    	file_line { 'config_zookeeper':
    	    path => '/etc/kafka-manager/application.conf',
    	    match => 'kafka-manager.zkhosts=\"kafka-manager-zookeeper:2181\"',
    	    line => "kafka-manager.zkhosts=\"${zookeeper_connect}\"",
    	    replace => true,
    	}
        
        file_line { 'enable_auth':
            path => '/etc/kafka-manager/application.conf',
            match => 'basicAuthentication.enabled=false',
            line => 'basicAuthentication.enabled=true',
            replace => true,
            }
    	service { 'kafka-manager':
    		ensure => 'running',
    		enable => true,
    		require => [ File['/usr/share/kafka-manager'], File_line['config_zookeeper'] ],	
    	}
    }

    This should be all, you are now free to login with the user and pass that you find in the application.conf file and start mapping Kafka clusters.

    Cheers!