Tag: scala

  • Consumer group coordinator in Kafka using some scala script

    Morning,

    Just a small post regarding returning a consumer group coordinator for a specific consumer group.
    We had the issue that consumer groups are re-balancing and we didn’t knew if it’s related to application logic or the Consumer Group Coordinator was changing and the Kafka cluster was reassign a different one each time. So, a small piece of code was needed. I was using the libraries that are sent with Kafka 1.0.0 for this test so be aware of the classpath update if you want to modify this.

    In order to do the test, i started a standalone Confluent Kafka image which normally listens on port 29092. For more details please consult their documentation here

    I also created a test topic with one partition and same replication factor. Produced some messages in the topic and after that started a console consumer:

    sorin@debian-test:~/kafka_2.11-1.0.0/bin$ ./kafka-console-consumer.sh --bootstrap-server localhost:29092 --topic test --from-beginning
    test
    message
    test message
    

    Once this is started you can also see it using consumer-groups command like this:

    sorin@debian-test:~/kafka_2.11-1.0.0/bin$ ./kafka-consumer-groups.sh --bootstrap-server localhost:29092 --list
    Note: This will not show information about old Zookeeper-based consumers.
    
    console-consumer-49198
    console-consumer-66063
    console-consumer-77631
    

    Now my console consumer is identified by console-consumer-77631 and in order to see the group coordinator you will have to run something like:

    ./getconsumercoordinator.sh localhost 29092 console-consumer-77631
    warning: there were three deprecation warnings; re-run with -deprecation for details
    one warning found
    Creating connection to: localhost 29092 
    log4j:WARN No appenders could be found for logger (kafka.network.BlockingChannel).
    log4j:WARN Please initialize the log4j system properly.
    log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
    Channel connected
    SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
    SLF4J: Defaulting to no-operation (NOP) logger implementation
    SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
    GroupCoordinatorResponse(Some(BrokerEndPoint(1001,localhost,29092)),NONE,0)
    

    It’s clear that since we have only one broker, that is also the coordinator.
    Regarding the details for the code i used this link and also, in order to search for all of the dependencies, since i don’t have a scala project, just a script the following command was of great use

    for i in *.jar; do jar -tvf "$i" | grep -Hsi ClassName && echo "$i"; done

    Here is also the code:

    #!/bin/sh
    exec scala -classpath "/home/sorin/kafka_2.11-1.0.0/libs/kafka-clients-1.0.0.jar:/home/sorin/kafka_2.11-1.0.0/libs/kafka_2.11-1.0.0.jar:/home/sorin/kafka_2.11-1.0.0/libs/slf4j-api-1.7.25.jar:/home/sorin/kafka_2.11-1.0.0/libs/jackson-core-2.9.1.jar:/home/sorin/kafka_2.11-1.0.0/libs/jackson-databind-2.9.1.jar:/home/sorin/kafka_2.11-1.0.0/libs/jackson-annotations-2.9.1.jar:/home/sorin/kafka_2.11-1.0.0/libs/log4j-1.2.17.jar" "$0" "$1" "$2" "$@"
    !#
    
    import com.fasterxml.jackson.core._
    import com.fasterxml.jackson.databind.ObjectMapper
    import kafka.network.BlockingChannel
    import kafka.api.GroupCoordinatorRequest
    import kafka.api.GroupCoordinatorResponse
    import org.slf4j.LoggerFactory
    
    val hostname = args(0)
    val port = args(1).toInt
    val group = args(2)
    println("Creating connection to: " + hostname + " " + port + " ")
    
    var channel = new BlockingChannel(hostname, port, 1048576, 1048576, readTimeoutMs = 50000)
    channel.connect()
    if (channel.isConnected) {
      println("Channel connected")
    channel.send(GroupCoordinatorRequest(group))
    val metadataResponse = GroupCoordinatorResponse.readFrom(channel.receive.payload())
    println(metadataResponse) }
    channel.disconnect()
    

    Regarding the code, the first part is to run scala from shell script, you need to update the lasspath with all libraries and also specify how many parameters to be used. In our case this is three. Also, if you won’t add all of the jackson, log4j and slf4j dependencies, it won’t work.

    P.S: It will work also by running exec scala -classpath "/home/sorin/kafka_2.11-1.0.0/libs/*

    Cheers!

  • Configure Jupyter Notebook on Raspberry PI 2 for remote access and scala kernel install

    Hi,

    This is a continuation of the previous article regarding Jupyter Notebook (http://log-it.tech/2017/09/02/installing-jupyter-notebook-raspberry-pi-2/) Let’s start with my modification in order to have an remote connection to it. It first needs a password in the form of password hash. To generate this pass run python cli and execute this code from IPython.lib import passwd;passwd(“your_custom_password”). Once you get the password hash, we can list the fields that i uncommented to activate minimal remote access:

    c.NotebookApp.open_browser = False #do not open a browser on notebook start, you will access it by daemon remotely
    c.NotebookApp.ip = '*' #permite access on every interface of the server
    c.NotebookApp.password = u'[your_pass_has]' #setup password in order to access the notebook, otherwise token from server is required (if you want it this way you can get the token by running sudo systemctl status jupyter.service 

    You can also add them at the bottom of the file as well. In order for the changes to take effect you will need also to perform a service restart with sudo systemctl restart jupyter.service.

    You have now the basic steps to run Jupyter Notebook with the IPython 2 kernel. Now lets’s ger to the next step of installing the scala kernel(https://www.scala-lang.org).

    The steps are pretty straight forward and they are taken from this link https://www.packtpub.com/mapt/book/big_data_and_business_intelligence/9781785884870/9/ch09lvl1sec65/installing-the-scala-kernel , what i tried is to put it end to end. I am not 100% sure if you need also java 8 but i installed it anyway, you will find the steps here https://www.raspinews.com/installing-oracle-java-jdk-8-on-raspberry-pi/ but what you really need to install is sbt.

    The catch here is that you don’t need to search for sbt on raspberry, just drop the default one, it will do the job. The steps are listed here http://www.scala-sbt.org/release/docs/Installing-sbt-on-Linux.html. Once it is installed you can return to the link listed above and just run the steps:

    apt-get install git
    git clone https://github.com/alexarchambault/jupyter-scala.git
    cd jupyter-scala
    sbt cli/packArchive

    Sbt will grab a lot of dependences, if you work with proxies i am not aware of the settings that you need to do, but you can search it and probably you find a solution. Have patience, it will take a while until it is done, but once it is done you can run ./jupyter-scala in order to install the kernel and also check if it works with jupyter kernelspec list.

    Restart the Jupyter Notebook to update it, although i am not convinced if it’s necessary 🙂
    In my case i have a dynamic dns service from my internet provider but i think you can do it with a free dns provider on your router as well. An extra forward or NAT of port 8888 will be needed but once this is done you should have a playgroup in your browser that knows python and scala. Cool, isn’t it?

    Cheers