• Fix under replicated partitions with controller restart

    Hi,

    If you have a Kafka cluster with only one broker that has zero under-replicated partitions and the rest have a number that is not equal to that value, than please be aware that it is not properly registered to the cluster.
    Taking a look in the state-change log on the instance that it’s the cluster controller will not show something particularly(all of them should be reported as being registered).

    That broker does not have the possibility to take partitions because the controller epoch does not see it to be functional(this is different from reachable).

    The easiest way to fix this is by a normal restart of the cluster controller. Please do not use kill or anything else, it will make things worse.

    Now, if a tail -50f /opt/kafka/server.log is executed, you will see that the process is trying to stop and all of the components are going down on a controlled fashion.
    What you will also see is that there are some errors regarding the partition that are hosted on the “problematic” broker, and that the errors are in a cycle. Please have patience with this processes and wait for it to be stopped as it should.

    On a restart, taking a look in the state-change log for the new cluster controller, you will see from time to time that the old broker that you restarted is reported to be unreachable. This is not a real problem and ignore those errors.

    After waiting for a while you will see that the under-replicated partition number will begin to decrease, hopefully to zero on all brokers.

    Cheers

  • Kafka service problem on upgrade to version 1.1.0

    Hi,

    If you are using version 1.1.0 or want to upgrade to it, and the method is by puppet module provided from voxpopuli, please be aware of this issue.

    In the template used for the init script that it’s located under /etc/init.d/kafka and as you can also see on the latest version below:

    https://github.com/voxpupuli/puppet-kafka/blob/master/templates/init.erb

    There are some lines that take the PID file for the kafka broker by using command

    `pgrep -f "$PGREP_PATTERN"`

    . This isn’t a problem for earlier version, but unfortunately for the latest, it doesn’t return anything causing for the init script to exit with return code 1 (my suspicion is that the process name changed).

    I fixed this by replacing this string with the following

    `ps -ef | grep "$PGREP_PATTERN" | grep -v grep | awk {'print $2}'`

    and it seems to work just fine.

    This doesn’t have any impact on the already configured and running cluster, and it will not restart your Kafka brokers.

    P.S: PGREP_PATTERN will resolve to kafka.Kafka which is the string to differentiate the broker instance

    Cheers

  • IBM MQ crtmqm instance issue

    Morning,

    Short notice. If you are trying to create a queue manager and receive following error:

    AMQ8101: WebSphere MQ error (893) has occurred.

    and you also have lots of space in you qmgrs and log directory, don’t look further than that.

    In some cases it happens because the install in /var/mqm is not complete. The small fix was by manually defining /var/mqm/mqs.ini, but i advise you to reinstall the framework.

    Before:

    $ ls -ltr /var/mqm/
    total 8
    drwxr-xr-x    2 root     system          256 Mar 21 13:14 lost+found
    drwxrwsrwt    2 mqm      mqm             256 Mar 29 15:22 trace
    drwxrwsr-x    3 mqm      mqm             256 Mar 29 15:22 qmgrs
    drwxrwsr-x    3 mqm      mqm             256 Mar 29 15:22 sockets
    drwxrwsr-x    3 mqm      mqm             256 Mar 29 15:22 shared
    drwxrwsrwt    2 mqm      mqm            4096 Apr 03 12:10 errors
    

    After:

    $ ls -ltr /var/mqm
    total 32
    drwxr-xr-x    2 root     system          256 Mar 21 13:14 lost+found
    drwxrwsrwx    2 mqm      mqm             256 Mar 29 15:22 trace
    drwxrwsrwx    2 mqm      mqm            4096 Apr 03 12:10 errors
    drwxrwsr-x    3 mqm      mqm             256 Apr 03 12:14 sockets
    drwxrwsr-x    3 mqm      mqm             256 Apr 03 12:14 qmgrs
    drwxrwsr-x    2 mqm      mqm             256 Apr 03 12:20 config
    drwxrwsr-x    3 mqm      mqm             256 Apr 03 12:20 conv
    drwxrwsr-x    2 mqm      mqm             256 Apr 03 12:20 log
    -rw-rw-r--    1 mqm      mqm            1941 Apr 03 12:20 service.env
    drwxrwsr-x    5 mqm      mqm             256 Apr 03 12:20 mqft
    drwxrwsr-x    4 mqm      mqm             256 Apr 03 12:20 shared
    -rw-rw-r--    1 mqm      mqm            1156 Apr 03 12:20 mqs.ini
    -rw-rw-r--    1 mqm      mqm             637 Apr 03 12:20 mqclient.ini
    drwxrwsr-x    3 mqm      mqm             256 Apr 03 12:20 exits
    drwxrwsr-x    3 mqm      mqm             256 Apr 03 12:20 exits64
    

    Cheers!

  • Command to start sysdig container – redundant but useful

    Hi,

    This is more like a easier way to find the command without searching the net:

    docker run -it --rm --name=sysdig --privileged=true \
       --volume=/var/run/docker.sock:/host/var/run/docker.sock \
       --volume=/dev:/host/dev \
       --volume=/proc:/host/proc:ro \
       --volume=/boot:/host/boot:ro \
       --volume=/lib/modules:/host/lib/modules:ro \
       --volume=/usr:/host/usr:ro \
       sysdig/sysdig

    The actual command on starting a sysdig container. I will get more in depth with some Kafka cluster aggregated info from this amazing tool and also what it takes to send it to an elastic cluster.
    It will be challenging, but this is how it goes in IT in our days.

    Cheers

  • Non registered Zookeeper – why doesn’t it work?

    Morning,

    If you ever deploy a server via puppet or other automation language that has also zookeeper installed and you already have a working cluster, please be aware of this.

    Yesterday i rebuilt a node multiple times (there were some errors to fix), and after finally getting it right, the zookeeper instance did not behave as expected.
    When i took a look in the /var/lib/zookeeper directory, there was the correct myid file, that it’s also present in the config file, and version-2 directory.
    Normally the version-2 should host all the data stored for the zookeeper but there was only currentEpoch file with 0 in it.

    Multiple restarts, no result. The log didn’t contain anything relevant. Since the server was not live yet, i rebuilt it one more time but it had the same behavior. It looked like the node was completely out of sync, and that was the truth 😀

    I figured out eventually, by mistake, that the zookeeper was not yet registered (i tried to change the id of that zookeeper and restart the hole cluster)

    In order to register it, well, you need to restart the leader. How do you find it? There are multiple methods, i guess,here are two that are working. Either by running the following command

    echo stat | nc localhost 2181 | grep Mode

    Or by checking the exposed ports

    zookeeper start/running, process 15259
    root@server1:/var/lib/zookeeper/version-2# netstat -tulpen | grep 15259
    tcp6       0      0 :::42844                :::*                    LISTEN      107        1104606     15259/java      
    tcp6       0      0 :::2181                 :::*                    LISTEN      107        1114708     15259/java      
    tcp6       0      0 :::2183                 :::*                    LISTEN      107        1104609     15259/java      
    tcp6       0      0 :::9998                 :::*                    LISTEN      107        1104607     15259/java
    
    root@server2:/var/lib/zookeeper/version-2# netstat -tulpen | grep 28068
    tcp6       0      0 :::48577                :::*                    LISTEN      107        3182780     28068/java      
    tcp6       0      0 :::2181                 :::*                    LISTEN      107        3185668     28068/java      
    tcp6       0      0 :::2183                 :::*                    LISTEN      107        3184651     28068/java      
    tcp6       0      0 :::9998                 :::*                    LISTEN      107        3182781     28068/java   
    
    root@server3:/var/lib/zookeeper/version-2# netstat -tulpen | grep 20719
    tcp6       0      0 :::2181                 :::*                    LISTEN      107        5365296     20719/java      
    tcp6       0      0 :::2182                 :::*                    LISTEN      107        5382604     20719/java      
    tcp6       0      0 :::2183                 :::*                    LISTEN      107        5374105     20719/java      
    tcp6       0      0 :::36008                :::*                    LISTEN      107        5371417     20719/java      
    tcp6       0      0 :::9998                 :::*                    LISTEN      107        5371418     20719/java
    

    The leader always exposes the 2182(follower port) in order for the followers to grab the updates.

    After a short restart of the leader, everything works as expected!

    Cheers

  • Use case for deleting corrupted Kafka topic

    Hi,

    We had a week ago a case in which the client could not delete a topic from the cluster (Kafka version in this case was 1.0.0).
    When the topic was listed, there were no leaders assigned for the partitions. It was pretty clear that it would not delete it until we fixed it.
    First we tried a reassignment of partition in the idea that a leader would be assigned in this process. A JSON file was generated for the specified topic and executed using kafka-reassign-partitions.sh. After verification’s, we concluded that the reassignment failed.
    The next step was to delete the topic from the zookeeper meta-data cache.
    We came to this conclusion following article:
    https://community.hortonworks.com/articles/72622/how-kafka-delete-topic-command-works.html

    The command was

    rmr /brokers/topics/[topic_name]

    under zookeeper-shell.sh script. Running this, fixed our leader problem. It was strange, but very convenient.

    There was one extra thing we needed to do. Version 1.0.0 has an bug that affects the cluster controller – Error found in the log Cached zkVersion [3] not equal to that in zookeeper, skip updating ISR (kafka.cluster.Partition) – https://issues.apache.org/jira/browse/KAFKA-2729

    We restarted the cluster to fix this, but since there was already an active request for the topic delete, a refresh of this was required.
    In order to do that you can run

    rmr /admin/delete_topics/[topic_name]

    After doing so, the topic won’t be appear as marked for deletion, but if you run the delete command again, it will mark it and the controller will actively start the deletion process.

    That was also the case for us, after running the delete command again, the topic was removed from the brokers.

    Cheers

  • Log rotate for Kafka Garbage collect without restart

    Morning,

    If you have a Apache Kafka version which is below 1.0.0 and you don’t have garbage collect rotate as shown here:

    with:

    -Xloggc:/opt/kafka/bin/../logs/kafkaServer-gc.log -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 -XX:GCLogFileSize=100M

    without:

    -Xloggc:/opt/kafka/bin/../logs/kafkaServer-gc.log -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps

    One option is to modify the parameters in order to include them in the process that starts but this involves also restarting the services.

    However, you can use also logrotate daemon with the following configuration, and i will put the block that you need to add on hiera using:

    logrotate::rule:
     'kafka_gc':
       path: '/opt/kafka/logs/kafkaServer-gc.log'
       copytruncate: true
       rotate_every: 'day'
       compress: true
       missingok: true
       su: true
       su_owner: 'kafka'
       su_group: 'kafka'
       ifempty: false
       size: '50M'
       maxsize: '50M'
       rotate: 5
    

    Or if you want to write it in a class, it should look like

    $version = lookup('kafka::version')
            if ($_role =~ /\Akafka/) and ($version != '1.0.0') {
                logrotate::rule { 'kafkagc_logs':
                 path => '/opt/kafka/logs/kafkaServer-gc.log',
                 copytruncate => true,
                 rotate => 5,
                 rotate_every => 'daily',
                 missingok => true,
                 ifempty => false,
                 su => true,
                 su_owner => 'kafka',
                 su_group => 'kafka',
                 size => '50M',
                 maxsize => '50M',
            }                                              
    

    Cheers!

  • Consumer group coordinator in Kafka using some scala script

    Morning,

    Just a small post regarding returning a consumer group coordinator for a specific consumer group.
    We had the issue that consumer groups are re-balancing and we didn’t knew if it’s related to application logic or the Consumer Group Coordinator was changing and the Kafka cluster was reassign a different one each time. So, a small piece of code was needed. I was using the libraries that are sent with Kafka 1.0.0 for this test so be aware of the classpath update if you want to modify this.

    In order to do the test, i started a standalone Confluent Kafka image which normally listens on port 29092. For more details please consult their documentation here

    I also created a test topic with one partition and same replication factor. Produced some messages in the topic and after that started a console consumer:

    sorin@debian-test:~/kafka_2.11-1.0.0/bin$ ./kafka-console-consumer.sh --bootstrap-server localhost:29092 --topic test --from-beginning
    test
    message
    test message
    

    Once this is started you can also see it using consumer-groups command like this:

    sorin@debian-test:~/kafka_2.11-1.0.0/bin$ ./kafka-consumer-groups.sh --bootstrap-server localhost:29092 --list
    Note: This will not show information about old Zookeeper-based consumers.
    
    console-consumer-49198
    console-consumer-66063
    console-consumer-77631
    

    Now my console consumer is identified by console-consumer-77631 and in order to see the group coordinator you will have to run something like:

    ./getconsumercoordinator.sh localhost 29092 console-consumer-77631
    warning: there were three deprecation warnings; re-run with -deprecation for details
    one warning found
    Creating connection to: localhost 29092 
    log4j:WARN No appenders could be found for logger (kafka.network.BlockingChannel).
    log4j:WARN Please initialize the log4j system properly.
    log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
    Channel connected
    SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
    SLF4J: Defaulting to no-operation (NOP) logger implementation
    SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
    GroupCoordinatorResponse(Some(BrokerEndPoint(1001,localhost,29092)),NONE,0)
    

    It’s clear that since we have only one broker, that is also the coordinator.
    Regarding the details for the code i used this link and also, in order to search for all of the dependencies, since i don’t have a scala project, just a script the following command was of great use

    for i in *.jar; do jar -tvf "$i" | grep -Hsi ClassName && echo "$i"; done

    Here is also the code:

    #!/bin/sh
    exec scala -classpath "/home/sorin/kafka_2.11-1.0.0/libs/kafka-clients-1.0.0.jar:/home/sorin/kafka_2.11-1.0.0/libs/kafka_2.11-1.0.0.jar:/home/sorin/kafka_2.11-1.0.0/libs/slf4j-api-1.7.25.jar:/home/sorin/kafka_2.11-1.0.0/libs/jackson-core-2.9.1.jar:/home/sorin/kafka_2.11-1.0.0/libs/jackson-databind-2.9.1.jar:/home/sorin/kafka_2.11-1.0.0/libs/jackson-annotations-2.9.1.jar:/home/sorin/kafka_2.11-1.0.0/libs/log4j-1.2.17.jar" "$0" "$1" "$2" "$@"
    !#
    
    import com.fasterxml.jackson.core._
    import com.fasterxml.jackson.databind.ObjectMapper
    import kafka.network.BlockingChannel
    import kafka.api.GroupCoordinatorRequest
    import kafka.api.GroupCoordinatorResponse
    import org.slf4j.LoggerFactory
    
    val hostname = args(0)
    val port = args(1).toInt
    val group = args(2)
    println("Creating connection to: " + hostname + " " + port + " ")
    
    var channel = new BlockingChannel(hostname, port, 1048576, 1048576, readTimeoutMs = 50000)
    channel.connect()
    if (channel.isConnected) {
      println("Channel connected")
    channel.send(GroupCoordinatorRequest(group))
    val metadataResponse = GroupCoordinatorResponse.readFrom(channel.receive.payload())
    println(metadataResponse) }
    channel.disconnect()
    

    Regarding the code, the first part is to run scala from shell script, you need to update the lasspath with all libraries and also specify how many parameters to be used. In our case this is three. Also, if you won’t add all of the jackson, log4j and slf4j dependencies, it won’t work.

    P.S: It will work also by running exec scala -classpath "/home/sorin/kafka_2.11-1.0.0/libs/*

    Cheers!

  • Apache Kafka technical logs to ELK stack

    Morning,

    Just wanted to share with you the following article:

    https://www.elastic.co/blog/monitoring-kafka-with-elastic-stack-1-filebeat

    I will try to share my experience once i have all i need to start working on it. As far as i understood from managing Kafka for a while, monitoring is not enough, this is mandatory in order to manage you clusters in a clear and transparent mattter.

    Cheers

  • Important considerations regarding Kafka MirrorMaker

    Morning folks,

    Just wanted to share with you two things that i learned when playing with Kafka MirrorMaker this morning.

    First is that if you want to put MirrorMaker or other tool that uses Kafka on debug, all you have to do is to modify the tools-log4j.properties file from the config folder of you Apache Kafka installation. It normally looks like this

    log4j.rootLogger=WARN, stderr
    
    log4j.appender.stderr=org.apache.log4j.ConsoleAppender
    log4j.appender.stderr.layout=org.apache.log4j.PatternLayout
    log4j.appender.stderr.layout.ConversionPattern=[%d] %p %m (%c)%n
    log4j.appender.stderr.Target=System.err
    

    but you can easily set it to

    log4j.rootLogger=DEBUG, stderr
    
    log4j.appender.stderr=org.apache.log4j.ConsoleAppender
    log4j.appender.stderr.layout=org.apache.log4j.PatternLayout
    log4j.appender.stderr.layout.ConversionPattern=[%d] %p %m (%c)%n
    log4j.appender.stderr.Target=System.err
    

    The second one is related to property that you should put on your consumer.config file in order to grab the data from the topic start and not from the latest offset.

    In order to achieve this you will need to add the following option:

    auto.offset.reset=earliest

    The possible values for this setting are latest, earliest, none.

    Cheers