Categories
kafka python

Get the info you need from consumer group (python style)

Hi,

For some of you might be of help. If it’s rubbish, i truly apologize, please leave a comment with improvements 🙂

import subprocess
import socket

fqdn = socket.getfqdn()

class GroupInfo:
 def __init__(self, line):
     self.topic = line[0]
     self.partition = line[1]
     self.current_offset = line[2]
     self.logend_offset = line[3]
     self.lag = line[4]
     self.consumer_id = line[5]
     self.host = line[6]
     self.client_id = line[7]
 def __str__(self):
    return self.topic+" "+self.partition+" "+self.current_offset+" "+self.logend_offset+" "+self.lag+" "+self.consumer_id

def getgroups():

 cmd = ['/opt/kafka/bin/kafka-consumer-groups.sh --bootstrap-server '+fqdn+':9092 --list']
 result = subprocess.check_output(cmd, shell=True).splitlines()
 group_list = []
 for r in result:
    try:
       rstr = r.decode('utf-8')
    except:
       print('Result can not be converted to utf-8')
    group_list.append(rstr)

 return group_list

def getgroupinfo(groupid):
 
 cmd = ('/opt/kafka/bin/kafka-consumer-groups.sh --bootstrap-server '+fqdn+':9092 --group '+groupid+' --describe')
 process = subprocess.Popen(cmd, stdout=subprocess.PIPE, shell=True)
 result = subprocess.check_output(('grep -v TOPIC'),stdin = process.stdout, shell=True).splitlines()
 process.wait()
 group_info_list = []
 for r in result:
     try:
       rstr = r.decode('utf-8')
     except:
       print('Result can not be converted to utf-8')
       print(rstr.split())
     if len(rstr.split()) == 0:
        pass
     else:
        group_info = GroupInfo(rstr.split())
        group_info_list.append(group_info)
 
 return group_info_list

def main():
 groupagregate = {}
 group_list = getgroups()
 for group in group_list:
    groupagregate[group] = getgroupinfo(group)
 
 for k, v in groupagregate.items():
    print(k)
    for re in v:
      print(re.__str__())

main()

I will not explain it. It should be self explanatory.

Cheers

Categories
kafka python

Kafka consumer group info retrieval using Python

Hi,

I’ve been playing with kafka-python module to grab the info i need in order to reconfigure Datadog integration.

Unfortunately, there is a catch also on this method. And i will show you below.

Here is a little bit of not so elegant code.

from kafka import BrokerConnection
from kafka.protocol.admin import *
import socket

fqdn = socket.getfqdn()
bc = BrokerConnection(fqdn,9092,socket.AF_INET)
try:
  bc.connect_blocking()
except Exception as e:
  print(e)
if bc.connected():
   print("Connection to", fqdn, " established")

def getgroup():
 list_groups_request = ListGroupsRequest_v1()

 future0 = bc.send(list_groups_request)
 while not future0.is_done:
     for resp, f in bc.recv():
         f.success(resp)
 group_ids = ()
 for group in future0.value.groups:
     group_ids += (group[0],)

 print(group_ids)
     
 description = DescribeGroupsRequest_v1(group_ids)
 future1 = bc.send(description)
 while not future1.is_done:
    for resp, f in bc.recv():
        f.success(resp)

 for groupid in future1.value.groups:
     print('For group ',groupid[1],':\n')
     for meta in groupid[5]:
         print(meta[0],meta[2],sep="\n")
         print(meta[3])
 if future1.is_done:
    print("Group query is done")

getgroup()

As you will see, print(meta[3]) will return a very ugly binary data with topic names in it, that is not converted if you try with meta[3].decode(‘utf-8’)

I hope i can find a way to decrypt it.

Cheers

Categories
python

Python for opening and reading files

Since I started learning Python and hopefully take also a certification, I am trying to do some hands-on.

Nothing too complicated, just some basic exercises, for the moment. Here is one of them.

They want to though you to open the and file and read from it like:

from sys import argv

script, filename = argv
txt = open(filename)
print(f"Here is your file: {filename}")
print(txt.read())

But there is a thing not to be ignored, and that is exception handling.

from sys import argv

try:
    script, filename = argv
    txt = open(filename)
    print(f"Here is your file: {filename}")
    print(txt.read())
except ValueError as e:
    if "not enough" in e.__str__():
        print("not enough arguments given")
    else:
        print("too many arguments given")
except FileNotFoundError:
    print("file given, not found")

And since you can’t really discern between ValueError messages, or at least I don’t know how to do that more elegant, yet, we have a workaround.

There is also a FileNotFoundError which shouldn’t be left unchecked, and if I missed something, leave a comment.

P.S: I know there is also the possibility for the file not to have reading permissions. That demonstrates that for three lines of code you need to threat even more exceptional cases.

Cheers!