Small go code example for zookeeper resource editing

Hi,

We have the task of “service restart coordination” for our Apache Kafka cluster. It’s still a work in progress but if you want to use the zookeeper for some status verification and update, something like this will work as an example.

package main

import (
	"fmt"
	"io"
	"launchpad.net/gozk"
	"os"
	"strings"
	"sync"
	"time"
)

const (
	SERVICEPATH = "/servicerestart"
)

var wg sync.WaitGroup

func main() {
	conn := "zk1:2181,zk2:2181,zk3:2181"
	connSlice := strings.Split(string(conn), ",")
	var flag bool
	args := os.Args
	if len(args) != 2 {
		io.WriteString(os.Stdout, "Argument is needed for the script\n")
		os.Exit(1)
	} else {
		switch args[1] {
		case "hang":
			flag = false
		case "nohang":
			flag = true		
		default:
			io.WriteString(os.Stdout, "Command unrecognized\n")	
	}
		
	}
	wg.Add(1)
	go ModifyZooStat(connSlice, flag)
	wg.Wait()
}
func ModifyZooStat(strconn []string, flag bool) {
	var zooReach string
	for _, zoohost := range strconn {
		zk, _, err := zookeeper.Dial(zoohost, 5e9)
		if err != nil {
			fmt.Println("Couldn't connect to " + zoohost)
			continue
		} else {
			zooReach = zoohost
			zk.Close()
			break
		}
	}
	zkf, sessionf, _ := zookeeper.Dial(zooReach, 5e9)
defer zkf.Close()
	event := <-sessionf
	if event.State != zookeeper.STATE_CONNECTED {
		fmt.Println("Couldn't connect")
	}
	acl := []zookeeper.ACL{zookeeper.ACL{Perms: zookeeper.PERM_ALL, Scheme: "world", Id: "anyone"}}
	host, _ := os.Hostname()
	t := time.Now()
	servicerestart, _ := zkf.Exists(SERVICEPATH)
	if servicerestart == nil {
		path, _ := zkf.Create(SERVICEPATH, host+" "+t.Format(time.Kitchen), zookeeper.EPHEMERAL, acl)
		fmt.Println(path)
	} else {
		change, _ := zkf.Set(SERVICEPATH, host+" "+t.Format(time.Kitchen), -1)
		fmt.Println(change.MTime().Format(time.Kitchen))
	}
	if flag {
		wg.Done()
	}

}

Let me explain what it does. Basically it takes a zookeeper connection string and it splits it per server. This was a requirement from the zk module used. It could’n take as argument more than one case of host:2181.
After we found the active server, we can connect to it and put in the /servicerestart path the hostname and also the time on which the resource was edited.
In order to create a resource, you will need an ACL slice that will be passed as parameter.

acl := []zookeeper.ACL{zookeeper.ACL{Perms: zookeeper.PERM_ALL, Scheme: "world", Id: "anyone"}}

Once this slice is created we will get in the next step and check if the resource exists. If it doesn’t then we will create it and if it does, we will just modify it.

The fmt.Println instructions are put basically for two reasons.

  • In order to see the resource that it’s created. And i wanted to do that because zookeeper.EPHEMERAL parameter only creates this resource as long as the connection is active. If you want persistence, you will have to use zookeeper.SEQUENCE but it will add to your resource name a unique counter.
  • Also see the timestamp when the resource was modified.

Even if you don’t close the zookeeper connection with defer zkf.Close(), it will close it automatically and end the script. So, we still need a way to keep it alive, and we will do that using WaitGroups…
We will add one function in the queue and wait for it to finish. And to control this we can use a parameter that is mapped to a flag.

This is just a very small example and i am still a true beginner in the art of Go programming, but hope it helps 🙂

Cheers