cloud kafka puppet python

Automatic increase of Kafka LVM on GCP

I wrote an article for my company that was published on Medium regarding the topic in the subject. Please see the link


cloud puppet

Install zookeeper using puppet without module


In this post, I was given the task to provide a standalone zookeeper cluster with basic auth on the latest version.

The reason that happened is that we are using a very old module on our Kafka clusters and a new requirement appeared to install the latest version of 3.5.5.

The old module had only the possibility to install the package from apt repo, which was not an option since the last version available on Ubuntu Xenial is at least two years old.

To complete this task, a different method was required. I would have to grab it with wget and add the rest of the files to make it functional.

Let us start with the puppet manifest and from that, I will add the rest.

class zookeeperstd {
  $version = hiera("zookeeperstd::version","3.5.5")
  $authenabled = hiera("zookeeperstd::authenabled",false)
  $server_jvm_flags = hiera('zookeeperstd::jvm_flags', undef)
    group { 'zookeeper':
        ensure => 'present',
    user {'zookeeper':
        ensure => 'present',
        home => '/var/lib/zookeeper',
        shell => '/bin/false',
    wget::fetch { 'zookeeper':
        source      => "${version}-bin.tar.gz",
        destination => "/opt/apache-zookeeper-${version}-bin.tar.gz",
        } ->
    archive { "/opt/apache-zookeeper-${version}-bin.tar.gz":
        creates      => "/opt/apache-zookeeper-${version}-bin",
        ensure        => present,
        extract       => true,
        extract_path  => '/opt',
        cleanup       => true,
    } ->
    file { "/opt/apache-zookeeper-${version}-bin":
        ensure    => directory,
        owner     => 'zookeeper',
        group      => 'zookeeper',
        require     => [ User['zookeeper'], Group['zookeeper'], ],
        recurse => true,
    } ->
    file { '/opt/zookeeper/':
        ensure    => link,
        target    => "/opt/apache-zookeeper-${version}-bin",
        owner     => 'zookeeper',
        group      => 'zookeeper',
        require     => [ User['zookeeper'], Group['zookeeper'], ],
    file { '/var/lib/zookeeper':
        ensure    => directory,
        owner     => 'zookeeper',
        group      => 'zookeeper',
        require     => [ User['zookeeper'], Group['zookeeper'], ],
        recurse    => true,
# in order to know which servers are in the cluster a role fact needs to be defined on each machine
    $hostshash = query_nodes(" v1_role='zookeeperstd'").sort
    $hosts_hash = $ |$value| { [$value, seeded_rand(254, $value)+1] }.hash
    $overide_hosts_hash = hiera_hash('profiles_opqs::kafka_hosts_hash', $hosts_hash)
    $overide_hosts = $overide_hosts_hash.keys.sort
    if $overide_hosts_hash.size() != $overide_hosts_hash.values.unique.size() {
        #notify {"Duplicate IDs detected! ${overide_hosts_hash}": }
        $overide_hosts_hash2 = $ |$index, $value| { [$value, $index+1] }.hash
  } else {
        $overide_hosts_hash2 = $overide_hosts_hash
	$hosts = $overide_hosts_hash2
	$data_dir = "/var/lib/zookeeper"
	$tick_time        = 2000
        $init_limit       = 10
        $sync_limit       = 5

	$myid = $hosts[$::fqdn]
    file { '/var/lib/zookeeper/myid':
        content => "${myid}",

	file { '/opt/zookeeper/conf/zoo.cfg':
        content => template("${module_name}/zoo.cfg.erb"),
   if $authenabled {
    $superpass        = hiera("zookeeperstd::super_pass", 'super-admin')
    $zoopass          = hiera("zookeeperstd::zookeeper_pass", 'zookeeper-admin')
    $clientpass        = hiera("zookeeperstd::client_pass", 'client-admin')
    file { '/opt/zookeeper/conf/zoo_jaas.config':
        content => template("${module_name}/zoo_jaas.config.erb"),
     file { '/opt/zookeeper/conf/java.env':
        content => template("${module_name}/java.zookeeper.env.erb"),
        mode => "0755",
     file { '/opt/zookeeper/conf/':
        content => template("${module_name}/"),
    file {'/etc/systemd/system/zookeeper.service':
        source  => 'puppet:///modules/work/zookeeper.service',
        mode => "644",
        } ->
    service { 'zookeeper':
        ensure   => running,
        enable   => true,
        provider => systemd,

As far as I managed to adapt some file from the existing module, here are the rest of the additional details.

# Note: This file is managed by Puppet.


# specify all zookeeper servers
# The fist port is used by followers to connect to the leader
# The second one is used for leader election
if @hosts
# sort hosts by myid and output a server config
# for each host and myid.  (sort_by returns an array of key,value tuples)
@hosts.sort_by { |name, id| id }.each do |host_id|
server.<%= host_id[1] %>=<%= host_id[0] %>:2182:2183
<% if @authenabled -%>
authProvider.<%= host_id[1] %>=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
<% end -%>
<% end -%>
<% end -%>

# the port at which the clients will connect

# the directory where the snapshot is stored.
dataDir=<%= @data_dir %>

# Place the dataLogDir to a separate physical disc for better performance
<%= @data_log_dir ? "dataLogDir=#{data_log_dir}" : '# dataLogDir=/disk2/zookeeper' %>

# The number of milliseconds of each tick.
tickTime=<%= @tick_time %>

# The number of ticks that the initial
# synchronization phase can take.
initLimit=<%= @init_limit %>

# The number of ticks that can pass between
# sending a request and getting an acknowledgement
syncLimit=<%= @sync_limit %>

# To avoid seeks ZooKeeper allocates space in the transaction log file in
# blocks of preAllocSize kilobytes. The default block size is 64M. One reason
# for changing the size of the blocks is to reduce the block size if snapshots
# are taken more often. (Also, see snapCount).

# Clients can submit requests faster than ZooKeeper can process them,
# especially if there are a lot of clients. To prevent ZooKeeper from running
# out of memory due to queued requests, ZooKeeper will throttle clients so that
# there is no more than globalOutstandingLimit outstanding requests in the
# system. The default limit is 1,000.ZooKeeper logs transactions to a
# transaction log. After snapCount transactions are written to a log file a
# snapshot is started and a new transaction log file is started. The default
# snapCount is 10,000.

# If this option is defined, requests will be will logged to a trace file named

# Leader accepts client connections. Default value is "yes". The leader machine
# coordinates updates. For higher update throughput at thes slight expense of
# read throughput the leader can be configured to not accept clients and focus
# on coordination.

<% if @authenabled -%>


<% end -%> 
QuorumServer {
       org.apache.zookeeper.server.auth.DigestLoginModule required
       user_zookeeper="<%= @zoopass %>";
QuorumLearner {
       org.apache.zookeeper.server.auth.DigestLoginModule required
       password="<%= @zoopass %>";

Server {
       org.apache.zookeeper.server.auth.DigestLoginModule required
       user_super="<%= @superpass %>"
       user_client="<%= @clientpass %>";
SERVER_JVMFLAGS="<%= @server_jvm_flags %>"
# Note: This file is managed by Puppet.

# ZooKeeper Logging Configuration

# Format is "<default threshold> (, <appender>)+

log4j.rootLogger=${zookeeper.root.logger}, ROLLINGFILE

# Log INFO level and above messages to the console
log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} - %-5p [%t:%C{1}@%L] - %m%n

# Add ROLLINGFILE to rootLogger to get log file output
#    Log INFO level and above messages to a log file

# Max log file size of 10MB
# Keep only 10 files
log4j.appender.ROLLINGFILE.layout.ConversionPattern=%d{ISO8601} - %-5p [%t:%C{1}@%L] - %m%n

And the last but not the least.

Description=ZooKeeper Service

ExecStart=/opt/zookeeper/bin/ start /opt/zookeeper/conf/zoo.cfg
ExecStop=/opt/zookeeper/bin/ stop /opt/zookeeper/conf/zoo.cfg
ExecReload=/opt/zookeeper/bin/ restart /opt/zookeeper/conf/zoo.cfg


Also, if you want to enable simple MD5 authentication, in hiera you will need to add the following two lines.

zookeeperstd::authenabled: true
zookeeperstd::jvm_flags: ""

If there is a simpler approach, feel free to leave me a message on Linkedin or Twitter.



Jolokia particular case using custom facts in Hiera


This is for me and also for all the other people that are searching for how to use custom defined types in Hiera

In my case i wanted to activate the HTTP endpoint of Jolokia using custom hostname and standard port. And for that it was sufficient to add in my host yaml the following lines

profiles::kafka::jolokia: "-javaagent:/usr/share/java/jolokia-jvm-agent.jar=port=8778,host=%{::networking.fqdn}"

This contains the standard fact called networking, which is a hash, and i am using the key that is called fqdn.

And it works.


kafka puppet

Fact for kafka_consumer hash…or kind of


There is a late requirement that we activate the kafka_consumer functionality of Datadog.

Unfortunately this is a challenge if you don’t have a fixed number of consumer groups and topics (on one client we had a couple of hundreds consumer groups)

Here is how it should look in the example file

  #  consumer_groups:
  #    <CONSUMER_NAME_1>:
  #      <TOPIC_NAME_1>: [0, 1, 4, 12]
  #    <CONSUMER_NAME_2>:
  #      <TOPIC_NAME_2>:
  #    <CONSUMER_NAME_3>

So, if you want to grab the data using the old, you will have to do this in one iteration. I tried it this way, but even if i am done, should not be a option for larger clusters

require 'facter'
Facter.add('kafka_consumers_config') do
  setcode do
      kafka_consumers_cmd = '/opt/kafka/bin/ --bootstrap-server localhost:9092 --list'
      kafka_consumers_result = Facter::Core::Execution.exec(kafka_consumers_cmd)
      group_hash = {}
      kafka_consumers_result.each_line do |group|
        groupid = group.strip 
        kafka_consumer_topic_list_cmd="/opt/kafka/bin/ --bootstrap-server localhost:9092 --group #{groupid} --describe | sort | grep -v TOPIC  | awk {\'print $1\'} | uniq"
        kafka_consumer_topic_list_result = Facter::Core::Execution.exec(kafka_consumer_topic_list_cmd)
        kafka_consumer_topic_list_result.split(/\n/).reject { |c| c.empty? }
        topic_hash = {}
        kafka_consumer_topic_list_result.each_line do |topic|
           topicid = topic.strip
           kafka_consumer_topic_partition_cmd="/opt/kafka/bin/ --bootstrap-server localhost:9092 --group #{groupid} --describe | grep -v TOPIC | grep #{topicid} | awk {\'print $2\'} | sort"
           kafka_consumer_topic_partition_result.gsub("\n", ' ').squeeze(' ')
           topic_hash[topic] = kafka_consumer_topic_partition_result
      group_hash[group] = topic_hash    

This is posted only as a snapshot and maybe as a source of inspiration. For an actual working version, it should be done using Java or Scala in order to leverage the power of libraries (from what i know Python/Go and other programming languages have only libraries on consumer/producer part)

If i will pursue the task of rewriting this using only one interrogation loop or in Java/Scala, you will see it.


Distributing service conditionally on OS version


Since we are in the process of migrating to 16.04, my service restart script needed to be deployed with separate builds.

In that purpose, i found a fact that would help me, so that my standard file block transformed into this:

 case $facts['os']['distro']['codename']  {
    'xenial': {
        file {"/root/servicerestart":
        source => 'puppet:///modules/profiles/servicerestart-kafka-new',
        mode => '0755',
        replace => true,
    'trusty': { 
    file {"/root/servicerestart":
        source => 'puppet:///modules/profiles/servicerestart-kafka',
        mode => '0755',
        replace => true,

That should be all for now.

linux puppet

Order Linux processes by memory usage

This one is more for me actually. We have some issues with one puppet instance on which the processes fail, and i wanted to see if there is any way to order them by memory usage.

So i searched the net and found this link

The command is like

ps aux --sort -rss | head -10

And it provides you with following output, at least in my case

puppet    6327 70.1 25.5 3585952 1034532 ?     Sl   06:53   7:33 /usr/lib/jvm/java-7-openjdk-amd64/jre/bin/java -XX:OnOutOfMemoryError=kill -9 %p -javaagent:/usr/share/java/jolokia-jvm-agent.jar=port=8778 -Xms1024m -Xmx1024m -cp /opt/puppetlabs/server/apps/puppetserver/puppet-server-release.jar clojure.main -m puppetlabs.trapperkeeper.main --config /etc/puppetlabs/puppetserver/conf.d -b /etc/puppetlabs/puppetserver/bootstrap.cfg
jenkins   6776  9.6 16.6 4648236 671980 ?      Sl   06:55   0:51 /usr/bin/java -Djava.awt.headless=true -javaagent:/usr/share/java/jolokia-jvm-agent.jar=port=8780 -Xms1024m -Xmx1024m -jar /usr/share/jenkins/jenkins.war --webroot=/var/cache/jenkins/war --httpPort=8080 --httpListenAddress=
puppetdb  5987 16.8 11.7 3845896 474164 ?      Sl   06:52   2:01 /usr/bin/java -XX:OnOutOfMemoryError=kill -9 %p -Xmx192m -javaagent:/usr/share/java/jolokia-jvm-agent.jar=port=8779 -cp /opt/puppetlabs/server/apps/puppetdb/puppetdb.jar clojure.main -m puppetlabs.puppetdb.main --config /etc/puppetlabs/puppetdb/conf.d -b /etc/puppetlabs/puppetdb/bootstrap.cfg
postgres  1458  0.0  2.1 249512 88656 ?        Ss   Nov21   3:10 postgres: checkpointer process                                                                                              
postgres  6206  0.0  1.4 253448 57984 ?        Ss   06:53   0:00 postgres: puppetdb puppetdb idle                                                                           
postgres  6209  0.0  0.7 252580 29820 ?        Ss   06:53   0:00 postgres: puppetdb puppetdb idle                                                                           
postgres  6210  0.0  0.5 254892 22440 ?        Ss   06:53   0:00 postgres: puppetdb puppetdb idle                                                                           
postgres  6213  0.0  0.5 254320 21416 ?        Ss   06:53   0:00 postgres: puppetdb puppetdb idle                                                                           
postgres  6205  0.0  0.5 253524 20324 ?        Ss   06:53   0:00 postgres: puppetdb puppetdb idle                       

As you can probably see, the components are taking slowly but surely more and more memory and since the machine has only 4GB allocated it will probably crash again.

If this happens, i will manually increase the memory with another 2GB and see where we will go from there.



Puppet gems install workaround after TLS 1.0 switchoff


It seems that since Ruby disabled the TLS 1.0 protocol, there is an issue with installing custom gems in the puppet server.

If you run puppetserver gem environment you will probably see the following output:

/opt/puppetlabs/bin/puppetserver gem environment
RubyGems Environment:
  - RUBY VERSION: 1.9.3 (2015-06-10 patchlevel 551) [java]
  - INSTALLATION DIRECTORY: /opt/puppetlabs/server/data/puppetserver/jruby-gems
  - RUBY EXECUTABLE: java -jar /opt/puppetlabs/server/apps/puppetserver/puppet-server-release.jar
  - EXECUTABLE DIRECTORY: /opt/puppetlabs/server/data/puppetserver/jruby-gems/bin
  - SPEC CACHE DIRECTORY: /root/.gem/specs
  - SYSTEM CONFIGURATION DIRECTORY: file:/opt/puppetlabs/server/apps/puppetserver/puppet-server-release.jar!/META-INF/jruby.home/etc
    - ruby
    - universal-java-1.7
     - /opt/puppetlabs/server/data/puppetserver/jruby-gems
     - /root/.gem/jruby/1.9
     - file:/opt/puppetlabs/server/apps/puppetserver/puppet-server-release.jar!/META-INF/jruby.home/lib/ruby/gems/shared
     - :update_sources => true
     - :verbose => true
     - :backtrace => false
     - :bulk_threshold => 1000
     - "install" => "--no-rdoc --no-ri --env-shebang"
     - "update" => "--no-rdoc --no-ri --env-shebang"
     - /usr/local/sbin
     - /usr/local/bin
     - /usr/sbin
     - /usr/bin
     - /sbin
     - /bin
     - /usr/games
     - /usr/local/games
     - /opt/puppetlabs/bin

Also if you want to install a gem you will receive:

/opt/puppetlabs/bin/puppetserver gem install toml-rb
ERROR:  Could not find a valid gem 'toml-rb' (>= 0), here is why:
          Unable to download data from - Received fatal alert: protocol_version (

Short but unsafe fix for this is:

opt/puppetlabs/bin/puppetserver gem install --source "" toml-rb
Fetching: toml-rb-1.1.1.gem (100%)
Successfully installed toml-rb-1.1.1
WARNING:  Unable to pull data from '': Received fatal alert: protocol_version (
1 gem installed

It’s not that elegant, but it does the trick. You can also include this in an puppet exec block.


newtools puppet

Error 127, not related to Puppet or Golang


Something from my experience playing with Golang and Puppet code this morning.
I wrote a very very simple script to restart a service that you can find here
Today i wanted to put it on the machine and run it with puppet, so i wrote a very small class that looked like this:

class profiles_test::updatekafka {

package { 'golang':
  ensure => installed,
  name   => 'golang',
file {"/root/servicerestart.go":
    source => 'puppet:///modules/profiles_test/servicerestart.go',
    mode => '0644',
    replace => true,
exec { 'go build servicerestart.go':
    cwd => '/root',
    creates => '/root/servicerestart',
    path => ['/usr/bin', '/usr/sbin'],
} ->
exec { '/root/servicerestart':
  cwd     => '/root',
  path    => ['/usr/bin', '/usr/sbin','/root'],
  onlyif => 'ps -u kafka',

On execution, surprise, it kept throw this feedback:

08:19:44 Notice: /Stage[main]/Profiles_test::Updatekafka/File[/root/check_cluster_state.go]/ensure: defined content as '{md5}0817dbf82b74072e125f8b71ee914150'
08:19:44 Notice: /Stage[main]/Profiles_test::Updatekafka/Exec[go run servicerestart.go]/returns: panic: exit status 127
08:19:44 Notice: /Stage[main]/Profiles_test::Updatekafka/Exec[go run servicerestart.go]/returns: 
08:19:44 Notice: /Stage[main]/Profiles_test::Updatekafka/Exec[go run servicerestart.go]/returns: goroutine 1 [running]:
08:19:44 Notice: /Stage[main]/Profiles_test::Updatekafka/Exec[go run servicerestart.go]/returns: runtime.panic(0x4c2100, 0xc2100000b8)
08:19:44 Notice: /Stage[main]/Profiles_test::Updatekafka/Exec[go run servicerestart.go]/returns: 	/usr/lib/go/src/pkg/runtime/panic.c:266 +0xb6
08:19:44 Notice: /Stage[main]/Profiles_test::Updatekafka/Exec[go run servicerestart.go]/returns: main.check(0x7f45b08ed150, 0xc2100000b8)
08:19:44 Notice: /Stage[main]/Profiles_test::Updatekafka/Exec[go run servicerestart.go]/returns: 	/root/servicerestart.go:94 +0x4f
08:19:44 Notice: /Stage[main]/Profiles_test::Updatekafka/Exec[go run servicerestart.go]/returns: main.RunCommand(0x4ea7f0, 0x2c, 0x7f4500000000, 0x409928, 0x5d9f38)
08:19:44 Notice: /Stage[main]/Profiles_test::Updatekafka/Exec[go run servicerestart.go]/returns: 	/root/servicerestart.go:87 +0x112
08:19:44 Notice: /Stage[main]/Profiles_test::Updatekafka/Exec[go run servicerestart.go]/returns: main.GetBrokerList(0x7f45b08ecf00, 0xc21003fa00, 0xe)
08:19:44 Notice: /Stage[main]/Profiles_test::Updatekafka/Exec[go run servicerestart.go]/returns: 	/root/servicerestart.go:66 +0x3c
08:19:44 Notice: /Stage[main]/Profiles_test::Updatekafka/Exec[go run servicerestart.go]/returns: main.GetStatus(0xc21000a170)
08:19:44 Notice: /Stage[main]/Profiles_test::Updatekafka/Exec[go run servicerestart.go]/returns: 	/root/servicerestart.go:33 +0x1e
08:19:44 Notice: /Stage[main]/Profiles_test::Updatekafka/Exec[go run servicerestart.go]/returns: main.StopBroker()
08:19:44 Notice: /Stage[main]/Profiles_test::Updatekafka/Exec[go run servicerestart.go]/returns: 	/root/servicerestart.go:39 +0x21
08:19:44 Notice: /Stage[main]/Profiles_test::Updatekafka/Exec[go run servicerestart.go]/returns: main.main()
08:19:44 Notice: /Stage[main]/Profiles_test::Updatekafka/Exec[go run servicerestart.go]/returns: 	/root/servicerestart.go:15 +0x1e
08:19:44 Notice: /Stage[main]/Profiles_test::Updatekafka/Exec[go run servicerestart.go]/returns: exit status 2
08:19:44 Error: go run servicerestart.go returned 1 instead of one of [0]

The secret is in panic: exit status 127 and it seems that it’s not related to neither golang or puppet, but shell.
In my script they way to get the broker list is by

output1 := RunCommand("echo dump | nc localhost 2181 | grep brokers")

and the error is related to not having the required binaries in you path. For example if you run

whereis echo
echo: /bin/echo /usr/share/man/man1/echo.1.gz

So the right way to for the exec block is actually:

exec { '/root/servicerestart':
  cwd     => '/root',
  path    => ['/usr/bin', '/usr/sbin','/root','/bin','/sbin'],
  onlyif => 'ps -u kafka',

And then it will work.



Multiple classes block declaration in hiera will not work


Do not add multiple classes in hiera like this:

  - profiles::datadogagent
  - profiles::updatekafka

kafka::security: true
kafka::security_default: true
kafka::heap_size: 2048
 - profiles::pybackuplogs
 - profiles::group_coordinator

Class updatekafka will not be executed.

The structure should look like:

  - profiles::datadogagent
  - profiles::updatekafka
  - profiles::pybackuplogs
  - profiles::group_coordinator
kafka::security: true
kafka::security_default: true
kafka::heap_size: 2048


kafka puppet

Log rotate for Kafka Garbage collect without restart


If you have a Apache Kafka version which is below 1.0.0 and you don’t have garbage collect rotate as shown here:


-Xloggc:/opt/kafka/bin/../logs/kafkaServer-gc.log -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 -XX:GCLogFileSize=100M


-Xloggc:/opt/kafka/bin/../logs/kafkaServer-gc.log -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps

One option is to modify the parameters in order to include them in the process that starts but this involves also restarting the services.

However, you can use also logrotate daemon with the following configuration, and i will put the block that you need to add on hiera using:

   path: '/opt/kafka/logs/kafkaServer-gc.log'
   copytruncate: true
   rotate_every: 'day'
   compress: true
   missingok: true
   su: true
   su_owner: 'kafka'
   su_group: 'kafka'
   ifempty: false
   size: '50M'
   maxsize: '50M'
   rotate: 5

Or if you want to write it in a class, it should look like

$version = lookup('kafka::version')
        if ($_role =~ /\Akafka/) and ($version != '1.0.0') {
            logrotate::rule { 'kafkagc_logs':
             path => '/opt/kafka/logs/kafkaServer-gc.log',
             copytruncate => true,
             rotate => 5,
             rotate_every => 'daily',
             missingok => true,
             ifempty => false,
             su => true,
             su_owner => 'kafka',
             su_group => 'kafka',
             size => '50M',
             maxsize => '50M',