Category: python

  • Automatic increase of Kafka LVM on GCP

    I wrote an article for my company that was published on Medium regarding the topic in the subject. Please see the link

    https://medium.com/metrosystemsro/new-ground-automatic-increase-of-kafka-lvm-on-gcp-311633b0816c

    Thanks

  • Using GCP recommender API for Compute engine

    Let’s keep it short. If you want to use Python libraries for Recommender API, this is how you connect to your project.

    from google.cloud.recommender_v1beta1 import RecommenderClient
    from google.oauth2 import service_account
    def main():
        credential = service_account.Credentials.from_service_account_file('account.json')
        project = "internal-project"
        location = "europe-west1-b"
        recommender = 'google.compute.instance.MachineTypeRecommender'
        client = RecommenderClient(credentials=credential)
        name = client.recommender_path(project, location, recommender)
       
        elements = client.list_recommendations(name,page_size=4)
        for i in elements:
            print(i)
    
    main()

    credential = RecommenderClient.from_service_account_file(‘account.json’) will not return any error, just hang.

    That’s all folks!

  • ELK query using Python with time range

    Short post. Sharing how you make an ELK query from Python using also timestamp:

    es=Elasticsearch([{'host':'[elk_host','port':elk_port}])
    
    query_body_mem = {
        "query": {
            "bool" : {
                "must" : [
                        {
                            "query_string" : {
                            "query": "metricset.module:system metricset.name:memory AND tags:test AND host.name:[hostname]"
                        }
                    },
                    {
                             "range" : {
                                "@timestamp" : {
                                    "gte" : "now-2d",
                                    "lt" :  "now"
                }
            
            }
       
                    }
                ]
            }
       
        }
        
    }
    
    res_mem=es.search(index="metricbeat-*", body=query_body_mem, size=500)
    df_mem = json_normalize(res_mem['hits']['hits'])
    

    And that’s all!

    Cheers

  • Multiple field query in ELK from Python

    Morning,

    There are a lot of pages on how to query ELK stack from Python client library, however, it’s still hard to grab a useful pattern.

    What I wanted is to translate some simple query in Kibana like redis.info.replication.role:master AND beat.hostname:*test AND tags:test into a useful Query DSL JSON.

    It’s worth mentioning that the Python library uses this DSL. Once you have this info, things get much simpler.

    Well, if you search hard enough, you will find a solution, and it should look like.

    another_query_body = {
        "query": {
            "query_string" : {
                "query": "(master) AND (*test) AND (test)",
                "fields": ["redis.info.replication.role", "beat.hostname" , "tags"]
            }
        }
    }

    As you probably guessed, each field maps to a query entry.

    Cheers

  • My introduction to Linear Regression, so far

    Morning,

    Here are some first steps that I want to share with you from my experience with regressions.

    I am new and I took it to step by step, so don’t expect anything new or either very complex.

    First thing, first, we started working on some prediction “algorithms” that should work with data available in the operations domain.

    We needed to have them stored in a centralized location, and it happens that they are sent to ELK. So, the first step is to query then from that location.

    To do that, there is a python client library with a lot of options that I am still beginning to explore. Cutting to the point, to have a regression you need a correlation parameter between the dependent and independent variable, so we thought at first about links between the number of connection and memory usage of a specific service (for example Redis). And this is available with some simple lines of code in Jupyter:

    from elasticsearch import Elasticsearch
    import matplotlib.pyplot as plt
    from pandas.io.json import json_normalize
    es=Elasticsearch([{'host':'ELK_IP','port':'ELK_PORT'}])
    res_redis=es.search(index="metricbeat-redis-*", body={"query": {"match": {'metricset.name': "info" }}}, size=1000)
    df_redis = json_normalize(res_redis['hits']['hits'])
    df_redis_filtered = df_redis[['_source.redis.info.clients.connected','_source.redis.info.memory.used.value']]
    df_redis_filtered['_source.redis.info.memory.used.value'] = df_redis_filtered['_source.redis.info.memory.used.value'] / 10**6
    df_redis_final = df_redis_filtered[df_redis_filtered['_source.redis.info.memory.used.value'] < 300]
    df_redis_final.corr()

    For a little bit of explaining, the used memory needs to be divided to ten to the sixth power in order to transform from bytes to MBytes, and also I wanted to exclude values of memory over 300MB. All good, unfortunately, if you plot the correlation “matrix” between these params, this happens:

    As far as we all should know, a correlation parameter should be as close as possible to 1 or -1, but it’s just not the case.

    And if you want to start plotting, it will look something like:

    So, back to the drawing board, and we now know that we have no clue which columns are correlated. Let us not filter the columns and just remove those that are non-numeric or completely filled with zeros.

    I used this to manipulate the data as simple as possible:

    df_redis_numeric = df_redis.select_dtypes(['number'])
    df_redis_cleaned = df_redis_numeric.loc[:, '_source.redis.info.clients.connected': '_source.redis.info.stats.net.output.bytes' ]
    df_redis_final = df_redis_cleaned.loc[:, (df_redis_cleaned != 0).any(axis=0)]
    df_redis_final.corr()

    And it will bring you a very large matrix with a lot of rows and columns. From that matrix, you can choose two data types that are more strongly correlated. In my example [‘_source.redis.info.cpu.used.user’,’_source.redis.info.cpu.used.sys’]

    If we plot the correlation matrix just for those two colums we are much better than at the start.

    So we are better than before, and we can now start thinking of plotting a regression, and here is the code for that.

    import matplotlib.pyplot as plt
    import numpy as np
    from sklearn import datasets, linear_model
    import pandas as pd
    
    x = df_redis_cpu['_source.redis.info.cpu.used.user']
    y = df_redis_cpu['_source.redis.info.cpu.used.sys']
    
    x = x.values.reshape(-1, 1)
    y = y.values.reshape(-1, 1)
    
    x_train = x[:-250]
    x_test = x[-250:]
    
    y_train = y[:-250]
    y_test = y[-250:]
    
    # Create linear regression object
    regr = linear_model.LinearRegression()
    
    # Train the model using the training sets
    regr.fit(x_train, y_train)
    
    # Plot outputs
    plt.plot(x_test, regr.predict(x_test), color='red',linewidth=3)
    plt.scatter(x_test, y_test,  color='black')
    plt.title('Test Data')
    plt.xlabel('User')
    plt.ylabel('Sys')
    plt.xticks(())
    plt.yticks(())
    
    plt.show()

    Our DataFrame contains 1000 records from which I used 750 to “train” and another 250 to “test”. The output looked this way

    It looks more like a regression, however, what concerns me is the mean square error which is a little bit high.

    So we will need to works further on the DataFrame 🙂

    In order for the linear model to be applied with scikit, the input and output data are transformed into single dimension vectors. If you want to switch back and for example to create a DataFrame from the output of the regression and the actual samples from ELK, it can be done this way:

    data = np.append(np.array(y_test), np.array(y_pred), axis = 1)
    dataset = pd.DataFrame({'test': data[:, 0], 'pred': data[:, 1]})
    dataset['pred'] = dataset['pred'].map(lambda x: '%.2f' % x)

    That is all.

    Cheers!

  • Kafka_consumer.yaml (python style) and more

    Hi,

    As a followup to the article i posted earlier ( https://log-it.tech/2019/03/15/get-the-info-you-need-from-consumer-group-python-style/ ) , you can use that info to put in into kafka_consumer.yaml for Datadog integration.

    It’s not elegant by any means, but it works. As an advise, please don’t over complicate thinks more than they need.

    In the last example i figured i wanted to create a list of GroupInfo objects for each line that was returned from consumer group script. Bad idea as you shall see below

    So, in addition to what i wrote in the last article, now it’s not just printing the dictionary but order it, by partition.

    def constructgroupdict():
     groupagregate = {}
     group_list = getgroups()
     for group in group_list:
        groupagregate[group] = getgroupinfo(group)
     
     for v in groupagregate.values():
        v.sort(key = lambda re: int(re.partition))
     
     return groupagregate
    
    def printgroupdict():
     groupdict = constructgroupdict()
     infile = open('/etc/datadog-agent/conf.d/kafka_consumer.d/kafka_consumer.yaml.template','a')
     for k,v in groupdict.items():
        infile.write('      '+k+':\n')
        topics = []
        testdict = {}
        for re in v:
            if re.topic not in topics:
               topics.append(re.topic)
        for x in topics:
            partitions = []
            for re in v:
               if (re.topic == x):
                  partitions.append(re.partition)
            testdict[x] = partitions
        for gr,partlst in testdict.items():
            infile.write('        '+gr+': ['+', '.join(partlst)+']\n')
     infile.close()
     os.rename('/etc/datadog-agent/conf.d/kafka_consumer.d/kafka_consumer.yaml.template','/etc/datadog-agent/conf.d/kafka_consumer.d/kafka_consumer.yaml')
      
    printgroupdict()
    

    And after that, it’s quite hard to get only the unique value for the topic name.

    The logic i chose to grab all the data per consumer group is related to the fact that querying the cluster takes a very long time, so if i wanted to grab another set of data filtered by topic, i would have been very time costly.

    In the way that is written now, there are a lot of for loop, that should become challenging in care there are too many records to process. Fortunately, this should not be the case for consumer groups in a normal case.

    The easiest way to integrate the info in kafka_consumer.yaml, in our case is to create a template called kafka_consumer.yaml.template

    init_config:
      # Customize the ZooKeeper connection timeout here
      # zk_timeout: 5
      # Customize the Kafka connection timeout here
      # kafka_timeout: 5
      # Customize max number of retries per failed query to Kafka
      # kafka_retries: 3
      # Customize the number of seconds that must elapse between running this check.
      # When checking Kafka offsets stored in Zookeeper, a single run of this check
      # must stat zookeeper more than the number of consumers * topic_partitions
      # that you're monitoring. If that number is greater than 100, it's recommended
      # to increase this value to avoid hitting zookeeper too hard.
      # https://docs.datadoghq.com/agent/faq/how-do-i-change-the-frequency-of-an-agent-check/
      # min_collection_interval: 600
      #
      # Please note that to avoid blindly collecting offsets and lag for an
      # unbounded number of partitions (as could be the case after introducing
      # the self discovery of consumer groups, topics and partitions) the check
      # will collect at metrics for at most 200 partitions.
    
    
    instances:
      # In a production environment, it's often useful to specify multiple
      # Kafka / Zookeper nodes for a single check instance. This way you
      # only generate a single check process, but if one host goes down,
      # KafkaClient / KazooClient will try contacting the next host.
      # Details: https://github.com/DataDog/dd-agent/issues/2943
      #
      # If you wish to only collect consumer offsets from kafka, because
      # you're using the new style consumers, you can comment out all
      # zk_* configuration elements below.
      # Please note that unlisted consumer groups are not supported at
      # the moment when zookeeper consumer offset collection is disabled.
      - kafka_connect_str:
          - localhost:9092
        zk_connect_str:
          - localhost:2181
        # zk_iteration_ival: 1  # how many seconds between ZK consumer offset
                                # collections. If kafka consumer offsets disabled
                                # this has no effect.
        # zk_prefix: /0.8
    
        # SSL Configuration
    
        # ssl_cafile: /path/to/pem/file
        # security_protocol: PLAINTEXT
        # ssl_check_hostname: True
        # ssl_certfile: /path/to/pem/file
        # ssl_keyfile: /path/to/key/file
        # ssl_password: password1
    
        # kafka_consumer_offsets: false
        consumer_groups:
    

    It’s true that i keep only one string for connectivity on Kafka and Zookeeper, and that things are a little bit more complicated once SSL is configured (but this is not our case, yet).

      - kafka_connect_str:
          - localhost:9092
        zk_connect_str:
          - localhost:2181

    And append the info at the bottom of it after which it is renamed. Who is putting that template back? Easy, that would be puppet.

    It works, it has been tested. One last thing that i wanted to warn you about.

    There is a limit of metrics that can be uploaded per machine, and that is 350. Please be aware and think very serious if you want to activate it.

    Than would be all for today.

    Cheers

  • Get the info you need from consumer group (python style)

    Hi,

    For some of you might be of help. If it’s rubbish, i truly apologize, please leave a comment with improvements 🙂

    import subprocess
    import socket
    
    fqdn = socket.getfqdn()
    
    class GroupInfo:
     def __init__(self, line):
         self.topic = line[0]
         self.partition = line[1]
         self.current_offset = line[2]
         self.logend_offset = line[3]
         self.lag = line[4]
         self.consumer_id = line[5]
         self.host = line[6]
         self.client_id = line[7]
     def __str__(self):
        return self.topic+" "+self.partition+" "+self.current_offset+" "+self.logend_offset+" "+self.lag+" "+self.consumer_id
    
    def getgroups():
    
     cmd = ['/opt/kafka/bin/kafka-consumer-groups.sh --bootstrap-server '+fqdn+':9092 --list']
     result = subprocess.check_output(cmd, shell=True).splitlines()
     group_list = []
     for r in result:
        try:
           rstr = r.decode('utf-8')
        except:
           print('Result can not be converted to utf-8')
        group_list.append(rstr)
    
     return group_list
    
    def getgroupinfo(groupid):
     
     cmd = ('/opt/kafka/bin/kafka-consumer-groups.sh --bootstrap-server '+fqdn+':9092 --group '+groupid+' --describe')
     process = subprocess.Popen(cmd, stdout=subprocess.PIPE, shell=True)
     result = subprocess.check_output(('grep -v TOPIC'),stdin = process.stdout, shell=True).splitlines()
     process.wait()
     group_info_list = []
     for r in result:
         try:
           rstr = r.decode('utf-8')
         except:
           print('Result can not be converted to utf-8')
           print(rstr.split())
         if len(rstr.split()) == 0:
            pass
         else:
            group_info = GroupInfo(rstr.split())
            group_info_list.append(group_info)
     
     return group_info_list
    
    def main():
     groupagregate = {}
     group_list = getgroups()
     for group in group_list:
        groupagregate[group] = getgroupinfo(group)
     
     for k, v in groupagregate.items():
        print(k)
        for re in v:
          print(re.__str__())
    
    main()

    I will not explain it. It should be self explanatory.

    Cheers

  • Kafka consumer group info retrieval using Python

    Hi,

    I’ve been playing with kafka-python module to grab the info i need in order to reconfigure Datadog integration.

    Unfortunately, there is a catch also on this method. And i will show you below.

    Here is a little bit of not so elegant code.

    from kafka import BrokerConnection
    from kafka.protocol.admin import *
    import socket
    
    fqdn = socket.getfqdn()
    bc = BrokerConnection(fqdn,9092,socket.AF_INET)
    try:
      bc.connect_blocking()
    except Exception as e:
      print(e)
    if bc.connected():
       print("Connection to", fqdn, " established")
    
    def getgroup():
     list_groups_request = ListGroupsRequest_v1()
    
     future0 = bc.send(list_groups_request)
     while not future0.is_done:
         for resp, f in bc.recv():
             f.success(resp)
     group_ids = ()
     for group in future0.value.groups:
         group_ids += (group[0],)
    
     print(group_ids)
         
     description = DescribeGroupsRequest_v1(group_ids)
     future1 = bc.send(description)
     while not future1.is_done:
        for resp, f in bc.recv():
            f.success(resp)
    
     for groupid in future1.value.groups:
         print('For group ',groupid[1],':\n')
         for meta in groupid[5]:
             print(meta[0],meta[2],sep="\n")
             print(meta[3])
     if future1.is_done:
        print("Group query is done")
    
    getgroup()
    

    As you will see, print(meta[3]) will return a very ugly binary data with topic names in it, that is not converted if you try with meta[3].decode(‘utf-8’)

    I hope i can find a way to decrypt it.

    Cheers

  • Python for opening and reading files

    Since I started learning Python and hopefully take also a certification, I am trying to do some hands-on.

    Nothing too complicated, just some basic exercises, for the moment. Here is one of them.

    They want to though you to open the and file and read from it like:

    from sys import argv
    
    script, filename = argv
    txt = open(filename)
    print(f"Here is your file: {filename}")
    print(txt.read())

    But there is a thing not to be ignored, and that is exception handling.

    from sys import argv
    
    try:
        script, filename = argv
        txt = open(filename)
        print(f"Here is your file: {filename}")
        print(txt.read())
    except ValueError as e:
        if "not enough" in e.__str__():
            print("not enough arguments given")
        else:
            print("too many arguments given")
    except FileNotFoundError:
        print("file given, not found")

    And since you can’t really discern between ValueError messages, or at least I don’t know how to do that more elegant, yet, we have a workaround.

    There is also a FileNotFoundError which shouldn’t be left unchecked, and if I missed something, leave a comment.

    P.S: I know there is also the possibility for the file not to have reading permissions. That demonstrates that for three lines of code you need to threat even more exceptional cases.

    Cheers!