Skip to content
This repository has been archived by the owner on Jan 28, 2021. It is now read-only.

Commit

Permalink
Make mapping per partition (#681)
Browse files Browse the repository at this point in the history
Make mapping per partition
  • Loading branch information
ajnavarro authored Apr 23, 2019
2 parents e98fa12 + 409e0be commit 634630f
Show file tree
Hide file tree
Showing 6 changed files with 140 additions and 71 deletions.
70 changes: 54 additions & 16 deletions sql/index/pilosa/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,10 @@ const (
// ProcessingFileName is the extension of the lock/processing index file.
ProcessingFileName = ".processing"

// MappingFileName is the extension of the mapping file.
MappingFileName = "mapping.db"
// MappingFileNamePrefix is the prefix in mapping file <prefix>-<mappingKey><extension>
MappingFileNamePrefix = "map"
// MappingFileNameExtension is the extension in mapping file <prefix>-<mappingKey><extension>
MappingFileNameExtension = ".db"
)

const (
Expand Down Expand Up @@ -124,7 +126,6 @@ func (d *Driver) Create(
return nil, err
}

mapping := newMapping(d.mappingFilePath(db, table, id))
processingFile := d.processingFilePath(db, table, id)
if err := index.WriteProcessingFile(
processingFile,
Expand All @@ -133,7 +134,7 @@ func (d *Driver) Create(
return nil, err
}

return newPilosaIndex(idx, mapping, cfg), nil
return newPilosaIndex(idx, cfg), nil
}

// LoadAll loads all indexes for given db and table
Expand Down Expand Up @@ -187,7 +188,6 @@ func (d *Driver) loadIndex(db, table, id string) (*pilosaIndex, error) {
return nil, errCorruptedIndex.New(dir)
}

mapping := d.mappingFilePath(db, table, id)
processing := d.processingFilePath(db, table, id)
ok, err := index.ExistsProcessingFile(processing)
if err != nil {
Expand All @@ -213,11 +213,23 @@ func (d *Driver) loadIndex(db, table, id string) (*pilosaIndex, error) {
if err != nil {
return nil, err
}
if cfg.Driver(DriverID) == nil {
cfgDriver := cfg.Driver(DriverID)
if cfgDriver == nil {
return nil, errCorruptedIndex.New(dir)
}

return newPilosaIndex(idx, newMapping(mapping), cfg), nil
pilosaIndex := newPilosaIndex(idx, cfg)
for k, v := range cfgDriver {
if strings.HasPrefix(v, MappingFileNamePrefix) && strings.HasSuffix(v, MappingFileNameExtension) {
path := d.mappingFilePath(db, table, id, k)
if _, err := os.Stat(path); err != nil {
continue
}
pilosaIndex.mapping[k] = newMapping(path)
}
}

return pilosaIndex, nil
}

func (d *Driver) savePartition(
Expand Down Expand Up @@ -245,28 +257,33 @@ func (d *Driver) savePartition(
}

rollback := true
if err := idx.mapping.openCreate(true); err != nil {
mk := mappingKey(p)
mapping, ok := idx.mapping[mk]
if !ok {
return 0, errMappingNotFound.New(mk)
}
if err := mapping.openCreate(true); err != nil {
return 0, err
}

defer func() {
if rollback {
idx.mapping.rollback()
mapping.rollback()
} else {
e := d.saveMapping(ctx, idx.mapping, colID, false, b)
e := d.saveMapping(ctx, mapping, colID, false, b)
if e != nil && err == nil {
err = e
}
}

idx.mapping.close()
mapping.close()
kviter.Close()
}()

for colID = 0; err == nil; colID++ {
// commit each batch of objects (pilosa and boltdb)
if colID%sql.IndexBatchSize == 0 && colID != 0 {
if err = d.saveBatch(ctx, idx.mapping, colID, b); err != nil {
if err = d.saveBatch(ctx, mapping, colID, b); err != nil {
return 0, err
}
}
Expand All @@ -287,15 +304,15 @@ func (d *Driver) savePartition(
continue
}

rowID, err := idx.mapping.getRowID(field.Name(), values[i])
rowID, err := mapping.getRowID(field.Name(), values[i])
if err != nil {
return 0, err
}

b.bitBatches[i].Add(rowID, colID)
}

err = idx.mapping.putLocation(pilosaIndex.Name(), p, colID, location)
err = mapping.putLocation(pilosaIndex.Name(), colID, location)
if err != nil {
return 0, err
}
Expand Down Expand Up @@ -352,6 +369,13 @@ func (d *Driver) Save(
return err
}

cfgPath := d.configFilePath(i.Database(), i.Table(), i.ID())
cfg, err := index.ReadConfigFile(cfgPath)
if err != nil {
return err
}
driverCfg := cfg.Driver(DriverID)

defer iter.Close()
pilosaIndex := idx.index

Expand Down Expand Up @@ -382,6 +406,10 @@ func (d *Driver) Save(
wg.Wait()
return err
}
mk := mappingKey(p)
driverCfg[mk] = mappingFileName(mk)
mapping := newMapping(d.mappingFilePath(idx.Database(), idx.Table(), idx.ID(), mk))
idx.mapping[mk] = mapping

wg.Add(1)

Expand Down Expand Up @@ -417,6 +445,9 @@ func (d *Driver) Save(
if len(errors) > 0 {
return errors[0]
}
if err = index.WriteConfigFile(cfgPath, cfg); err != nil {
return err
}

logrus.WithFields(logrus.Fields{
"duration": time.Since(start),
Expand Down Expand Up @@ -469,6 +500,8 @@ func (d *Driver) Delete(i sql.Index, partitions sql.PartitionIter) error {
return err
}
}
mk := mappingKey(p)
delete(idx.mapping, mk)
}

return partitions.Close()
Expand Down Expand Up @@ -581,8 +614,13 @@ func (d *Driver) processingFilePath(db, table, id string) string {
return filepath.Join(d.root, db, table, id, ProcessingFileName)
}

func (d *Driver) mappingFilePath(db, table, id string) string {
return filepath.Join(d.root, db, table, id, MappingFileName)
func mappingFileName(key string) string {
h := sha1.New()
io.WriteString(h, key)
return fmt.Sprintf("%s-%x%s", MappingFileNamePrefix, h.Sum(nil), MappingFileNameExtension)
}
func (d *Driver) mappingFilePath(db, table, id string, key string) string {
return filepath.Join(d.root, db, table, id, mappingFileName(key))
}

func (d *Driver) newPilosaIndex(db, table string) (*pilosa.Index, error) {
Expand Down
18 changes: 12 additions & 6 deletions sql/index/pilosa/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ var (
// pilosaIndex is an pilosa implementation of sql.Index interface
type pilosaIndex struct {
index *concurrentPilosaIndex
mapping *mapping
mapping map[string]*mapping
cancel context.CancelFunc
wg sync.WaitGroup

Expand All @@ -70,7 +70,7 @@ type pilosaIndex struct {
checksum string
}

func newPilosaIndex(idx *pilosa.Index, mapping *mapping, cfg *index.Config) *pilosaIndex {
func newPilosaIndex(idx *pilosa.Index, cfg *index.Config) *pilosaIndex {
var checksum string
for _, c := range cfg.Drivers {
if ch, ok := c[sql.ChecksumKey]; ok {
Expand All @@ -85,7 +85,7 @@ func newPilosaIndex(idx *pilosa.Index, mapping *mapping, cfg *index.Config) *pil
table: cfg.Table,
id: cfg.ID,
expressions: cfg.Expressions,
mapping: mapping,
mapping: make(map[string]*mapping),
checksum: checksum,
}
}
Expand Down Expand Up @@ -116,15 +116,21 @@ func (idx *pilosaIndex) Get(keys ...interface{}) (sql.IndexLookup, error) {

// Has checks if the given key is present in the index mapping
func (idx *pilosaIndex) Has(p sql.Partition, key ...interface{}) (bool, error) {
if err := idx.mapping.open(); err != nil {
mk := mappingKey(p)
m, ok := idx.mapping[mk]
if !ok {
return false, errMappingNotFound.New(mk)
}

if err := m.open(); err != nil {
return false, err
}
defer idx.mapping.close()
defer m.close()

for i, expr := range idx.expressions {
name := fieldName(idx.ID(), expr, p)

val, err := idx.mapping.get(name, key[i])
val, err := m.get(name, key[i])
if err != nil || val == nil {
return false, err
}
Expand Down
4 changes: 1 addition & 3 deletions sql/index/pilosa/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (

"github.com/sirupsen/logrus"
bolt "go.etcd.io/bbolt"
"gopkg.in/src-d/go-mysql-server.v0/sql"
)

type locationValueIter struct {
Expand All @@ -32,7 +31,6 @@ type indexValueIter struct {
total uint64
bits []uint64
mapping *mapping
partition sql.Partition
indexName string

// share transaction and bucket on all getLocation calls
Expand All @@ -47,7 +45,7 @@ func (it *indexValueIter) Next() ([]byte, error) {
return nil, err
}

bucket, err := it.mapping.getBucket(it.indexName, it.partition, false)
bucket, err := it.mapping.getBucket(it.indexName, false)
if err != nil {
_ = it.Close()
return nil, err
Expand Down
Loading

0 comments on commit 634630f

Please sign in to comment.