Categories
kafka python

Kafka consumer group info retrieval using Python

Hi,

I’ve been playing with kafka-python module to grab the info i need in order to reconfigure Datadog integration.

Unfortunately, there is a catch also on this method. And i will show you below.

Here is a little bit of not so elegant code.

from kafka import BrokerConnection
from kafka.protocol.admin import *
import socket

fqdn = socket.getfqdn()
bc = BrokerConnection(fqdn,9092,socket.AF_INET)
try:
  bc.connect_blocking()
except Exception as e:
  print(e)
if bc.connected():
   print("Connection to", fqdn, " established")

def getgroup():
 list_groups_request = ListGroupsRequest_v1()

 future0 = bc.send(list_groups_request)
 while not future0.is_done:
     for resp, f in bc.recv():
         f.success(resp)
 group_ids = ()
 for group in future0.value.groups:
     group_ids += (group[0],)

 print(group_ids)
     
 description = DescribeGroupsRequest_v1(group_ids)
 future1 = bc.send(description)
 while not future1.is_done:
    for resp, f in bc.recv():
        f.success(resp)

 for groupid in future1.value.groups:
     print('For group ',groupid[1],':\n')
     for meta in groupid[5]:
         print(meta[0],meta[2],sep="\n")
         print(meta[3])
 if future1.is_done:
    print("Group query is done")

getgroup()

As you will see, print(meta[3]) will return a very ugly binary data with topic names in it, that is not converted if you try with meta[3].decode(‘utf-8’)

I hope i can find a way to decrypt it.

Cheers

Categories
kafka puppet

Fact for kafka_consumer hash…or kind of

Hi,

There is a late requirement that we activate the kafka_consumer functionality of Datadog.

Unfortunately this is a challenge if you don’t have a fixed number of consumer groups and topics (on one client we had a couple of hundreds consumer groups)

Here is how it should look in the example file

  #  consumer_groups:
  #    <CONSUMER_NAME_1>:
  #      <TOPIC_NAME_1>: [0, 1, 4, 12]
  #    <CONSUMER_NAME_2>:
  #      <TOPIC_NAME_2>:
  #    <CONSUMER_NAME_3>

So, if you want to grab the data using the old kafka-consumer-groups.sh, you will have to do this in one iteration. I tried it this way, but even if i am done, should not be a option for larger clusters

require 'facter'
Facter.add('kafka_consumers_config') do
  setcode do
      kafka_consumers_cmd = '/opt/kafka/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list'
      kafka_consumers_result = Facter::Core::Execution.exec(kafka_consumers_cmd)
      kafka_consumers_result.to_s.split(/\n/)
      group_hash = {}
      kafka_consumers_result.each_line do |group|
        groupid = group.strip 
        kafka_consumer_topic_list_cmd="/opt/kafka/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group #{groupid} --describe | sort | grep -v TOPIC  | awk {\'print $1\'} | uniq"
        kafka_consumer_topic_list_result = Facter::Core::Execution.exec(kafka_consumer_topic_list_cmd)
        kafka_consumer_topic_list_result.split(/\n/).reject { |c| c.empty? }
        topic_hash = {}
        kafka_consumer_topic_list_result.each_line do |topic|
           topicid = topic.strip
           kafka_consumer_topic_partition_cmd="/opt/kafka/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group #{groupid} --describe | grep -v TOPIC | grep #{topicid} | awk {\'print $2\'} | sort"
           kafka_consumer_topic_partition_result=Facter::Core::Execution.exec(kafka_consumer_topic_partition_cmd)
           kafka_consumer_topic_partition_result.gsub("\n", ' ').squeeze(' ')
           topic_hash[topic] = kafka_consumer_topic_partition_result
        end
      group_hash[group] = topic_hash    
      end
    group_hash
  end
end

This is posted only as a snapshot and maybe as a source of inspiration. For an actual working version, it should be done using Java or Scala in order to leverage the power of libraries (from what i know Python/Go and other programming languages have only libraries on consumer/producer part)

If i will pursue the task of rewriting this using only one interrogation loop or in Java/Scala, you will see it.

Categories
python

Python for opening and reading files

Since I started learning Python and hopefully take also a certification, I am trying to do some hands-on.

Nothing too complicated, just some basic exercises, for the moment. Here is one of them.

They want to though you to open the and file and read from it like:

from sys import argv

script, filename = argv
txt = open(filename)
print(f"Here is your file: {filename}")
print(txt.read())

But there is a thing not to be ignored, and that is exception handling.

