kafka python

Kafka_consumer.yaml (python style) and more


As a followup to the article i posted earlier ( ) , you can use that info to put in into kafka_consumer.yaml for Datadog integration.

It’s not elegant by any means, but it works. As an advise, please don’t over complicate thinks more than they need.

In the last example i figured i wanted to create a list of GroupInfo objects for each line that was returned from consumer group script. Bad idea as you shall see below

So, in addition to what i wrote in the last article, now it’s not just printing the dictionary but order it, by partition.

def constructgroupdict():
 groupagregate = {}
 group_list = getgroups()
 for group in group_list:
    groupagregate[group] = getgroupinfo(group)
 for v in groupagregate.values():
    v.sort(key = lambda re: int(re.partition))
 return groupagregate

def printgroupdict():
 groupdict = constructgroupdict()
 infile = open('/etc/datadog-agent/conf.d/kafka_consumer.d/kafka_consumer.yaml.template','a')
 for k,v in groupdict.items():
    infile.write('      '+k+':\n')
    topics = []
    testdict = {}
    for re in v:
        if re.topic not in topics:
    for x in topics:
        partitions = []
        for re in v:
           if (re.topic == x):
        testdict[x] = partitions
    for gr,partlst in testdict.items():
        infile.write('        '+gr+': ['+', '.join(partlst)+']\n')

And after that, it’s quite hard to get only the unique value for the topic name.

The logic i chose to grab all the data per consumer group is related to the fact that querying the cluster takes a very long time, so if i wanted to grab another set of data filtered by topic, i would have been very time costly.

In the way that is written now, there are a lot of for loop, that should become challenging in care there are too many records to process. Fortunately, this should not be the case for consumer groups in a normal case.

The easiest way to integrate the info in kafka_consumer.yaml, in our case is to create a template called kafka_consumer.yaml.template

  # Customize the ZooKeeper connection timeout here
  # zk_timeout: 5
  # Customize the Kafka connection timeout here
  # kafka_timeout: 5
  # Customize max number of retries per failed query to Kafka
  # kafka_retries: 3
  # Customize the number of seconds that must elapse between running this check.
  # When checking Kafka offsets stored in Zookeeper, a single run of this check
  # must stat zookeeper more than the number of consumers * topic_partitions
  # that you're monitoring. If that number is greater than 100, it's recommended
  # to increase this value to avoid hitting zookeeper too hard.
  # min_collection_interval: 600
  # Please note that to avoid blindly collecting offsets and lag for an
  # unbounded number of partitions (as could be the case after introducing
  # the self discovery of consumer groups, topics and partitions) the check
  # will collect at metrics for at most 200 partitions.

  # In a production environment, it's often useful to specify multiple
  # Kafka / Zookeper nodes for a single check instance. This way you
  # only generate a single check process, but if one host goes down,
  # KafkaClient / KazooClient will try contacting the next host.
  # Details:
  # If you wish to only collect consumer offsets from kafka, because
  # you're using the new style consumers, you can comment out all
  # zk_* configuration elements below.
  # Please note that unlisted consumer groups are not supported at
  # the moment when zookeeper consumer offset collection is disabled.
  - kafka_connect_str:
      - localhost:9092
      - localhost:2181
    # zk_iteration_ival: 1  # how many seconds between ZK consumer offset
                            # collections. If kafka consumer offsets disabled
                            # this has no effect.
    # zk_prefix: /0.8

    # SSL Configuration

    # ssl_cafile: /path/to/pem/file
    # security_protocol: PLAINTEXT
    # ssl_check_hostname: True
    # ssl_certfile: /path/to/pem/file
    # ssl_keyfile: /path/to/key/file
    # ssl_password: password1

    # kafka_consumer_offsets: false

It’s true that i keep only one string for connectivity on Kafka and Zookeeper, and that things are a little bit more complicated once SSL is configured (but this is not our case, yet).

  - kafka_connect_str:
      - localhost:9092
      - localhost:2181

And append the info at the bottom of it after which it is renamed. Who is putting that template back? Easy, that would be puppet.

It works, it has been tested. One last thing that i wanted to warn you about.

There is a limit of metrics that can be uploaded per machine, and that is 350. Please be aware and think very serious if you want to activate it.

Than would be all for today.



Python dictionary construction from process list


This is out of my expertise but i wanted to shared it anyways. One colleague wanted to help him with the creation of a pair key:value from one command that lists the processes, in python. With a little bit of testing i came to the following form:

import os
import subprocess
from subprocess import Popen, PIPE
username = subprocess.Popen(['/bin/ps','-eo','pid,uname'], stdout=PIPE, stderr=PIPE)
firstlist ='\n')
dict = {}
for str in firstlist:
  if (str != ''):
    secondlist = str.split()
    key = secondlist[0]
    value = secondlist[1]

Now, i think there are better ways to write this but it works also in this way.
If you find better ways, please leave a message 🙂


linux newtools

Installing Jupyter Notebook on Raspberry PI 2


Just want to share you that i managed to install the Jupyter Notebook( on a Raspberry PI 2 without any real problems. Beside a microSD card and a Raspberry you need to read this and that would be all.
So, you will need a image of Raspbian from (i selected the lite version without the GUI, you really don’t need that actually). In installed it on the card with Linux so i executed a command similar with dd if=[path_to_image]/[image_name] of=[sd_device_name taken from fdisk -l without partition id usually /dev/mmcblk0] bs=4MB; sync. The sync command is added just to be sure that all files are syncronized to card before remove it. We have now a working image that we can use on raspberry, it’s fair to try boot it.
Once it’s booted login with user pi and password raspberry. I am a fan of running the resize steps which you can find here
Ok, so we are good to go on installing Jupyter Notebook, at first we need to check what Python version we have installed and in my case it was 2.7.13 (it should be shown by running python –version). In this case then we need to use pip for this task, and it’s not present by default on the image.
Run sudo apt-get install python-pip, after this is done please run pip install jupyter. It will take some time, but when it is done you will have a fresh installation in pi homedir(/home/pi/.local).
It is true that we need also a service, and in order to do that, please create following path with following file:

Description=Jupyter Notebook

# Step 1 and Step 2 details are here..
# ------------------------------------
ExecStart=/home/pi/.local/bin/jupyter-notebook --config=/home/pi/.jupyter/


You are probably wondering from where do you get the config file. This will be easy, just run /home/pi/.local/bin/jupyter notebook –generate-config

After the file is created, in order to activate the service and enable it there are sudo systemctl enable jupyter.service and sudo systemctl start jupyter.service

You have now a fresh and auto managed jupyter service. It will be started only on the localhost by default, but in the next article i will tell you also the modifications to be executed in order to run it remotely and also install scala kernel.