Category: newtools

  • Multiple field query in ELK from Python

    Morning,

    There are a lot of pages on how to query ELK stack from Python client library, however, it’s still hard to grab a useful pattern.

    What I wanted is to translate some simple query in Kibana like redis.info.replication.role:master AND beat.hostname:*test AND tags:test into a useful Query DSL JSON.

    It’s worth mentioning that the Python library uses this DSL. Once you have this info, things get much simpler.

    Well, if you search hard enough, you will find a solution, and it should look like.

    another_query_body = {
        "query": {
            "query_string" : {
                "query": "(master) AND (*test) AND (test)",
                "fields": ["redis.info.replication.role", "beat.hostname" , "tags"]
            }
        }
    }

    As you probably guessed, each field maps to a query entry.

    Cheers

  • Kafka cluster nodes and controller using golang

    Hi,

    Using the golang library for zookeeper from here you can get very easily the nodes that are registered in the cluster controller node.
    In order to install this module, beside needing to setup the GOPATH you will have also to install packages from linux distro repo called:
    bzr, gcc, libzookeeper-mt-dev
    Once all of this is install just go get launchpad.net/gozk 🙂

    And here is the small example:

    
    package main
    
    import (
    	"launchpad.net/gozk"
    	"fmt"
    	"strings"
    )
    func main() {
    	GetResource()
    }
    func GetResource() {
    	zk, session, err := zookeeper.Dial("localhost:2181", 5e9)
     if err != nil {
    	fmt. Println("Couldn't connect")
    	}
    	
    	defer zk.Close()
    
    	event := <-session
    	if event.State != zookeeper.STATE_CONNECTED {
    		fmt.Println("Couldn't connect")
    	}
    
    	GetBrokers(zk)
    	GetController(zk)
    
    }
    func GetController(connection *zookeeper.Conn) {
    	rs ,_ , err := connection.Get("/controller")
     if err != nil {
            fmt. Println("Couldn't get the resource")
            }
    	controller := strings. Split(rs,",")
    	// fmt.Println(controller[1])
    	id := strings.Split(controller[1],":")
    	fmt. Printf("\nCluster controller is: %s\n",id[1])
    }
    func GetBrokers(connection *zookeeper.Conn) {	
    	trs ,_, err := connection.Children("/brokers/ids")
    
    	 if err != nil {
            fmt. Println("Couldn't get the resource")
            }
    	fmt.Printf("List of brokers: ")
    	for _, value := range trs {
    	fmt.Printf(" %s",value)
    	}
    	
    }
    

    Yeah, i know it's not the most elegant but it works:

    go run zootest.go
    List of brokers:  1011 1009 1001
    Cluster controller is: 1011
    

    That would be all.

    Tnx and cheers!

  • Error 127, not related to Puppet or Golang

    Hi,

    Something from my experience playing with Golang and Puppet code this morning.
    I wrote a very very simple script to restart a service that you can find here
    Today i wanted to put it on the machine and run it with puppet, so i wrote a very small class that looked like this:

    class profiles_test::updatekafka {
    
    package { 'golang':
      ensure => installed,
      name   => 'golang',
    }
    file {"/root/servicerestart.go":
        source => 'puppet:///modules/profiles_test/servicerestart.go',
        mode => '0644',
        replace => true,
    	}
    exec { 'go build servicerestart.go':
        cwd => '/root',
        creates => '/root/servicerestart',
        path => ['/usr/bin', '/usr/sbin'],
    } ->
    exec { '/root/servicerestart':
      cwd     => '/root',
      path    => ['/usr/bin', '/usr/sbin','/root'],
      onlyif => 'ps -u kafka',
    }
    }
    

    On execution, surprise, it kept throw this feedback:

    08:19:44 Notice: /Stage[main]/Profiles_test::Updatekafka/File[/root/check_cluster_state.go]/ensure: defined content as '{md5}0817dbf82b74072e125f8b71ee914150'
    08:19:44 Notice: /Stage[main]/Profiles_test::Updatekafka/Exec[go run servicerestart.go]/returns: panic: exit status 127
    08:19:44 Notice: /Stage[main]/Profiles_test::Updatekafka/Exec[go run servicerestart.go]/returns: 
    08:19:44 Notice: /Stage[main]/Profiles_test::Updatekafka/Exec[go run servicerestart.go]/returns: goroutine 1 [running]:
    08:19:44 Notice: /Stage[main]/Profiles_test::Updatekafka/Exec[go run servicerestart.go]/returns: runtime.panic(0x4c2100, 0xc2100000b8)
    08:19:44 Notice: /Stage[main]/Profiles_test::Updatekafka/Exec[go run servicerestart.go]/returns: 	/usr/lib/go/src/pkg/runtime/panic.c:266 +0xb6
    08:19:44 Notice: /Stage[main]/Profiles_test::Updatekafka/Exec[go run servicerestart.go]/returns: main.check(0x7f45b08ed150, 0xc2100000b8)
    08:19:44 Notice: /Stage[main]/Profiles_test::Updatekafka/Exec[go run servicerestart.go]/returns: 	/root/servicerestart.go:94 +0x4f
    08:19:44 Notice: /Stage[main]/Profiles_test::Updatekafka/Exec[go run servicerestart.go]/returns: main.RunCommand(0x4ea7f0, 0x2c, 0x7f4500000000, 0x409928, 0x5d9f38)
    08:19:44 Notice: /Stage[main]/Profiles_test::Updatekafka/Exec[go run servicerestart.go]/returns: 	/root/servicerestart.go:87 +0x112
    08:19:44 Notice: /Stage[main]/Profiles_test::Updatekafka/Exec[go run servicerestart.go]/returns: main.GetBrokerList(0x7f45b08ecf00, 0xc21003fa00, 0xe)
    08:19:44 Notice: /Stage[main]/Profiles_test::Updatekafka/Exec[go run servicerestart.go]/returns: 	/root/servicerestart.go:66 +0x3c
    08:19:44 Notice: /Stage[main]/Profiles_test::Updatekafka/Exec[go run servicerestart.go]/returns: main.GetStatus(0xc21000a170)
    08:19:44 Notice: /Stage[main]/Profiles_test::Updatekafka/Exec[go run servicerestart.go]/returns: 	/root/servicerestart.go:33 +0x1e
    08:19:44 Notice: /Stage[main]/Profiles_test::Updatekafka/Exec[go run servicerestart.go]/returns: main.StopBroker()
    08:19:44 Notice: /Stage[main]/Profiles_test::Updatekafka/Exec[go run servicerestart.go]/returns: 	/root/servicerestart.go:39 +0x21
    08:19:44 Notice: /Stage[main]/Profiles_test::Updatekafka/Exec[go run servicerestart.go]/returns: main.main()
    08:19:44 Notice: /Stage[main]/Profiles_test::Updatekafka/Exec[go run servicerestart.go]/returns: 	/root/servicerestart.go:15 +0x1e
    08:19:44 Notice: /Stage[main]/Profiles_test::Updatekafka/Exec[go run servicerestart.go]/returns: exit status 2
    08:19:44 Error: go run servicerestart.go returned 1 instead of one of [0]

    The secret is in panic: exit status 127 and it seems that it’s not related to neither golang or puppet, but shell.
    In my script they way to get the broker list is by

    output1 := RunCommand("echo dump | nc localhost 2181 | grep brokers")

    and the error is related to not having the required binaries in you path. For example if you run

    whereis echo
    echo: /bin/echo /usr/share/man/man1/echo.1.gz

    So the right way to for the exec block is actually:

    exec { '/root/servicerestart':
      cwd     => '/root',
      path    => ['/usr/bin', '/usr/sbin','/root','/bin','/sbin'],
      onlyif => 'ps -u kafka',
    }

    And then it will work.

    Cheers!

  • Golang example for kafka service restart script

    Hi,

    Not much to say, a pretty decent script for Kafka service restart(i tried to write it for our rolling upgrade procedure) that it’s still work in progress. If there are any changes that needed to be made to it, i will post it.

    Here is the script:

    package main
    
    import (
    	"os/exec"
    	"strings"
    	"fmt"
    	"time"
    	"io/ioutil"
    )
    
    const (
        libpath = "/opt/kafka/libs"
        )
    func main(){
    	StopBroker()
    	StartBroker()
    	fmt.Printf("You are running version %s",GetVersion(libpath))
    }
    
    func GetVersion(libpath string) string {
     	var KafkaVersion []string
    	files, err := ioutil.ReadDir(libpath)
    	check(err)
    for _, f := range files {
    	if (strings.HasPrefix(f.Name(),"kafka_")) {
    		KafkaVersion = strings.Split(f.Name(),"-")
    		break
    		}
        }
    	return KafkaVersion[1]
    }
    func GetStatus() bool {
    	brokers := GetBrokerList()
    	id := GetBrokerId()
            status := Contain(id,brokers)
    return status
    }
    func StopBroker() {
    	status := GetStatus()
    	brokers := GetBrokerList()
    if (status == true) && (len(brokers) > 2) {
    	stop := RunCommand("service kafka stop")
    	fmt.Println(string(stop))
    	time.Sleep(60000 * time.Millisecond)
    	} 
    if (status == false) {
    	fmt.Println("Broker has been successful stopped")
    	} else {
    	StopBroker()
    	}
    }
    func StartBroker() {
    	status := GetStatus()
    	if (status == false) {
    	start := RunCommand("service kafka start")
    	fmt.Println(string(start))
    	time.Sleep(60000 * time.Millisecond)
    	}
    }
    func GetBrokerId() string {
    	output := RunCommand("cat /srv/kafka/meta.properties | grep broker.id | cut -d'=' -f2")
    	return strings.TrimRight(string(output),"\n")
    }
    func GetBrokerList() []string {
    	output1 := RunCommand("echo dump | nc localhost 2181 | grep brokers")
    	var brokers []string
    	lines:= strings.Split(string(output1),"\n")
    for _, line := range lines {
    	trimString := strings.TrimSpace(line)
    	finalString := strings.TrimPrefix(trimString, "/brokers/ids/")
    	brokers = append(brokers, finalString)
    	}
     return brokers
    }
    func Contain(val1 string, val2 []string) bool {
      for _,value := range val2 {
    	if (value == val1) {
    		return true
    }
    }
    	return false
    }
    func RunCommand(command string) []byte {
     cmd :=exec.Command("/bin/sh", "-c", command)
     result,err := cmd.Output()
     check(err)
     return result
    }
    
    
    func check(e error) {
        if e != nil {
            panic(e)
        }
    }
    

    Cheers

  • Command to start sysdig container – redundant but useful

    Hi,

    This is more like a easier way to find the command without searching the net:

    docker run -it --rm --name=sysdig --privileged=true \
       --volume=/var/run/docker.sock:/host/var/run/docker.sock \
       --volume=/dev:/host/dev \
       --volume=/proc:/host/proc:ro \
       --volume=/boot:/host/boot:ro \
       --volume=/lib/modules:/host/lib/modules:ro \
       --volume=/usr:/host/usr:ro \
       sysdig/sysdig

    The actual command on starting a sysdig container. I will get more in depth with some Kafka cluster aggregated info from this amazing tool and also what it takes to send it to an elastic cluster.
    It will be challenging, but this is how it goes in IT in our days.

    Cheers

  • Consumer group coordinator in Kafka using some scala script

    Morning,

    Just a small post regarding returning a consumer group coordinator for a specific consumer group.
    We had the issue that consumer groups are re-balancing and we didn’t knew if it’s related to application logic or the Consumer Group Coordinator was changing and the Kafka cluster was reassign a different one each time. So, a small piece of code was needed. I was using the libraries that are sent with Kafka 1.0.0 for this test so be aware of the classpath update if you want to modify this.

    In order to do the test, i started a standalone Confluent Kafka image which normally listens on port 29092. For more details please consult their documentation here

    I also created a test topic with one partition and same replication factor. Produced some messages in the topic and after that started a console consumer:

    sorin@debian-test:~/kafka_2.11-1.0.0/bin$ ./kafka-console-consumer.sh --bootstrap-server localhost:29092 --topic test --from-beginning
    test
    message
    test message
    

    Once this is started you can also see it using consumer-groups command like this:

    sorin@debian-test:~/kafka_2.11-1.0.0/bin$ ./kafka-consumer-groups.sh --bootstrap-server localhost:29092 --list
    Note: This will not show information about old Zookeeper-based consumers.
    
    console-consumer-49198
    console-consumer-66063
    console-consumer-77631
    

    Now my console consumer is identified by console-consumer-77631 and in order to see the group coordinator you will have to run something like:

    ./getconsumercoordinator.sh localhost 29092 console-consumer-77631
    warning: there were three deprecation warnings; re-run with -deprecation for details
    one warning found
    Creating connection to: localhost 29092 
    log4j:WARN No appenders could be found for logger (kafka.network.BlockingChannel).
    log4j:WARN Please initialize the log4j system properly.
    log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
    Channel connected
    SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
    SLF4J: Defaulting to no-operation (NOP) logger implementation
    SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
    GroupCoordinatorResponse(Some(BrokerEndPoint(1001,localhost,29092)),NONE,0)
    

    It’s clear that since we have only one broker, that is also the coordinator.
    Regarding the details for the code i used this link and also, in order to search for all of the dependencies, since i don’t have a scala project, just a script the following command was of great use

    for i in *.jar; do jar -tvf "$i" | grep -Hsi ClassName && echo "$i"; done

    Here is also the code:

    #!/bin/sh
    exec scala -classpath "/home/sorin/kafka_2.11-1.0.0/libs/kafka-clients-1.0.0.jar:/home/sorin/kafka_2.11-1.0.0/libs/kafka_2.11-1.0.0.jar:/home/sorin/kafka_2.11-1.0.0/libs/slf4j-api-1.7.25.jar:/home/sorin/kafka_2.11-1.0.0/libs/jackson-core-2.9.1.jar:/home/sorin/kafka_2.11-1.0.0/libs/jackson-databind-2.9.1.jar:/home/sorin/kafka_2.11-1.0.0/libs/jackson-annotations-2.9.1.jar:/home/sorin/kafka_2.11-1.0.0/libs/log4j-1.2.17.jar" "$0" "$1" "$2" "$@"
    !#
    
    import com.fasterxml.jackson.core._
    import com.fasterxml.jackson.databind.ObjectMapper
    import kafka.network.BlockingChannel
    import kafka.api.GroupCoordinatorRequest
    import kafka.api.GroupCoordinatorResponse
    import org.slf4j.LoggerFactory
    
    val hostname = args(0)
    val port = args(1).toInt
    val group = args(2)
    println("Creating connection to: " + hostname + " " + port + " ")
    
    var channel = new BlockingChannel(hostname, port, 1048576, 1048576, readTimeoutMs = 50000)
    channel.connect()
    if (channel.isConnected) {
      println("Channel connected")
    channel.send(GroupCoordinatorRequest(group))
    val metadataResponse = GroupCoordinatorResponse.readFrom(channel.receive.payload())
    println(metadataResponse) }
    channel.disconnect()
    

    Regarding the code, the first part is to run scala from shell script, you need to update the lasspath with all libraries and also specify how many parameters to be used. In our case this is three. Also, if you won’t add all of the jackson, log4j and slf4j dependencies, it won’t work.

    P.S: It will work also by running exec scala -classpath "/home/sorin/kafka_2.11-1.0.0/libs/*

    Cheers!

  • Apache Kafka technical logs to ELK stack

    Morning,

    Just wanted to share with you the following article:

    https://www.elastic.co/blog/monitoring-kafka-with-elastic-stack-1-filebeat

    I will try to share my experience once i have all i need to start working on it. As far as i understood from managing Kafka for a while, monitoring is not enough, this is mandatory in order to manage you clusters in a clear and transparent mattter.

    Cheers

  • List differences between two sftp hosts using golang

    Hi,

    Just as a intermediate post as i wanted to play a little bit with golang, let me show you what i managed to put together in some days. I created a virtual machine on which i installed docker and grabbed a sftp image. You can try first two from Docker Hub, it should work.
    So i pulled this image and initiated two containers as shown below:

    eaf3b93798b5        asavartzeth/sftp    "/entrypoint.sh /u..."   21 hours ago        Up About a minute         0.0.0.0:2225->22/tcp   server4
    ec7d7e1d029f        asavartzeth/sftp    "/entrypoint.sh /u..."   21 hours ago        Up About a minute         0.0.0.0:2224->22/tcp   server3
    

    The command to do this looks like:

    docker run --name server3 -v /home/sorin/sftp1:/chroot/sorin:rw -e SFTP_USER=sorin -e SFTP_PASS=pass -p 2224:22 -d asavartzeth/sftp
    docker run --name server4 -v /home/sorin/sftp2:/chroot/sorin:rw -e SFTP_USER=sorin -e SFTP_PASS=pass -p 2225:22 -d asavartzeth/sftp

    Main info to know about these containers is that they should be accessible by user sorin and the path were the external directories are mapped is on /chroot/sorin.

    You can manually test the connection by using a simple command like:

    sftp -P 2224 sorin@localhost

    If you are using the container ip address i observed that you will use the default 22 port to connect to them. Not really clear why but this is not about that.

    Once the servers are up and running you can test the differences between the structure using following code:

    
    package main
    
    import (
    	"fmt"
    
    	"github.com/pkg/sftp"
    	"golang.org/x/crypto/ssh"
    )
    
    type ServerFiles struct {
    	Name  string
    	files []string
    }
    
    func main() {
    
    	server1client := ConnectSftp("localhost:2224", "sorin", "pass")
    	server1files := ReadPath(server1client)
    	server1struct := BuildStruct("172.17.0.2", server1files)
    	server2client := ConnectSftp("localhost:2225", "sorin", "pass")
    	server2files := ReadPath(server2client)
    	server2struct := BuildStruct("172.17.0.3", server2files)
    	diffilesstruct := CompareStruct(server1struct, server2struct)
            for _, values := range diffilestruct.files {
            fmt.Printf("%s %s\n", diffilesstruct.Name, values)
     }
    	CloseConnection(server1client)
    	CloseConnection(server2client)
    }
    func CheckError(err error) {
    	if err != nil {
    		panic(err)
    	}
    }
    func ConnectSftp(address string, user string, password string) *sftp.Client {
    	config := &ssh.ClientConfig{
    		User: user,
    		Auth: []ssh.AuthMethod{
    			ssh.Password(password),
    		},
    		HostKeyCallback: ssh.InsecureIgnoreHostKey(),
    	}
    	conn, err := ssh.Dial("tcp", address, config)
    	CheckError(err)
    
    	client, err := sftp.NewClient(conn)
    	CheckError(err)
    
    	return client
    }
    func ReadPath(client *sftp.Client) []string {
    	var paths []string
    	w := client.Walk("/")
    	for w.Step() {
    		if w.Err() != nil {
    			continue
    		}
    		
    		paths = append(paths, w.Path())
    	}
    	return paths
    }
    func BuildStruct(address string, files []string) *ServerFiles {
    	server := new(ServerFiles)
    	server.Name = address
    	server.files = files
    
    	return server
    }
    func CompareStruct(struct1 *ServerFiles, struct2 *ServerFiles) *ServerFiles {
    
    	diff := difference(struct1.files, struct2.files)
    	diffstruct := new(ServerFiles)
    	for _, value := range diff {
    		for _, valueP := range struct1.files {
    			if valueP == value {
    				
    				diffstruct.Name = struct1.Name
    				diffstruct.files = append(diffstruct.files, valueP)
    			}
    		}
    		for _, valueQ := range struct2.files {
    			if valueQ == value {
    				
    				diffstruct.Name = struct2.Name
    				diffstruct.files = append(diffstruct.files, valueQ)
    			}
    		}
    	}
    	return diffstruct
    }
    func difference(slice1 []string, slice2 []string) []string {
    	var diff []string
    
    	// Loop two times, first to find slice1 strings not in slice2,
    	// second loop to find slice2 strings not in slice1
    	for i := 0; i < 2; i++ {
    		for _, s1 := range slice1 {
    			found := false
    			for _, s2 := range slice2 {
    				if s1 == s2 {
    					found = true
    					break
    				}
    			}
    			// String not found. We add it to return slice
    			if !found {
    				diff = append(diff, s1)
    			}
    		}
    		// Swap the slices, only if it was the first loop
    		if i == 0 {
    			slice1, slice2 = slice2, slice1
    		}
    	}
    
    	return diff
    }
    func CloseConnection(client *sftp.Client) {
    	client.Close()
    }

    This actually connects to each server, reads the hole filepath and puts it on a structure. After this is done for both servers, there is a method that compares only the slice part of the struct and returns the differences. On this differences there is another structure constructed with only the differences.
    It is true that i took the differences func from stackoverflow, and it's far from good code, but i am working on it, this is the first draft, i will post different versions as it gets better.

    The output if there are differences will look like this:

    172.17.0.2 /sorin/subdirectory
    172.17.0.2 /sorin/subdirectory/subtest.file
    172.17.0.2 /sorin/test.file
    172.17.0.3 /sorin/test2
    

    If there are no differences that it will just exit.
    Working on improving my golang experience. Keep you posted.

    Cheers!

  • Memory debug by Heroku guys on Apache Kafka – nice one

    Hi,

    I know, i should write more about my experience with Apache Kafka, have patience, it’s still building, but until then please check this article:

    https://blog.heroku.com/fixing-kafka-memory-leak

    Be aware of the things that you want to include in functionalities and code that is written beside Apache Kafka functionalities, it might get you in to trouble.

    I am very happy that sysdig is used by more and more teams for debug, it’s truly a great tool for this kind of situations.

    Cheers!

  • Configure Jupyter Notebook on Raspberry PI 2 for remote access and scala kernel install

    Hi,

    This is a continuation of the previous article regarding Jupyter Notebook (http://log-it.tech/2017/09/02/installing-jupyter-notebook-raspberry-pi-2/) Let’s start with my modification in order to have an remote connection to it. It first needs a password in the form of password hash. To generate this pass run python cli and execute this code from IPython.lib import passwd;passwd(“your_custom_password”). Once you get the password hash, we can list the fields that i uncommented to activate minimal remote access:

    c.NotebookApp.open_browser = False #do not open a browser on notebook start, you will access it by daemon remotely
    c.NotebookApp.ip = '*' #permite access on every interface of the server
    c.NotebookApp.password = u'[your_pass_has]' #setup password in order to access the notebook, otherwise token from server is required (if you want it this way you can get the token by running sudo systemctl status jupyter.service 

    You can also add them at the bottom of the file as well. In order for the changes to take effect you will need also to perform a service restart with sudo systemctl restart jupyter.service.

    You have now the basic steps to run Jupyter Notebook with the IPython 2 kernel. Now lets’s ger to the next step of installing the scala kernel(https://www.scala-lang.org).

    The steps are pretty straight forward and they are taken from this link https://www.packtpub.com/mapt/book/big_data_and_business_intelligence/9781785884870/9/ch09lvl1sec65/installing-the-scala-kernel , what i tried is to put it end to end. I am not 100% sure if you need also java 8 but i installed it anyway, you will find the steps here https://www.raspinews.com/installing-oracle-java-jdk-8-on-raspberry-pi/ but what you really need to install is sbt.

    The catch here is that you don’t need to search for sbt on raspberry, just drop the default one, it will do the job. The steps are listed here http://www.scala-sbt.org/release/docs/Installing-sbt-on-Linux.html. Once it is installed you can return to the link listed above and just run the steps:

    apt-get install git
    git clone https://github.com/alexarchambault/jupyter-scala.git
    cd jupyter-scala
    sbt cli/packArchive

    Sbt will grab a lot of dependences, if you work with proxies i am not aware of the settings that you need to do, but you can search it and probably you find a solution. Have patience, it will take a while until it is done, but once it is done you can run ./jupyter-scala in order to install the kernel and also check if it works with jupyter kernelspec list.

    Restart the Jupyter Notebook to update it, although i am not convinced if it’s necessary 🙂
    In my case i have a dynamic dns service from my internet provider but i think you can do it with a free dns provider on your router as well. An extra forward or NAT of port 8888 will be needed but once this is done you should have a playgroup in your browser that knows python and scala. Cool, isn’t it?

    Cheers