from sys import argv

try:
    script, filename = argv
    txt = open(filename)
    print(f"Here is your file: {filename}")
    print(txt.read())
except ValueError as e:
    if "not enough" in e.__str__():
        print("not enough arguments given")
    else:
        print("too many arguments given")
except FileNotFoundError:
    print("file given, not found")

And since you can’t really discern between ValueError messages, or at least I don’t know how to do that more elegant, yet, we have a workaround.

There is also a FileNotFoundError which shouldn’t be left unchecked, and if I missed something, leave a comment.

P.S: I know there is also the possibility for the file not to have reading permissions. That demonstrates that for three lines of code you need to threat even more exceptional cases.

Cheers!

Categories
puppet

Distributing service conditionally on OS version

Hi,

Since we are in the process of migrating to 16.04, my service restart script needed to be deployed with separate builds.

In that purpose, i found a fact that would help me, so that my standard file block transformed into this:

 case $facts['os']['distro']['codename']  {
    'xenial': {
        file {"/root/servicerestart":
        source => 'puppet:///modules/profiles/servicerestart-kafka-new',
        mode => '0755',
        replace => true,
	}
    }
    'trusty': { 
    file {"/root/servicerestart":
        source => 'puppet:///modules/profiles/servicerestart-kafka',
        mode => '0755',
        replace => true,
	}
}
  }

That should be all for now.

Categories
linux puppet

Order Linux processes by memory usage

This one is more for me actually. We have some issues with one puppet instance on which the processes fail, and i wanted to see if there is any way to order them by memory usage.

So i searched the net and found this link https://unix.stackexchange.com/questions/92493/sorting-down-processes-by-memory-usage

The command is like

ps aux --sort -rss | head -10

And it provides you with following output, at least in my case

USER       PID %CPU %MEM    VSZ   RSS TTY      STAT START   TIME COMMAND
puppet    6327 70.1 25.5 3585952 1034532 ?     Sl   06:53   7:33 /usr/lib/jvm/java-7-openjdk-amd64/jre/bin/java -XX:OnOutOfMemoryError=kill -9 %p -Djava.security.egd=/dev/urandom -javaagent:/usr/share/java/jolokia-jvm-agent.jar=port=8778 -Xms1024m -Xmx1024m -cp /opt/puppetlabs/server/apps/puppetserver/puppet-server-release.jar clojure.main -m puppetlabs.trapperkeeper.main --config /etc/puppetlabs/puppetserver/conf.d -b /etc/puppetlabs/puppetserver/bootstrap.cfg
jenkins   6776  9.6 16.6 4648236 671980 ?      Sl   06:55   0:51 /usr/bin/java -Djava.awt.headless=true -javaagent:/usr/share/java/jolokia-jvm-agent.jar=port=8780 -Xms1024m -Xmx1024m -jar /usr/share/jenkins/jenkins.war --webroot=/var/cache/jenkins/war --httpPort=8080 --httpListenAddress=127.0.0.1
puppetdb  5987 16.8 11.7 3845896 474164 ?      Sl   06:52   2:01 /usr/bin/java -XX:OnOutOfMemoryError=kill -9 %p -Djava.security.egd=/dev/urandom -Xmx192m -javaagent:/usr/share/java/jolokia-jvm-agent.jar=port=8779 -cp /opt/puppetlabs/server/apps/puppetdb/puppetdb.jar clojure.main -m puppetlabs.puppetdb.main --config /etc/puppetlabs/puppetdb/conf.d -b /etc/puppetlabs/puppetdb/bootstrap.cfg
postgres  1458  0.0  2.1 249512 88656 ?        Ss   Nov21   3:10 postgres: checkpointer process                                                                                              
postgres  6206  0.0  1.4 253448 57984 ?        Ss   06:53   0:00 postgres: puppetdb puppetdb 127.0.0.1(36882) idle                                                                           
postgres  6209  0.0  0.7 252580 29820 ?        Ss   06:53   0:00 postgres: puppetdb puppetdb 127.0.0.1(36886) idle                                                                           
postgres  6210  0.0  0.5 254892 22440 ?        Ss   06:53   0:00 postgres: puppetdb puppetdb 127.0.0.1(36888) idle                                                                           
postgres  6213  0.0  0.5 254320 21416 ?        Ss   06:53   0:00 postgres: puppetdb puppetdb 127.0.0.1(36894) idle                                                                           
postgres  6205  0.0  0.5 253524 20324 ?        Ss   06:53   0:00 postgres: puppetdb puppetdb 127.0.0.1(36878) idle                       

