From 9819b906415875e60b412628a721e58c9980d4b9 Mon Sep 17 00:00:00 2001 From: sunhongtao Date: Wed, 6 Dec 2023 17:29:09 +0800 Subject: [PATCH] perf: format kafka topic --- blockchain/service/http_handler.go | 2 +- collect/service/cmd/cmd.go | 6 ++--- common/chain/chain.go | 36 +++++++++++++++--------------- store/service/store_service.go | 10 ++++----- store/service/ws_service.go | 2 +- 5 files changed, 28 insertions(+), 28 deletions(-) diff --git a/blockchain/service/http_handler.go b/blockchain/service/http_handler.go index 82e80aa..a715e51 100644 --- a/blockchain/service/http_handler.go +++ b/blockchain/service/http_handler.go @@ -434,7 +434,7 @@ func (h *HttpHandler) SendRawTx(ctx *gin.Context) { defer func(backup map[string]any, chainCode int64) { bs, _ := json.Marshal(backup) - msg := &kafka.Message{Topic: fmt.Sprintf("%v_%v", h.kafkaCfg.Topic, chainCode), Partition: h.kafkaCfg.Partition, Key: []byte(fmt.Sprintf("%v", time.Now().UnixNano())), Value: bs} + msg := &kafka.Message{Topic: fmt.Sprintf("%v-%v", h.kafkaCfg.Topic, chainCode), Partition: h.kafkaCfg.Partition, Key: []byte(fmt.Sprintf("%v", time.Now().UnixNano())), Value: bs} h.sendCh <- []*kafka.Message{msg} }(backup, blockChainCode) diff --git a/collect/service/cmd/cmd.go b/collect/service/cmd/cmd.go index 4cc65a1..f506496 100644 --- a/collect/service/cmd/cmd.go +++ b/collect/service/cmd/cmd.go @@ -785,7 +785,7 @@ func (c *Cmd) HandlerBlock(block *collect.BlockInterface) (*kafka.Message, error } r = b } - return &kafka.Message{Topic: k.Topic, Partition: k.Partition, Time: time.Now(), Key: []byte(block.BlockHash), Value: r}, nil + return &kafka.Message{Topic: fmt.Sprintf("%v-%v", c.chain.BlockChainCode, k.Topic), Partition: k.Partition, Time: time.Now(), Key: []byte(block.BlockHash), Value: r}, nil } func (c *Cmd) HandlerTx(tx *collect.TxInterface) (*kafka.Message, error) { @@ -808,7 +808,7 @@ func (c *Cmd) HandlerTx(tx *collect.TxInterface) (*kafka.Message, error) { r = b } - return &kafka.Message{Topic: k.Topic, Partition: k.Partition, Time: time.Now(), Key: []byte(tx.TxHash), Value: r}, nil + return &kafka.Message{Topic: fmt.Sprintf("%v-%v", c.chain.BlockChainCode, k.Topic), Partition: k.Partition, Time: time.Now(), Key: []byte(tx.TxHash), Value: r}, nil } func (c *Cmd) HandlerReceipt(receipt *collect.ReceiptInterface) (*kafka.Message, error) { @@ -835,7 +835,7 @@ func (c *Cmd) HandlerReceipt(receipt *collect.ReceiptInterface) (*kafka.Message, } r = b } - return &kafka.Message{Topic: k.Topic, Partition: k.Partition, Time: time.Now(), Key: []byte(receipt.TransactionHash), Value: r}, nil + return &kafka.Message{Topic: fmt.Sprintf("%v-%v", c.chain.BlockChainCode, k.Topic), Partition: k.Partition, Time: time.Now(), Key: []byte(receipt.TransactionHash), Value: r}, nil } func (c *Cmd) Stop() { diff --git a/common/chain/chain.go b/common/chain/chain.go index c497a50..6e4060d 100644 --- a/common/chain/chain.go +++ b/common/chain/chain.go @@ -9,27 +9,27 @@ import ( ) // dev -var defaultChainCode = map[string]map[int64]int8{ - "ETH": {200: 1, 2001: 1}, - "POLYGON": {201: 1, 2011: 1}, - "BSC": {202: 1}, - "TRON": {205: 1}, - "BTC": {300: 1}, - "FIL": {301: 1}, - "XRP": {310: 1}, -} - -// main //var defaultChainCode = map[string]map[int64]int8{ -// "ETH": {60: 1, 6001: 1}, -// "POLYGON": {62: 1, 6201: 1}, -// "BSC": {2510: 1, 2610: 1}, -// "TRON": {195: 1, 198: 1}, -// "BTC": {0: 1, 1: 1}, -// "FIL": {2307: 1}, -// "XRP": {144: 1}, +// "ETH": {200: 1, 2001: 1}, +// "POLYGON": {201: 1, 2011: 1}, +// "BSC": {202: 1}, +// "TRON": {205: 1}, +// "BTC": {300: 1}, +// "FIL": {301: 1}, +// "XRP": {310: 1}, //} +// main +var defaultChainCode = map[string]map[int64]int8{ + "ETH": {60: 1, 6001: 1}, + "POLYGON": {62: 1, 6201: 1}, + "BSC": {2510: 1, 2610: 1}, + "TRON": {195: 1, 198: 1}, + "BTC": {0: 1, 1: 1}, + "FIL": {2307: 1}, + "XRP": {144: 1}, +} + func LoadConfig(path string) (string, error) { f, err := os.OpenFile(path, os.O_RDONLY, os.ModeAppend) if err != nil { diff --git a/store/service/store_service.go b/store/service/store_service.go index 98b6500..63d4e91 100644 --- a/store/service/store_service.go +++ b/store/service/store_service.go @@ -70,7 +70,7 @@ func (s *StoreHandler) ReadBackupTxFromKafka(blockChain int64, kafkaCfg map[stri Kafka := kafkaCfg["BackupTx"] broker := fmt.Sprintf("%v:%v", Kafka.Host, Kafka.Port) group := fmt.Sprintf("gr_store_backuptx_%v", Kafka.Group) - s.kafka.Read(&kafkaClient.Config{Brokers: []string{broker}, Topic: fmt.Sprintf("%v_%v", Kafka.Topic, blockChain), Group: group, Partition: Kafka.Partition, StartOffset: Kafka.StartOffset}, receiver, ctx2) + s.kafka.Read(&kafkaClient.Config{Brokers: []string{broker}, Topic: fmt.Sprintf("%v-%v", Kafka.Topic, blockChain), Group: group, Partition: Kafka.Partition, StartOffset: Kafka.StartOffset}, receiver, ctx2) }() list := make([]*store.BackupTx, 0, 20) @@ -120,7 +120,7 @@ func (s *StoreHandler) ReadSubTxFromKafka(blockChain int64, kafkaCfg map[string] Kafka := kafkaCfg["SubTx"] broker := fmt.Sprintf("%v:%v", Kafka.Host, Kafka.Port) group := fmt.Sprintf("gr_store_subtx_%v", Kafka.Group) - s.kafka.Read(&kafkaClient.Config{Brokers: []string{broker}, Topic: Kafka.Topic, Group: group, Partition: Kafka.Partition, StartOffset: Kafka.StartOffset}, receiver, ctx2) + s.kafka.Read(&kafkaClient.Config{Brokers: []string{broker}, Topic: fmt.Sprintf("%v-%v", blockChain, Kafka.Topic), Group: group, Partition: Kafka.Partition, StartOffset: Kafka.StartOffset}, receiver, ctx2) }() list := make([]*store.SubTx, 0, 20) @@ -170,7 +170,7 @@ func (s *StoreHandler) ReadTxFromKafka(blockChain int64, kafkaCfg map[string]*co Kafka := kafkaCfg["Tx"] broker := fmt.Sprintf("%v:%v", Kafka.Host, Kafka.Port) group := fmt.Sprintf("gr_store_tx_%v", Kafka.Group) - s.kafka.Read(&kafkaClient.Config{Brokers: []string{broker}, Topic: Kafka.Topic, Group: group, Partition: Kafka.Partition, StartOffset: Kafka.StartOffset}, receiver, ctx2) + s.kafka.Read(&kafkaClient.Config{Brokers: []string{broker}, Topic: fmt.Sprintf("%v-%v", blockChain, Kafka.Topic), Group: group, Partition: Kafka.Partition, StartOffset: Kafka.StartOffset}, receiver, ctx2) }() list := make([]*store.Tx, 0, 20) @@ -221,7 +221,7 @@ func (s *StoreHandler) ReadBlockFromKafka(blockChain int64, kafkaCfg map[string] Kafka := kafkaCfg["Block"] broker := fmt.Sprintf("%v:%v", Kafka.Host, Kafka.Port) group := fmt.Sprintf("gr_store_block_%v", Kafka.Group) - s.kafka.Read(&kafkaClient.Config{Brokers: []string{broker}, Topic: Kafka.Topic, Group: group, Partition: Kafka.Partition, StartOffset: Kafka.StartOffset}, receiver, ctx2) + s.kafka.Read(&kafkaClient.Config{Brokers: []string{broker}, Topic: fmt.Sprintf("%v-%v", blockChain, Kafka.Topic), Group: group, Partition: Kafka.Partition, StartOffset: Kafka.StartOffset}, receiver, ctx2) }() list := make([]*store.Block, 0, 20) @@ -269,7 +269,7 @@ func (s *StoreHandler) ReadReceiptFromKafka(blockChain int64, kafkaCfg map[strin Kafka := kafkaCfg["Receipt"] broker := fmt.Sprintf("%v:%v", Kafka.Host, Kafka.Port) group := fmt.Sprintf("gr_store_receipt_%v", Kafka.Group) - s.kafka.Read(&kafkaClient.Config{Brokers: []string{broker}, Topic: Kafka.Topic, Group: group, Partition: Kafka.Partition, StartOffset: Kafka.StartOffset}, receiver, ctx2) + s.kafka.Read(&kafkaClient.Config{Brokers: []string{broker}, Topic: fmt.Sprintf("%v-%v", blockChain, Kafka.Topic), Group: group, Partition: Kafka.Partition, StartOffset: Kafka.StartOffset}, receiver, ctx2) }() list := make([]*store.Receipt, 0, 20) diff --git a/store/service/ws_service.go b/store/service/ws_service.go index 0cb36a6..090ae09 100644 --- a/store/service/ws_service.go +++ b/store/service/ws_service.go @@ -427,7 +427,7 @@ func (ws *WsHandler) sendMessage(SubKafkaConfig *config.KafkaConfig, kafkaConfig //save to kafka if len(pushMp) > 0 && SubKafkaConfig != nil { r, _ := json.Marshal(tx) - m := &kafka.Message{Topic: SubKafkaConfig.Topic, Key: []byte(uuid.New().String()), Value: r} + m := &kafka.Message{Topic: fmt.Sprintf("%v-%v", blockChain, SubKafkaConfig.Topic), Key: []byte(uuid.New().String()), Value: r} bufferMessage = append(bufferMessage, m) }