Get the info you need from consumer group (python style)

Hi,

For some of you might be of help. If it’s rubbish, i truly apologize, please leave a comment with improvements 🙂

import subprocess
import socket

fqdn = socket.getfqdn()

class GroupInfo:
 def __init__(self, line):
     self.topic = line[0]
     self.partition = line[1]
     self.current_offset = line[2]
     self.logend_offset = line[3]
     self.lag = line[4]
     self.consumer_id = line[5]
     self.host = line[6]
     self.client_id = line[7]
 def __str__(self):
    return self.topic+" "+self.partition+" "+self.current_offset+" "+self.logend_offset+" "+self.lag+" "+self.consumer_id

def getgroups():

 cmd = ['/opt/kafka/bin/kafka-consumer-groups.sh --bootstrap-server '+fqdn+':9092 --list']
 result = subprocess.check_output(cmd, shell=True).splitlines()
 group_list = []
 for r in result:
    try:
       rstr = r.decode('utf-8')
    except:
       print('Result can not be converted to utf-8')
    group_list.append(rstr)

 return group_list

def getgroupinfo(groupid):
 
 cmd = ('/opt/kafka/bin/kafka-consumer-groups.sh --bootstrap-server '+fqdn+':9092 --group '+groupid+' --describe')
 process = subprocess.Popen(cmd, stdout=subprocess.PIPE, shell=True)
 result = subprocess.check_output(('grep -v TOPIC'),stdin = process.stdout, shell=True).splitlines()
 process.wait()
 group_info_list = []
 for r in result:
     try:
       rstr = r.decode('utf-8')
     except:
       print('Result can not be converted to utf-8')
       print(rstr.split())
     if len(rstr.split()) == 0:
        pass
     else:
        group_info = GroupInfo(rstr.split())
        group_info_list.append(group_info)
 
 return group_info_list

def main():
 groupagregate = {}
 group_list = getgroups()
 for group in group_list:
    groupagregate[group] = getgroupinfo(group)
 
 for k, v in groupagregate.items():
    print(k)
    for re in v:
      print(re.__str__())

main()

I will not explain it. It should be self explanatory.

Cheers