As you can probably see, the components are taking slowly but surely more and more memory and since the machine has only 4GB allocated it will probably crash again.

If this happens, i will manually increase the memory with another 2GB and see where we will go from there.

Cheers!

Categories
golang linux

Golang logging using USER profile on Mint 19

Hi,

I committed on learning Golang and as a part of this task i came to play with logging examples. It seems that if you user syslog.LOG_USER the info is stored in the /var/log/syslog.

Here is the code and also the output

package main
import (
	"io"
	"log"
	"log/syslog"
	"os"
	"path/filepath"
)
func main() {
	progname := filepath.Base(os.Args[0])
	sysLog, err := syslog.New(syslog.LOG_INFO|syslog.LOG_USER,progname)
	if err != nil {
	log.Fatal(err)
} else {
	log.SetOutput(sysLog)
	}
	log.Println("LOG_INFO + LOG_USER: Logging in Go!")
	io.WriteString(os.Stdout,"Will you see this?")
}

The second line (Will you see this?) is outputed only in console.

Oct 29 14:30:25 mintworkstation logging[4835]: 2018/10/29 14:30:25 LOG_INFO + LOG_USER: Logging in Go!
Oct 29 14:30:25 mintworkstation logging[4835]: 2018/10/29 14:30:25 LOG_INFO + LOG_USER: Logging in Go!

P.S.: Managed to find a config file located under /etc/rsyslog.d, called 50-default.conf.
In this file there is a commented line

#user.*				-/var/log/user.log

If you uncomment it and restart service with systemctl restart rsyslog, the output will be moved to /var/log/user.log

Oct 29 14:48:32 mintworkstation NetworkManager[836]:   [1540817312.1683] connectivity: (enp0s31f6) timed out
Oct 29 14:49:37 mintworkstation gnome-terminal-[2196]: g_menu_insert_item: assertion 'G_IS_MENU_ITEM (item)' failed
Oct 29 14:49:59 mintworkstation gnome-terminal-[2196]: g_menu_insert_item: assertion 'G_IS_MENU_ITEM (item)' failed
Oct 29 14:50:28 mintworkstation gnome-terminal-[2196]: g_menu_insert_item: assertion 'G_IS_MENU_ITEM (item)' failed
Oct 29 14:50:59 mintworkstation logging[5144]: 2018/10/29 14:50:59 LOG_INFO + LOG_USER: Logging in Go!
Oct 29 14:51:14 mintworkstation gnome-terminal-[2196]: g_menu_insert_item: assertion 'G_IS_MENU_ITEM (item)' failed

Cheers

Categories
golang kafka

Small go code example for zookeeper resource editing

Hi,

We have the task of “service restart coordination” for our Apache Kafka cluster. It’s still a work in progress but if you want to use the zookeeper for some status verification and update, something like this will work as an example.

package main

import (
	"fmt"
	"io"
	"launchpad.net/gozk"
	"os"
	"strings"
	"sync"
	"time"
)

const (
	SERVICEPATH = "/servicerestart"
)

var wg sync.WaitGroup

func main() {
	conn := "zk1:2181,zk2:2181,zk3:2181"
	connSlice := strings.Split(string(conn), ",")
	var flag bool
	args := os.Args
	if len(args) != 2 {
		io.WriteString(os.Stdout, "Argument is needed for the script\n")
		os.Exit(1)
	} else {
		switch args[1] {
		case "hang":
			flag = false
		case "nohang":
			flag = true		
		default:
			io.WriteString(os.Stdout, "Command unrecognized\n")	
	}
		
	}
	wg.Add(1)
	go ModifyZooStat(connSlice, flag)
	wg.Wait()
}
func ModifyZooStat(strconn []string, flag bool) {
	var zooReach string
	for _, zoohost := range strconn {
		zk, _, err := zookeeper.Dial(zoohost, 5e9)
		if err != nil {
			fmt.Println("Couldn't connect to " + zoohost)
			continue
		} else {
			zooReach = zoohost
			zk.Close()
			break
		}
	}
	zkf, sessionf, _ := zookeeper.Dial(zooReach, 5e9)
defer zkf.Close()
	event := <-sessionf
	if event.State != zookeeper.STATE_CONNECTED {
		fmt.Println("Couldn't connect")
	}
	acl := []zookeeper.ACL{zookeeper.ACL{Perms: zookeeper.PERM_ALL, Scheme: "world", Id: "anyone"}}
	host, _ := os.Hostname()
	t := time.Now()
	servicerestart, _ := zkf.Exists(SERVICEPATH)
	if servicerestart == nil {
		path, _ := zkf.Create(SERVICEPATH, host+" "+t.Format(time.Kitchen), zookeeper.EPHEMERAL, acl)
		fmt.Println(path)
	} else {
		change, _ := zkf.Set(SERVICEPATH, host+" "+t.Format(time.Kitchen), -1)
		fmt.Println(change.MTime().Format(time.Kitchen))
	}
	if flag {
		wg.Done()
	}

}

