cloud puppet

Install zookeeper using puppet without module


In this post, I was given the task to provide a standalone zookeeper cluster with basic auth on the latest version.

The reason that happened is that we are using a very old module on our Kafka clusters and a new requirement appeared to install the latest version of 3.5.5.

The old module had only the possibility to install the package from apt repo, which was not an option since the last version available on Ubuntu Xenial is at least two years old.

To complete this task, a different method was required. I would have to grab it with wget and add the rest of the files to make it functional.

Let us start with the puppet manifest and from that, I will add the rest.

class zookeeperstd {
  $version = hiera("zookeeperstd::version","3.5.5")
  $authenabled = hiera("zookeeperstd::authenabled",false)
  $server_jvm_flags = hiera('zookeeperstd::jvm_flags', undef)
    group { 'zookeeper':
        ensure => 'present',
    user {'zookeeper':
        ensure => 'present',
        home => '/var/lib/zookeeper',
        shell => '/bin/false',
    wget::fetch { 'zookeeper':
        source      => "${version}-bin.tar.gz",
        destination => "/opt/apache-zookeeper-${version}-bin.tar.gz",
        } ->
    archive { "/opt/apache-zookeeper-${version}-bin.tar.gz":
        creates      => "/opt/apache-zookeeper-${version}-bin",
        ensure        => present,
        extract       => true,
        extract_path  => '/opt',
        cleanup       => true,
    } ->
    file { "/opt/apache-zookeeper-${version}-bin":
        ensure    => directory,
        owner     => 'zookeeper',
        group      => 'zookeeper',
        require     => [ User['zookeeper'], Group['zookeeper'], ],
        recurse => true,
    } ->
    file { '/opt/zookeeper/':
        ensure    => link,
        target    => "/opt/apache-zookeeper-${version}-bin",
        owner     => 'zookeeper',
        group      => 'zookeeper',
        require     => [ User['zookeeper'], Group['zookeeper'], ],
    file { '/var/lib/zookeeper':
        ensure    => directory,
        owner     => 'zookeeper',
        group      => 'zookeeper',
        require     => [ User['zookeeper'], Group['zookeeper'], ],
        recurse    => true,
# in order to know which servers are in the cluster a role fact needs to be defined on each machine
    $hostshash = query_nodes(" v1_role='zookeeperstd'").sort
    $hosts_hash = $ |$value| { [$value, seeded_rand(254, $value)+1] }.hash
    $overide_hosts_hash = hiera_hash('profiles_opqs::kafka_hosts_hash', $hosts_hash)
    $overide_hosts = $overide_hosts_hash.keys.sort
    if $overide_hosts_hash.size() != $overide_hosts_hash.values.unique.size() {
        #notify {"Duplicate IDs detected! ${overide_hosts_hash}": }
        $overide_hosts_hash2 = $ |$index, $value| { [$value, $index+1] }.hash
  } else {
        $overide_hosts_hash2 = $overide_hosts_hash
	$hosts = $overide_hosts_hash2
	$data_dir = "/var/lib/zookeeper"
	$tick_time        = 2000
        $init_limit       = 10
        $sync_limit       = 5

	$myid = $hosts[$::fqdn]
    file { '/var/lib/zookeeper/myid':
        content => "${myid}",

	file { '/opt/zookeeper/conf/zoo.cfg':
        content => template("${module_name}/zoo.cfg.erb"),
   if $authenabled {
    $superpass        = hiera("zookeeperstd::super_pass", 'super-admin')
    $zoopass          = hiera("zookeeperstd::zookeeper_pass", 'zookeeper-admin')
    $clientpass        = hiera("zookeeperstd::client_pass", 'client-admin')
    file { '/opt/zookeeper/conf/zoo_jaas.config':
        content => template("${module_name}/zoo_jaas.config.erb"),
     file { '/opt/zookeeper/conf/java.env':
        content => template("${module_name}/java.zookeeper.env.erb"),
        mode => "0755",
     file { '/opt/zookeeper/conf/':
        content => template("${module_name}/"),
    file {'/etc/systemd/system/zookeeper.service':
        source  => 'puppet:///modules/work/zookeeper.service',
        mode => "644",
        } ->
    service { 'zookeeper':
        ensure   => running,
        enable   => true,
        provider => systemd,

As far as I managed to adapt some file from the existing module, here are the rest of the additional details.

# Note: This file is managed by Puppet.


# specify all zookeeper servers
# The fist port is used by followers to connect to the leader
# The second one is used for leader election
if @hosts
# sort hosts by myid and output a server config
# for each host and myid.  (sort_by returns an array of key,value tuples)
@hosts.sort_by { |name, id| id }.each do |host_id|
server.<%= host_id[1] %>=<%= host_id[0] %>:2182:2183
<% if @authenabled -%>
authProvider.<%= host_id[1] %>=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
<% end -%>
<% end -%>
<% end -%>

# the port at which the clients will connect

# the directory where the snapshot is stored.
dataDir=<%= @data_dir %>

# Place the dataLogDir to a separate physical disc for better performance
<%= @data_log_dir ? "dataLogDir=#{data_log_dir}" : '# dataLogDir=/disk2/zookeeper' %>

# The number of milliseconds of each tick.
tickTime=<%= @tick_time %>

# The number of ticks that the initial
# synchronization phase can take.
initLimit=<%= @init_limit %>

# The number of ticks that can pass between
# sending a request and getting an acknowledgement
syncLimit=<%= @sync_limit %>

# To avoid seeks ZooKeeper allocates space in the transaction log file in
# blocks of preAllocSize kilobytes. The default block size is 64M. One reason
# for changing the size of the blocks is to reduce the block size if snapshots
# are taken more often. (Also, see snapCount).

# Clients can submit requests faster than ZooKeeper can process them,
# especially if there are a lot of clients. To prevent ZooKeeper from running
# out of memory due to queued requests, ZooKeeper will throttle clients so that
# there is no more than globalOutstandingLimit outstanding requests in the
# system. The default limit is 1,000.ZooKeeper logs transactions to a
# transaction log. After snapCount transactions are written to a log file a
# snapshot is started and a new transaction log file is started. The default
# snapCount is 10,000.

# If this option is defined, requests will be will logged to a trace file named

# Leader accepts client connections. Default value is "yes". The leader machine
# coordinates updates. For higher update throughput at thes slight expense of
# read throughput the leader can be configured to not accept clients and focus
# on coordination.

<% if @authenabled -%>


<% end -%> 
QuorumServer {
       org.apache.zookeeper.server.auth.DigestLoginModule required
       user_zookeeper="<%= @zoopass %>";
QuorumLearner {
       org.apache.zookeeper.server.auth.DigestLoginModule required
       password="<%= @zoopass %>";

Server {
       org.apache.zookeeper.server.auth.DigestLoginModule required
       user_super="<%= @superpass %>"
       user_client="<%= @clientpass %>";
SERVER_JVMFLAGS="<%= @server_jvm_flags %>"
# Note: This file is managed by Puppet.

# ZooKeeper Logging Configuration

# Format is "<default threshold> (, <appender>)+

log4j.rootLogger=${zookeeper.root.logger}, ROLLINGFILE

# Log INFO level and above messages to the console
log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} - %-5p [%t:%C{1}@%L] - %m%n

# Add ROLLINGFILE to rootLogger to get log file output
#    Log INFO level and above messages to a log file

# Max log file size of 10MB
# Keep only 10 files
log4j.appender.ROLLINGFILE.layout.ConversionPattern=%d{ISO8601} - %-5p [%t:%C{1}@%L] - %m%n

And the last but not the least.

Description=ZooKeeper Service

ExecStart=/opt/zookeeper/bin/ start /opt/zookeeper/conf/zoo.cfg
ExecStop=/opt/zookeeper/bin/ stop /opt/zookeeper/conf/zoo.cfg
ExecReload=/opt/zookeeper/bin/ restart /opt/zookeeper/conf/zoo.cfg


Also, if you want to enable simple MD5 authentication, in hiera you will need to add the following two lines.

zookeeperstd::authenabled: true
zookeeperstd::jvm_flags: ""

If there is a simpler approach, feel free to leave me a message on Linkedin or Twitter.


golang kafka

Small go code example for zookeeper resource editing


We have the task of “service restart coordination” for our Apache Kafka cluster. It’s still a work in progress but if you want to use the zookeeper for some status verification and update, something like this will work as an example.

package main

import (

const (
	SERVICEPATH = "/servicerestart"

var wg sync.WaitGroup

func main() {
	conn := "zk1:2181,zk2:2181,zk3:2181"
	connSlice := strings.Split(string(conn), ",")
	var flag bool
	args := os.Args
	if len(args) != 2 {
		io.WriteString(os.Stdout, "Argument is needed for the script\n")
	} else {
		switch args[1] {
		case "hang":
			flag = false
		case "nohang":
			flag = true		
			io.WriteString(os.Stdout, "Command unrecognized\n")	
	go ModifyZooStat(connSlice, flag)
func ModifyZooStat(strconn []string, flag bool) {
	var zooReach string
	for _, zoohost := range strconn {
		zk, _, err := zookeeper.Dial(zoohost, 5e9)
		if err != nil {
			fmt.Println("Couldn't connect to " + zoohost)
		} else {
			zooReach = zoohost
	zkf, sessionf, _ := zookeeper.Dial(zooReach, 5e9)
defer zkf.Close()
	event := <-sessionf
	if event.State != zookeeper.STATE_CONNECTED {
		fmt.Println("Couldn't connect")
	acl := []zookeeper.ACL{zookeeper.ACL{Perms: zookeeper.PERM_ALL, Scheme: "world", Id: "anyone"}}
	host, _ := os.Hostname()
	t := time.Now()
	servicerestart, _ := zkf.Exists(SERVICEPATH)
	if servicerestart == nil {
		path, _ := zkf.Create(SERVICEPATH, host+" "+t.Format(time.Kitchen), zookeeper.EPHEMERAL, acl)
	} else {
		change, _ := zkf.Set(SERVICEPATH, host+" "+t.Format(time.Kitchen), -1)
	if flag {


Let me explain what it does. Basically it takes a zookeeper connection string and it splits it per server. This was a requirement from the zk module used. It could’n take as argument more than one case of host:2181.
After we found the active server, we can connect to it and put in the /servicerestart path the hostname and also the time on which the resource was edited.
In order to create a resource, you will need an ACL slice that will be passed as parameter.

acl := []zookeeper.ACL{zookeeper.ACL{Perms: zookeeper.PERM_ALL, Scheme: "world", Id: "anyone"}}

Once this slice is created we will get in the next step and check if the resource exists. If it doesn’t then we will create it and if it does, we will just modify it.

The fmt.Println instructions are put basically for two reasons.

  • In order to see the resource that it’s created. And i wanted to do that because zookeeper.EPHEMERAL parameter only creates this resource as long as the connection is active. If you want persistence, you will have to use zookeeper.SEQUENCE but it will add to your resource name a unique counter.
  • Also see the timestamp when the resource was modified.

Even if you don’t close the zookeeper connection with defer zkf.Close(), it will close it automatically and end the script. So, we still need a way to keep it alive, and we will do that using WaitGroups…
We will add one function in the queue and wait for it to finish. And to control this we can use a parameter that is mapped to a flag.

This is just a very small example and i am still a true beginner in the art of Go programming, but hope it helps 🙂


kafka newtools

Kafka cluster nodes and controller using golang


Using the golang library for zookeeper from here you can get very easily the nodes that are registered in the cluster controller node.
In order to install this module, beside needing to setup the GOPATH you will have also to install packages from linux distro repo called:
bzr, gcc, libzookeeper-mt-dev
Once all of this is install just go get 🙂

And here is the small example:

package main

import (
func main() {
func GetResource() {
	zk, session, err := zookeeper.Dial("localhost:2181", 5e9)
 if err != nil {
	fmt. Println("Couldn't connect")
	defer zk.Close()

	event := <-session
	if event.State != zookeeper.STATE_CONNECTED {
		fmt.Println("Couldn't connect")


func GetController(connection *zookeeper.Conn) {
	rs ,_ , err := connection.Get("/controller")
 if err != nil {
        fmt. Println("Couldn't get the resource")
	controller := strings. Split(rs,",")
	// fmt.Println(controller[1])
	id := strings.Split(controller[1],":")
	fmt. Printf("\nCluster controller is: %s\n",id[1])
func GetBrokers(connection *zookeeper.Conn) {	
	trs ,_, err := connection.Children("/brokers/ids")

	 if err != nil {
        fmt. Println("Couldn't get the resource")
	fmt.Printf("List of brokers: ")
	for _, value := range trs {
	fmt.Printf(" %s",value)

Yeah, i know it's not the most elegant but it works:

go run zootest.go
List of brokers:  1011 1009 1001
Cluster controller is: 1011

That would be all.

Tnx and cheers!


Use case for deleting corrupted Kafka topic


We had a week ago a case in which the client could not delete a topic from the cluster (Kafka version in this case was 1.0.0).
When the topic was listed, there were no leaders assigned for the partitions. It was pretty clear that it would not delete it until we fixed it.
First we tried a reassignment of partition in the idea that a leader would be assigned in this process. A JSON file was generated for the specified topic and executed using After verification’s, we concluded that the reassignment failed.
The next step was to delete the topic from the zookeeper meta-data cache.
We came to this conclusion following article:

The command was

rmr /brokers/topics/[topic_name]

under script. Running this, fixed our leader problem. It was strange, but very convenient.

There was one extra thing we needed to do. Version 1.0.0 has an bug that affects the cluster controller – Error found in the log Cached zkVersion [3] not equal to that in zookeeper, skip updating ISR (kafka.cluster.Partition) –

We restarted the cluster to fix this, but since there was already an active request for the topic delete, a refresh of this was required.
In order to do that you can run

rmr /admin/delete_topics/[topic_name]

After doing so, the topic won’t be appear as marked for deletion, but if you run the delete command again, it will mark it and the controller will actively start the deletion process.

That was also the case for us, after running the delete command again, the topic was removed from the brokers.


kafka puppet

Observer functionality for puppet zookeeper module


I know it’s been some time since i last posted but i didn’t had the time to play that much. Today i want to share with you the use case in which we needed to modify the module used for the deployment of zookeeper in order to include also observer role.

The link that describes how this should be activated from version 3.3.0 is located here:

Taking this situation we are using for deployment module

It’s not a nice module, trust me, i know, but since we did not want to take the development process from beginning and impact the infrastructure that it’s already deployed we had to cope with this situation by changing what we had.

Main idea in our case is that since the number of zookeeper members for the election process needs to be 2n+1 in order for the Quorum mechanism to work, deployment of even number of machines was pretty tricky, so to fix this, the extra zookeeper instances over requirements should be set as observers

A zookeeper observer is a node that it’s not included in the election process and just receives the updates from the cluster.

My vision is that the best approach for delivery is to activate it in Hiera with a zookeeper::observer parameter per host.

We can start by including it in the defaults.pp file as follows:

 $observer	      = hiera('zookeeper::observer', false)

The zoo.conf file deployed for the configuration is being written in the init.pp file so we need to add it also here as parameter

$observer	   = $::zookeeper::defaults::observer

Ok, now how do we share the status of each node in the required domain? We will need to use another module and include in our code something like:

 share_data { $::fqdn:
  	    data  => [ $::fqdn, $observer ],
  	    label => 'role',
   $obsrole = share_data::retrieve('role')

This guarantees us that all servers have and can use the observer flag in the erb template.

Jumping to the last component of this config, we need to modify the template to have it with the added observer role.

How do we do that? Basically by rewriting the server information in this format:

<% if @hosts
 @hosts.sort_by { |name, id| id }.each do |host_id| -%>
server.<%= host_id[1] %>=<%= host_id[0] %>:2182:2183<% @obsrole.each do |item| if (item[0] == host_id[0]) && item[1] -%>:observer<% end -%><% end -%> 
<% end -%>
<% end -%>

Straight forward this compares the values from the two lists and if the flag is true, it adds the observer configuration.
One last part needs to be added and that is

<% if @observer == true -%>
<% end -%>

And you are done, if you add zookeeper::observer: true to your yaml file, puppet should rewrite the file and restart Zookeeper service.