Hi,
Not much to say, a pretty decent script for Kafka service restart(i tried to write it for our rolling upgrade procedure) that it’s still work in progress. If there are any changes that needed to be made to it, i will post it.
Here is the script:
package main
import (
"os/exec"
"strings"
"fmt"
"time"
"io/ioutil"
)
const (
libpath = "/opt/kafka/libs"
)
func main(){
StopBroker()
StartBroker()
fmt.Printf("You are running version %s",GetVersion(libpath))
}
func GetVersion(libpath string) string {
var KafkaVersion []string
files, err := ioutil.ReadDir(libpath)
check(err)
for _, f := range files {
if (strings.HasPrefix(f.Name(),"kafka_")) {
KafkaVersion = strings.Split(f.Name(),"-")
break
}
}
return KafkaVersion[1]
}
func GetStatus() bool {
brokers := GetBrokerList()
id := GetBrokerId()
status := Contain(id,brokers)
return status
}
func StopBroker() {
status := GetStatus()
brokers := GetBrokerList()
if (status == true) && (len(brokers) > 2) {
stop := RunCommand("service kafka stop")
fmt.Println(string(stop))
time.Sleep(60000 * time.Millisecond)
}
if (status == false) {
fmt.Println("Broker has been successful stopped")
} else {
StopBroker()
}
}
func StartBroker() {
status := GetStatus()
if (status == false) {
start := RunCommand("service kafka start")
fmt.Println(string(start))
time.Sleep(60000 * time.Millisecond)
}
}
func GetBrokerId() string {
output := RunCommand("cat /srv/kafka/meta.properties | grep broker.id | cut -d'=' -f2")
return strings.TrimRight(string(output),"\n")
}
func GetBrokerList() []string {
output1 := RunCommand("echo dump | nc localhost 2181 | grep brokers")
var brokers []string
lines:= strings.Split(string(output1),"\n")
for _, line := range lines {
trimString := strings.TrimSpace(line)
finalString := strings.TrimPrefix(trimString, "/brokers/ids/")
brokers = append(brokers, finalString)
}
return brokers
}
func Contain(val1 string, val2 []string) bool {
for _,value := range val2 {
if (value == val1) {
return true
}
}
return false
}
func RunCommand(command string) []byte {
cmd :=exec.Command("/bin/sh", "-c", command)
result,err := cmd.Output()
check(err)
return result
}
func check(e error) {
if e != nil {
panic(e)
}
}
Cheers