Skip to content

Commit

Permalink
Fixed review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
sr464e committed Oct 10, 2019
1 parent a971a27 commit a2b232c
Show file tree
Hide file tree
Showing 5 changed files with 5 additions and 18 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func main() {

Example for streaming the result
==========
Neptune provides 64 values per Response that is why Execute at present provides a [] of Response since it waits for all the responses to be retrieved and then provides it.In ExecuteAsync method it takes a channel to provide the Response as request parmeter and provides the Response as and when it is provided by Neptune. The Response are streamed to the caller and once all the Response are provided the channel is closed.
Neptune provides 64 values per Response that is why Execute at present provides a [] of Response since it waits for all the responses to be retrieved and then provides it.In ExecuteAsync method it takes a channel to provide the Response as request parameter and provides the Response as and when it is provided by Neptune. The Response are streamed to the caller and once all the Responses are provided the channel is closed.
go test -v -run ExecuteBulkDataAsync is the cmd to run the testcase)
```go
package main
Expand Down
2 changes: 1 addition & 1 deletion client.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ type Client struct {
responses chan []byte
results *sync.Map
responseNotifier *sync.Map // responseNotifier notifies the requester that a response has arrived for the request
responseStatusNotifier *sync.Map // responseNotifier notifies the requester that a response has arrived for the request with the code
responseStatusNotifier *sync.Map // responseStatusNotifier notifies the requester that a response has arrived for the request with the code
sync.RWMutex
Errored bool
}
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
module github.com/rsrinathr/gremtune
module github.com/schwartzmx/gremtune

require (
github.com/gofrs/uuid v3.2.0+incompatible
Expand Down
15 changes: 1 addition & 14 deletions gremtune_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,15 @@ type BulkResponse struct {
}

func truncateData(t *testing.T) {
log.Println("Removing all data from gremlin server strated...")
log.Println("Removing all data from gremlin server started...")
_, err := g.Execute(`g.V('1234').drop()`)
if err != nil {
t.Fatal(err)
}
//t.Logf("Removed all vertices, response: %v+ \n err: %s", string(r[0].Result.Data), err)
_, err = g.Execute(`g.V('2145').drop()`)
if err != nil {
t.Fatal(err)
}
//t.Logf("Removed all vertices, response: %v+ \n err: %s", string(r[0].Result.Data), err)
log.Println("Removing all data from gremlin server completed...")
}

Expand All @@ -56,7 +54,6 @@ func seedData(t *testing.T) {
if err != nil {
t.Fatal(err)
}
//t.Logf("Added two vertices and one edge, response: %v+ \n err: %s", string(r[0].Result.Data), err)
log.Println("Seeding data completed...")
}

Expand All @@ -66,12 +63,10 @@ func truncateBulkData(t *testing.T) {
if err != nil {
t.Fatal(err)
}
//t.Logf("Removed all vertices, response: %v+ \n err: %s", string(r[0].Result.Data), err)
_, err = g.Execute(`g.V().hasLabel('EmployerBulkData').drop()`)
if err != nil {
t.Fatal(err)
}
//t.Logf("Removed all vertices, response: %v+ \n err: %s", string(r[0].Result.Data), err)
log.Println("Removing bulk data from gremlin server completed...")
}

Expand All @@ -85,14 +80,12 @@ func seedBulkData(t *testing.T) {
if err != nil {
t.Fatal(err)
}
//t.Logf("Added EmployerBulkData vertices, response: %v+ \n err: %s", string(r[0].Result.Data), err)

for i := 9001; i < 9641; i++ {
_, err = g.Execute("g.addV('EmployeeBulkData').property(id, '" + strconv.Itoa(i) + "').property('timestamp', '2018-07-01T13:37:45-05:00').property('source', 'tree').as('y').addE('employes').from(V('1234567890')).to('y')")
if err != nil {
t.Fatal(err)
}
//t.Logf("Added two vertices and one edge, response: %v+ \n err: %s", string(r[0].Result.Data), err)
}
log.Println("Seeding bulk data completed...")
}
Expand Down Expand Up @@ -130,7 +123,6 @@ func TestExecuteBulkData(t *testing.T) {
if err != nil {
t.Errorf("Unexpected error returned from server err: %v", err.Error())
} else {
//t.Logf("Execute get vertex, response: %v \n err: %v", string(r[0].Result.Data), err)
nl := new(BulkResponse)
datastr := strings.Replace(string(r[0].Result.Data), "@type", "type", -1)
datastr = strings.Replace(datastr, "@value", "value", -1)
Expand All @@ -146,7 +138,6 @@ func TestExecuteBulkData(t *testing.T) {

func TestExecuteBulkDataAsync(t *testing.T) {
seedBulkData(t)
//defer truncateBulkData(t)
start := time.Now()
responseChannel := make(chan AsyncResponse, 2)
err := g.ExecuteAsync(`g.V().hasLabel('EmployerBulkData').both('employes').hasLabel('EmployeeBulkData').valueMap(true)`, responseChannel)
Expand All @@ -160,7 +151,6 @@ func TestExecuteBulkDataAsync(t *testing.T) {
for asyncResponse = range responseChannel {
log.Println(fmt.Sprintf("Time it took to get async response: %s response status: %v (206 means partial and 200 final response)", time.Since(start), asyncResponse.Response.Status.Code))
count++
//t.Logf("Execute get vertex, response: %v \n err: %v", string(r[0].Result.Data), err)
nl := new(BulkResponse)
datastr := strings.Replace(string(asyncResponse.Response.Result.Data), "@type", "type", -1)
datastr = strings.Replace(datastr, "@value", "value", -1)
Expand All @@ -169,9 +159,6 @@ func TestExecuteBulkDataAsync(t *testing.T) {
t.Errorf("There should only be 64 value, got: %v+", len(nl.Value))
}
start = time.Now()
// if count ==1 {
// time.Sleep(2 * time.Second)
// }
}
if count != 10 {
t.Errorf("There should only be 10 value, got: %v+", count)
Expand Down
2 changes: 1 addition & 1 deletion response.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func (c *Client) saveResponse(resp Response, err error) {
}
}

// retrieveResponse retrieves the response saved by saveResponse and send the retrieved reponse to the channel .
// retrieveResponseAsync retrieves the response saved by saveResponse and send the retrieved reponse to the channel .
func (c *Client) retrieveResponseAsync(id string, responseChannel chan AsyncResponse) {
var responseProcessedIndex int
responseNotifier, _ := c.responseNotifier.Load(id)
Expand Down

0 comments on commit a2b232c

Please sign in to comment.