Let me explain what it does. Basically it takes a zookeeper connection string and it splits it per server. This was a requirement from the zk module used. It could’n take as argument more than one case of host:2181.
After we found the active server, we can connect to it and put in the /servicerestart path the hostname and also the time on which the resource was edited.
In order to create a resource, you will need an ACL slice that will be passed as parameter.

acl := []zookeeper.ACL{zookeeper.ACL{Perms: zookeeper.PERM_ALL, Scheme: "world", Id: "anyone"}}

Once this slice is created we will get in the next step and check if the resource exists. If it doesn’t then we will create it and if it does, we will just modify it.

The fmt.Println instructions are put basically for two reasons.

  • In order to see the resource that it’s created. And i wanted to do that because zookeeper.EPHEMERAL parameter only creates this resource as long as the connection is active. If you want persistence, you will have to use zookeeper.SEQUENCE but it will add to your resource name a unique counter.
  • Also see the timestamp when the resource was modified.

Even if you don’t close the zookeeper connection with defer zkf.Close(), it will close it automatically and end the script. So, we still need a way to keep it alive, and we will do that using WaitGroups…
We will add one function in the queue and wait for it to finish. And to control this we can use a parameter that is mapped to a flag.

This is just a very small example and i am still a true beginner in the art of Go programming, but hope it helps 🙂

Cheers

Categories
kafka

Don’t delete the Kafka GC logs when they are used

Hi,

I made a mistake some time ago, and it’s there to hunt me.
Deleting the normal gc logs including the one it’s already used doesn’t solve anything, it just created a more difficult situation.
Here is my example:

/dev/sda1                        50G   42G  5.2G  90% /
/opt/kafka/logs# ll
total 34M
drwxrwxr-x 2 kafka kafka 4.0K Oct 10 19:34 ./
drwxr-xr-x 7 kafka kafka 4.0K Mar 14  2018 ../
-rw-rw-r-- 1 kafka kafka    0 Mar 14  2018 controller.log
-rw-rw-r-- 1 kafka kafka    0 Mar 14  2018 kafka-authorizer.log
-rw-rw-r-- 1 kafka kafka    0 Mar 14  2018 kafka-request.log
-rw-rw-r-- 1 kafka kafka 2.9M Oct 11 04:44 log-cleaner.log
-rw-rw-r-- 1 kafka kafka 6.1M Oct 11 05:24 server.log
-rw-rw-r-- 1 kafka kafka  25M Oct  4 14:03 state-change.log
lsof +L1 | grep delete
init        1     root   13w   REG    8,1         106     0     95 /var/log/upstart/systemd-logind.log.1 (deleted)
init        1     root   14w   REG    8,1        5794     0   2944 /var/log/upstart/kafka-manager.log.1 (deleted)
java     1630    kafka    3w   REG    8,1 46836567522     0 524939 /opt/kafka-2.11-0.10.1.1/logs/kafkaServer-gc.log (deleted)
java     1863 dd-agent    4r   REG    8,1     5750256     0 525428 /opt/datadog-agent/bin/agent/dist/jmx/jmxfetch-0.20.1-jar-with-dependencies.jar (deleted)
java    10749 dd-agent    4r   REG    8,1     5750216     0 525427 /opt/datadog-agent/bin/agent/dist/jmx/jmxfetch-0.20.0-jar-with-dependencies.jar (deleted)
bash    10928     root    0u   CHR  136,6         0t0     0      9 /dev/pts/6 (deleted)
bash    10928     root    1u   CHR  136,6         0t0     0      9 /dev/pts/6 (deleted)
bash    10928     root    2u   CHR  136,6         0t0     0      9 /dev/pts/6 (deleted)
bash    10928     root  255u   CHR  136,6         0t0     0      9 /dev/pts/6 (deleted)
tail    12378     root    0u   CHR  136,6         0t0     0      9 /dev/pts/6 (deleted)
tail    12378     root    1u   CHR  136,6         0t0     0      9 /dev/pts/6 (deleted)
tail    12378     root    2u   CHR  136,6         0t0     0      9 /dev/pts/6 (deleted)
tail    12378     root    3r   REG    8,1    52428909     0 525512 /opt/kafka-2.11-0.10.1.1/logs/server.log.1 (deleted)
java    14692 dd-agent    4r   REG    8,1     5750256     0 526042 /opt/datadog-agent/bin/agent/dist/jmx/jmxfetch-0.20.1-jar-with-dependencies.jar (deleted)
java    16574 dd-agent    4r   REG    8,1     5750256     0 526041 /opt/datadog-agent/bin/agent/dist/jmx/jmxfetch-0.20.1-jar-with-dependencies.jar (deleted)

