Category: kafka

  • Correct SSL script for Kafka deployment

    Hi,

    I wrote some time ago a post about certificate generation in order to secure Kafka cluster.

    Long story short, it was wrong!

    Here is the correct version that returns O (keystore is correctly generated and used)

    
    #!/bin/bash
    HOST=<%= @fqdn %>
    PASSWORD=<%= @pass %>
    KEYSTOREPASS=<%= @keystorepass %>
    VALIDITY=365
    
    keytool -keystore kafka.server.temp.keystore.jks -alias $HOST -validity $VALIDITY -genkey -dname "CN=${HOST}, OU=Myteam, O=Mycompany, L=Bucharest S=Romania C=RO" -storepass $KEYSTOREPASS -keypass $KEYSTOREPASS
    openssl req -new -x509 -keyout ca-key -out ca-cert -days $VALIDITY -subj "/CN=${HOST}/OU=Myteam/O=MyCompany/L=Bucharest/S=Romania/C=RO" -passout pass:$PASSWORD
    keytool -keystore kafka.server.temp.keystore.jks -alias $HOST -certreq -file cert-file-${HOST}.host -storepass $KEYSTOREPASS
    openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file-${HOST}.host -out cert-signed-${HOST}.host -days $VALIDITY -CAcreateserial -passin pass:$PASSWORD
    keytool -keystore kafka.server.keystore.jks -alias $HOST -import -file cert-signed-${HOST}.host -storepass $KEYSTOREPASS -noprompt
    keytool -keystore kafka.server.keystore.jks -alias CARoot -import -file ca-cert -storepass $KEYSTOREPASS -noprompt
    keytool -keystore kafka.server.truststore.jks -alias CARoot -import -file ca-cert -storepass $KEYSTOREPASS -noprompt
    
    
    <% @servers.each do |server| -%>
    # <%= server %>
    keytool -keystore kafka.server.temp.keystore.jks -alias <%= server %> -validity $VALIDITY -genkey -dname "CN=<%= server %>, OU=Myteam, O=MyCompany, L=Bucharest S=Romania C=RO" -storepass $KEYSTOREPASS -keypass $KEYSTOREPASS
    keytool -keystore kafka.server.temp.keystore.jks -alias <%= server %> -certreq -file cert-file-<%= server %>.host -storepass $KEYSTOREPASS
    openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file-<%= server %>.host -out cert-signed-<%= server %>.host -days $VALIDITY -CAcreateserial -passin pass:$PASSWORD
    keytool -keystore kafka.server.keystore.jks -alias <%= server %> -import -file cert-signed-<%= server %>.host -storepass $KEYSTOREPASS -noprompt
    <% end -%>
    
    keytool -keystore kafka.client.temp.keystore.jks -alias 'client' -validity $VALIDITY -genkey -dname "CN=${HOST}, OU=Myteam, O=MyCompany, L=Bucharest S=Romania C=RO" -storepass $KEYSTOREPASS -keypass $KEYSTOREPASS
    keytool -keystore kafka.client.temp.keystore.jks -alias 'client' -certreq -file cert-file-client.host -storepass $KEYSTOREPASS
    openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file-client.host -out cert-signed-client.host -days $VALIDITY -CAcreateserial -passin pass:$PASSWORD
    keytool -keystore kafka.client.keystore.jks -alias $HOST -import -file cert-signed-client.host -storepass $KEYSTOREPASS -noprompt
    keytool -keystore kafka.client.truststore.jks -alias CARoot -import -file ca-cert -storepass $KEYSTOREPASS -noprompt
    

    Here is also a link to the old article for comparison wrong way to do it

    PS: It seems that this is also wrong. Please check article

  • Kernel not compatible with zookeeper version

    Morning,

    It’s important to share this situation with you. This morning i came to the office to see that a cluster that was upgraded/restarted had an issue with Zookeeper instances.

    Symptoms  were clear: instances won’t start completely. But why?

    After a little bit of investigation, i went to the /var/log/syslog (/var/log/zookeeper did not contain any information at all) to see that there is a bad page table in the jvm.

    Java version is:

    java version "1.8.0_111"
    Java(TM) SE Runtime Environment (build 1.8.0_111-b14)
    Java HotSpot(TM) 64-Bit Server VM (build 25.111-b14, mixed mode)
    

    So, the log showed following lines:

    Aug 16 07:16:04 kafka0 kernel: [  742.349010] init: zookeeper main process ended, respawning
    Aug 16 07:16:04 kafka0 kernel: [  742.925427] java: Corrupted page table at address 7f6a81e5d100
    Aug 16 07:16:05 kafka0 kernel: [  742.926589] PGD 80000000373f4067 PUD b7852067 PMD b1c08067 PTE 80003ffffe17c225
    Aug 16 07:16:05 kafka0 kernel: [  742.928011] Bad pagetable: 000d [#1643] SMP 
    Aug 16 07:16:05 kafka0 kernel: [  742.928011] Modules linked in: dm_crypt serio_raw isofs crct10dif_pclmul crc32_pclmul ghash_clmulni_intel aesni_intel aes_x86_64 lrw gf128mul glue_helper ablk_helper cryptd psmouse floppy
    

    Why should the JVM throw a memory error? The main reason is incompatibility with kernel version.

    Let’s take a look in the GRUB config file.

    Looks like we are using for boot:

    menuentry 'Ubuntu' --class ubuntu --class gnu-linux --class gnu --class os $menuentry_id_option 'gnulinux-simple-baf292e5-0bb6-4e58-8a71-5b912e0f09b6' {
    	recordfail
    	load_video
    	gfxmode $linux_gfx_mode
    	insmod gzio
    	insmod part_msdos
    	insmod ext2
    	if [ x$feature_platform_search_hint = xy ]; then
    	  search --no-floppy --fs-uuid --set=root  baf292e5-0bb6-4e58-8a71-5b912e0f09b6
    	else
    	  search --no-floppy --fs-uuid --set=root baf292e5-0bb6-4e58-8a71-5b912e0f09b6
    	fi
    	linux	/boot/vmlinuz-3.13.0-155-generic root=UUID=baf292e5-0bb6-4e58-8a71-5b912e0f09b6 ro  console=tty1 console=ttyS0
    	initrd	/boot/initrd.img-3.13.0-155-generic
    

    There was also an older version of kernel image available 3.13.0-153.

    Short fix for this is to update the grub.cfg file with the old version and reboot the server.

    Good fix is still in progress. Will post as soon as i have it.

    P.S: I forgot to mention the Zookeeper version:

    Zookeeper version: 3.4.5--1, built on 06/10/2013 17:26 GMT

    P.S 2: It seems that the issue is related with the java processes in general not only zookeeper

    Cheers

  • Kafka cluster nodes and controller using golang

    Hi,

    Using the golang library for zookeeper from here you can get very easily the nodes that are registered in the cluster controller node.
    In order to install this module, beside needing to setup the GOPATH you will have also to install packages from linux distro repo called:
    bzr, gcc, libzookeeper-mt-dev
    Once all of this is install just go get launchpad.net/gozk 🙂

    And here is the small example:

    
    package main
    
    import (
    	"launchpad.net/gozk"
    	"fmt"
    	"strings"
    )
    func main() {
    	GetResource()
    }
    func GetResource() {
    	zk, session, err := zookeeper.Dial("localhost:2181", 5e9)
     if err != nil {
    	fmt. Println("Couldn't connect")
    	}
    	
    	defer zk.Close()
    
    	event := <-session
    	if event.State != zookeeper.STATE_CONNECTED {
    		fmt.Println("Couldn't connect")
    	}
    
    	GetBrokers(zk)
    	GetController(zk)
    
    }
    func GetController(connection *zookeeper.Conn) {
    	rs ,_ , err := connection.Get("/controller")
     if err != nil {
            fmt. Println("Couldn't get the resource")
            }
    	controller := strings. Split(rs,",")
    	// fmt.Println(controller[1])
    	id := strings.Split(controller[1],":")
    	fmt. Printf("\nCluster controller is: %s\n",id[1])
    }
    func GetBrokers(connection *zookeeper.Conn) {	
    	trs ,_, err := connection.Children("/brokers/ids")
    
    	 if err != nil {
            fmt. Println("Couldn't get the resource")
            }
    	fmt.Printf("List of brokers: ")
    	for _, value := range trs {
    	fmt.Printf(" %s",value)
    	}
    	
    }
    

    Yeah, i know it's not the most elegant but it works:

    go run zootest.go
    List of brokers:  1011 1009 1001
    Cluster controller is: 1011
    

    That would be all.

    Tnx and cheers!

  • Delete corrupted Kafka topic version 2.0

    Hi,

    We had in the past the situation described in this link use-case-deleting-corrupted-kafka-topic
    The situation repeated a little bit different this time. Taking a look on the list of topics, there were three topics marked for deletion.
    None of them had a Leader or Isr, so after a little bit of investigation the conclusion that they weren’t available anymore on the filesystem.
    My assumption is that the cluster controller began the delete process but failed before sending a new metadata update to the zookeepers.
    A restart of the cluster controller was performed in order to provide a new epoch, and after that, manual deletion of the “deletion request” and metadata from the zookeepers(you can find the commands in the above link).

    On another list, everything looks good.

    Cheers

  • Golang example for kafka service restart script

    Hi,

    Not much to say, a pretty decent script for Kafka service restart(i tried to write it for our rolling upgrade procedure) that it’s still work in progress. If there are any changes that needed to be made to it, i will post it.

    Here is the script:

    package main
    
    import (
    	"os/exec"
    	"strings"
    	"fmt"
    	"time"
    	"io/ioutil"
    )
    
    const (
        libpath = "/opt/kafka/libs"
        )
    func main(){
    	StopBroker()
    	StartBroker()
    	fmt.Printf("You are running version %s",GetVersion(libpath))
    }
    
    func GetVersion(libpath string) string {
     	var KafkaVersion []string
    	files, err := ioutil.ReadDir(libpath)
    	check(err)
    for _, f := range files {
    	if (strings.HasPrefix(f.Name(),"kafka_")) {
    		KafkaVersion = strings.Split(f.Name(),"-")
    		break
    		}
        }
    	return KafkaVersion[1]
    }
    func GetStatus() bool {
    	brokers := GetBrokerList()
    	id := GetBrokerId()
            status := Contain(id,brokers)
    return status
    }
    func StopBroker() {
    	status := GetStatus()
    	brokers := GetBrokerList()
    if (status == true) && (len(brokers) > 2) {
    	stop := RunCommand("service kafka stop")
    	fmt.Println(string(stop))
    	time.Sleep(60000 * time.Millisecond)
    	} 
    if (status == false) {
    	fmt.Println("Broker has been successful stopped")
    	} else {
    	StopBroker()
    	}
    }
    func StartBroker() {
    	status := GetStatus()
    	if (status == false) {
    	start := RunCommand("service kafka start")
    	fmt.Println(string(start))
    	time.Sleep(60000 * time.Millisecond)
    	}
    }
    func GetBrokerId() string {
    	output := RunCommand("cat /srv/kafka/meta.properties | grep broker.id | cut -d'=' -f2")
    	return strings.TrimRight(string(output),"\n")
    }
    func GetBrokerList() []string {
    	output1 := RunCommand("echo dump | nc localhost 2181 | grep brokers")
    	var brokers []string
    	lines:= strings.Split(string(output1),"\n")
    for _, line := range lines {
    	trimString := strings.TrimSpace(line)
    	finalString := strings.TrimPrefix(trimString, "/brokers/ids/")
    	brokers = append(brokers, finalString)
    	}
     return brokers
    }
    func Contain(val1 string, val2 []string) bool {
      for _,value := range val2 {
    	if (value == val1) {
    		return true
    }
    }
    	return false
    }
    func RunCommand(command string) []byte {
     cmd :=exec.Command("/bin/sh", "-c", command)
     result,err := cmd.Output()
     check(err)
     return result
    }
    
    
    func check(e error) {
        if e != nil {
            panic(e)
        }
    }
    

    Cheers

  • Fix under replicated partitions with controller restart

    Hi,

    If you have a Kafka cluster with only one broker that has zero under-replicated partitions and the rest have a number that is not equal to that value, than please be aware that it is not properly registered to the cluster.
    Taking a look in the state-change log on the instance that it’s the cluster controller will not show something particularly(all of them should be reported as being registered).

    That broker does not have the possibility to take partitions because the controller epoch does not see it to be functional(this is different from reachable).

    The easiest way to fix this is by a normal restart of the cluster controller. Please do not use kill or anything else, it will make things worse.

    Now, if a tail -50f /opt/kafka/server.log is executed, you will see that the process is trying to stop and all of the components are going down on a controlled fashion.
    What you will also see is that there are some errors regarding the partition that are hosted on the “problematic” broker, and that the errors are in a cycle. Please have patience with this processes and wait for it to be stopped as it should.

    On a restart, taking a look in the state-change log for the new cluster controller, you will see from time to time that the old broker that you restarted is reported to be unreachable. This is not a real problem and ignore those errors.

    After waiting for a while you will see that the under-replicated partition number will begin to decrease, hopefully to zero on all brokers.

    Cheers

  • Kafka service problem on upgrade to version 1.1.0

    Hi,

    If you are using version 1.1.0 or want to upgrade to it, and the method is by puppet module provided from voxpopuli, please be aware of this issue.

    In the template used for the init script that it’s located under /etc/init.d/kafka and as you can also see on the latest version below:

    https://github.com/voxpupuli/puppet-kafka/blob/master/templates/init.erb

    There are some lines that take the PID file for the kafka broker by using command

    `pgrep -f "$PGREP_PATTERN"`

    . This isn’t a problem for earlier version, but unfortunately for the latest, it doesn’t return anything causing for the init script to exit with return code 1 (my suspicion is that the process name changed).

    I fixed this by replacing this string with the following

    `ps -ef | grep "$PGREP_PATTERN" | grep -v grep | awk {'print $2}'`

    and it seems to work just fine.

    This doesn’t have any impact on the already configured and running cluster, and it will not restart your Kafka brokers.

    P.S: PGREP_PATTERN will resolve to kafka.Kafka which is the string to differentiate the broker instance

    Cheers

  • Non registered Zookeeper – why doesn’t it work?

    Morning,

    If you ever deploy a server via puppet or other automation language that has also zookeeper installed and you already have a working cluster, please be aware of this.

    Yesterday i rebuilt a node multiple times (there were some errors to fix), and after finally getting it right, the zookeeper instance did not behave as expected.
    When i took a look in the /var/lib/zookeeper directory, there was the correct myid file, that it’s also present in the config file, and version-2 directory.
    Normally the version-2 should host all the data stored for the zookeeper but there was only currentEpoch file with 0 in it.

    Multiple restarts, no result. The log didn’t contain anything relevant. Since the server was not live yet, i rebuilt it one more time but it had the same behavior. It looked like the node was completely out of sync, and that was the truth 😀

    I figured out eventually, by mistake, that the zookeeper was not yet registered (i tried to change the id of that zookeeper and restart the hole cluster)

    In order to register it, well, you need to restart the leader. How do you find it? There are multiple methods, i guess,here are two that are working. Either by running the following command

    echo stat | nc localhost 2181 | grep Mode

    Or by checking the exposed ports

    zookeeper start/running, process 15259
    root@server1:/var/lib/zookeeper/version-2# netstat -tulpen | grep 15259
    tcp6       0      0 :::42844                :::*                    LISTEN      107        1104606     15259/java      
    tcp6       0      0 :::2181                 :::*                    LISTEN      107        1114708     15259/java      
    tcp6       0      0 :::2183                 :::*                    LISTEN      107        1104609     15259/java      
    tcp6       0      0 :::9998                 :::*                    LISTEN      107        1104607     15259/java
    
    root@server2:/var/lib/zookeeper/version-2# netstat -tulpen | grep 28068
    tcp6       0      0 :::48577                :::*                    LISTEN      107        3182780     28068/java      
    tcp6       0      0 :::2181                 :::*                    LISTEN      107        3185668     28068/java      
    tcp6       0      0 :::2183                 :::*                    LISTEN      107        3184651     28068/java      
    tcp6       0      0 :::9998                 :::*                    LISTEN      107        3182781     28068/java   
    
    root@server3:/var/lib/zookeeper/version-2# netstat -tulpen | grep 20719
    tcp6       0      0 :::2181                 :::*                    LISTEN      107        5365296     20719/java      
    tcp6       0      0 :::2182                 :::*                    LISTEN      107        5382604     20719/java      
    tcp6       0      0 :::2183                 :::*                    LISTEN      107        5374105     20719/java      
    tcp6       0      0 :::36008                :::*                    LISTEN      107        5371417     20719/java      
    tcp6       0      0 :::9998                 :::*                    LISTEN      107        5371418     20719/java
    

    The leader always exposes the 2182(follower port) in order for the followers to grab the updates.

    After a short restart of the leader, everything works as expected!

    Cheers

  • Use case for deleting corrupted Kafka topic

    Hi,

    We had a week ago a case in which the client could not delete a topic from the cluster (Kafka version in this case was 1.0.0).
    When the topic was listed, there were no leaders assigned for the partitions. It was pretty clear that it would not delete it until we fixed it.
    First we tried a reassignment of partition in the idea that a leader would be assigned in this process. A JSON file was generated for the specified topic and executed using kafka-reassign-partitions.sh. After verification’s, we concluded that the reassignment failed.
    The next step was to delete the topic from the zookeeper meta-data cache.
    We came to this conclusion following article:
    https://community.hortonworks.com/articles/72622/how-kafka-delete-topic-command-works.html

    The command was

    rmr /brokers/topics/[topic_name]

    under zookeeper-shell.sh script. Running this, fixed our leader problem. It was strange, but very convenient.

    There was one extra thing we needed to do. Version 1.0.0 has an bug that affects the cluster controller – Error found in the log Cached zkVersion [3] not equal to that in zookeeper, skip updating ISR (kafka.cluster.Partition) – https://issues.apache.org/jira/browse/KAFKA-2729

    We restarted the cluster to fix this, but since there was already an active request for the topic delete, a refresh of this was required.
    In order to do that you can run

    rmr /admin/delete_topics/[topic_name]

    After doing so, the topic won’t be appear as marked for deletion, but if you run the delete command again, it will mark it and the controller will actively start the deletion process.

    That was also the case for us, after running the delete command again, the topic was removed from the brokers.

    Cheers

  • Log rotate for Kafka Garbage collect without restart

    Morning,

    If you have a Apache Kafka version which is below 1.0.0 and you don’t have garbage collect rotate as shown here:

    with:

    -Xloggc:/opt/kafka/bin/../logs/kafkaServer-gc.log -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 -XX:GCLogFileSize=100M

    without:

    -Xloggc:/opt/kafka/bin/../logs/kafkaServer-gc.log -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps

    One option is to modify the parameters in order to include them in the process that starts but this involves also restarting the services.

    However, you can use also logrotate daemon with the following configuration, and i will put the block that you need to add on hiera using:

    logrotate::rule:
     'kafka_gc':
       path: '/opt/kafka/logs/kafkaServer-gc.log'
       copytruncate: true
       rotate_every: 'day'
       compress: true
       missingok: true
       su: true
       su_owner: 'kafka'
       su_group: 'kafka'
       ifempty: false
       size: '50M'
       maxsize: '50M'
       rotate: 5
    

    Or if you want to write it in a class, it should look like

    $version = lookup('kafka::version')
            if ($_role =~ /\Akafka/) and ($version != '1.0.0') {
                logrotate::rule { 'kafkagc_logs':
                 path => '/opt/kafka/logs/kafkaServer-gc.log',
                 copytruncate => true,
                 rotate => 5,
                 rotate_every => 'daily',
                 missingok => true,
                 ifempty => false,
                 su => true,
                 su_owner => 'kafka',
                 su_group => 'kafka',
                 size => '50M',
                 maxsize => '50M',
            }                                              
    

    Cheers!