Skip to content

Commit

Permalink
Merge pull request #104 from 0xcregis/hotfix/main-240306
Browse files Browse the repository at this point in the history
fix: cancel logs for monitor
  • Loading branch information
sunjiangjun authored Mar 6, 2024
2 parents a8f41d8 + 11ab103 commit 63610ce
Showing 1 changed file with 16 additions and 4 deletions.
20 changes: 16 additions & 4 deletions collect/service/monitor/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,11 +126,15 @@ func (s *Service) CheckNodeTask() {
continue
}

tempList := make([]*kafka.Message, 0, 10)
log := s.log.WithField("Id", time.Now().UnixNano())
log.Printf("NodeTask|len:%v,chainCode:%v", len(list), chainCode)

tempList := make([]*kafka.Message, 0, 10)
expireCount := 0
for _, hash := range list {
count, task, err := store.GetNodeTask(chainCode, hash)
if err != nil || count >= 5 || task == nil {
expireCount++
continue
}

Expand All @@ -148,7 +152,7 @@ func (s *Service) CheckNodeTask() {
task.Id = time.Now().UnixNano()
task.TaskStatus = 0
task.NodeId = s.nodeId
s.log.Printf("NodeTask|task:%+v", task)
//s.log.Printf("NodeTask|task:%+v", task)

bs, _ := json.Marshal(task)
msg := &kafka.Message{Topic: fmt.Sprintf(NodeTaskTopic, chainCode), Partition: 0, Key: []byte(task.NodeId), Value: bs}
Expand All @@ -161,6 +165,8 @@ func (s *Service) CheckNodeTask() {
}

s.kafkaSender[chainCode] <- tempList

log.Printf("--------------NodeTask|End:%v,chainCode:%v,len:%v,expireCount:%v---------------", time.Now(), chainCode, len(list), expireCount)
}

}
Expand Down Expand Up @@ -225,11 +231,15 @@ func (s *Service) CheckErrTx() {
continue
}

tempList := make([]*kafka.Message, 0, 10)
log := s.log.WithField("Id", time.Now().UnixNano())
log.Printf("ErrTx|len:%v,chainCode:%v", len(list), chainCode)

tempList := make([]*kafka.Message, 0, 10)
expireCount := 0
for _, hash := range list {
count, task, err := store.GetErrTxNodeTask(chainCode, hash)
if err != nil || count >= 5 {
expireCount++
continue
}

Expand All @@ -251,7 +261,7 @@ func (s *Service) CheckErrTx() {
}
task.Id = time.Now().UnixNano()
task.TaskStatus = 0
s.log.Printf("ErrTx|task:%+v", task)
//s.log.Printf("ErrTx|task:%+v", task)

bs, _ := json.Marshal(task)
msg := &kafka.Message{Topic: fmt.Sprintf(NodeTaskTopic, chainCode), Partition: 0, Key: []byte(task.NodeId), Value: bs}
Expand All @@ -264,6 +274,8 @@ func (s *Service) CheckErrTx() {
}

s.kafkaSender[chainCode] <- tempList

log.Printf("--------------ErrTx|End:%v,chainCode:%v,len:%v,expireCount:%v---------------", time.Now(), chainCode, len(list), expireCount)
}

}
Expand Down

0 comments on commit 63610ce

Please sign in to comment.