Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Read brokers and Partition size infomation from input json file instead of Zookeeper #257

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 50 additions & 1 deletion cmd/topicmappr/commands/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"fmt"
"os"
"time"
"io/ioutil"
"encoding/json"

"github.com/DataDog/kafka-kit/kafkazk"

Expand Down Expand Up @@ -42,6 +44,35 @@ func getBrokerMeta(cmd *cobra.Command, zk kafkazk.Handler, m bool) kafkazk.Broke
}
os.Exit(1)
}
// Get a broker map of the brokers in the current partition map.
// If meta data isn't being looked up, brokerMeta will be empty.
bmif, _ := cmd.Flags().GetString("brokers-storage-in-file")
if bmif != "" {
jsonFile, err := os.Open(bmif)
// if we os.Open returns an error then handle it
if err != nil {
fmt.Printf("Error on %s",err)
os.Exit(1)
}
// defer the closing of our jsonFile so that we can parse it later on
defer jsonFile.Close()
data, _ := ioutil.ReadAll(jsonFile)
bmm := kafkazk.BrokerMetricsMap{}
err = json.Unmarshal(data, &bmm)
if err != nil {
fmt.Errorf("Error unmarshalling broker metrics: %s", err.Error())
os.Exit(1)
}
// Populate each broker with
// metric data.
for bid := range brokerMeta {
m, exists := bmm[bid]
if exists {
brokerMeta[bid].StorageFree = m.StorageFree
brokerMeta[bid].MetricsIncomplete = false
}
}
}

return brokerMeta
}
Expand Down Expand Up @@ -70,6 +101,24 @@ func getPartitionMeta(cmd *cobra.Command, zk kafkazk.Handler) kafkazk.PartitionM
fmt.Println(err)
os.Exit(1)
}

// Get a the partitionMetaMap from input file
psif, _ := cmd.Flags().GetString("partitions-size-in-file")
if psif != "" {
jsonFile, err := os.Open(psif)
// if we os.Open returns an error then handle it
if err != nil {
fmt.Printf("Error on %s", err)
os.Exit(1)
}
// defer the closing of our jsonFile so that we can parse it later on
defer jsonFile.Close()
data, _ := ioutil.ReadAll(jsonFile)
err = json.Unmarshal(data, &partitionMeta)
if err != nil {
fmt.Errorf("Error unmarshalling broker metrics: %s", err.Error())
os.Exit(1)
}
}
//fmt.Println(partitionMeta)
return partitionMeta
}
9 changes: 7 additions & 2 deletions cmd/topicmappr/commands/rebuild.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ func init() {
rebuildCmd.Flags().Bool("use-meta", true, "Use broker metadata in placement constraints")
rebuildCmd.Flags().String("out-path", "", "Path to write output map files to")
rebuildCmd.Flags().String("out-file", "", "If defined, write a combined map of all topics to a file")
rebuildCmd.Flags().String("partitions-size-in-file", "", "Read Topics partitions sizes from a file")
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add the changes in this file to the rebalance.go file too. That will allow file input for the rebalance option too.

rebuildCmd.Flags().String("brokers-storage-in-file", "", "Read Brokers free storage from a file")
rebuildCmd.Flags().Bool("force-rebuild", false, "Forces a complete map rebuild")
rebuildCmd.Flags().Int("replication", 0, "Normalize the topic replication factor across all replica sets (0 results in a no-op)")
rebuildCmd.Flags().Bool("sub-affinity", false, "Replacement broker substitution affinity")
Expand All @@ -54,7 +56,8 @@ func rebuild(cmd *cobra.Command, _ []string) {
fr, _ := cmd.Flags().GetBool("force-rebuild")
sa, _ := cmd.Flags().GetBool("sub-affinity")
m, _ := cmd.Flags().GetBool("use-meta")

bsif, _ := cmd.Flags().GetString("brokers-storage-in-file")
psif, _ := cmd.Flags().GetString("partitions-size-in-file")
switch {
case ms == "" && t == "":
fmt.Println("\n[ERROR] must specify either --topics or --map-string")
Expand Down Expand Up @@ -103,7 +106,9 @@ func rebuild(cmd *cobra.Command, _ []string) {
// Fetch broker metadata.
var withMetrics bool
if cmd.Flag("placement").Value.String() == "storage" {
checkMetaAge(cmd, zk)
if bsif == "" || psif == "" {
checkMetaAge(cmd, zk)
}
withMetrics = true
}

Expand Down
2 changes: 1 addition & 1 deletion cmd/topicmappr/main.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package main

import "github.com/DataDog/kafka-kit/cmd/topicmappr/commands"
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change is not needed.

import "kafka-kit/cmd/topicmappr/commands"

func main() {
commands.Execute()
Expand Down