Delete corrupted Kafka topic version 2.0


We had in the past the situation described in this link use-case-deleting-corrupted-kafka-topic
The situation repeated a little bit different this time. Taking a look on the list of topics, there were three topics marked for deletion.
None of them had a Leader or Isr, so after a little bit of investigation the conclusion that they weren’t available anymore on the filesystem.
My assumption is that the cluster controller began the delete process but failed before sending a new metadata update to the zookeepers.
A restart of the cluster controller was performed in order to provide a new epoch, and after that, manual deletion of the “deletion request” and metadata from the zookeepers(you can find the commands in the above link).

On another list, everything looks good.



Kafka service problem on upgrade to version 1.1.0


If you are using version 1.1.0 or want to upgrade to it, and the method is by puppet module provided from voxpopuli, please be aware of this issue.

In the template used for the init script that it’s located under /etc/init.d/kafka and as you can also see on the latest version below:

There are some lines that take the PID file for the kafka broker by using command

`pgrep -f "$PGREP_PATTERN"`

. This isn’t a problem for earlier version, but unfortunately for the latest, it doesn’t return anything causing for the init script to exit with return code 1 (my suspicion is that the process name changed).

I fixed this by replacing this string with the following

`ps -ef | grep "$PGREP_PATTERN" | grep -v grep | awk {'print $2}'`

and it seems to work just fine.

This doesn’t have any impact on the already configured and running cluster, and it will not restart your Kafka brokers.

P.S: PGREP_PATTERN will resolve to kafka.Kafka which is the string to differentiate the broker instance


kafka newtools

Consumer group coordinator in Kafka using some scala script


Just a small post regarding returning a consumer group coordinator for a specific consumer group.
We had the issue that consumer groups are re-balancing and we didn’t knew if it’s related to application logic or the Consumer Group Coordinator was changing and the Kafka cluster was reassign a different one each time. So, a small piece of code was needed. I was using the libraries that are sent with Kafka 1.0.0 for this test so be aware of the classpath update if you want to modify this.

In order to do the test, i started a standalone Confluent Kafka image which normally listens on port 29092. For more details please consult their documentation here

I also created a test topic with one partition and same replication factor. Produced some messages in the topic and after that started a console consumer:

sorin@debian-test:~/kafka_2.11-1.0.0/bin$ ./ --bootstrap-server localhost:29092 --topic test --from-beginning
test message

Once this is started you can also see it using consumer-groups command like this:

sorin@debian-test:~/kafka_2.11-1.0.0/bin$ ./ --bootstrap-server localhost:29092 --list
Note: This will not show information about old Zookeeper-based consumers.


Now my console consumer is identified by console-consumer-77631 and in order to see the group coordinator you will have to run something like:

