• Non registered Zookeeper – why doesn’t it work?

    Morning,

    If you ever deploy a server via puppet or other automation language that has also zookeeper installed and you already have a working cluster, please be aware of this.

    Yesterday i rebuilt a node multiple times (there were some errors to fix), and after finally getting it right, the zookeeper instance did not behave as expected.
    When i took a look in the /var/lib/zookeeper directory, there was the correct myid file, that it’s also present in the config file, and version-2 directory.
    Normally the version-2 should host all the data stored for the zookeeper but there was only currentEpoch file with 0 in it.

    Multiple restarts, no result. The log didn’t contain anything relevant. Since the server was not live yet, i rebuilt it one more time but it had the same behavior. It looked like the node was completely out of sync, and that was the truth 😀

    I figured out eventually, by mistake, that the zookeeper was not yet registered (i tried to change the id of that zookeeper and restart the hole cluster)

    In order to register it, well, you need to restart the leader. How do you find it? There are multiple methods, i guess,here are two that are working. Either by running the following command

    echo stat | nc localhost 2181 | grep Mode

    Or by checking the exposed ports

    zookeeper start/running, process 15259
    root@server1:/var/lib/zookeeper/version-2# netstat -tulpen | grep 15259
    tcp6       0      0 :::42844                :::*                    LISTEN      107        1104606     15259/java      
    tcp6       0      0 :::2181                 :::*                    LISTEN      107        1114708     15259/java      
    tcp6       0      0 :::2183                 :::*                    LISTEN      107        1104609     15259/java      
    tcp6       0      0 :::9998                 :::*                    LISTEN      107        1104607     15259/java
    
    root@server2:/var/lib/zookeeper/version-2# netstat -tulpen | grep 28068
    tcp6       0      0 :::48577                :::*                    LISTEN      107        3182780     28068/java      
    tcp6       0      0 :::2181                 :::*                    LISTEN      107        3185668     28068/java      
    tcp6       0      0 :::2183                 :::*                    LISTEN      107        3184651     28068/java      
    tcp6       0      0 :::9998                 :::*                    LISTEN      107        3182781     28068/java   
    
    root@server3:/var/lib/zookeeper/version-2# netstat -tulpen | grep 20719
    tcp6       0      0 :::2181                 :::*                    LISTEN      107        5365296     20719/java      
    tcp6       0      0 :::2182                 :::*                    LISTEN      107        5382604     20719/java      
    tcp6       0      0 :::2183                 :::*                    LISTEN      107        5374105     20719/java      
    tcp6       0      0 :::36008                :::*                    LISTEN      107        5371417     20719/java      
    tcp6       0      0 :::9998                 :::*                    LISTEN      107        5371418     20719/java
    

    The leader always exposes the 2182(follower port) in order for the followers to grab the updates.

    After a short restart of the leader, everything works as expected!

    Cheers

  • Use case for deleting corrupted Kafka topic

    Hi,

    We had a week ago a case in which the client could not delete a topic from the cluster (Kafka version in this case was 1.0.0).
    When the topic was listed, there were no leaders assigned for the partitions. It was pretty clear that it would not delete it until we fixed it.
    First we tried a reassignment of partition in the idea that a leader would be assigned in this process. A JSON file was generated for the specified topic and executed using kafka-reassign-partitions.sh. After verification’s, we concluded that the reassignment failed.
    The next step was to delete the topic from the zookeeper meta-data cache.
    We came to this conclusion following article:
    https://community.hortonworks.com/articles/72622/how-kafka-delete-topic-command-works.html

    The command was

    rmr /brokers/topics/[topic_name]

    under zookeeper-shell.sh script. Running this, fixed our leader problem. It was strange, but very convenient.

    There was one extra thing we needed to do. Version 1.0.0 has an bug that affects the cluster controller – Error found in the log Cached zkVersion [3] not equal to that in zookeeper, skip updating ISR (kafka.cluster.Partition) – https://issues.apache.org/jira/browse/KAFKA-2729

    We restarted the cluster to fix this, but since there was already an active request for the topic delete, a refresh of this was required.
    In order to do that you can run

    rmr /admin/delete_topics/[topic_name]

    After doing so, the topic won’t be appear as marked for deletion, but if you run the delete command again, it will mark it and the controller will actively start the deletion process.

    That was also the case for us, after running the delete command again, the topic was removed from the brokers.

    Cheers

  • Log rotate for Kafka Garbage collect without restart

    Morning,

    If you have a Apache Kafka version which is below 1.0.0 and you don’t have garbage collect rotate as shown here:

    with:

    -Xloggc:/opt/kafka/bin/../logs/kafkaServer-gc.log -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 -XX:GCLogFileSize=100M

    without:

    -Xloggc:/opt/kafka/bin/../logs/kafkaServer-gc.log -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps

    One option is to modify the parameters in order to include them in the process that starts but this involves also restarting the services.

    However, you can use also logrotate daemon with the following configuration, and i will put the block that you need to add on hiera using:

    logrotate::rule:
     'kafka_gc':
       path: '/opt/kafka/logs/kafkaServer-gc.log'
       copytruncate: true
       rotate_every: 'day'
       compress: true
       missingok: true
       su: true
       su_owner: 'kafka'
       su_group: 'kafka'
       ifempty: false
       size: '50M'
       maxsize: '50M'
       rotate: 5
    

    Or if you want to write it in a class, it should look like

    $version = lookup('kafka::version')
            if ($_role =~ /\Akafka/) and ($version != '1.0.0') {
                logrotate::rule { 'kafkagc_logs':
                 path => '/opt/kafka/logs/kafkaServer-gc.log',
                 copytruncate => true,
                 rotate => 5,
                 rotate_every => 'daily',
                 missingok => true,
                 ifempty => false,
                 su => true,
                 su_owner => 'kafka',
                 su_group => 'kafka',
                 size => '50M',
                 maxsize => '50M',
            }                                              
    

    Cheers!

  • Consumer group coordinator in Kafka using some scala script

    Morning,

    Just a small post regarding returning a consumer group coordinator for a specific consumer group.
    We had the issue that consumer groups are re-balancing and we didn’t knew if it’s related to application logic or the Consumer Group Coordinator was changing and the Kafka cluster was reassign a different one each time. So, a small piece of code was needed. I was using the libraries that are sent with Kafka 1.0.0 for this test so be aware of the classpath update if you want to modify this.

    In order to do the test, i started a standalone Confluent Kafka image which normally listens on port 29092. For more details please consult their documentation here

    I also created a test topic with one partition and same replication factor. Produced some messages in the topic and after that started a console consumer:

    sorin@debian-test:~/kafka_2.11-1.0.0/bin$ ./kafka-console-consumer.sh --bootstrap-server localhost:29092 --topic test --from-beginning
    test
    message
    test message
    

    Once this is started you can also see it using consumer-groups command like this:

    sorin@debian-test:~/kafka_2.11-1.0.0/bin$ ./kafka-consumer-groups.sh --bootstrap-server localhost:29092 --list
    Note: This will not show information about old Zookeeper-based consumers.
    
    console-consumer-49198
    console-consumer-66063
    console-consumer-77631
    

    Now my console consumer is identified by console-consumer-77631 and in order to see the group coordinator you will have to run something like:

    ./getconsumercoordinator.sh localhost 29092 console-consumer-77631
    warning: there were three deprecation warnings; re-run with -deprecation for details
    one warning found
    Creating connection to: localhost 29092 
    log4j:WARN No appenders could be found for logger (kafka.network.BlockingChannel).
    log4j:WARN Please initialize the log4j system properly.
    log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
    Channel connected
    SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
    SLF4J: Defaulting to no-operation (NOP) logger implementation
    SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
    GroupCoordinatorResponse(Some(BrokerEndPoint(1001,localhost,29092)),NONE,0)
    

    It’s clear that since we have only one broker, that is also the coordinator.
    Regarding the details for the code i used this link and also, in order to search for all of the dependencies, since i don’t have a scala project, just a script the following command was of great use

    for i in *.jar; do jar -tvf "$i" | grep -Hsi ClassName && echo "$i"; done

    Here is also the code:

    #!/bin/sh
    exec scala -classpath "/home/sorin/kafka_2.11-1.0.0/libs/kafka-clients-1.0.0.jar:/home/sorin/kafka_2.11-1.0.0/libs/kafka_2.11-1.0.0.jar:/home/sorin/kafka_2.11-1.0.0/libs/slf4j-api-1.7.25.jar:/home/sorin/kafka_2.11-1.0.0/libs/jackson-core-2.9.1.jar:/home/sorin/kafka_2.11-1.0.0/libs/jackson-databind-2.9.1.jar:/home/sorin/kafka_2.11-1.0.0/libs/jackson-annotations-2.9.1.jar:/home/sorin/kafka_2.11-1.0.0/libs/log4j-1.2.17.jar" "$0" "$1" "$2" "$@"
    !#
    
    import com.fasterxml.jackson.core._
    import com.fasterxml.jackson.databind.ObjectMapper
    import kafka.network.BlockingChannel
    import kafka.api.GroupCoordinatorRequest
    import kafka.api.GroupCoordinatorResponse
    import org.slf4j.LoggerFactory
    
    val hostname = args(0)
    val port = args(1).toInt
    val group = args(2)
    println("Creating connection to: " + hostname + " " + port + " ")
    
    var channel = new BlockingChannel(hostname, port, 1048576, 1048576, readTimeoutMs = 50000)
    channel.connect()
    if (channel.isConnected) {
      println("Channel connected")
    channel.send(GroupCoordinatorRequest(group))
    val metadataResponse = GroupCoordinatorResponse.readFrom(channel.receive.payload())
    println(metadataResponse) }
    channel.disconnect()
    

    Regarding the code, the first part is to run scala from shell script, you need to update the lasspath with all libraries and also specify how many parameters to be used. In our case this is three. Also, if you won’t add all of the jackson, log4j and slf4j dependencies, it won’t work.

    P.S: It will work also by running exec scala -classpath "/home/sorin/kafka_2.11-1.0.0/libs/*

    Cheers!

  • Apache Kafka technical logs to ELK stack

    Morning,

    Just wanted to share with you the following article:

    https://www.elastic.co/blog/monitoring-kafka-with-elastic-stack-1-filebeat

    I will try to share my experience once i have all i need to start working on it. As far as i understood from managing Kafka for a while, monitoring is not enough, this is mandatory in order to manage you clusters in a clear and transparent mattter.

    Cheers

  • Important considerations regarding Kafka MirrorMaker

    Morning folks,

    Just wanted to share with you two things that i learned when playing with Kafka MirrorMaker this morning.

    First is that if you want to put MirrorMaker or other tool that uses Kafka on debug, all you have to do is to modify the tools-log4j.properties file from the config folder of you Apache Kafka installation. It normally looks like this

    log4j.rootLogger=WARN, stderr
    
    log4j.appender.stderr=org.apache.log4j.ConsoleAppender
    log4j.appender.stderr.layout=org.apache.log4j.PatternLayout
    log4j.appender.stderr.layout.ConversionPattern=[%d] %p %m (%c)%n
    log4j.appender.stderr.Target=System.err
    

    but you can easily set it to

    log4j.rootLogger=DEBUG, stderr
    
    log4j.appender.stderr=org.apache.log4j.ConsoleAppender
    log4j.appender.stderr.layout=org.apache.log4j.PatternLayout
    log4j.appender.stderr.layout.ConversionPattern=[%d] %p %m (%c)%n
    log4j.appender.stderr.Target=System.err
    

    The second one is related to property that you should put on your consumer.config file in order to grab the data from the topic start and not from the latest offset.

    In order to achieve this you will need to add the following option:

    auto.offset.reset=earliest

    The possible values for this setting are latest, earliest, none.

    Cheers

  • List differences between two sftp hosts using golang

    Hi,

    Just as a intermediate post as i wanted to play a little bit with golang, let me show you what i managed to put together in some days. I created a virtual machine on which i installed docker and grabbed a sftp image. You can try first two from Docker Hub, it should work.
    So i pulled this image and initiated two containers as shown below:

    eaf3b93798b5        asavartzeth/sftp    "/entrypoint.sh /u..."   21 hours ago        Up About a minute         0.0.0.0:2225->22/tcp   server4
    ec7d7e1d029f        asavartzeth/sftp    "/entrypoint.sh /u..."   21 hours ago        Up About a minute         0.0.0.0:2224->22/tcp   server3
    

    The command to do this looks like:

    docker run --name server3 -v /home/sorin/sftp1:/chroot/sorin:rw -e SFTP_USER=sorin -e SFTP_PASS=pass -p 2224:22 -d asavartzeth/sftp
    docker run --name server4 -v /home/sorin/sftp2:/chroot/sorin:rw -e SFTP_USER=sorin -e SFTP_PASS=pass -p 2225:22 -d asavartzeth/sftp

    Main info to know about these containers is that they should be accessible by user sorin and the path were the external directories are mapped is on /chroot/sorin.

    You can manually test the connection by using a simple command like:

    sftp -P 2224 sorin@localhost

    If you are using the container ip address i observed that you will use the default 22 port to connect to them. Not really clear why but this is not about that.

    Once the servers are up and running you can test the differences between the structure using following code:

    
    package main
    
    import (
    	"fmt"
    
    	"github.com/pkg/sftp"
    	"golang.org/x/crypto/ssh"
    )
    
    type ServerFiles struct {
    	Name  string
    	files []string
    }
    
    func main() {
    
    	server1client := ConnectSftp("localhost:2224", "sorin", "pass")
    	server1files := ReadPath(server1client)
    	server1struct := BuildStruct("172.17.0.2", server1files)
    	server2client := ConnectSftp("localhost:2225", "sorin", "pass")
    	server2files := ReadPath(server2client)
    	server2struct := BuildStruct("172.17.0.3", server2files)
    	diffilesstruct := CompareStruct(server1struct, server2struct)
            for _, values := range diffilestruct.files {
            fmt.Printf("%s %s\n", diffilesstruct.Name, values)
     }
    	CloseConnection(server1client)
    	CloseConnection(server2client)
    }
    func CheckError(err error) {
    	if err != nil {
    		panic(err)
    	}
    }
    func ConnectSftp(address string, user string, password string) *sftp.Client {
    	config := &ssh.ClientConfig{
    		User: user,
    		Auth: []ssh.AuthMethod{
    			ssh.Password(password),
    		},
    		HostKeyCallback: ssh.InsecureIgnoreHostKey(),
    	}
    	conn, err := ssh.Dial("tcp", address, config)
    	CheckError(err)
    
    	client, err := sftp.NewClient(conn)
    	CheckError(err)
    
    	return client
    }
    func ReadPath(client *sftp.Client) []string {
    	var paths []string
    	w := client.Walk("/")
    	for w.Step() {
    		if w.Err() != nil {
    			continue
    		}
    		
    		paths = append(paths, w.Path())
    	}
    	return paths
    }
    func BuildStruct(address string, files []string) *ServerFiles {
    	server := new(ServerFiles)
    	server.Name = address
    	server.files = files
    
    	return server
    }
    func CompareStruct(struct1 *ServerFiles, struct2 *ServerFiles) *ServerFiles {
    
    	diff := difference(struct1.files, struct2.files)
    	diffstruct := new(ServerFiles)
    	for _, value := range diff {
    		for _, valueP := range struct1.files {
    			if valueP == value {
    				
    				diffstruct.Name = struct1.Name
    				diffstruct.files = append(diffstruct.files, valueP)
    			}
    		}
    		for _, valueQ := range struct2.files {
    			if valueQ == value {
    				
    				diffstruct.Name = struct2.Name
    				diffstruct.files = append(diffstruct.files, valueQ)
    			}
    		}
    	}
    	return diffstruct
    }
    func difference(slice1 []string, slice2 []string) []string {
    	var diff []string
    
    	// Loop two times, first to find slice1 strings not in slice2,
    	// second loop to find slice2 strings not in slice1
    	for i := 0; i < 2; i++ {
    		for _, s1 := range slice1 {
    			found := false
    			for _, s2 := range slice2 {
    				if s1 == s2 {
    					found = true
    					break
    				}
    			}
    			// String not found. We add it to return slice
    			if !found {
    				diff = append(diff, s1)
    			}
    		}
    		// Swap the slices, only if it was the first loop
    		if i == 0 {
    			slice1, slice2 = slice2, slice1
    		}
    	}
    
    	return diff
    }
    func CloseConnection(client *sftp.Client) {
    	client.Close()
    }

    This actually connects to each server, reads the hole filepath and puts it on a structure. After this is done for both servers, there is a method that compares only the slice part of the struct and returns the differences. On this differences there is another structure constructed with only the differences.
    It is true that i took the differences func from stackoverflow, and it's far from good code, but i am working on it, this is the first draft, i will post different versions as it gets better.

    The output if there are differences will look like this:

    172.17.0.2 /sorin/subdirectory
    172.17.0.2 /sorin/subdirectory/subtest.file
    172.17.0.2 /sorin/test.file
    172.17.0.3 /sorin/test2
    

    If there are no differences that it will just exit.
    Working on improving my golang experience. Keep you posted.

    Cheers!

  • How to change root password on Debian – after vacation

    Morning,

    Since i had a vacation and completely forgot all my passwords for Debian VM i fixed it using this article. Very useful!

    https://pve.proxmox.com/wiki/Root_Password_Reset

    Cheers!

  • Observer functionality for puppet zookeeper module

    Morning,

    I know it’s been some time since i last posted but i didn’t had the time to play that much. Today i want to share with you the use case in which we needed to modify the module used for the deployment of zookeeper in order to include also observer role.

    The link that describes how this should be activated from version 3.3.0 is located here: https://zookeeper.apache.org/doc/trunk/zookeeperObservers.html

    Taking this situation we are using for deployment module https://github.com/wikimedia/puppet-zookeeper

    It’s not a nice module, trust me, i know, but since we did not want to take the development process from beginning and impact the infrastructure that it’s already deployed we had to cope with this situation by changing what we had.

    Main idea in our case is that since the number of zookeeper members for the election process needs to be 2n+1 in order for the Quorum mechanism to work, deployment of even number of machines was pretty tricky, so to fix this, the extra zookeeper instances over requirements should be set as observers

    A zookeeper observer is a node that it’s not included in the election process and just receives the updates from the cluster.

    My vision is that the best approach for delivery is to activate it in Hiera with a zookeeper::observer parameter per host.

    We can start by including it in the defaults.pp file as follows:

     $observer	      = hiera('zookeeper::observer', false)

    The zoo.conf file deployed for the configuration is being written in the init.pp file so we need to add it also here as parameter

    $observer	   = $::zookeeper::defaults::observer

    Ok, now how do we share the status of each node in the required domain? We will need to use another module https://github.com/WhatsARanjit/puppet-share_data and include in our code something like:

     share_data { $::fqdn:
      	    data  => [ $::fqdn, $observer ],
      	    label => 'role',
        }
       $obsrole = share_data::retrieve('role')
    

    This guarantees us that all servers have and can use the observer flag in the erb template.

    Jumping to the last component of this config, we need to modify the template to have it with the added observer role.

    How do we do that? Basically by rewriting the server information in this format:

    <% if @hosts
     @hosts.sort_by { |name, id| id }.each do |host_id| -%>
    server.<%= host_id[1] %>=<%= host_id[0] %>:2182:2183<% @obsrole.each do |item| if (item[0] == host_id[0]) && item[1] -%>:observer<% end -%><% end -%> 
    <% end -%>
    <% end -%>
    

    Straight forward this compares the values from the two lists and if the flag is true, it adds the observer configuration.
    One last part needs to be added and that is

    <% if @observer == true -%>
    peerType=observer
    <% end -%>
    

    And you are done, if you add zookeeper::observer: true to your yaml file, puppet should rewrite the file and restart Zookeeper service.

    Cheers

  • Memory debug by Heroku guys on Apache Kafka – nice one

    Hi,

    I know, i should write more about my experience with Apache Kafka, have patience, it’s still building, but until then please check this article:

    https://blog.heroku.com/fixing-kafka-memory-leak

    Be aware of the things that you want to include in functionalities and code that is written beside Apache Kafka functionalities, it might get you in to trouble.

    I am very happy that sysdig is used by more and more teams for debug, it’s truly a great tool for this kind of situations.

    Cheers!