Skip to content

Commit

Permalink
Merge pull request #2 from butonic/index-fixes
Browse files Browse the repository at this point in the history
index fixes
  • Loading branch information
butonic authored Jul 22, 2020
2 parents f967059 + 99e2677 commit 8fa9c07
Showing 1 changed file with 110 additions and 15 deletions.
125 changes: 110 additions & 15 deletions pkg/service/v0/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"io/ioutil"
"os"
"path/filepath"
"strings"

"github.com/blevesearch/bleve"
"github.com/blevesearch/bleve/analysis/analyzer/keyword"
Expand All @@ -19,9 +18,9 @@ import (

// BleveDocument wraps the generated Record.Metadata and adds a property that is used to distinguish documents in the index.
type BleveDocument struct {
Metadata map[string]*proto.Field
Database string `json:"database"`
Table string `json:"table"`
Metadata map[string]*proto.Field `json:"metadata"`
Database string `json:"database"`
Table string `json:"table"`
}

// New returns a new instance of Service
Expand Down Expand Up @@ -63,9 +62,9 @@ func New(opts ...Option) (s *Service, err error) {
if s.index, err = bleve.New(indexDir, indexMapping); err != nil {
return
}
// if err = s.indexRecords(recordsDir); err != nil {
// return nil, err
// }
if err = s.indexRecords(recordsDir); err != nil {
return nil, err
}
return
}

Expand All @@ -80,7 +79,8 @@ type Service struct {
// Read implements the StoreHandler interface.
func (s *Service) Read(c context.Context, rreq *proto.ReadRequest, rres *proto.ReadResponse) error {
if len(rreq.Key) != 0 {
file := filepath.Join(s.Config.Datapath, "databases", rreq.Options.Database, rreq.Options.Table, rreq.Key)
id := getID(rreq.Options.Database, rreq.Options.Table, rreq.Key)
file := filepath.Join(s.Config.Datapath, "databases", id)

var data []byte
rec := &proto.Record{}
Expand All @@ -97,7 +97,7 @@ func (s *Service) Read(c context.Context, rreq *proto.ReadRequest, rres *proto.R
return nil
}

s.log.Info().Interface("requeest", rreq).Msg("read request")
s.log.Info().Interface("request", rreq).Msg("read request")
if rreq.Options.Where != nil {
// build bleve query
// execute search
Expand Down Expand Up @@ -149,9 +149,8 @@ func (s *Service) Read(c context.Context, rreq *proto.ReadRequest, rres *proto.R

// Write implements the StoreHandler interface.
func (s *Service) Write(c context.Context, wreq *proto.WriteRequest, wres *proto.WriteResponse) error {
// TODO sanitize key. As it may contain invalid characters, such as slashes.
// file: /var/tmp/ocis-store/databases/{database}/{table}/{record.key}.
file := filepath.Join(s.Config.Datapath, "databases", wreq.Options.Database, wreq.Options.Table, wreq.Record.Key)
id := getID(wreq.Options.Database, wreq.Options.Table, wreq.Record.Key)
file := filepath.Join(s.Config.Datapath, "databases", id)

var bytes []byte
bytes, err := protojson.Marshal(wreq.Record)
Expand All @@ -173,8 +172,7 @@ func (s *Service) Write(c context.Context, wreq *proto.WriteRequest, wres *proto
Database: wreq.Options.Database,
Table: wreq.Options.Table,
}
// TODO sanitize input.
if err := s.index.Index(strings.Join([]string{wreq.Options.Database, wreq.Options.Table, wreq.Record.Key}, "/"), doc); err != nil {
if err := s.index.Index(id, doc); err != nil {
s.log.Error().Err(err).Interface("document", doc).Msg("could not index record metadata")
return err
}
Expand All @@ -184,14 +182,21 @@ func (s *Service) Write(c context.Context, wreq *proto.WriteRequest, wres *proto

// Delete implements the StoreHandler interface.
func (s *Service) Delete(c context.Context, dreq *proto.DeleteRequest, dres *proto.DeleteResponse) error {
file := filepath.Join(s.Config.Datapath, "databases", dreq.Options.Database, dreq.Options.Table, dreq.Key)
id := getID(dreq.Options.Database, dreq.Options.Table, dreq.Key)
file := filepath.Join(s.Config.Datapath, "databases", id)
if err := os.Remove(file); err != nil {
if os.IsNotExist(err) {
return merrors.NotFound(s.id, "could not find record")
}

return merrors.InternalServerError(s.id, "could not delete record")
}

if err := s.index.Delete(id); err != nil {
s.log.Error().Err(err).Str("id", id).Msg("could not remove record from index")
return merrors.InternalServerError(s.id, "could not remove record from index")
}

return nil
}

Expand Down Expand Up @@ -235,3 +240,93 @@ func (s *Service) Tables(ctx context.Context, in *proto.TablesRequest, out *prot
out.Tables = tnames
return nil
}

// TODO sanitize key. As it may contain invalid characters, such as slashes.
// file: /var/tmp/ocis-store/databases/{database}/{table}/{record.key}.
func getID(database string, table string, key string) string {
// TODO sanitize input.
return filepath.Join(database, table, key)
}

func (s Service) indexRecords(recordsDir string) (err error) {

// TODO use filepath.Walk to clean up code
rh, err := os.Open(recordsDir)
if err != nil {
return merrors.InternalServerError(s.id, "could not open database directory")
}
defer rh.Close()

dbs, err := rh.Readdirnames(0)
if err != nil {
return merrors.InternalServerError(s.id, "could not read databases directory")
}

for i := range dbs {
tp := filepath.Join(s.Config.Datapath, "databases", dbs[i])
th, err := os.Open(tp)
if err != nil {
s.log.Error().Err(err).Str("database", dbs[i]).Msg("could not open database directory")
continue
}
defer th.Close()

tables, err := th.Readdirnames(0)
if err != nil {
s.log.Error().Err(err).Str("database", dbs[i]).Msg("could not read database directory")
continue
}

for j := range tables {

tp := filepath.Join(s.Config.Datapath, "databases", dbs[i], tables[j])
kh, err := os.Open(tp)
if err != nil {
s.log.Error().Err(err).Str("database", dbs[i]).Str("table", tables[i]).Msg("could not open table directory")
continue
}
defer kh.Close()

keys, err := kh.Readdirnames(0)
if err != nil {
s.log.Error().Err(err).Str("database", dbs[i]).Str("table", tables[i]).Msg("could not read table directory")
continue
}

for k := range keys {

id := getID(dbs[i], tables[j], keys[k])
kp := filepath.Join(s.Config.Datapath, "databases", id)

// read record
var data []byte
rec := &proto.Record{}
data, err = ioutil.ReadFile(kp)
if err != nil {
s.log.Error().Err(err).Str("id", id).Msg("could not read record")
continue
}

if err = protojson.Unmarshal(data, rec); err != nil {
s.log.Error().Err(err).Str("id", id).Msg("could not unmarshal record")
continue
}

// index record
doc := BleveDocument{
Metadata: rec.Metadata,
Database: dbs[i],
Table: tables[j],
}
if err := s.index.Index(id, doc); err != nil {
s.log.Error().Err(err).Interface("document", doc).Str("id", id).Msg("could not index record metadata")
continue
}

s.log.Debug().Str("id", id).Msg("indexed record")
}
}
}

return
}

0 comments on commit 8fa9c07

Please sign in to comment.