diff --git a/internal/nsqconsumer/outputs/elasticsearch/client.go b/internal/nsqconsumer/outputs/elasticsearch/client.go index 4513a7c..4bd91bb 100644 --- a/internal/nsqconsumer/outputs/elasticsearch/client.go +++ b/internal/nsqconsumer/outputs/elasticsearch/client.go @@ -3,10 +3,12 @@ package elasticsearch import ( "context" "encoding/json" + "fmt" "net/http" "sync" "time" + "github.com/jehiah/go-strftime" "github.com/marmotedu/iam/pkg/log" "github.com/olivere/elastic/v7" @@ -100,6 +102,11 @@ func (c *Client) Run(msgChan <-chan *message.Message) { } } +func (c *Client) indexName(topic string) string { + now := time.Now() + return strftime.Format(fmt.Sprintf("%s-%%&.%%m.%%d", topic), now) +} + func (c *Client) Publish(msgList []*message.Message) { bulkReq := elastic.NewBulkService(c.client) defer bulkReq.Reset() @@ -112,7 +119,7 @@ func (c *Client) Publish(msgList []*message.Message) { removeList = append(removeList, i) continue } - req := elastic.NewBulkIndexRequest().Index(m.GetTopic()).Doc(data) + req := elastic.NewBulkIndexRequest().Index(c.indexName(m.GetTopic())).Doc(data) bulkReq = bulkReq.Add(req) }