Handling gc in versions lower than 1.0.0 is quite tricky. It is best to remove these options from your startup script

-XX:+DisableExplicitGC -Djava.awt.headless=true -Xloggc:/opt/kafka/bin/../logs/kafkaServer-gc.log -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps

But taking into consideration that we use a standard puppet module that it’s used by multiple teams it is still to be fixed. Fortunately from 1.0.0, GC is disabled by default.

In order to fix what i showed you before, process restart is needed and we will do that.

Cheers

Categories
kafka

Final version of SSL gen script for kafka

Hi,

I wrote a lot about this topic but it seems that i came to the procedure specified by Confluent.
Here is the right way to do it, at least for now:

#!/bin/bash
HOST=<%= @fqdn %>
PASSWORD=<%= @pass %>
KEYSTOREPASS=<%= @keystorepass %>
VALIDITY=365

keytool -keystore kafka.server.keystore.jks -alias ${HOST} -validity $VALIDITY -genkey -dname "CN=${HOST}, OU=MyTeam, O=MyCompany, L=Bucharest S=Romania C=RO" -storepass $KEYSTOREPASS -keypass $KEYSTOREPASS
openssl req -new -x509 -keyout ca-key -out ca-cert -days $VALIDITY -subj "/CN=${HOST}/OU=MyTeam/O=MyCompany/L=Bucharest/S=Romania/C=RO" -passout pass:$PASSWORD
keytool -keystore kafka.server.truststore.jks -alias CARoot -import -file ca-cert -storepass $KEYSTOREPASS -noprompt
keytool -keystore kafka.server.keystore.jks -alias CARoot -import -file ca-cert -storepass $KEYSTOREPASS -noprompt
keytool -keystore kafka.server.keystore.jks -alias ${HOST} -certreq -file cert-file-${HOST}.host -storepass $KEYSTOREPASS
openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file-${HOST}.host -out cert-signed-${HOST}.host -days $VALIDITY -CAcreateserial -passin pass:$PASSWORD
keytool -keystore kafka.server.keystore.jks -alias ${HOST} -import -file cert-signed-${HOST}.host -storepass $KEYSTOREPASS -noprompt
keytool -keystore kafka.client.keystore.jks -alias CARoot -import -file ca-cert -storepass $KEYSTOREPASS -noprompt
keytool -keystore kafka.client.truststore.jks -alias CARoot -import -file ca-cert -storepass $KEYSTOREPASS -noprompt

<% @servers.each do |server| 
separate = server.split("."); host = separate[0]-%>
# <%= server %>
keytool -keystore <%= host %>.server.keystore.jks -alias <%= server %> -validity $VALIDITY -genkey -dname "CN=<%= server %>, OU=MyTeam, O=MyCompany, L=Bucharest S=Romania C=RO" -storepass $KEYSTOREPASS -keypass $KEYSTOREPASS
keytool -keystore <%= host %>.server.keystore.jks -alias <%= server %> -certreq -file cert-file-<%= server %>.host -storepass $KEYSTOREPASS
openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file-<%= server %>.host -out cert-signed-<%= server %>.host -days $VALIDITY -CAcreateserial -passin pass:$PASSWORD
keytool -keystore <%= host %>.server.keystore.jks -alias CARoot -import -file ca-cert -storepass $KEYSTOREPASS -noprompt
keytool -keystore <%= host %>.server.keystore.jks -alias <%= server %> -import -file cert-signed-<%= server %>.host -storepass $KEYSTOREPASS -noprompt

<% end -%>

