Skip to content
This repository has been archived by the owner on Jan 28, 2021. It is now read-only.

Make mapping per partition #681

Merged
merged 1 commit into from
Apr 23, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 54 additions & 16 deletions sql/index/pilosa/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,10 @@ const (
// ProcessingFileName is the extension of the lock/processing index file.
ProcessingFileName = ".processing"

// MappingFileName is the extension of the mapping file.
MappingFileName = "mapping.db"
// MappingFileNamePrefix is the prefix in mapping file <prefix>-<mappingKey><extension>
MappingFileNamePrefix = "map"
// MappingFileNameExtension is the extension in mapping file <prefix>-<mappingKey><extension>
MappingFileNameExtension = ".db"
)

const (
Expand Down Expand Up @@ -124,7 +126,6 @@ func (d *Driver) Create(
return nil, err
}

mapping := newMapping(d.mappingFilePath(db, table, id))
processingFile := d.processingFilePath(db, table, id)
if err := index.WriteProcessingFile(
processingFile,
Expand All @@ -133,7 +134,7 @@ func (d *Driver) Create(
return nil, err
}

return newPilosaIndex(idx, mapping, cfg), nil
return newPilosaIndex(idx, cfg), nil
}

// LoadAll loads all indexes for given db and table
Expand Down Expand Up @@ -187,7 +188,6 @@ func (d *Driver) loadIndex(db, table, id string) (*pilosaIndex, error) {
return nil, errCorruptedIndex.New(dir)
}

mapping := d.mappingFilePath(db, table, id)
processing := d.processingFilePath(db, table, id)
ok, err := index.ExistsProcessingFile(processing)
if err != nil {
Expand All @@ -213,11 +213,23 @@ func (d *Driver) loadIndex(db, table, id string) (*pilosaIndex, error) {
if err != nil {
return nil, err
}
if cfg.Driver(DriverID) == nil {
cfgDriver := cfg.Driver(DriverID)
if cfgDriver == nil {
return nil, errCorruptedIndex.New(dir)
}

return newPilosaIndex(idx, newMapping(mapping), cfg), nil
pilosaIndex := newPilosaIndex(idx, cfg)
for k, v := range cfgDriver {
if strings.HasPrefix(v, MappingFileNamePrefix) && strings.HasSuffix(v, MappingFileNameExtension) {
path := d.mappingFilePath(db, table, id, k)
if _, err := os.Stat(path); err != nil {
continue
}
pilosaIndex.mapping[k] = newMapping(path)
}
}

return pilosaIndex, nil
}

func (d *Driver) savePartition(
Expand Down Expand Up @@ -245,28 +257,33 @@ func (d *Driver) savePartition(
}

rollback := true
if err := idx.mapping.openCreate(true); err != nil {
mk := mappingKey(p)
mapping, ok := idx.mapping[mk]
if !ok {
return 0, errMappingNotFound.New(mk)
}
if err := mapping.openCreate(true); err != nil {
return 0, err
}

defer func() {
if rollback {
idx.mapping.rollback()
mapping.rollback()
} else {
e := d.saveMapping(ctx, idx.mapping, colID, false, b)
e := d.saveMapping(ctx, mapping, colID, false, b)
if e != nil && err == nil {
err = e
}
}

idx.mapping.close()
mapping.close()
kviter.Close()
}()

for colID = 0; err == nil; colID++ {
// commit each batch of objects (pilosa and boltdb)
if colID%sql.IndexBatchSize == 0 && colID != 0 {
if err = d.saveBatch(ctx, idx.mapping, colID, b); err != nil {
if err = d.saveBatch(ctx, mapping, colID, b); err != nil {
return 0, err
}
}
Expand All @@ -287,15 +304,15 @@ func (d *Driver) savePartition(
continue
}

rowID, err := idx.mapping.getRowID(field.Name(), values[i])
rowID, err := mapping.getRowID(field.Name(), values[i])
if err != nil {
return 0, err
}

b.bitBatches[i].Add(rowID, colID)
}

err = idx.mapping.putLocation(pilosaIndex.Name(), p, colID, location)
err = mapping.putLocation(pilosaIndex.Name(), colID, location)
if err != nil {
return 0, err
}
Expand Down Expand Up @@ -352,6 +369,13 @@ func (d *Driver) Save(
return err
}

cfgPath := d.configFilePath(i.Database(), i.Table(), i.ID())
cfg, err := index.ReadConfigFile(cfgPath)
if err != nil {
return err
}
driverCfg := cfg.Driver(DriverID)

defer iter.Close()
pilosaIndex := idx.index

Expand Down Expand Up @@ -382,6 +406,10 @@ func (d *Driver) Save(
wg.Wait()
return err
}
mk := mappingKey(p)
driverCfg[mk] = mappingFileName(mk)
mapping := newMapping(d.mappingFilePath(idx.Database(), idx.Table(), idx.ID(), mk))
idx.mapping[mk] = mapping

wg.Add(1)

Expand Down Expand Up @@ -417,6 +445,9 @@ func (d *Driver) Save(
if len(errors) > 0 {
return errors[0]
}
if err = index.WriteConfigFile(cfgPath, cfg); err != nil {
return err
}

logrus.WithFields(logrus.Fields{
"duration": time.Since(start),
Expand Down Expand Up @@ -469,6 +500,8 @@ func (d *Driver) Delete(i sql.Index, partitions sql.PartitionIter) error {
return err
}
}
mk := mappingKey(p)
delete(idx.mapping, mk)
}

return partitions.Close()
Expand Down Expand Up @@ -581,8 +614,13 @@ func (d *Driver) processingFilePath(db, table, id string) string {
return filepath.Join(d.root, db, table, id, ProcessingFileName)
}

func (d *Driver) mappingFilePath(db, table, id string) string {
return filepath.Join(d.root, db, table, id, MappingFileName)
func mappingFileName(key string) string {
h := sha1.New()
io.WriteString(h, key)
return fmt.Sprintf("%s-%x%s", MappingFileNamePrefix, h.Sum(nil), MappingFileNameExtension)
}
func (d *Driver) mappingFilePath(db, table, id string, key string) string {
return filepath.Join(d.root, db, table, id, mappingFileName(key))
}

func (d *Driver) newPilosaIndex(db, table string) (*pilosa.Index, error) {
Expand Down
18 changes: 12 additions & 6 deletions sql/index/pilosa/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ var (
// pilosaIndex is an pilosa implementation of sql.Index interface
type pilosaIndex struct {
index *concurrentPilosaIndex
mapping *mapping
mapping map[string]*mapping
cancel context.CancelFunc
wg sync.WaitGroup

Expand All @@ -70,7 +70,7 @@ type pilosaIndex struct {
checksum string
}

func newPilosaIndex(idx *pilosa.Index, mapping *mapping, cfg *index.Config) *pilosaIndex {
func newPilosaIndex(idx *pilosa.Index, cfg *index.Config) *pilosaIndex {
var checksum string
for _, c := range cfg.Drivers {
if ch, ok := c[sql.ChecksumKey]; ok {
Expand All @@ -85,7 +85,7 @@ func newPilosaIndex(idx *pilosa.Index, mapping *mapping, cfg *index.Config) *pil
table: cfg.Table,
id: cfg.ID,
expressions: cfg.Expressions,
mapping: mapping,
mapping: make(map[string]*mapping),
checksum: checksum,
}
}
Expand Down Expand Up @@ -116,15 +116,21 @@ func (idx *pilosaIndex) Get(keys ...interface{}) (sql.IndexLookup, error) {

// Has checks if the given key is present in the index mapping
func (idx *pilosaIndex) Has(p sql.Partition, key ...interface{}) (bool, error) {
if err := idx.mapping.open(); err != nil {
mk := mappingKey(p)
m, ok := idx.mapping[mk]
if !ok {
return false, errMappingNotFound.New(mk)
}

if err := m.open(); err != nil {
return false, err
}
defer idx.mapping.close()
defer m.close()

for i, expr := range idx.expressions {
name := fieldName(idx.ID(), expr, p)

val, err := idx.mapping.get(name, key[i])
val, err := m.get(name, key[i])
if err != nil || val == nil {
return false, err
}
Expand Down
4 changes: 1 addition & 3 deletions sql/index/pilosa/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (

"github.com/sirupsen/logrus"
bolt "go.etcd.io/bbolt"
"gopkg.in/src-d/go-mysql-server.v0/sql"
)

type locationValueIter struct {
Expand All @@ -32,7 +31,6 @@ type indexValueIter struct {
total uint64
bits []uint64
mapping *mapping
partition sql.Partition
indexName string

// share transaction and bucket on all getLocation calls
Expand All @@ -47,7 +45,7 @@ func (it *indexValueIter) Next() ([]byte, error) {
return nil, err
}

bucket, err := it.mapping.getBucket(it.indexName, it.partition, false)
bucket, err := it.mapping.getBucket(it.indexName, false)
if err != nil {
_ = it.Close()
return nil, err
Expand Down
Loading