Category: kafka

  • SASL config issue on latest Kafka versions

    Hello,

    Today I want to share with you a problem that we needed to fix when we decided to activate SASL.

    Normally, the steps are pretty straight forward and you can use Confluent doku or the general Apache Kafka.

    The main catch is that if you have a certain property in your config file, the following error will appear in a loop:

    [2021-01-11 09:17:28,052] ERROR Processor [0..n] closed connection from null (kafka.network.Processor)
    java.io.IOException: Channel could not be created for socket java.nio.channels.SocketChannel[closed]
    	at org.apache.kafka.common.network.Selector.buildAndAttachKafkaChannel(Selector.java:348)
    	at org.apache.kafka.common.network.Selector.registerChannel(Selector.java:329)
    	at org.apache.kafka.common.network.Selector.register(Selector.java:311)
    	at kafka.network.Processor.configureNewConnections(SocketServer.scala:1024)
    	at kafka.network.Processor.run(SocketServer.scala:757)
    	at java.base/java.lang.Thread.run(Thread.java:834)
    Caused by: org.apache.kafka.common.KafkaException: java.lang.NullPointerException
    	at org.apache.kafka.common.network.SaslChannelBuilder.buildChannel(SaslChannelBuilder.java:228)
    	at org.apache.kafka.common.network.Selector.buildAndAttachKafkaChannel(Selector.java:338)
    	... 5 more
    Caused by: java.lang.NullPointerException
    	at java.base/java.util.Objects.requireNonNull(Objects.java:221)
    	at org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder.fromOldPrincipalBuilder(DefaultKafkaPrincipalBuilder.java:77)
    	at org.apache.kafka.common.network.ChannelBuilders.createPrincipalBuilder(ChannelBuilders.java:216)
    	at org.apache.kafka.common.security.authenticator.SaslServerAuthenticator.<init>(SaslServerAuthenticator.java:183)
    	at org.apache.kafka.common.network.SaslChannelBuilder.buildServerAuthenticator(SaslChannelBuilder.java:262)
    	at org.apache.kafka.common.network.SaslChannelBuilder.lambda$buildChannel$0(SaslChannelBuilder.java:207)
    	at org.apache.kafka.common.network.KafkaChannel.<init>(KafkaChannel.java:143)
    	at org.apache.kafka.common.network.SaslChannelBuilder.buildChannel(SaslChannelBuilder.java:224)
    	... 6 more
    

    The cause for this is property:

    principal.builder.class=org.apache.kafka.common.security.auth.DefaultPrincipalBuilder

    Normally, for the latest versions of Apache Kafka like 2.x.x, it should not be set at all so that when the process starts it will be like:

    principal.builder.class=null
  • Automatic increase of Kafka LVM on GCP

    I wrote an article for my company that was published on Medium regarding the topic in the subject. Please see the link

    https://medium.com/metrosystemsro/new-ground-automatic-increase-of-kafka-lvm-on-gcp-311633b0816c

    Thanks

  • Kafka_consumer.yaml (python style) and more

    Hi,

    As a followup to the article i posted earlier ( https://log-it.tech/2019/03/15/get-the-info-you-need-from-consumer-group-python-style/ ) , you can use that info to put in into kafka_consumer.yaml for Datadog integration.

    It’s not elegant by any means, but it works. As an advise, please don’t over complicate thinks more than they need.

    In the last example i figured i wanted to create a list of GroupInfo objects for each line that was returned from consumer group script. Bad idea as you shall see below

    So, in addition to what i wrote in the last article, now it’s not just printing the dictionary but order it, by partition.

    def constructgroupdict():
     groupagregate = {}
     group_list = getgroups()
     for group in group_list:
        groupagregate[group] = getgroupinfo(group)
     
     for v in groupagregate.values():
        v.sort(key = lambda re: int(re.partition))
     
     return groupagregate
    
    def printgroupdict():
     groupdict = constructgroupdict()
     infile = open('/etc/datadog-agent/conf.d/kafka_consumer.d/kafka_consumer.yaml.template','a')
     for k,v in groupdict.items():
        infile.write('      '+k+':\n')
        topics = []
        testdict = {}
        for re in v:
            if re.topic not in topics:
               topics.append(re.topic)
        for x in topics:
            partitions = []
            for re in v:
               if (re.topic == x):
                  partitions.append(re.partition)
            testdict[x] = partitions
        for gr,partlst in testdict.items():
            infile.write('        '+gr+': ['+', '.join(partlst)+']\n')
     infile.close()
     os.rename('/etc/datadog-agent/conf.d/kafka_consumer.d/kafka_consumer.yaml.template','/etc/datadog-agent/conf.d/kafka_consumer.d/kafka_consumer.yaml')
      
    printgroupdict()
    

    And after that, it’s quite hard to get only the unique value for the topic name.

    The logic i chose to grab all the data per consumer group is related to the fact that querying the cluster takes a very long time, so if i wanted to grab another set of data filtered by topic, i would have been very time costly.

    In the way that is written now, there are a lot of for loop, that should become challenging in care there are too many records to process. Fortunately, this should not be the case for consumer groups in a normal case.

    The easiest way to integrate the info in kafka_consumer.yaml, in our case is to create a template called kafka_consumer.yaml.template

    init_config:
      # Customize the ZooKeeper connection timeout here
      # zk_timeout: 5
      # Customize the Kafka connection timeout here
      # kafka_timeout: 5
      # Customize max number of retries per failed query to Kafka
      # kafka_retries: 3
      # Customize the number of seconds that must elapse between running this check.
      # When checking Kafka offsets stored in Zookeeper, a single run of this check
      # must stat zookeeper more than the number of consumers * topic_partitions
      # that you're monitoring. If that number is greater than 100, it's recommended
      # to increase this value to avoid hitting zookeeper too hard.
      # https://docs.datadoghq.com/agent/faq/how-do-i-change-the-frequency-of-an-agent-check/
      # min_collection_interval: 600
      #
      # Please note that to avoid blindly collecting offsets and lag for an
      # unbounded number of partitions (as could be the case after introducing
      # the self discovery of consumer groups, topics and partitions) the check
      # will collect at metrics for at most 200 partitions.
    
    
    instances:
      # In a production environment, it's often useful to specify multiple
      # Kafka / Zookeper nodes for a single check instance. This way you
      # only generate a single check process, but if one host goes down,
      # KafkaClient / KazooClient will try contacting the next host.
      # Details: https://github.com/DataDog/dd-agent/issues/2943
      #
      # If you wish to only collect consumer offsets from kafka, because
      # you're using the new style consumers, you can comment out all
      # zk_* configuration elements below.
      # Please note that unlisted consumer groups are not supported at
      # the moment when zookeeper consumer offset collection is disabled.
      - kafka_connect_str:
          - localhost:9092
        zk_connect_str:
          - localhost:2181
        # zk_iteration_ival: 1  # how many seconds between ZK consumer offset
                                # collections. If kafka consumer offsets disabled
                                # this has no effect.
        # zk_prefix: /0.8
    
        # SSL Configuration
    
        # ssl_cafile: /path/to/pem/file
        # security_protocol: PLAINTEXT
        # ssl_check_hostname: True
        # ssl_certfile: /path/to/pem/file
        # ssl_keyfile: /path/to/key/file
        # ssl_password: password1
    
        # kafka_consumer_offsets: false
        consumer_groups:
    

    It’s true that i keep only one string for connectivity on Kafka and Zookeeper, and that things are a little bit more complicated once SSL is configured (but this is not our case, yet).

      - kafka_connect_str:
          - localhost:9092
        zk_connect_str:
          - localhost:2181

    And append the info at the bottom of it after which it is renamed. Who is putting that template back? Easy, that would be puppet.

    It works, it has been tested. One last thing that i wanted to warn you about.

    There is a limit of metrics that can be uploaded per machine, and that is 350. Please be aware and think very serious if you want to activate it.

    Than would be all for today.

    Cheers

  • Get the info you need from consumer group (python style)

    Hi,

    For some of you might be of help. If it’s rubbish, i truly apologize, please leave a comment with improvements 🙂

    import subprocess
    import socket
    
    fqdn = socket.getfqdn()
    
    class GroupInfo:
     def __init__(self, line):
         self.topic = line[0]
         self.partition = line[1]
         self.current_offset = line[2]
         self.logend_offset = line[3]
         self.lag = line[4]
         self.consumer_id = line[5]
         self.host = line[6]
         self.client_id = line[7]
     def __str__(self):
        return self.topic+" "+self.partition+" "+self.current_offset+" "+self.logend_offset+" "+self.lag+" "+self.consumer_id
    
    def getgroups():
    
     cmd = ['/opt/kafka/bin/kafka-consumer-groups.sh --bootstrap-server '+fqdn+':9092 --list']
     result = subprocess.check_output(cmd, shell=True).splitlines()
     group_list = []
     for r in result:
        try:
           rstr = r.decode('utf-8')
        except:
           print('Result can not be converted to utf-8')
        group_list.append(rstr)
    
     return group_list
    
    def getgroupinfo(groupid):
     
     cmd = ('/opt/kafka/bin/kafka-consumer-groups.sh --bootstrap-server '+fqdn+':9092 --group '+groupid+' --describe')
     process = subprocess.Popen(cmd, stdout=subprocess.PIPE, shell=True)
     result = subprocess.check_output(('grep -v TOPIC'),stdin = process.stdout, shell=True).splitlines()
     process.wait()
     group_info_list = []
     for r in result:
         try:
           rstr = r.decode('utf-8')
         except:
           print('Result can not be converted to utf-8')
           print(rstr.split())
         if len(rstr.split()) == 0:
            pass
         else:
            group_info = GroupInfo(rstr.split())
            group_info_list.append(group_info)
     
     return group_info_list
    
    def main():
     groupagregate = {}
     group_list = getgroups()
     for group in group_list:
        groupagregate[group] = getgroupinfo(group)
     
     for k, v in groupagregate.items():
        print(k)
        for re in v:
          print(re.__str__())
    
    main()

    I will not explain it. It should be self explanatory.

    Cheers

  • Kafka consumer group info retrieval using Python

    Hi,

    I’ve been playing with kafka-python module to grab the info i need in order to reconfigure Datadog integration.

    Unfortunately, there is a catch also on this method. And i will show you below.

    Here is a little bit of not so elegant code.

    from kafka import BrokerConnection
    from kafka.protocol.admin import *
    import socket
    
    fqdn = socket.getfqdn()
    bc = BrokerConnection(fqdn,9092,socket.AF_INET)
    try:
      bc.connect_blocking()
    except Exception as e:
      print(e)
    if bc.connected():
       print("Connection to", fqdn, " established")
    
    def getgroup():
     list_groups_request = ListGroupsRequest_v1()
    
     future0 = bc.send(list_groups_request)
     while not future0.is_done:
         for resp, f in bc.recv():
             f.success(resp)
     group_ids = ()
     for group in future0.value.groups:
         group_ids += (group[0],)
    
     print(group_ids)
         
     description = DescribeGroupsRequest_v1(group_ids)
     future1 = bc.send(description)
     while not future1.is_done:
        for resp, f in bc.recv():
            f.success(resp)
    
     for groupid in future1.value.groups:
         print('For group ',groupid[1],':\n')
         for meta in groupid[5]:
             print(meta[0],meta[2],sep="\n")
             print(meta[3])
     if future1.is_done:
        print("Group query is done")
    
    getgroup()
    

    As you will see, print(meta[3]) will return a very ugly binary data with topic names in it, that is not converted if you try with meta[3].decode(‘utf-8’)

    I hope i can find a way to decrypt it.

    Cheers

  • Fact for kafka_consumer hash…or kind of

    Hi,

    There is a late requirement that we activate the kafka_consumer functionality of Datadog.

    Unfortunately this is a challenge if you don’t have a fixed number of consumer groups and topics (on one client we had a couple of hundreds consumer groups)

    Here is how it should look in the example file

      #  consumer_groups:
      #    <CONSUMER_NAME_1>:
      #      <TOPIC_NAME_1>: [0, 1, 4, 12]
      #    <CONSUMER_NAME_2>:
      #      <TOPIC_NAME_2>:
      #    <CONSUMER_NAME_3>

    So, if you want to grab the data using the old kafka-consumer-groups.sh, you will have to do this in one iteration. I tried it this way, but even if i am done, should not be a option for larger clusters

    require 'facter'
    Facter.add('kafka_consumers_config') do
      setcode do
          kafka_consumers_cmd = '/opt/kafka/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list'
          kafka_consumers_result = Facter::Core::Execution.exec(kafka_consumers_cmd)
          kafka_consumers_result.to_s.split(/\n/)
          group_hash = {}
          kafka_consumers_result.each_line do |group|
            groupid = group.strip 
            kafka_consumer_topic_list_cmd="/opt/kafka/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group #{groupid} --describe | sort | grep -v TOPIC  | awk {\'print $1\'} | uniq"
            kafka_consumer_topic_list_result = Facter::Core::Execution.exec(kafka_consumer_topic_list_cmd)
            kafka_consumer_topic_list_result.split(/\n/).reject { |c| c.empty? }
            topic_hash = {}
            kafka_consumer_topic_list_result.each_line do |topic|
               topicid = topic.strip
               kafka_consumer_topic_partition_cmd="/opt/kafka/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group #{groupid} --describe | grep -v TOPIC | grep #{topicid} | awk {\'print $2\'} | sort"
               kafka_consumer_topic_partition_result=Facter::Core::Execution.exec(kafka_consumer_topic_partition_cmd)
               kafka_consumer_topic_partition_result.gsub("\n", ' ').squeeze(' ')
               topic_hash[topic] = kafka_consumer_topic_partition_result
            end
          group_hash[group] = topic_hash    
          end
        group_hash
      end
    end
    
    

    This is posted only as a snapshot and maybe as a source of inspiration. For an actual working version, it should be done using Java or Scala in order to leverage the power of libraries (from what i know Python/Go and other programming languages have only libraries on consumer/producer part)

    If i will pursue the task of rewriting this using only one interrogation loop or in Java/Scala, you will see it.

  • Small go code example for zookeeper resource editing

    Hi,

    We have the task of “service restart coordination” for our Apache Kafka cluster. It’s still a work in progress but if you want to use the zookeeper for some status verification and update, something like this will work as an example.

    package main
    
    import (
    	"fmt"
    	"io"
    	"launchpad.net/gozk"
    	"os"
    	"strings"
    	"sync"
    	"time"
    )
    
    const (
    	SERVICEPATH = "/servicerestart"
    )
    
    var wg sync.WaitGroup
    
    func main() {
    	conn := "zk1:2181,zk2:2181,zk3:2181"
    	connSlice := strings.Split(string(conn), ",")
    	var flag bool
    	args := os.Args
    	if len(args) != 2 {
    		io.WriteString(os.Stdout, "Argument is needed for the script\n")
    		os.Exit(1)
    	} else {
    		switch args[1] {
    		case "hang":
    			flag = false
    		case "nohang":
    			flag = true		
    		default:
    			io.WriteString(os.Stdout, "Command unrecognized\n")	
    	}
    		
    	}
    	wg.Add(1)
    	go ModifyZooStat(connSlice, flag)
    	wg.Wait()
    }
    func ModifyZooStat(strconn []string, flag bool) {
    	var zooReach string
    	for _, zoohost := range strconn {
    		zk, _, err := zookeeper.Dial(zoohost, 5e9)
    		if err != nil {
    			fmt.Println("Couldn't connect to " + zoohost)
    			continue
    		} else {
    			zooReach = zoohost
    			zk.Close()
    			break
    		}
    	}
    	zkf, sessionf, _ := zookeeper.Dial(zooReach, 5e9)
    defer zkf.Close()
    	event := <-sessionf
    	if event.State != zookeeper.STATE_CONNECTED {
    		fmt.Println("Couldn't connect")
    	}
    	acl := []zookeeper.ACL{zookeeper.ACL{Perms: zookeeper.PERM_ALL, Scheme: "world", Id: "anyone"}}
    	host, _ := os.Hostname()
    	t := time.Now()
    	servicerestart, _ := zkf.Exists(SERVICEPATH)
    	if servicerestart == nil {
    		path, _ := zkf.Create(SERVICEPATH, host+" "+t.Format(time.Kitchen), zookeeper.EPHEMERAL, acl)
    		fmt.Println(path)
    	} else {
    		change, _ := zkf.Set(SERVICEPATH, host+" "+t.Format(time.Kitchen), -1)
    		fmt.Println(change.MTime().Format(time.Kitchen))
    	}
    	if flag {
    		wg.Done()
    	}
    
    }
    

    Let me explain what it does. Basically it takes a zookeeper connection string and it splits it per server. This was a requirement from the zk module used. It could’n take as argument more than one case of host:2181.
    After we found the active server, we can connect to it and put in the /servicerestart path the hostname and also the time on which the resource was edited.
    In order to create a resource, you will need an ACL slice that will be passed as parameter.

    acl := []zookeeper.ACL{zookeeper.ACL{Perms: zookeeper.PERM_ALL, Scheme: "world", Id: "anyone"}}

    Once this slice is created we will get in the next step and check if the resource exists. If it doesn’t then we will create it and if it does, we will just modify it.

    The fmt.Println instructions are put basically for two reasons.

    • In order to see the resource that it’s created. And i wanted to do that because zookeeper.EPHEMERAL parameter only creates this resource as long as the connection is active. If you want persistence, you will have to use zookeeper.SEQUENCE but it will add to your resource name a unique counter.
    • Also see the timestamp when the resource was modified.

    Even if you don’t close the zookeeper connection with defer zkf.Close(), it will close it automatically and end the script. So, we still need a way to keep it alive, and we will do that using WaitGroups…
    We will add one function in the queue and wait for it to finish. And to control this we can use a parameter that is mapped to a flag.

    This is just a very small example and i am still a true beginner in the art of Go programming, but hope it helps 🙂

    Cheers

  • Don’t delete the Kafka GC logs when they are used

    Hi,

    I made a mistake some time ago, and it’s there to hunt me.
    Deleting the normal gc logs including the one it’s already used doesn’t solve anything, it just created a more difficult situation.
    Here is my example:

    /dev/sda1                        50G   42G  5.2G  90% /
    /opt/kafka/logs# ll
    total 34M
    drwxrwxr-x 2 kafka kafka 4.0K Oct 10 19:34 ./
    drwxr-xr-x 7 kafka kafka 4.0K Mar 14  2018 ../
    -rw-rw-r-- 1 kafka kafka    0 Mar 14  2018 controller.log
    -rw-rw-r-- 1 kafka kafka    0 Mar 14  2018 kafka-authorizer.log
    -rw-rw-r-- 1 kafka kafka    0 Mar 14  2018 kafka-request.log
    -rw-rw-r-- 1 kafka kafka 2.9M Oct 11 04:44 log-cleaner.log
    -rw-rw-r-- 1 kafka kafka 6.1M Oct 11 05:24 server.log
    -rw-rw-r-- 1 kafka kafka  25M Oct  4 14:03 state-change.log
    
    lsof +L1 | grep delete
    init        1     root   13w   REG    8,1         106     0     95 /var/log/upstart/systemd-logind.log.1 (deleted)
    init        1     root   14w   REG    8,1        5794     0   2944 /var/log/upstart/kafka-manager.log.1 (deleted)
    java     1630    kafka    3w   REG    8,1 46836567522     0 524939 /opt/kafka-2.11-0.10.1.1/logs/kafkaServer-gc.log (deleted)
    java     1863 dd-agent    4r   REG    8,1     5750256     0 525428 /opt/datadog-agent/bin/agent/dist/jmx/jmxfetch-0.20.1-jar-with-dependencies.jar (deleted)
    java    10749 dd-agent    4r   REG    8,1     5750216     0 525427 /opt/datadog-agent/bin/agent/dist/jmx/jmxfetch-0.20.0-jar-with-dependencies.jar (deleted)
    bash    10928     root    0u   CHR  136,6         0t0     0      9 /dev/pts/6 (deleted)
    bash    10928     root    1u   CHR  136,6         0t0     0      9 /dev/pts/6 (deleted)
    bash    10928     root    2u   CHR  136,6         0t0     0      9 /dev/pts/6 (deleted)
    bash    10928     root  255u   CHR  136,6         0t0     0      9 /dev/pts/6 (deleted)
    tail    12378     root    0u   CHR  136,6         0t0     0      9 /dev/pts/6 (deleted)
    tail    12378     root    1u   CHR  136,6         0t0     0      9 /dev/pts/6 (deleted)
    tail    12378     root    2u   CHR  136,6         0t0     0      9 /dev/pts/6 (deleted)
    tail    12378     root    3r   REG    8,1    52428909     0 525512 /opt/kafka-2.11-0.10.1.1/logs/server.log.1 (deleted)
    java    14692 dd-agent    4r   REG    8,1     5750256     0 526042 /opt/datadog-agent/bin/agent/dist/jmx/jmxfetch-0.20.1-jar-with-dependencies.jar (deleted)
    java    16574 dd-agent    4r   REG    8,1     5750256     0 526041 /opt/datadog-agent/bin/agent/dist/jmx/jmxfetch-0.20.1-jar-with-dependencies.jar (deleted)
    

    Handling gc in versions lower than 1.0.0 is quite tricky. It is best to remove these options from your startup script

    -XX:+DisableExplicitGC -Djava.awt.headless=true -Xloggc:/opt/kafka/bin/../logs/kafkaServer-gc.log -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps

    But taking into consideration that we use a standard puppet module that it’s used by multiple teams it is still to be fixed. Fortunately from 1.0.0, GC is disabled by default.

    In order to fix what i showed you before, process restart is needed and we will do that.

    Cheers

  • Final version of SSL gen script for kafka

    Hi,

    I wrote a lot about this topic but it seems that i came to the procedure specified by Confluent.
    Here is the right way to do it, at least for now:

    #!/bin/bash
    HOST=<%= @fqdn %>
    PASSWORD=<%= @pass %>
    KEYSTOREPASS=<%= @keystorepass %>
    VALIDITY=365
    
    keytool -keystore kafka.server.keystore.jks -alias ${HOST} -validity $VALIDITY -genkey -dname "CN=${HOST}, OU=MyTeam, O=MyCompany, L=Bucharest S=Romania C=RO" -storepass $KEYSTOREPASS -keypass $KEYSTOREPASS
    openssl req -new -x509 -keyout ca-key -out ca-cert -days $VALIDITY -subj "/CN=${HOST}/OU=MyTeam/O=MyCompany/L=Bucharest/S=Romania/C=RO" -passout pass:$PASSWORD
    keytool -keystore kafka.server.truststore.jks -alias CARoot -import -file ca-cert -storepass $KEYSTOREPASS -noprompt
    keytool -keystore kafka.server.keystore.jks -alias CARoot -import -file ca-cert -storepass $KEYSTOREPASS -noprompt
    keytool -keystore kafka.server.keystore.jks -alias ${HOST} -certreq -file cert-file-${HOST}.host -storepass $KEYSTOREPASS
    openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file-${HOST}.host -out cert-signed-${HOST}.host -days $VALIDITY -CAcreateserial -passin pass:$PASSWORD
    keytool -keystore kafka.server.keystore.jks -alias ${HOST} -import -file cert-signed-${HOST}.host -storepass $KEYSTOREPASS -noprompt
    keytool -keystore kafka.client.keystore.jks -alias CARoot -import -file ca-cert -storepass $KEYSTOREPASS -noprompt
    keytool -keystore kafka.client.truststore.jks -alias CARoot -import -file ca-cert -storepass $KEYSTOREPASS -noprompt
    
    <% @servers.each do |server| 
    separate = server.split("."); host = separate[0]-%>
    # <%= server %>
    keytool -keystore <%= host %>.server.keystore.jks -alias <%= server %> -validity $VALIDITY -genkey -dname "CN=<%= server %>, OU=MyTeam, O=MyCompany, L=Bucharest S=Romania C=RO" -storepass $KEYSTOREPASS -keypass $KEYSTOREPASS
    keytool -keystore <%= host %>.server.keystore.jks -alias <%= server %> -certreq -file cert-file-<%= server %>.host -storepass $KEYSTOREPASS
    openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file-<%= server %>.host -out cert-signed-<%= server %>.host -days $VALIDITY -CAcreateserial -passin pass:$PASSWORD
    keytool -keystore <%= host %>.server.keystore.jks -alias CARoot -import -file ca-cert -storepass $KEYSTOREPASS -noprompt
    keytool -keystore <%= host %>.server.keystore.jks -alias <%= server %> -import -file cert-signed-<%= server %>.host -storepass $KEYSTOREPASS -noprompt
    
    <% end -%>
    
    keytool -keystore kafka.client.keystore.jks -alias 'client' -validity $VALIDITY -genkey -dname "CN=${HOST}, OU=MyTeam, O=MyCompany, L=Bucharest S=Romania C=RO" -storepass $KEYSTOREPASS -keypass $KEYSTOREPASS
    keytool -keystore kafka.client.keystore.jks -alias 'client' -certreq -file cert-file-client.host -storepass $KEYSTOREPASS
    openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file-client.host -out cert-signed-client.host -days $VALIDITY -CAcreateserial -passin pass:$PASSWORD
    keytool -keystore kafka.client.keystore.jks -alias 'client' -import -file cert-signed-client.host -storepass $KEYSTOREPASS -noprompt
    

    The puppet code needs to be modified also. You can find the initial manifest here The difference is

    
    if (member($servers,$item[0]) and $item[1] == "disabled") {
        $fqdn_split = split($item[0], '[.]')
            exec{"copy files to ${item[0]}":
                cwd => '/home/kafka',
                path   => '/usr/bin:/usr/sbin:/bin',
                command => "scp /home/kafka/${fqdn_split[0]}.server.keystore.jks kafka@${item[0]}:/home/kafka/kafka.server.keystore.jks; scp /home/kafka/kafka.server.truststore.jks kafka@${item[0]}:/home/kafka/kafka.server.truststore.jks",
                user => 'kafka',
            }
            }
    

    Enough on this topic.

    Cheers

  • Wrong again, there is no return code 0 on self signed certs

    Morning,

    It looks like i was wrong again with the SSL generation script. Here is the second article

    Code 0 is not good after all and it signals that Kafka broker is closing the connection really fast.

    So:

  • There is no 0 on self signed certs
  • Please make sure that you have a certificate in chain when you test
  • I will give you just the server side, for the client it’s still not very clear if it works. Once i have the confirmation i will post it.

    #!/bin/bash
    HOST=<%= @fqdn %>
    PASSWORD=<%= @pass %>
    KEYSTOREPASS=<%= @keystorepass %>
    VALIDITY=365
    
    openssl genrsa -out CA.key 2048
    openssl req -new -x509 -keyout CA.key -out ca-cert -days $VALIDITY -subj "/CN=${HOST}/OU=MyTeam/O=MyCompany/L=Bucharest/S=Romania/C=RO" -passout pass:$PASSWORD
    keytool -keystore kafka.server.keystore.jks -alias $HOST -validity $VALIDITY -genkey -dname "CN=${HOST}, OU=MyTeam, O=MyCompany, L=Bucharest S=Romania C=RO" -storepass $KEYSTOREPASS -keypass $KEYSTOREPASS
    keytool -keystore kafka.server.keystore.jks -alias $HOST -certreq -file cert-file-${HOST}.host -storepass $KEYSTOREPASS
    openssl x509 -req -CA ca-cert -CAkey CA.key -in cert-file-${HOST}.host -out cert-signed-${HOST}.host -days $VALIDITY -CAcreateserial -passin pass:$PASSWORD
    keytool -keystore kafka.server.keystore.jks -alias CARoot -import -trustcacerts -file ca-cert -storepass $KEYSTOREPASS -noprompt
    keytool -keystore kafka.server.keystore.jks -alias $HOST -import -file cert-signed-${HOST}.host -storepass $KEYSTOREPASS -noprompt
    
    <% @servers.each do |server| -%>
    # <%= server %>
    keytool -keystore kafka.server.keystore.jks -alias <%= server %> -validity $VALIDITY -genkey -dname "CN=<%= server %>, OU=MyTeam, O=MyCompany, L=Bucharest S=Romania C=RO" -storepass $KEYSTOREPASS -keypass $KEYSTOREPASS
    keytool -keystore kafka.server.keystore.jks -alias <%= server %> -certreq -file cert-file-<%= server %>.host -storepass $KEYSTOREPASS
    openssl x509 -req -CA ca-cert -CAkey CA.key -in cert-file-<%= server %>.host -out cert-signed-<%= server %>.host -days $VALIDITY -CAcreateserial -passin pass:$PASSWORD
    keytool -keystore kafka.server.keystore.jks -alias <%= server %> -import -file cert-signed-<%= server %>.host -storepass $KEYSTOREPASS -noprompt
    <% end -%>
    
    keytool -keystore kafka.server.truststore.jks -alias CARoot -import -trustcacerts -file ca-cert -storepass $KEYSTOREPASS -noprompt
    

    Hope i don’t discover anything else that it’s wrong. If so, keep you informed

    PS: It seems that i was wrong again 😀 It’s strange that it works with Kafka until 2.0 but it will not validate on that version.
    The final right way to do it is to kave in the keystore only caroot and the alias correspondent to that server.
    Will post as soon as i have an implementation.

    And here it is.
    Cheers