keytool -keystore kafka.client.keystore.jks -alias 'client' -validity $VALIDITY -genkey -dname "CN=${HOST}, OU=MyTeam, O=MyCompany, L=Bucharest S=Romania C=RO" -storepass $KEYSTOREPASS -keypass $KEYSTOREPASS
keytool -keystore kafka.client.keystore.jks -alias 'client' -certreq -file cert-file-client.host -storepass $KEYSTOREPASS
openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file-client.host -out cert-signed-client.host -days $VALIDITY -CAcreateserial -passin pass:$PASSWORD
keytool -keystore kafka.client.keystore.jks -alias 'client' -import -file cert-signed-client.host -storepass $KEYSTOREPASS -noprompt

The puppet code needs to be modified also. You can find the initial manifest here The difference is


if (member($servers,$item[0]) and $item[1] == "disabled") {
    $fqdn_split = split($item[0], '[.]')
        exec{"copy files to ${item[0]}":
            cwd => '/home/kafka',
            path   => '/usr/bin:/usr/sbin:/bin',
            command => "scp /home/kafka/${fqdn_split[0]}.server.keystore.jks kafka@${item[0]}:/home/kafka/kafka.server.keystore.jks; scp /home/kafka/kafka.server.truststore.jks kafka@${item[0]}:/home/kafka/kafka.server.truststore.jks",
            user => 'kafka',
        }
        }

Enough on this topic.

Cheers

Categories
kafka

Wrong again, there is no return code 0 on self signed certs

Morning,

It looks like i was wrong again with the SSL generation script. Here is the second article

Code 0 is not good after all and it signals that Kafka broker is closing the connection really fast.

So:

  • There is no 0 on self signed certs
  • Please make sure that you have a certificate in chain when you test
  • I will give you just the server side, for the client it’s still not very clear if it works. Once i have the confirmation i will post it.

    #!/bin/bash
    HOST=<%= @fqdn %>
    PASSWORD=<%= @pass %>
    KEYSTOREPASS=<%= @keystorepass %>
    VALIDITY=365
    
    openssl genrsa -out CA.key 2048
    openssl req -new -x509 -keyout CA.key -out ca-cert -days $VALIDITY -subj "/CN=${HOST}/OU=MyTeam/O=MyCompany/L=Bucharest/S=Romania/C=RO" -passout pass:$PASSWORD
    keytool -keystore kafka.server.keystore.jks -alias $HOST -validity $VALIDITY -genkey -dname "CN=${HOST}, OU=MyTeam, O=MyCompany, L=Bucharest S=Romania C=RO" -storepass $KEYSTOREPASS -keypass $KEYSTOREPASS
    keytool -keystore kafka.server.keystore.jks -alias $HOST -certreq -file cert-file-${HOST}.host -storepass $KEYSTOREPASS
    openssl x509 -req -CA ca-cert -CAkey CA.key -in cert-file-${HOST}.host -out cert-signed-${HOST}.host -days $VALIDITY -CAcreateserial -passin pass:$PASSWORD
    keytool -keystore kafka.server.keystore.jks -alias CARoot -import -trustcacerts -file ca-cert -storepass $KEYSTOREPASS -noprompt
    keytool -keystore kafka.server.keystore.jks -alias $HOST -import -file cert-signed-${HOST}.host -storepass $KEYSTOREPASS -noprompt
    
    <% @servers.each do |server| -%>
    # <%= server %>
    keytool -keystore kafka.server.keystore.jks -alias <%= server %> -validity $VALIDITY -genkey -dname "CN=<%= server %>, OU=MyTeam, O=MyCompany, L=Bucharest S=Romania C=RO" -storepass $KEYSTOREPASS -keypass $KEYSTOREPASS
    keytool -keystore kafka.server.keystore.jks -alias <%= server %> -certreq -file cert-file-<%= server %>.host -storepass $KEYSTOREPASS
    openssl x509 -req -CA ca-cert -CAkey CA.key -in cert-file-<%= server %>.host -out cert-signed-<%= server %>.host -days $VALIDITY -CAcreateserial -passin pass:$PASSWORD
    keytool -keystore kafka.server.keystore.jks -alias <%= server %> -import -file cert-signed-<%= server %>.host -storepass $KEYSTOREPASS -noprompt
    <% end -%>
    
    keytool -keystore kafka.server.truststore.jks -alias CARoot -import -trustcacerts -file ca-cert -storepass $KEYSTOREPASS -noprompt
    

    Hope i don’t discover anything else that it’s wrong. If so, keep you informed

    PS: It seems that i was wrong again 😀 It’s strange that it works with Kafka until 2.0 but it will not validate on that version.
    The final right way to do it is to kave in the keystore only caroot and the alias correspondent to that server.
    Will post as soon as i have an implementation.

    And here it is.
    Cheers