Hi,
For some of you might be of help. If it’s rubbish, i truly apologize, please leave a comment with improvements 🙂
import subprocess
import socket
fqdn = socket.getfqdn()
class GroupInfo:
def __init__(self, line):
self.topic = line[0]
self.partition = line[1]
self.current_offset = line[2]
self.logend_offset = line[3]
self.lag = line[4]
self.consumer_id = line[5]
self.host = line[6]
self.client_id = line[7]
def __str__(self):
return self.topic+" "+self.partition+" "+self.current_offset+" "+self.logend_offset+" "+self.lag+" "+self.consumer_id
def getgroups():
cmd = ['/opt/kafka/bin/kafka-consumer-groups.sh --bootstrap-server '+fqdn+':9092 --list']
result = subprocess.check_output(cmd, shell=True).splitlines()
group_list = []
for r in result:
try:
rstr = r.decode('utf-8')
except:
print('Result can not be converted to utf-8')
group_list.append(rstr)
return group_list
def getgroupinfo(groupid):
cmd = ('/opt/kafka/bin/kafka-consumer-groups.sh --bootstrap-server '+fqdn+':9092 --group '+groupid+' --describe')
process = subprocess.Popen(cmd, stdout=subprocess.PIPE, shell=True)
result = subprocess.check_output(('grep -v TOPIC'),stdin = process.stdout, shell=True).splitlines()
process.wait()
group_info_list = []
for r in result:
try:
rstr = r.decode('utf-8')
except:
print('Result can not be converted to utf-8')
print(rstr.split())
if len(rstr.split()) == 0:
pass
else:
group_info = GroupInfo(rstr.split())
group_info_list.append(group_info)
return group_info_list
def main():
groupagregate = {}
group_list = getgroups()
for group in group_list:
groupagregate[group] = getgroupinfo(group)
for k, v in groupagregate.items():
print(k)
for re in v:
print(re.__str__())
main()
I will not explain it. It should be self explanatory.
Cheers