• Install zookeeper using puppet without module

    Hi,

    In this post, I was given the task to provide a standalone zookeeper cluster with basic auth on the latest version.

    The reason that happened is that we are using a very old module on our Kafka clusters and a new requirement appeared to install the latest version of 3.5.5.

    The old module had only the possibility to install the package from apt repo, which was not an option since the last version available on Ubuntu Xenial is at least two years old.

    To complete this task, a different method was required. I would have to grab it with wget and add the rest of the files to make it functional.

    Let us start with the puppet manifest and from that, I will add the rest.

    class zookeeperstd {
      $version = hiera("zookeeperstd::version","3.5.5")
      $authenabled = hiera("zookeeperstd::authenabled",false)
      $server_jvm_flags = hiera('zookeeperstd::jvm_flags', undef)
        group { 'zookeeper':
            ensure => 'present',
        } 
        user {'zookeeper':
            ensure => 'present',
            home => '/var/lib/zookeeper',
            shell => '/bin/false',
            }
        wget::fetch { 'zookeeper':
            source      => "https://www-eu.apache.org/dist/zookeeper/stable/apache-zookeeper-${version}-bin.tar.gz",
            destination => "/opt/apache-zookeeper-${version}-bin.tar.gz",
            } ->
        archive { "/opt/apache-zookeeper-${version}-bin.tar.gz":
            creates      => "/opt/apache-zookeeper-${version}-bin",
            ensure        => present,
            extract       => true,
            extract_path  => '/opt',
            cleanup       => true,
        } ->
        file { "/opt/apache-zookeeper-${version}-bin":
            ensure    => directory,
            owner     => 'zookeeper',
            group      => 'zookeeper',
            require     => [ User['zookeeper'], Group['zookeeper'], ],
            recurse => true,
        } ->
        file { '/opt/zookeeper/':
            ensure    => link,
            target    => "/opt/apache-zookeeper-${version}-bin",
            owner     => 'zookeeper',
            group      => 'zookeeper',
            require     => [ User['zookeeper'], Group['zookeeper'], ],
        }
        file { '/var/lib/zookeeper':
            ensure    => directory,
            owner     => 'zookeeper',
            group      => 'zookeeper',
            require     => [ User['zookeeper'], Group['zookeeper'], ],
            recurse    => true,
        }
    # in order to know which servers are in the cluster a role fact needs to be defined on each machine
        $hostshash = query_nodes(" v1_role='zookeeperstd'").sort
        $hosts_hash = $hostshash.map |$value| { [$value, seeded_rand(254, $value)+1] }.hash
        $overide_hosts_hash = hiera_hash('profiles_opqs::kafka_hosts_hash', $hosts_hash)
        $overide_hosts = $overide_hosts_hash.keys.sort
        if $overide_hosts_hash.size() != $overide_hosts_hash.values.unique.size() {
            #notify {"Duplicate IDs detected! ${overide_hosts_hash}": }
            $overide_hosts_hash2 = $hosts.map |$index, $value| { [$value, $index+1] }.hash
      } else {
            $overide_hosts_hash2 = $overide_hosts_hash
        }
    	$hosts = $overide_hosts_hash2
    	$data_dir = "/var/lib/zookeeper"
    	$tick_time        = 2000
            $init_limit       = 10
            $sync_limit       = 5
    
    	$myid = $hosts[$::fqdn]
        file { '/var/lib/zookeeper/myid':
            content => "${myid}",
        }
    
    	file { '/opt/zookeeper/conf/zoo.cfg':
            content => template("${module_name}/zoo.cfg.erb"),
       }
       if $authenabled {
       
        $superpass        = hiera("zookeeperstd::super_pass", 'super-admin')
        $zoopass          = hiera("zookeeperstd::zookeeper_pass", 'zookeeper-admin')
        $clientpass        = hiera("zookeeperstd::client_pass", 'client-admin')
        
        file { '/opt/zookeeper/conf/zoo_jaas.config':
            content => template("${module_name}/zoo_jaas.config.erb"),
       }
       }
         file { '/opt/zookeeper/conf/java.env':
            content => template("${module_name}/java.zookeeper.env.erb"),
            mode => "0755",
        }
         file { '/opt/zookeeper/conf/log4j.properties':
            content => template("${module_name}/log4j.zookeeper.properties.erb"),
        }
       
        file {'/etc/systemd/system/zookeeper.service':
            source  => 'puppet:///modules/work/zookeeper.service',
            mode => "644",
            } ->
        service { 'zookeeper':
            ensure   => running,
            enable   => true,
            provider => systemd,
            }
    }
    

    As far as I managed to adapt some file from the existing module, here are the rest of the additional details.

    #zoo.cfg.erb
    # Note: This file is managed by Puppet.
    
    # http://hadoop.apache.org/zookeeper/docs/current/zookeeperAdmin.html
    
    # specify all zookeeper servers
    # The fist port is used by followers to connect to the leader
    # The second one is used for leader election
    <%
    if @hosts
    # sort hosts by myid and output a server config
    # for each host and myid.  (sort_by returns an array of key,value tuples)
    @hosts.sort_by { |name, id| id }.each do |host_id|
    -%>
    server.<%= host_id[1] %>=<%= host_id[0] %>:2182:2183
    <% if @authenabled -%>
    authProvider.<%= host_id[1] %>=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
    <% end -%>
    <% end -%>
    <% end -%>
    
    # the port at which the clients will connect
    clientPort=2181
    
    # the directory where the snapshot is stored.
    dataDir=<%= @data_dir %>
    
    # Place the dataLogDir to a separate physical disc for better performance
    <%= @data_log_dir ? "dataLogDir=#{data_log_dir}" : '# dataLogDir=/disk2/zookeeper' %>
    
    
    # The number of milliseconds of each tick.
    tickTime=<%= @tick_time %>
    
    # The number of ticks that the initial
    # synchronization phase can take.
    initLimit=<%= @init_limit %>
    
    # The number of ticks that can pass between
    # sending a request and getting an acknowledgement
    syncLimit=<%= @sync_limit %>
    
    # To avoid seeks ZooKeeper allocates space in the transaction log file in
    # blocks of preAllocSize kilobytes. The default block size is 64M. One reason
    # for changing the size of the blocks is to reduce the block size if snapshots
    # are taken more often. (Also, see snapCount).
    #preAllocSize=65536
    
    # Clients can submit requests faster than ZooKeeper can process them,
    # especially if there are a lot of clients. To prevent ZooKeeper from running
    # out of memory due to queued requests, ZooKeeper will throttle clients so that
    # there is no more than globalOutstandingLimit outstanding requests in the
    # system. The default limit is 1,000.ZooKeeper logs transactions to a
    # transaction log. After snapCount transactions are written to a log file a
    # snapshot is started and a new transaction log file is started. The default
    # snapCount is 10,000.
    #snapCount=1000
    
    # If this option is defined, requests will be will logged to a trace file named
    # traceFile.year.month.day.
    #traceFile=
    
    # Leader accepts client connections. Default value is "yes". The leader machine
    # coordinates updates. For higher update throughput at thes slight expense of
    # read throughput the leader can be configured to not accept clients and focus
    # on coordination.
    #leaderServes=yes
    
    <% if @authenabled -%>
    
    requireClientAuthScheme=sasl
    quorum.auth.enableSasl=true
    quorum.auth.learnerRequireSasl=true
    quorum.auth.serverRequireSasl=true
    quorum.auth.learner.loginContext=QuorumLearner
    quorum.auth.server.loginContext=QuorumServer
    quorum.cnxn.threads.size=20
    
    <% end -%> 
    
    #zoo_jaas.config
    QuorumServer {
           org.apache.zookeeper.server.auth.DigestLoginModule required
           user_zookeeper="<%= @zoopass %>";
    };
     
    QuorumLearner {
           org.apache.zookeeper.server.auth.DigestLoginModule required
           username="zookeeper"
           password="<%= @zoopass %>";
    };
    
    Server {
           org.apache.zookeeper.server.auth.DigestLoginModule required
           user_super="<%= @superpass %>"
           user_client="<%= @clientpass %>";
    };
    
    #java.zookeeper.env.erb
    ZOO_LOG4J_PROP="INFO,ROLLINGFILE"
    SERVER_JVMFLAGS="<%= @server_jvm_flags %>"
    
    #log4j.zookeeper.properties.erb
    # Note: This file is managed by Puppet.
    
    #
    # ZooKeeper Logging Configuration
    #
    
    # Format is "<default threshold> (, <appender>)+
    
    log4j.rootLogger=${zookeeper.root.logger}, ROLLINGFILE
    
    #
    # Log INFO level and above messages to the console
    #
    log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
    log4j.appender.CONSOLE.Threshold=INFO
    log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
    log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} - %-5p [%t:%C{1}@%L] - %m%n
    
    #
    # Add ROLLINGFILE to rootLogger to get log file output
    #    Log INFO level and above messages to a log file
    log4j.appender.ROLLINGFILE=org.apache.log4j.RollingFileAppender
    log4j.appender.ROLLINGFILE.Threshold=INFO
    log4j.appender.ROLLINGFILE.File=${zookeeper.log.dir}/zookeeper.log
    
    # Max log file size of 10MB
    log4j.appender.ROLLINGFILE.MaxFileSize=10MB
    # Keep only 10 files
    log4j.appender.ROLLINGFILE.MaxBackupIndex=10
    log4j.appender.ROLLINGFILE.layout=org.apache.log4j.PatternLayout
    log4j.appender.ROLLINGFILE.layout.ConversionPattern=%d{ISO8601} - %-5p [%t:%C{1}@%L] - %m%n
    

    And the last but not the least.

    [Unit]
    Description=ZooKeeper Service
    Documentation=http://zookeeper.apache.org
    Requires=network.target
    After=network.target
    
    [Service]
    Type=forking
    User=zookeeper
    Group=zookeeper
    ExecStart=/opt/zookeeper/bin/zkServer.sh start /opt/zookeeper/conf/zoo.cfg
    ExecStop=/opt/zookeeper/bin/zkServer.sh stop /opt/zookeeper/conf/zoo.cfg
    ExecReload=/opt/zookeeper/bin/zkServer.sh restart /opt/zookeeper/conf/zoo.cfg
    WorkingDirectory=/var/lib/zookeeper
    
    [Install]
    WantedBy=default.target
    

    Also, if you want to enable simple MD5 authentication, in hiera you will need to add the following two lines.

    zookeeperstd::authenabled: true
    zookeeperstd::jvm_flags: "-Djava.security.auth.login.config=/opt/zookeeper/conf/zoo_jaas.config"
    
    

    If there is a simpler approach, feel free to leave me a message on Linkedin or Twitter.

    Cheers

  • Logs check without ELK 🙂

    Hi,

    We didn’t have the time to implement ELK stack for Kafka logs so if a issue appears it should be done the old fashion way.

    To that purpose, here are two commands that should help you surfing the logs in an easy manner.

    First of all, there is the grep command that should show you the hole line and number.

    A simple example looks like

    grep -nw "2019-06-03" server.log

    This should show you all the lines with date 03.06 from the log of the Kafka broker. The idea is that you can not use it with the standard construct cat server.log | grep -nw “[string]”. It must be used in this specific format.

    Once you found the line number (and it could look just like 95138:java.lang.OutOfMemoryError: Java heap space there is the less command that we can use.

    less +95138 server.log

    And that should give you the line.

    Thanks all folks!

  • Editing Windows registry entries(hive) from Linux

    Hi,

    I want to share this with you and also it’s also very useful for me in case i come to this problem again.

    The main reason for this is related to the fact that my Windows installation will not boot anymore. We thought that it was something related to a registry entry, so i started to take a look how can registries be modified from Linux (later saw that it was much easier to do it with a bootable Windows stick, a cmd window and regedit)

    From what i managed to research on the net, it seems that the only tool for this scope is chntpw

    I will no go into details o installing this tool, you can find that in different posts from other sites. What i consider important is how you find the right “hive” to edit (and by hive they understand directory structure)

    So, in order to have access to the registry, you will need to mount the Windows partition on Linux.

    It should be relatively easy, find the partition using sudo fdisk -l and after that for example mkdir /mnt/windows; sudo mount /dev/sda2 (for example since sda1 should be the boot partition) /mnt/windows

    After it is mounted, the easiest way to see the trees is by listing the content of /mnt/windows/Windows/System32/config

    And it should look similar to this:

    -rwxrwxrwx 1 root root 5505024 May 9 10:50 DRIVERS
    -rwxrwxrwx 1 root root 2776 May 14 15:35 netlogon.ftl
    -rwxrwxrwx 1 root root 18612224 May 15 2019 SYSTEM
    -rwxrwxrwx 1 root root 96206848 May 15 2019 SOFTWARE
    -rwxrwxrwx 1 root root 786432 May 15 2019 DEFAULT
    -rwxrwxrwx 2 root root 53215232 May 15 2019 COMPONENTS

    And of course a lot more other directories.

    To edit one of the trees it as simple as running

    me@mintworkstation:/mnt/windows/Windows/System32/config$ chntpw -e SYSTEM
    chntpw version 1.00 140201, (c) Petter N Hagen
    Hive name (from header): <\Windows\system32\config\SYSTEM>
    ROOT KEY at offset: 0x001020 * Subkey indexing type is: 686c
    File size 18612224 [11c0000] bytes, containing 3776 pages (+ 1 headerpage)
    Used for data: 257785/18420128 blocks/bytes, unused: 13/1632 blocks/bytes.

    Simple registry editor. ? for help.

    And the simplest way to navigate is by using ls to list and cd to change o a smaller tree (please put the name of the tree without “<” and “>” like, cd Software for example.

    Once you arrived at the record you want to edit just call ed [record_name] It will show you the actual value and ask you what is the update one.

    Once the changes are done, just press q and it will as you to same the registry hive. After it is saved, you are all done.

    That would be all. Cheers.

  • Jolokia particular case using custom facts in Hiera

    Hi,

    This is for me and also for all the other people that are searching for how to use custom defined types in Hiera

    In my case i wanted to activate the HTTP endpoint of Jolokia using custom hostname and standard port. And for that it was sufficient to add in my host yaml the following lines

    profiles::kafka::jolokia: "-javaagent:/usr/share/java/jolokia-jvm-agent.jar=port=8778,host=%{::networking.fqdn}"

    This contains the standard fact called networking, which is a hash, and i am using the key that is called fqdn.

    And it works.

    Cheers

  • Kafka_consumer.yaml (python style) and more

    Hi,

    As a followup to the article i posted earlier ( https://log-it.tech/2019/03/15/get-the-info-you-need-from-consumer-group-python-style/ ) , you can use that info to put in into kafka_consumer.yaml for Datadog integration.

    It’s not elegant by any means, but it works. As an advise, please don’t over complicate thinks more than they need.

    In the last example i figured i wanted to create a list of GroupInfo objects for each line that was returned from consumer group script. Bad idea as you shall see below

    So, in addition to what i wrote in the last article, now it’s not just printing the dictionary but order it, by partition.

    def constructgroupdict():
     groupagregate = {}
     group_list = getgroups()
     for group in group_list:
        groupagregate[group] = getgroupinfo(group)
     
     for v in groupagregate.values():
        v.sort(key = lambda re: int(re.partition))
     
     return groupagregate
    
    def printgroupdict():
     groupdict = constructgroupdict()
     infile = open('/etc/datadog-agent/conf.d/kafka_consumer.d/kafka_consumer.yaml.template','a')
     for k,v in groupdict.items():
        infile.write('      '+k+':\n')
        topics = []
        testdict = {}
        for re in v:
            if re.topic not in topics:
               topics.append(re.topic)
        for x in topics:
            partitions = []
            for re in v:
               if (re.topic == x):
                  partitions.append(re.partition)
            testdict[x] = partitions
        for gr,partlst in testdict.items():
            infile.write('        '+gr+': ['+', '.join(partlst)+']\n')
     infile.close()
     os.rename('/etc/datadog-agent/conf.d/kafka_consumer.d/kafka_consumer.yaml.template','/etc/datadog-agent/conf.d/kafka_consumer.d/kafka_consumer.yaml')
      
    printgroupdict()
    

    And after that, it’s quite hard to get only the unique value for the topic name.

    The logic i chose to grab all the data per consumer group is related to the fact that querying the cluster takes a very long time, so if i wanted to grab another set of data filtered by topic, i would have been very time costly.

    In the way that is written now, there are a lot of for loop, that should become challenging in care there are too many records to process. Fortunately, this should not be the case for consumer groups in a normal case.

    The easiest way to integrate the info in kafka_consumer.yaml, in our case is to create a template called kafka_consumer.yaml.template

    init_config:
      # Customize the ZooKeeper connection timeout here
      # zk_timeout: 5
      # Customize the Kafka connection timeout here
      # kafka_timeout: 5
      # Customize max number of retries per failed query to Kafka
      # kafka_retries: 3
      # Customize the number of seconds that must elapse between running this check.
      # When checking Kafka offsets stored in Zookeeper, a single run of this check
      # must stat zookeeper more than the number of consumers * topic_partitions
      # that you're monitoring. If that number is greater than 100, it's recommended
      # to increase this value to avoid hitting zookeeper too hard.
      # https://docs.datadoghq.com/agent/faq/how-do-i-change-the-frequency-of-an-agent-check/
      # min_collection_interval: 600
      #
      # Please note that to avoid blindly collecting offsets and lag for an
      # unbounded number of partitions (as could be the case after introducing
      # the self discovery of consumer groups, topics and partitions) the check
      # will collect at metrics for at most 200 partitions.
    
    
    instances:
      # In a production environment, it's often useful to specify multiple
      # Kafka / Zookeper nodes for a single check instance. This way you
      # only generate a single check process, but if one host goes down,
      # KafkaClient / KazooClient will try contacting the next host.
      # Details: https://github.com/DataDog/dd-agent/issues/2943
      #
      # If you wish to only collect consumer offsets from kafka, because
      # you're using the new style consumers, you can comment out all
      # zk_* configuration elements below.
      # Please note that unlisted consumer groups are not supported at
      # the moment when zookeeper consumer offset collection is disabled.
      - kafka_connect_str:
          - localhost:9092
        zk_connect_str:
          - localhost:2181
        # zk_iteration_ival: 1  # how many seconds between ZK consumer offset
                                # collections. If kafka consumer offsets disabled
                                # this has no effect.
        # zk_prefix: /0.8
    
        # SSL Configuration
    
        # ssl_cafile: /path/to/pem/file
        # security_protocol: PLAINTEXT
        # ssl_check_hostname: True
        # ssl_certfile: /path/to/pem/file
        # ssl_keyfile: /path/to/key/file
        # ssl_password: password1
    
        # kafka_consumer_offsets: false
        consumer_groups:
    

    It’s true that i keep only one string for connectivity on Kafka and Zookeeper, and that things are a little bit more complicated once SSL is configured (but this is not our case, yet).

      - kafka_connect_str:
          - localhost:9092
        zk_connect_str:
          - localhost:2181

    And append the info at the bottom of it after which it is renamed. Who is putting that template back? Easy, that would be puppet.

    It works, it has been tested. One last thing that i wanted to warn you about.

    There is a limit of metrics that can be uploaded per machine, and that is 350. Please be aware and think very serious if you want to activate it.

    Than would be all for today.

    Cheers

  • Get the info you need from consumer group (python style)

    Hi,

    For some of you might be of help. If it’s rubbish, i truly apologize, please leave a comment with improvements 🙂

    import subprocess
    import socket
    
    fqdn = socket.getfqdn()
    
    class GroupInfo:
     def __init__(self, line):
         self.topic = line[0]
         self.partition = line[1]
         self.current_offset = line[2]
         self.logend_offset = line[3]
         self.lag = line[4]
         self.consumer_id = line[5]
         self.host = line[6]
         self.client_id = line[7]
     def __str__(self):
        return self.topic+" "+self.partition+" "+self.current_offset+" "+self.logend_offset+" "+self.lag+" "+self.consumer_id
    
    def getgroups():
    
     cmd = ['/opt/kafka/bin/kafka-consumer-groups.sh --bootstrap-server '+fqdn+':9092 --list']
     result = subprocess.check_output(cmd, shell=True).splitlines()
     group_list = []
     for r in result:
        try:
           rstr = r.decode('utf-8')
        except:
           print('Result can not be converted to utf-8')
        group_list.append(rstr)
    
     return group_list
    
    def getgroupinfo(groupid):
     
     cmd = ('/opt/kafka/bin/kafka-consumer-groups.sh --bootstrap-server '+fqdn+':9092 --group '+groupid+' --describe')
     process = subprocess.Popen(cmd, stdout=subprocess.PIPE, shell=True)
     result = subprocess.check_output(('grep -v TOPIC'),stdin = process.stdout, shell=True).splitlines()
     process.wait()
     group_info_list = []
     for r in result:
         try:
           rstr = r.decode('utf-8')
         except:
           print('Result can not be converted to utf-8')
           print(rstr.split())
         if len(rstr.split()) == 0:
            pass
         else:
            group_info = GroupInfo(rstr.split())
            group_info_list.append(group_info)
     
     return group_info_list
    
    def main():
     groupagregate = {}
     group_list = getgroups()
     for group in group_list:
        groupagregate[group] = getgroupinfo(group)
     
     for k, v in groupagregate.items():
        print(k)
        for re in v:
          print(re.__str__())
    
    main()

    I will not explain it. It should be self explanatory.

    Cheers

  • Kafka consumer group info retrieval using Python

    Hi,

    I’ve been playing with kafka-python module to grab the info i need in order to reconfigure Datadog integration.

    Unfortunately, there is a catch also on this method. And i will show you below.

    Here is a little bit of not so elegant code.

    from kafka import BrokerConnection
    from kafka.protocol.admin import *
    import socket
    
    fqdn = socket.getfqdn()
    bc = BrokerConnection(fqdn,9092,socket.AF_INET)
    try:
      bc.connect_blocking()
    except Exception as e:
      print(e)
    if bc.connected():
       print("Connection to", fqdn, " established")
    
    def getgroup():
     list_groups_request = ListGroupsRequest_v1()
    
     future0 = bc.send(list_groups_request)
     while not future0.is_done:
         for resp, f in bc.recv():
             f.success(resp)
     group_ids = ()
     for group in future0.value.groups:
         group_ids += (group[0],)
    
     print(group_ids)
         
     description = DescribeGroupsRequest_v1(group_ids)
     future1 = bc.send(description)
     while not future1.is_done:
        for resp, f in bc.recv():
            f.success(resp)
    
     for groupid in future1.value.groups:
         print('For group ',groupid[1],':\n')
         for meta in groupid[5]:
             print(meta[0],meta[2],sep="\n")
             print(meta[3])
     if future1.is_done:
        print("Group query is done")
    
    getgroup()
    

    As you will see, print(meta[3]) will return a very ugly binary data with topic names in it, that is not converted if you try with meta[3].decode(‘utf-8’)

    I hope i can find a way to decrypt it.

    Cheers

  • Fact for kafka_consumer hash…or kind of

    Hi,

    There is a late requirement that we activate the kafka_consumer functionality of Datadog.

    Unfortunately this is a challenge if you don’t have a fixed number of consumer groups and topics (on one client we had a couple of hundreds consumer groups)

    Here is how it should look in the example file

      #  consumer_groups:
      #    <CONSUMER_NAME_1>:
      #      <TOPIC_NAME_1>: [0, 1, 4, 12]
      #    <CONSUMER_NAME_2>:
      #      <TOPIC_NAME_2>:
      #    <CONSUMER_NAME_3>

    So, if you want to grab the data using the old kafka-consumer-groups.sh, you will have to do this in one iteration. I tried it this way, but even if i am done, should not be a option for larger clusters

    require 'facter'
    Facter.add('kafka_consumers_config') do
      setcode do
          kafka_consumers_cmd = '/opt/kafka/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list'
          kafka_consumers_result = Facter::Core::Execution.exec(kafka_consumers_cmd)
          kafka_consumers_result.to_s.split(/\n/)
          group_hash = {}
          kafka_consumers_result.each_line do |group|
            groupid = group.strip 
            kafka_consumer_topic_list_cmd="/opt/kafka/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group #{groupid} --describe | sort | grep -v TOPIC  | awk {\'print $1\'} | uniq"
            kafka_consumer_topic_list_result = Facter::Core::Execution.exec(kafka_consumer_topic_list_cmd)
            kafka_consumer_topic_list_result.split(/\n/).reject { |c| c.empty? }
            topic_hash = {}
            kafka_consumer_topic_list_result.each_line do |topic|
               topicid = topic.strip
               kafka_consumer_topic_partition_cmd="/opt/kafka/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group #{groupid} --describe | grep -v TOPIC | grep #{topicid} | awk {\'print $2\'} | sort"
               kafka_consumer_topic_partition_result=Facter::Core::Execution.exec(kafka_consumer_topic_partition_cmd)
               kafka_consumer_topic_partition_result.gsub("\n", ' ').squeeze(' ')
               topic_hash[topic] = kafka_consumer_topic_partition_result
            end
          group_hash[group] = topic_hash    
          end
        group_hash
      end
    end
    
    

    This is posted only as a snapshot and maybe as a source of inspiration. For an actual working version, it should be done using Java or Scala in order to leverage the power of libraries (from what i know Python/Go and other programming languages have only libraries on consumer/producer part)

    If i will pursue the task of rewriting this using only one interrogation loop or in Java/Scala, you will see it.

  • Python for opening and reading files

    Since I started learning Python and hopefully take also a certification, I am trying to do some hands-on.

    Nothing too complicated, just some basic exercises, for the moment. Here is one of them.

    They want to though you to open the and file and read from it like:

    from sys import argv
    
    script, filename = argv
    txt = open(filename)
    print(f"Here is your file: {filename}")
    print(txt.read())

    But there is a thing not to be ignored, and that is exception handling.

    from sys import argv
    
    try:
        script, filename = argv
        txt = open(filename)
        print(f"Here is your file: {filename}")
        print(txt.read())
    except ValueError as e:
        if "not enough" in e.__str__():
            print("not enough arguments given")
        else:
            print("too many arguments given")
    except FileNotFoundError:
        print("file given, not found")

    And since you can’t really discern between ValueError messages, or at least I don’t know how to do that more elegant, yet, we have a workaround.

    There is also a FileNotFoundError which shouldn’t be left unchecked, and if I missed something, leave a comment.

    P.S: I know there is also the possibility for the file not to have reading permissions. That demonstrates that for three lines of code you need to threat even more exceptional cases.

    Cheers!

  • Distributing service conditionally on OS version

    Hi,

    Since we are in the process of migrating to 16.04, my service restart script needed to be deployed with separate builds.

    In that purpose, i found a fact that would help me, so that my standard file block transformed into this:

     case $facts['os']['distro']['codename']  {
        'xenial': {
            file {"/root/servicerestart":
            source => 'puppet:///modules/profiles/servicerestart-kafka-new',
            mode => '0755',
            replace => true,
    	}
        }
        'trusty': { 
        file {"/root/servicerestart":
            source => 'puppet:///modules/profiles/servicerestart-kafka',
            mode => '0755',
            replace => true,
    	}
    }
      }
    

    That should be all for now.