• Python dictionary construction from process list

    Hi,

    This is out of my expertise but i wanted to shared it anyways. One colleague wanted to help him with the creation of a pair key:value from one command that lists the processes, in python. With a little bit of testing i came to the following form:

    
    import os
    import subprocess
    from subprocess import Popen, PIPE
    username = subprocess.Popen(['/bin/ps','-eo','pid,uname'], stdout=PIPE, stderr=PIPE)
    firstlist = username.stdout.read().split('\n')
    dict = {}
    for str in firstlist:
      if (str != ''):
        secondlist = str.split()
        key = secondlist[0]
        value = secondlist[1]
        dict[key]=value
    print(dict)
    

    Now, i think there are better ways to write this but it works also in this way.
    If you find better ways, please leave a message πŸ™‚

    Cheers

  • Kafka problem that wasn’t a problem after all

    Hi,

    Do not make my mistake from the last couple of weeks trying to connect to a “secured” kafka cluster using TLS. I wrote following article http://log-it.tech/2017/07/27/configure-kafka-truststore-keystore-using-puppet/ some time ago, and i know that it’s far from bullet proof but it does the job.
    Now let’s get to the subject, if you want to connect to the node once this is activated you can not use localhost anymore. And the way i figured it out is by trying to test the port using openssl command.
    The config in server.properties is

    'listeners'                     => "PLAINTEXT://${::fqdn}:9092,SSL://${::fqdn}:9093", #both listeners are enabled
    'advertised.listeners'          => "PLAINTEXT://${::fqdn}:9092,SSL://${::fqdn}:9093",

    So, please keep in mind that it’s configured to listen on FQDN, so normally the external interface is the target not the loopback adapter.
    Now if you try to test it using localhost you will surely get this output:

    /opt/kafka/bin# openssl s_client -debug -connect localhost:9093 -tls1
    connect: Connection refused
    connect:errno=111

    Do not try to check if the firewall or port it’s opened. You can easily check that using iptables -L or netstat -tulpen | grep 9093. The problem is that instead of localhost you should be using FQDN like openssl s_client -debug -connect ${fqdn}:9093 -tls1 and you will see a lot of keys/certificates.
    Now, if you want for example to use the standard .sh scripts that are delivered with kafka installation, you should created a file called config.properties (for example) and pass it as parameter. In case zookeeper connect (with the –zookeeper parameter) this is not needed but if you want to start a console consumer or producer, or you want to check the consumer groups, this will be needed. Let me just give you an example:

    /opt/kafka/bin# ./kafka-consumer-groups.sh --command-config /root/config.properties --bootstrap-server ${fqdn}:9093 --list
    Note: This will only show information about consumers that use the Java consumer API (non-ZooKeeper-based consumers).
    
    console-consumer-30514
    KMOffsetCache-kafka2
    KMOffsetCache-kafka0
    KMOffsetCache-kafka1
    

    Oterwise, it will not work. And my config file looks like this:

    security.protocol=SSL
    ssl.truststore.location=/home/kafka/kafka.client.truststore.jks
    ssl.truststore.password=password
    ssl.keystore.location=/home/kafka/kafka.client.keystore.jks
    ssl.keystore.password=password
    ssl.key.password=password
    

    I can not give you all the details to all the commands but at least i am confident i put you on the right track.

    Cheers

  • Configure Jupyter Notebook on Raspberry PI 2 for remote access and scala kernel install

    Hi,

    This is a continuation of the previous article regarding Jupyter Notebook (http://log-it.tech/2017/09/02/installing-jupyter-notebook-raspberry-pi-2/) Let’s start with my modification in order to have an remote connection to it. It first needs a password in the form of password hash. To generate this pass run python cli and execute this code from IPython.lib import passwd;passwd(“your_custom_password”). Once you get the password hash, we can list the fields that i uncommented to activate minimal remote access:

    c.NotebookApp.open_browser = False #do not open a browser on notebook start, you will access it by daemon remotely
    c.NotebookApp.ip = '*' #permite access on every interface of the server
    c.NotebookApp.password = u'[your_pass_has]' #setup password in order to access the notebook, otherwise token from server is required (if you want it this way you can get the token by running sudo systemctl status jupyter.service 

    You can also add them at the bottom of the file as well. In order for the changes to take effect you will need also to perform a service restart with sudo systemctl restart jupyter.service.

    You have now the basic steps to run Jupyter Notebook with the IPython 2 kernel. Now lets’s ger to the next step of installing the scala kernel(https://www.scala-lang.org).

    The steps are pretty straight forward and they are taken from this link https://www.packtpub.com/mapt/book/big_data_and_business_intelligence/9781785884870/9/ch09lvl1sec65/installing-the-scala-kernel , what i tried is to put it end to end. I am not 100% sure if you need also java 8 but i installed it anyway, you will find the steps here https://www.raspinews.com/installing-oracle-java-jdk-8-on-raspberry-pi/ but what you really need to install is sbt.

    The catch here is that you don’t need to search for sbt on raspberry, just drop the default one, it will do the job. The steps are listed here http://www.scala-sbt.org/release/docs/Installing-sbt-on-Linux.html. Once it is installed you can return to the link listed above and just run the steps:

    apt-get install git
    git clone https://github.com/alexarchambault/jupyter-scala.git
    cd jupyter-scala
    sbt cli/packArchive

    Sbt will grab a lot of dependences, if you work with proxies i am not aware of the settings that you need to do, but you can search it and probably you find a solution. Have patience, it will take a while until it is done, but once it is done you can run ./jupyter-scala in order to install the kernel and also check if it works with jupyter kernelspec list.

    Restart the Jupyter Notebook to update it, although i am not convinced if it’s necessary πŸ™‚
    In my case i have a dynamic dns service from my internet provider but i think you can do it with a free dns provider on your router as well. An extra forward or NAT of port 8888 will be needed but once this is done you should have a playgroup in your browser that knows python and scala. Cool, isn’t it?

    Cheers

  • Installing Jupyter Notebook on Raspberry PI 2

    Morning,

    Just want to share you that i managed to install the Jupyter Notebook(http://jupyter.org) on a Raspberry PI 2 without any real problems. Beside a microSD card and a Raspberry you need to read this and that would be all.
    So, you will need a image of Raspbian from https://www.raspberrypi.org/downloads/raspbian/ (i selected the lite version without the GUI, you really don’t need that actually). In installed it on the card with Linux so i executed a command similar with dd if=[path_to_image]/[image_name] of=[sd_device_name taken from fdisk -l without partition id usually /dev/mmcblk0] bs=4MB; sync. The sync command is added just to be sure that all files are syncronized to card before remove it. We have now a working image that we can use on raspberry, it’s fair to try boot it.
    Once it’s booted login with user pi and password raspberry. I am a fan of running the resize steps which you can find here https://coderwall.com/p/mhj8jw/raspbian-how-to-resize-the-root-partition-to-fill-sd-card.
    Ok, so we are good to go on installing Jupyter Notebook, at first we need to check what Python version we have installed and in my case it was 2.7.13 (it should be shown by running python –version). In this case then we need to use pip for this task, and it’s not present by default on the image.
    Run sudo apt-get install python-pip, after this is done please run pip install jupyter. It will take some time, but when it is done you will have a fresh installation in pi homedir(/home/pi/.local).
    It is true that we need also a service, and in order to do that, please create following path with following file:
    /usr/lib/systemd/system/jupyter.service

    [Unit]
    Description=Jupyter Notebook
    
    [Service]
    Type=simple
    PIDFile=/run/jupyter.pid
    # Step 1 and Step 2 details are here..
    # ------------------------------------
    ExecStart=/home/pi/.local/bin/jupyter-notebook --config=/home/pi/.jupyter/jupyter_notebook_config.py
    User=pi
    Group=pi
    WorkingDirectory=/home/pi/notebooks
    Restart=always
    RestartSec=10
    #KillMode=mixed
    
    [Install]
    WantedBy=multi-user.target

    You are probably wondering from where do you get the config file. This will be easy, just run /home/pi/.local/bin/jupyter notebook –generate-config

    After the file is created, in order to activate the service and enable it there are sudo systemctl enable jupyter.service and sudo systemctl start jupyter.service

    You have now a fresh and auto managed jupyter service. It will be started only on the localhost by default, but in the next article i will tell you also the modifications to be executed in order to run it remotely and also install scala kernel.

    Cheers!

  • Implementing logrotate for kafka

    Hi,

    Yes, we will need to implement also logrotate if we want to keep kafka under control. My solution was with puppet, as you probably expected. After i took a look on the documentation related to log4j properties i this i had a configuration figured out that should look like the following erb template

    # Unless required by applicable law or agreed to in writing, software
    # distributed under the License is distributed on an "AS IS" BASIS,
    # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    # See the License for the specific language governing permissions and
    # limitations under the License.
    
    log4j.rootLogger=INFO, stdout
    
    log4j.appender.stdout=org.apache.log4j.ConsoleAppender
    log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
    log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n
    
    log4j.appender.kafkaAppender=org.apache.log4j.RollingFileAppender
    #log4j.appender.kafkaAppender=org.apache.log4j.DailyRollingFileAppender
    #log4j.appender.kafkaAppender.DatePattern='.'yyyy-MM-dd-HH
    log4j.appender.kafkaAppender.File=${kafka.logs.dir}/server.log
    log4j.appender.kafkaAppender.MaxFileSize=<%= @filesize %>
    log4j.appender.kafkaAppender.MaxBackupIndex=<%= @backupindex %>
    log4j.appender.kafkaAppender.layout=org.apache.log4j.PatternLayout
    log4j.appender.kafkaAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
    
    log4j.appender.stateChangeAppender=org.apache.log4j.RollingFileAppender
    #log4j.appender.stateChangeAppender=org.apache.log4j.DailyRollingFileAppender
    #log4j.appender.stateChangeAppender.DatePattern='.'yyyy-MM-dd-HH
    log4j.appender.stateChangeAppender.File=${kafka.logs.dir}/state-change.log
    log4j.appender.stateChangeAppender.MaxFileSize=<%= @filesize %>
    log4j.appender.stateChangeAppender.MaxBackupIndex=<%= @backupindex %>
    log4j.appender.stateChangeAppender.layout=org.apache.log4j.PatternLayout
    log4j.appender.stateChangeAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
    
    log4j.appender.requestAppender=org.apache.log4j.RollingFileAppender
    #log4j.appender.requestAppender=org.apache.log4j.DailyRollingFileAppender
    #log4j.appender.requestAppender.DatePattern='.'yyyy-MM-dd-HH
    log4j.appender.requestAppender.File=${kafka.logs.dir}/kafka-request.log
    log4j.appender.requestAppender.MaxFileSize=<%= @filesize%>
    log4j.appender.requestAppender.MaxBackupIndex=<%= @backupindex %>
    log4j.appender.requestAppender.layout=org.apache.log4j.PatternLayout
    log4j.appender.requestAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
    
    log4j.appender.cleanerAppender=org.apache.log4j.RollingFileAppender
    #log4j.appender.cleanerAppender=org.apache.log4j.DailyRollingFileAppender
    #log4j.appender.cleanerAppender.DatePattern='.'yyyy-MM-dd-HH
    log4j.appender.cleanerAppender.File=${kafka.logs.dir}/log-cleaner.log
    log4j.appender.cleanerAppender.MaxFileSize=<%= @filesize %>
    log4j.appender.cleanerAppender.MaxBackupIndex=<%= @backupindex %>
    log4j.appender.cleanerAppender.layout=org.apache.log4j.PatternLayout
    log4j.appender.cleanerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
    
    log4j.appender.controlAppender=org.apache.log4j.RollingFileAppender
    #log4j.appender.controllerAppender=org.apache.log4j.DailyRollingFileAppender
    #log4j.appender.controllerAppender.DatePattern='.'yyyy-MM-dd-HH
    log4j.appender.controllerAppender.File=${kafka.logs.dir}/controller.log
    log4j.appender.controllerAppender.MaxFileSize=<%= @filesize %>
    log4j.appender.controllerAppender.MaxBackupIndex=<%= @backupindex %>
    log4j.appender.controllerAppender.layout=org.apache.log4j.PatternLayout
    log4j.appender.controllerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
    
    log4j.appender.authorizerAppender=org.apache.log4j.RollingFileAppender
    #log4j.appender.authorizerAppender=org.apache.log4j.DailyRollingFileAppender
    #log4j.appender.authorizerAppender.DatePattern='.'yyyy-MM-dd-HH
    log4j.appender.authorizerAppender.File=${kafka.logs.dir}/kafka-authorizer.log
    log4j.appender.authorizerAppender.MaxFileSize=<%= @filesize %>
    log4j.appender.authorizerAppender.MaxBackupIndex=<%= @backupindex %>
    log4j.appender.authorizerAppender.layout=org.apache.log4j.PatternLayout
    log4j.appender.authorizerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
    
    # Turn on all our debugging info
    #log4j.logger.kafka.producer.async.DefaultEventHandler=DEBUG, kafkaAppender
    #log4j.logger.kafka.client.ClientUtils=DEBUG, kafkaAppender
    #log4j.logger.kafka.perf=DEBUG, kafkaAppender
    #log4j.logger.kafka.perf.ProducerPerformance$ProducerThread=DEBUG, kafkaAppender
    #log4j.logger.org.I0Itec.zkclient.ZkClient=DEBUG
    log4j.logger.kafka=INFO, kafkaAppender
    
    log4j.logger.kafka.network.RequestChannel$=WARN, requestAppender
    log4j.additivity.kafka.network.RequestChannel$=false
    
    #log4j.logger.kafka.network.Processor=TRACE, requestAppender
    #log4j.logger.kafka.server.KafkaApis=TRACE, requestAppender
    #log4j.additivity.kafka.server.KafkaApis=false
    log4j.logger.kafka.request.logger=WARN, requestAppender
    log4j.additivity.kafka.request.logger=false
    
    log4j.logger.kafka.controller=TRACE, controllerAppender
    log4j.additivity.kafka.controller=false
    
    log4j.logger.kafka.log.LogCleaner=INFO, cleanerAppender
    log4j.additivity.kafka.log.LogCleaner=false
    
    log4j.logger.state.change.logger=TRACE, stateChangeAppender
    log4j.additivity.state.change.logger=false
    
    #Change this to debug to get the actual audit log for authorizer.
    log4j.logger.kafka.authorizer.logger=WARN, authorizerAppender
    log4j.additivity.kafka.authorizer.logger=false
    
    

    It just adds the two most important values required for a working logrotate system MaxFileSize and MaxBackupIndex. The first tells us what is the max size for a file and the second one the amount of files of each type to be kept.
    In order to use it you need to put it in a class that in my view is constructed as follows:

    class profiles::kafkalogrotate {
      
        $filesize = hiera('kafkalogrotate::size','50MB')
        $backupindex = hiera('kafkalogrotate::backupindex',10)
        validate_string($filesize)
        validate_integer($backupindex)
    	file {"adding_log4j":
    	path => "/opt/kafka/config/log4j.properties",
    	content => template("${module_name}/log4j.properties.erb"),
    	replace => true,
    	owner => 'kafka',
      group => 'kafka',
      mode => '0644',
    	}
    }
    

    I put the path as we generally install kafka in /opt/kafka but that can be changed. In our case restart of kafka brokers is not needed immediately so i just called the class in my kafka manifests.

    That is all, if i think of something additional to what i wrote, i will post it.

    Cheers!

  • Adding custom kafka check consumer.lag to datadog from JMX

    Hi,

    We had the necessity to add the consumer.lag check to datadog. Since we did not have access to the kafka.consumer domain which from what i believe it’s on the client side i decided to connect to the kafka node using JMX (so JConsole was the tool). At MBeans tab you will gladly see that there isn’t what you need by default for kafka.consumer.max_lag. The actual info you can grab is located under kafka.server and even more accurate under FetcherLagMetrics as shown.

    If you go all the way down on the hierarchy, you will get partition 0 details for one of the topics. I will use this example for base construct of regex tag:

    kafka.server:type=FetcherLagMetrics,name=ConsumerLag,clientId=ReplicaFetcherThread-0-1001,topic=__consumer_offsets,partition=0

    Using the same template you can directly construct the block that you need to add to kafka.yaml and it should look like this:

     - include:
            domain: 'kafka.server'
            bean_regex: 'kafka\.server:type=FetcherLagMetrics,name=ConsumerLag,clientId=.*'
            attribute:
              Value:
                metric_type: rate
                alias: kafka.consumer.lag

    After a agent restart you will gladly see that the number of metrics that are collected increases and you have a new check in the datadog web interface.
    After that, you can manually add in the dashboard a new graph that uses this check and you can also configure it on specific criteria, like host, topic, partition.

    Hope this is helpful,

    Cheers

  • Interesting insight in docker networking mechanism

    Hi,

    This one is not mine, but it’s worth to mention. There are’s always interesting articles in the docker newsletter but i enjoyed very much this series and i highly recommend you to read it and try it:

    http://techblog.d2-si.eu/2017/04/25/deep-dive-into-docker-overlay-networks-part-1.html

    I would really want to work more with docker and hope in the future that i get the chance but for now i resume to posting these kind of interesting “stuff”.

    Cheers!

  • Securing kafka-manager endpoints with iptables rules behind traefik

    Hi,

    One extra addition to my traefik balancing article from http://log-it.tech/2017/08/19/puppet-implementation-traefik-load-balancer-kafka-manager/ is that even so now we have the balancing capability we still need to restrict access to unsecured endpoint. I thought all the code to be deployable on all of the nodes. If this is taken in consideration, our issue with the firewall rules should be easily solved by using the puppetlabs module https://github.com/puppetlabs/puppetlabs-firewall and the code that i included looks like:

    $hosts_count = $kafka_hosts.count
      
      package {'iptables-persistent':
      name => 'iptables-persistent',
      ensure => installed,
      }
      resources { 'firewall':
        purge => true,
      }
      
      $kafka_hosts.each | Integer $index,String $host | {
        firewall {"10${index} adding tcp rule kafka manager node ${index}":
          proto => 'tcp',
          dport => 9000,
          source => "${host}",
          destination => "${fqdn}",
          action => 'accept',
          }
      } 
      firewall {"10${hosts_count} droping rest of kafka manager calls":
        proto => 'tcp',
          dport => 9000,
          destination => "${fqdn}",
          action => 'drop',
    }

    This should be add rules in order to allow traffic on port 9000 only between the kafka hosts that have kafka manager installed.
    Cheers

  • Fixing the kafka-manager puppet code

    Hi, we have a new code version for kafka-manager deploy. I will not give more details, just that now it also has a fact for the kafka-password and also some minor changes. Fact looks like this:

    require 'facter'
    Facter.add(:kafka_manager_pass) do
      setcode do
        file='/etc/kafka-manager/application.conf'
        if File.exist?(file)
            kafka_manager_pass = Facter::Core::Execution.exec("cat #{file} | grep basicAuthentication.password | cut -d'=' -f2 | tr -d '\"'")
        else
            kafka_manager_pass = "undef"
        end
      end
    end
    

    And also the new puppet code for the class:

    
    class profiles::kafkamanager {
    
       $zookeeper_connect = hiera('kafkamanager::zookeeperconnect')
       $password = hiera("kafkamanager::password:",'password')
    	package {'kafka-manager':
    		ensure => installed,
    	}
    	group { 'kafka-manager':
    		ensure => 'present',	
    	}
    	user { 'kafka-manager':
    		ensure => 'present',
    		groups => 'kafka-manager'
    	}
    	
    	Group['kafka-manager'] -> User['kafka-manager']
    	
    	file { '/usr/share/kafka-manager' :
        		ensure    => directory,
        		owner     => 'kafka-manager',
        		group      => 'kafka-manager',
        		require     => [ User['kafka-manager'], Group['kafka-manager'], ],
        		recurse    => true,
    	}
    	file { '/etc/kafka-manager/application.conf':
    		ensure => present,
    	}
    	file_line { 'config_zookeeper':
    		path   => '/etc/kafka-manager/application.conf',
    		line => "kafka-manager.zkhosts=\"${zookeeper_connect}\"",
    		match => 'kafka-manager.zkhosts=\"\"',
        replace => true,
    	} 
    	if ($::kafka_manager_pass == "undef") {
    	file_line { 'enable_pass_default':
         path => '/etc/kafka-manager/application.conf',
         match => "basicAuthentication.password=\"password\"",
         line => "basicAuthentication.password=\"${password}\"",
         replace => true,
           } 
    	}
    	elsif ($password != $::kafka_manager_pass) {
      file_line { 'enable_pass':
         path => '/etc/kafka-manager/application.conf',
         match => "basicAuthentication.password=\"${::kafka_manager_pass}\"",
         line => "basicAuthentication.password=\"${password}\"",
         replace => true,
           }
      exec {'restart_kafkamanager':
        command => '/usr/sbin/service kafka-manager restart',
        path => ['/bin','/sbin','/usr/bin','/usr/sbin'],
        refreshonly => true,
        subscribe => File_line['enable_pass'],
    } 
            }
        file_line { 'enable_auth':
            path => '/etc/kafka-manager/application.conf',
            match => 'basicAuthentication.enabled=false',
            line => 'basicAuthentication.enabled=true',
            replace => true,
            }
               
    	service { 'kafka-manager':
    		ensure => 'running',
    		enable => true,
    		require => [ File['/usr/share/kafka-manager'], File['/etc/kafka-manager/application.conf'] ],
    		subscribe => File["/etc/kafka-manager/application.conf"],
    	}
    

    Give it a try!

    Cheers!

  • Fixing the keystore/trustore distribution code

    Hi,

    There is an extra thing to be added to my article http://log-it.tech/2017/07/27/configure-kafka-truststore-keystore-using-puppet/

    As is the code copies the files at each puppet run to the other nodes which not contain the keystore generation code. And to fix this i used yet again another puppet module that should share data between the nodes, you can find it hereΒ https://github.com/WhatsARanjit/puppet-share_data

    As far as i saw it gets the job done, and in order to use it, you will need to include the following pieces of code to your repo. First of all, one piece of custom fact:

    
    require 'facter'
    
    filename = '/home/kafka/kafka.server.keystore.jks'
    Facter.add(:kafkakeystore) do
        setcode do
            if File.file?(filename)
                kafkakeystore = "enabled"
            else
            	kafkakeystore = "disabled"    
            end
        end
    end
    

    If the file is present, this means that the setup is probably activated. For the kafka manifests, if it’s not the node on which the keystore it’s generated we need to share the fact which we actually added in form:

        share_data { "${fqdn}":
          data => [ $::fqdn,$::kafkakeystore ],
          label => 'keystore',
        }
    

    If it’s the node that actually generates and copies the keystore then we will need to include in the class that actually does this kafka_security_gen following piece:

     $data = share_data::retrieve('keystore')
         $data.each |$item| {
       # $servers.each |String $server| {
       if (member($servers,$item[0]) and $item[1] == "disabled") {
            exec{"copy files to ${item[0]}":
                cwd => '/home/kafka',
                path   => '/usr/bin:/usr/sbin:/bin',
                command => "scp /home/kafka/kafka* kafka@${item[0]}:/home/kafka",
                user => 'kafka',
            }
            }
         }
    

    And this should assure you that puppet will not try to copy the keystore on nodes that already has it. Now come to think of it, if you need to refresh the store, it should be a proble, but i will think also for a fix for that and come back.

    Cheers!