./ localhost 29092 console-consumer-77631
warning: there were three deprecation warnings; re-run with -deprecation for details
one warning found
Creating connection to: localhost 29092 
log4j:WARN No appenders could be found for logger (
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See for more info.
Channel connected
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See for further details.

It’s clear that since we have only one broker, that is also the coordinator.
Regarding the details for the code i used this link and also, in order to search for all of the dependencies, since i don’t have a scala project, just a script the following command was of great use

for i in *.jar; do jar -tvf "$i" | grep -Hsi ClassName && echo "$i"; done

Here is also the code:

exec scala -classpath "/home/sorin/kafka_2.11-1.0.0/libs/kafka-clients-1.0.0.jar:/home/sorin/kafka_2.11-1.0.0/libs/kafka_2.11-1.0.0.jar:/home/sorin/kafka_2.11-1.0.0/libs/slf4j-api-1.7.25.jar:/home/sorin/kafka_2.11-1.0.0/libs/jackson-core-2.9.1.jar:/home/sorin/kafka_2.11-1.0.0/libs/jackson-databind-2.9.1.jar:/home/sorin/kafka_2.11-1.0.0/libs/jackson-annotations-2.9.1.jar:/home/sorin/kafka_2.11-1.0.0/libs/log4j-1.2.17.jar" "$0" "$1" "$2" "$@"

import com.fasterxml.jackson.core._
import com.fasterxml.jackson.databind.ObjectMapper
import kafka.api.GroupCoordinatorRequest
import kafka.api.GroupCoordinatorResponse
import org.slf4j.LoggerFactory

val hostname = args(0)
val port = args(1).toInt
val group = args(2)
println("Creating connection to: " + hostname + " " + port + " ")

var channel = new BlockingChannel(hostname, port, 1048576, 1048576, readTimeoutMs = 50000)
if (channel.isConnected) {
  println("Channel connected")
val metadataResponse = GroupCoordinatorResponse.readFrom(channel.receive.payload())
println(metadataResponse) }

Regarding the code, the first part is to run scala from shell script, you need to update the lasspath with all libraries and also specify how many parameters to be used. In our case this is three. Also, if you won’t add all of the jackson, log4j and slf4j dependencies, it won’t work.

P.S: It will work also by running exec scala -classpath "/home/sorin/kafka_2.11-1.0.0/libs/*



Important considerations regarding Kafka MirrorMaker

Morning folks,

Just wanted to share with you two things that i learned when playing with Kafka MirrorMaker this morning.

First is that if you want to put MirrorMaker or other tool that uses Kafka on debug, all you have to do is to modify the file from the config folder of you Apache Kafka installation. It normally looks like this

log4j.rootLogger=WARN, stderr

log4j.appender.stderr.layout.ConversionPattern=[%d] %p %m (%c)%n

but you can easily set it to

log4j.rootLogger=DEBUG, stderr

log4j.appender.stderr.layout.ConversionPattern=[%d] %p %m (%c)%n

The second one is related to property that you should put on your consumer.config file in order to grab the data from the topic start and not from the latest offset.

In order to achieve this you will need to add the following option:


The possible values for this setting are latest, earliest, none.


kafka puppet

Observer functionality for puppet zookeeper module


I know it’s been some time since i last posted but i didn’t had the time to play that much. Today i want to share with you the use case in which we needed to modify the module used for the deployment of zookeeper in order to include also observer role.

The link that describes how this should be activated from version 3.3.0 is located here:

Taking this situation we are using for deployment module

It’s not a nice module, trust me, i know, but since we did not want to take the development process from beginning and impact the infrastructure that it’s already deployed we had to cope with this situation by changing what we had.

Main idea in our case is that since the number of zookeeper members for the election process needs to be 2n+1 in order for the Quorum mechanism to work, deployment of even number of machines was pretty tricky, so to fix this, the extra zookeeper instances over requirements should be set as observers

A zookeeper observer is a node that it’s not included in the election process and just receives the updates from the cluster.

My vision is that the best approach for delivery is to activate it in Hiera with a zookeeper::observer parameter per host.

We can start by including it in the defaults.pp file as follows:

 $observer	      = hiera('zookeeper::observer', false)

The zoo.conf file deployed for the configuration is being written in the init.pp file so we need to add it also here as parameter

$observer	   = $::zookeeper::defaults::observer

Ok, now how do we share the status of each node in the required domain? We will need to use another module and include in our code something like:

 share_data { $::fqdn:
  	    data  => [ $::fqdn, $observer ],
  	    label => 'role',
   $obsrole = share_data::retrieve('role')

This guarantees us that all servers have and can use the observer flag in the erb template.

Jumping to the last component of this config, we need to modify the template to have it with the added observer role.

How do we do that? Basically by rewriting the server information in this format:

<% if @hosts
 @hosts.sort_by { |name, id| id }.each do |host_id| -%>
server.<%= host_id[1] %>=<%= host_id[0] %>:2182:2183<% @obsrole.each do |item| if (item[0] == host_id[0]) && item[1] -%>:observer<% end -%><% end -%> 
<% end -%>
<% end -%>

Straight forward this compares the values from the two lists and if the flag is true, it adds the observer configuration.
One last part needs to be added and that is

<% if @observer == true -%>
<% end -%>

And you are done, if you add zookeeper::observer: true to your yaml file, puppet should rewrite the file and restart Zookeeper service.


kafka newtools

Memory debug by Heroku guys on Apache Kafka – nice one


I know, i should write more about my experience with Apache Kafka, have patience, it’s still building, but until then please check this article:

Be aware of the things that you want to include in functionalities and code that is written beside Apache Kafka functionalities, it might get you in to trouble.

I am very happy that sysdig is used by more and more teams for debug, it’s truly a great tool for this kind of situations.


kafka puppet

Kafka limits implementation using puppet


I keep my promise and provide you with the two simple blocks that are needed to implement limits that we discussed in article

For the limits module you can use:

As for the actual puppet implementation, I took the decision not to restart the service immediately. This being said, it’s dead simple to do it:

	 file_line {"add_pamd_record":
	 path => '/etc/pam.d/common-session',
	 line => 'session required'
	 limits::fragment {
      		value => "100000";
      		value => "100000";
      		value => "100000";
      		value => "100000";

This is all you need.


kafka linux

Ubuntu – change ulimit for kafka, do not ignore


Wanna share with you what managed to take me half a day to clarify. I just read in the following article
and learned that in order to optimize kafka, you will need to also change the maximum number of open files. It is nice, but our clusters are deployed on Ubuntu and the images are pretty basic. Not really sure if this is valid for all of the distributions but at least for this one it’s absolutely needed.
Before trying to setup anything in


make sure that you have exported in



session required

It is needed in order for ssh, su processes to take the new limits for that user (in our case kafka).
Doing this will help you define new values on “limits” file. You are now free to setup nofile limit like this for example

*               soft    nofile          10000
*		hard	nofile		100000
kafka		soft 	nofile		10000
kafka		hard	nofile		100000

After it is done, you can restart the cluster and check value by finding process with ps-ef | grep kafka and viewing limit file using cat /proc/[kafka-process]/limits.

I will come back later with also a puppet implementation for this.


kafka puppet

Kafka implementation using puppet at IMWorld Bucharest 2017


I recently had a presentation on how to deploy kafka using puppet and what do you need as a minimum in order to have success in production.
Here is the presentation:

Hope it is useful.



There is also an official version from IMWorld which you can find here:

And also the article on that describes it in more technical detail:


Definitive guide to Kafka, confluent edition


No technical details today. Just wanted to share with you the Definitive guide to Kafka, book provided by our dear and esteem colleagues from Confluent

Thank you, it should be an interesting read.