-
Notifications
You must be signed in to change notification settings - Fork 12
/
Copy pathentries_holder.go
218 lines (183 loc) · 6.18 KB
/
entries_holder.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
package be_indexer
import (
"fmt"
"sort"
"strings"
"github.com/echoface/be_indexer/codegen/cache"
"github.com/echoface/be_indexer/parser"
"github.com/echoface/be_indexer/util"
"google.golang.org/protobuf/proto"
)
type (
// EntriesHolder 存储索引的PostingList数据
// 目前的三种典型场景:
// 1. 内存KV 存储所有Field值对应的EntryID列表(PostingList)
// 2. AC自动机:用于将所有的values 构建生成AC自动机,对输入的语句找到匹配的PostingList
// 3. 1的一种扩展,引入网络\磁盘存储,内部维护一个LRU/LFU cache减轻内存压力
EntriesHolder interface {
EnableDebug(debug bool)
DumpInfo(buffer *strings.Builder)
DumpEntries(buffer *strings.Builder)
// GetEntries retrieve all satisfied PostingList from holder
GetEntries(field *FieldDesc, assigns Values) (EntriesCursors, error)
// IndexingBETx holder tokenize/parse values into what its needed data
// then wait IndexerBuilder call CommitAppend to apply 'Data' into holder
// when all expression prepare success in a conjunction
IndexingBETx(field *FieldDesc, bv *BoolValues) (TxData, error)
// CommitIndexingBETx NOTE: builder will panic when error return,
// because partial success for a conjunction will cause logic error
CommitIndexingBETx(tx IndexingBETx) error
// DecodeTxData decode data; used for building progress cache
DecodeTxData(data []byte) (TxData, error)
// CompileEntries finalize entries status for query, build or make sorted
// according to the paper, entries must be sorted
CompileEntries() error
}
TxData interface {
// BetterToCache if txData big enough, prefer to cache it; builder will
// detect all expressions in a conjunction and decide whether cache it or not
BetterToCache() bool
// Encode serialize TxData for caching
Encode() ([]byte, error)
}
IndexingBETx struct {
field *FieldDesc
holder EntriesHolder
EID EntryID
Data TxData
}
Term struct {
FieldID uint64
IDValue uint64
}
// DefaultEntriesHolder EntriesHolder implement base on hash map holder map<key, Entries>
// 默认容器,目前支持表达式最大256个field; 支持多个field复用默认容器; 见:Key编码逻辑
// 如果需要打破这个限制,可以自己实现容器.
DefaultEntriesHolder struct {
debug bool
maxLen int64 // max length of Entries
avgLen int64 // avg length of Entries
plEntries map[Term]Entries
Parser parser.FieldValueParser
FieldParser map[BEField]parser.FieldValueParser
}
Uint64TxData cache.Uint64ListValues
)
var (
BetterToCacheMaxItemsCount = 512
)
func NewTerm(fid, idValue uint64) Term {
return Term{FieldID: fid, IDValue: idValue}
}
func (tm Term) String() string {
return fmt.Sprintf("<%d,%d>", tm.FieldID, tm.IDValue)
}
func NewDefaultEntriesHolder() *DefaultEntriesHolder {
return &DefaultEntriesHolder{
plEntries: map[Term]Entries{},
Parser: parser.NewCommonParser(),
FieldParser: map[BEField]parser.FieldValueParser{},
}
}
func (txd *Uint64TxData) BetterToCache() bool {
return len(txd.Values) > BetterToCacheMaxItemsCount
}
func (txd *Uint64TxData) Encode() ([]byte, error) {
protoMsg := (*cache.Uint64ListValues)(txd)
return proto.Marshal(protoMsg)
}
// DecodeTxData decode data; used for building progress cache
func (h *DefaultEntriesHolder) DecodeTxData(data []byte) (TxData, error) {
if len(data) == 0 {
return &Uint64TxData{Values: nil}, nil
}
txData := &Uint64TxData{}
err := proto.Unmarshal(data, (*cache.Uint64ListValues)(txData))
return txData, err
}
func (h *DefaultEntriesHolder) EnableDebug(debug bool) {
h.debug = debug
}
// DumpInfo
// {name: %s, value_count:%d max_entries:%d avg_entries:%d}
func (h *DefaultEntriesHolder) DumpInfo(buffer *strings.Builder) {
summary := map[string]interface{}{
"name": HolderNameDefault,
"termCnt": len(h.plEntries),
"maxEntriesLen": h.maxLen,
"avgEntriesLen": h.avgLen,
}
for field, idGen := range h.FieldParser {
summary[fmt.Sprintf("field#%s#parser", field)] = idGen.Name()
}
buffer.WriteString(util.JSONPretty(summary))
}
func (h *DefaultEntriesHolder) DumpEntries(buffer *strings.Builder) {
buffer.WriteString("DefaultEntriesHolder entries:")
for key, entries := range h.plEntries {
buffer.WriteString("\n")
buffer.WriteString(key.String())
buffer.WriteString(":")
buffer.WriteString(strings.Join(entries.DocString(), ","))
}
}
func (h *DefaultEntriesHolder) GetParser(field BEField) parser.FieldValueParser {
if p, ok := h.FieldParser[field]; ok {
return p
}
return h.Parser
}
func (h *DefaultEntriesHolder) CompileEntries() error {
h.makeEntriesSorted()
return nil
}
func (h *DefaultEntriesHolder) GetEntries(field *FieldDesc, assigns Values) (r EntriesCursors, e error) {
var ids []uint64
if ids, e = h.GetParser(field.Field).ParseAssign(assigns); e != nil {
return nil, e
}
for _, id := range ids {
key := NewTerm(field.ID, id)
if entries, hit := h.plEntries[key]; hit && len(entries) > 0 {
cursor := NewEntriesCursor(NewQKey(field.Field, id), entries)
r = append(r, cursor)
}
}
return r, nil
}
func (h *DefaultEntriesHolder) IndexingBETx(field *FieldDesc, bv *BoolValues) (TxData, error) {
util.PanicIf(bv.Operator != ValueOptEQ, "default container support EQ operator only")
// NOTE: ids can be replicated if expression contain cross condition
ids, e := h.GetParser(field.Field).ParseValue(bv.Value)
if e != nil {
return nil, fmt.Errorf("field:%s value:%+v parse fail, err:%s", field.Field, bv, e.Error())
}
return &Uint64TxData{Values: ids}, nil
}
func (h *DefaultEntriesHolder) CommitIndexingBETx(tx IndexingBETx) error {
if tx.Data == nil {
return nil
}
var ok bool
data, ok := tx.Data.(*Uint64TxData)
util.PanicIf(!ok, "bad TxData need *Uint64TxData, oops")
values := util.DistinctInteger(data.Values)
for _, id := range values {
key := NewTerm(tx.field.ID, id)
h.plEntries[key] = append(h.plEntries[key], tx.EID)
}
return nil
}
func (h *DefaultEntriesHolder) makeEntriesSorted() {
var total int64
for _, entries := range h.plEntries {
sort.Sort(entries)
if h.maxLen < int64(len(entries)) {
h.maxLen = int64(len(entries))
}
total += int64(len(entries))
}
if len(h.plEntries) > 0 {
h.avgLen = total / int64(len(h.plEntries))
}
}