Tag: kafka

  • Delete corrupted Kafka topic version 2.0

    Hi,

    We had in the past the situation described in this link use-case-deleting-corrupted-kafka-topic
    The situation repeated a little bit different this time. Taking a look on the list of topics, there were three topics marked for deletion.
    None of them had a Leader or Isr, so after a little bit of investigation the conclusion that they weren’t available anymore on the filesystem.
    My assumption is that the cluster controller began the delete process but failed before sending a new metadata update to the zookeepers.
    A restart of the cluster controller was performed in order to provide a new epoch, and after that, manual deletion of the “deletion request” and metadata from the zookeepers(you can find the commands in the above link).

    On another list, everything looks good.

    Cheers

  • Kafka service problem on upgrade to version 1.1.0

    Hi,

    If you are using version 1.1.0 or want to upgrade to it, and the method is by puppet module provided from voxpopuli, please be aware of this issue.

    In the template used for the init script that it’s located under /etc/init.d/kafka and as you can also see on the latest version below:

    https://github.com/voxpupuli/puppet-kafka/blob/master/templates/init.erb

    There are some lines that take the PID file for the kafka broker by using command

    `pgrep -f "$PGREP_PATTERN"`

    . This isn’t a problem for earlier version, but unfortunately for the latest, it doesn’t return anything causing for the init script to exit with return code 1 (my suspicion is that the process name changed).

    I fixed this by replacing this string with the following

    `ps -ef | grep "$PGREP_PATTERN" | grep -v grep | awk {'print $2}'`

    and it seems to work just fine.

    This doesn’t have any impact on the already configured and running cluster, and it will not restart your Kafka brokers.

    P.S: PGREP_PATTERN will resolve to kafka.Kafka which is the string to differentiate the broker instance

    Cheers

  • Consumer group coordinator in Kafka using some scala script

    Morning,

    Just a small post regarding returning a consumer group coordinator for a specific consumer group.
    We had the issue that consumer groups are re-balancing and we didn’t knew if it’s related to application logic or the Consumer Group Coordinator was changing and the Kafka cluster was reassign a different one each time. So, a small piece of code was needed. I was using the libraries that are sent with Kafka 1.0.0 for this test so be aware of the classpath update if you want to modify this.

    In order to do the test, i started a standalone Confluent Kafka image which normally listens on port 29092. For more details please consult their documentation here

    I also created a test topic with one partition and same replication factor. Produced some messages in the topic and after that started a console consumer:

    sorin@debian-test:~/kafka_2.11-1.0.0/bin$ ./kafka-console-consumer.sh --bootstrap-server localhost:29092 --topic test --from-beginning
    test
    message
    test message
    

    Once this is started you can also see it using consumer-groups command like this:

    sorin@debian-test:~/kafka_2.11-1.0.0/bin$ ./kafka-consumer-groups.sh --bootstrap-server localhost:29092 --list
    Note: This will not show information about old Zookeeper-based consumers.
    
    console-consumer-49198
    console-consumer-66063
    console-consumer-77631
    

    Now my console consumer is identified by console-consumer-77631 and in order to see the group coordinator you will have to run something like:

    ./getconsumercoordinator.sh localhost 29092 console-consumer-77631
    warning: there were three deprecation warnings; re-run with -deprecation for details
    one warning found
    Creating connection to: localhost 29092 
    log4j:WARN No appenders could be found for logger (kafka.network.BlockingChannel).
    log4j:WARN Please initialize the log4j system properly.
    log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
    Channel connected
    SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
    SLF4J: Defaulting to no-operation (NOP) logger implementation
    SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
    GroupCoordinatorResponse(Some(BrokerEndPoint(1001,localhost,29092)),NONE,0)
    

    It’s clear that since we have only one broker, that is also the coordinator.
    Regarding the details for the code i used this link and also, in order to search for all of the dependencies, since i don’t have a scala project, just a script the following command was of great use

    for i in *.jar; do jar -tvf "$i" | grep -Hsi ClassName && echo "$i"; done

    Here is also the code:

    #!/bin/sh
    exec scala -classpath "/home/sorin/kafka_2.11-1.0.0/libs/kafka-clients-1.0.0.jar:/home/sorin/kafka_2.11-1.0.0/libs/kafka_2.11-1.0.0.jar:/home/sorin/kafka_2.11-1.0.0/libs/slf4j-api-1.7.25.jar:/home/sorin/kafka_2.11-1.0.0/libs/jackson-core-2.9.1.jar:/home/sorin/kafka_2.11-1.0.0/libs/jackson-databind-2.9.1.jar:/home/sorin/kafka_2.11-1.0.0/libs/jackson-annotations-2.9.1.jar:/home/sorin/kafka_2.11-1.0.0/libs/log4j-1.2.17.jar" "$0" "$1" "$2" "$@"
    !#
    
    import com.fasterxml.jackson.core._
    import com.fasterxml.jackson.databind.ObjectMapper
    import kafka.network.BlockingChannel
    import kafka.api.GroupCoordinatorRequest
    import kafka.api.GroupCoordinatorResponse
    import org.slf4j.LoggerFactory
    
    val hostname = args(0)
    val port = args(1).toInt
    val group = args(2)
    println("Creating connection to: " + hostname + " " + port + " ")
    
    var channel = new BlockingChannel(hostname, port, 1048576, 1048576, readTimeoutMs = 50000)
    channel.connect()
    if (channel.isConnected) {
      println("Channel connected")
    channel.send(GroupCoordinatorRequest(group))
    val metadataResponse = GroupCoordinatorResponse.readFrom(channel.receive.payload())
    println(metadataResponse) }
    channel.disconnect()
    

    Regarding the code, the first part is to run scala from shell script, you need to update the lasspath with all libraries and also specify how many parameters to be used. In our case this is three. Also, if you won’t add all of the jackson, log4j and slf4j dependencies, it won’t work.

    P.S: It will work also by running exec scala -classpath "/home/sorin/kafka_2.11-1.0.0/libs/*

    Cheers!

  • Important considerations regarding Kafka MirrorMaker

    Morning folks,

    Just wanted to share with you two things that i learned when playing with Kafka MirrorMaker this morning.

    First is that if you want to put MirrorMaker or other tool that uses Kafka on debug, all you have to do is to modify the tools-log4j.properties file from the config folder of you Apache Kafka installation. It normally looks like this

    log4j.rootLogger=WARN, stderr
    
    log4j.appender.stderr=org.apache.log4j.ConsoleAppender
    log4j.appender.stderr.layout=org.apache.log4j.PatternLayout
    log4j.appender.stderr.layout.ConversionPattern=[%d] %p %m (%c)%n
    log4j.appender.stderr.Target=System.err
    

    but you can easily set it to

    log4j.rootLogger=DEBUG, stderr
    
    log4j.appender.stderr=org.apache.log4j.ConsoleAppender
    log4j.appender.stderr.layout=org.apache.log4j.PatternLayout
    log4j.appender.stderr.layout.ConversionPattern=[%d] %p %m (%c)%n
    log4j.appender.stderr.Target=System.err
    

    The second one is related to property that you should put on your consumer.config file in order to grab the data from the topic start and not from the latest offset.

    In order to achieve this you will need to add the following option:

    auto.offset.reset=earliest

    The possible values for this setting are latest, earliest, none.

    Cheers

  • Observer functionality for puppet zookeeper module

    Morning,

    I know it’s been some time since i last posted but i didn’t had the time to play that much. Today i want to share with you the use case in which we needed to modify the module used for the deployment of zookeeper in order to include also observer role.

    The link that describes how this should be activated from version 3.3.0 is located here: https://zookeeper.apache.org/doc/trunk/zookeeperObservers.html

    Taking this situation we are using for deployment module https://github.com/wikimedia/puppet-zookeeper

    It’s not a nice module, trust me, i know, but since we did not want to take the development process from beginning and impact the infrastructure that it’s already deployed we had to cope with this situation by changing what we had.

    Main idea in our case is that since the number of zookeeper members for the election process needs to be 2n+1 in order for the Quorum mechanism to work, deployment of even number of machines was pretty tricky, so to fix this, the extra zookeeper instances over requirements should be set as observers

    A zookeeper observer is a node that it’s not included in the election process and just receives the updates from the cluster.

    My vision is that the best approach for delivery is to activate it in Hiera with a zookeeper::observer parameter per host.

    We can start by including it in the defaults.pp file as follows:

     $observer	      = hiera('zookeeper::observer', false)

    The zoo.conf file deployed for the configuration is being written in the init.pp file so we need to add it also here as parameter

    $observer	   = $::zookeeper::defaults::observer

    Ok, now how do we share the status of each node in the required domain? We will need to use another module https://github.com/WhatsARanjit/puppet-share_data and include in our code something like:

     share_data { $::fqdn:
      	    data  => [ $::fqdn, $observer ],
      	    label => 'role',
        }
       $obsrole = share_data::retrieve('role')
    

    This guarantees us that all servers have and can use the observer flag in the erb template.

    Jumping to the last component of this config, we need to modify the template to have it with the added observer role.

    How do we do that? Basically by rewriting the server information in this format:

    <% if @hosts
     @hosts.sort_by { |name, id| id }.each do |host_id| -%>
    server.<%= host_id[1] %>=<%= host_id[0] %>:2182:2183<% @obsrole.each do |item| if (item[0] == host_id[0]) && item[1] -%>:observer<% end -%><% end -%> 
    <% end -%>
    <% end -%>
    

    Straight forward this compares the values from the two lists and if the flag is true, it adds the observer configuration.
    One last part needs to be added and that is

    <% if @observer == true -%>
    peerType=observer
    <% end -%>
    

    And you are done, if you add zookeeper::observer: true to your yaml file, puppet should rewrite the file and restart Zookeeper service.

    Cheers

  • Memory debug by Heroku guys on Apache Kafka – nice one

    Hi,

    I know, i should write more about my experience with Apache Kafka, have patience, it’s still building, but until then please check this article:

    https://blog.heroku.com/fixing-kafka-memory-leak

    Be aware of the things that you want to include in functionalities and code that is written beside Apache Kafka functionalities, it might get you in to trouble.

    I am very happy that sysdig is used by more and more teams for debug, it’s truly a great tool for this kind of situations.

    Cheers!

  • Kafka limits implementation using puppet

    Morning,

    I keep my promise and provide you with the two simple blocks that are needed to implement limits that we discussed in article http://log-it.tech/2017/10/16/ubuntu-change-ulimit-kafka-not-ignore/

    For the limits module you can use:
    https://forge.puppet.com/puppetlabs/limits

    As for the actual puppet implementation, I took the decision not to restart the service immediately. This being said, it’s dead simple to do it:

    	 file_line {"add_pamd_record":
    	 path => '/etc/pam.d/common-session',
    	 line => 'session required pam_limits.so'
    	 }
    	 limits::fragment {
    	     "*/soft/nofile":
          		value => "100000";
        		"*/hard/nofile":
          		value => "100000";
       		 "kafka/soft/nofile":
          		value => "100000";
        		"kafka/hard/nofile":
          		value => "100000";
      }
    

    This is all you need.

    Cheers

  • Ubuntu – change ulimit for kafka, do not ignore

    Hi,

    Wanna share with you what managed to take me half a day to clarify. I just read in the following article https://docs.confluent.io/current/kafka/deployment.html#file-descriptors-and-mmap
    and learned that in order to optimize kafka, you will need to also change the maximum number of open files. It is nice, but our clusters are deployed on Ubuntu and the images are pretty basic. Not really sure if this is valid for all of the distributions but at least for this one it’s absolutely needed.
    Before trying to setup anything in

    /etc/security/limits.conf

    make sure that you have exported in

    /etc/pam.d/common-session

    line

    session required pam_limits.so

    It is needed in order for ssh, su processes to take the new limits for that user (in our case kafka).
    Doing this will help you define new values on “limits” file. You are now free to setup nofile limit like this for example

    *               soft    nofile          10000
    *		hard	nofile		100000
    kafka		soft 	nofile		10000
    kafka		hard	nofile		100000

    After it is done, you can restart the cluster and check value by finding process with ps-ef | grep kafka and viewing limit file using cat /proc/[kafka-process]/limits.

    I will come back later with also a puppet implementation for this.

    Cheers!

  • Kafka implementation using puppet at IMWorld Bucharest 2017

    Hi,

    I recently had a presentation on how to deploy kafka using puppet and what do you need as a minimum in order to have success in production.
    Here is the presentation:

    Hope it is useful.

    Cheers!

    Update:

    There is also an official version from IMWorld which you can find here:

    And also the article on medium.com that describes it in more technical detail:

    https://medium.com/@sorin.tudor/messaging-kafka-implementation-using-puppet-5438a0ed275d

  • Definitive guide to Kafka, confluent edition

    Hi,

    No technical details today. Just wanted to share with you the Definitive guide to Kafka, book provided by our dear and esteem colleagues from Confluent

    https://www.confluent.io/wp-content/uploads/confluent-kafka-definitive-guide-complete.pdf

    Thank you, it should be an interesting read.

    Cheers!