From fe13daf3e48b2bf7dd4b4e3a6b3d9fd787fd5a6f Mon Sep 17 00:00:00 2001 From: sr464e Date: Sat, 14 Sep 2019 18:18:13 -0700 Subject: [PATCH 1/6] * Neptune provides 64 values per Response that is why Execute at present provides a [] of Response since it waits for all the responses to be retrieved and then provides it. I have added the ExecuteAsync method that takes a channel to provide the Response as request parmeter and provides the Response as and when it is provided by Neptune. The Response are streamed to the caller and once all the Response are provided the channel is closed. * Added TestExecuteBulkData and TestExecuteBulkDataAsync so can compare using Execute and ExecuteAsync * Fixed the existing test cases --- README.md | 57 +++++++++++ client.go | 46 ++++++++- gremtune_test.go | 247 ++++++++++++++++++++++++++++++++++++----------- response.go | 65 +++++++++++++ 4 files changed, 354 insertions(+), 61 deletions(-) diff --git a/README.md b/README.md index 0906076e..577a939a 100644 --- a/README.md +++ b/README.md @@ -61,6 +61,63 @@ func main() { } ``` +Example for streaming the result (Neptune streams 64 rows at a time. go test -v -run ExecuteBulkDataAsync is cmd to run the testcase) +========== +```go +package main + +import ( + "fmt" + "log" + "time" + "strings" + "github.com/schwartzmx/gremtune" +) + +func main() { + errs := make(chan error) + go func(chan error) { + err := <-errs + log.Fatal("Lost connection to the database: " + err.Error()) + }(errs) // Example of connection error handling logic + + dialer := gremtune.NewDialer("ws://127.0.0.1:8182") // Returns a WebSocket dialer to connect to Gremlin Server + g, err := gremtune.Dial(dialer, errs) // Returns a gremtune client to interact with + if err != nil { + fmt.Println(err) + return + } + start := time.Now() + responseChannel := make(chan AsyncResponse, 10) + err := g.ExecuteAsync( // Sends a query to Gremlin Server + "g.V().hasLabel('Employee').valueMap(true)", responseChannel + ) + log.Println(fmt.Sprintf("Time it took to execute query %s", time.Since(start))) + if err != nil { + fmt.Println(err) + return + } + count := 0 + asyncResponse := AsyncResponse{} + start = time.Now() + for asyncResponse = range responseChannel { + log.Println(fmt.Sprintf("Time it took to get async response: %s response status: %v", time.Since(start), asyncResponse.Response.Status.Code)) + count++ + + nl := new(BulkResponse) + datastr := strings.Replace(string(asyncResponse.Response.Result.Data), "@type", "type", -1) + datastr = strings.Replace(datastr, "@value", "value", -1) + err = json.Unmarshal([]byte(datastr), &nl) + if err != nil { + fmt.Println(err) + return nil, err + } + log.Println(fmt.Sprintf("No of rows retrieved: %v", len(nl.Value))) + start = time.Now() + } +} +``` + Authentication ========== The plugin accepts authentication creating a secure dialer where credentials are setted. diff --git a/client.go b/client.go index 3362adcd..b48a0773 100644 --- a/client.go +++ b/client.go @@ -11,11 +11,12 @@ import ( // Client is a container for the gremtune client. type Client struct { - conn dialer - requests chan []byte - responses chan []byte - results *sync.Map - responseNotifier *sync.Map // responseNotifier notifies the requester that a response has arrived for the request + conn dialer + requests chan []byte + responses chan []byte + results *sync.Map + responseNotifier *sync.Map // responseNotifier notifies the requester that a response has arrived for the request + responseStatusNotifier *sync.Map // responseNotifier notifies the requester that a response has arrived for the request with the code sync.RWMutex Errored bool } @@ -44,6 +45,7 @@ func newClient() (c Client) { c.responses = make(chan []byte, 3) // c.responses takes raw responses from ReadWorker and delivers it for sorting to handelResponse c.results = &sync.Map{} c.responseNotifier = &sync.Map{} + c.responseStatusNotifier = &sync.Map{} return } @@ -85,6 +87,7 @@ func (c *Client) executeRequest(query string, bindings, rebindings *map[string]s return } c.responseNotifier.Store(id, make(chan error, 1)) + c.responseStatusNotifier.Store(id, make(chan int, 1)) c.dispatchRequest(msg) resp, err = c.retrieveResponse(id) if err != nil { @@ -93,6 +96,30 @@ func (c *Client) executeRequest(query string, bindings, rebindings *map[string]s return } +func (c *Client) executeAsync(query string, bindings, rebindings *map[string]string, responseChannel chan AsyncResponse) (err error) { + var req request + var id string + if bindings != nil && rebindings != nil { + req, id, err = prepareRequestWithBindings(query, *bindings, *rebindings) + } else { + req, id, err = prepareRequest(query) + } + if err != nil { + return + } + + msg, err := packageRequest(req) + if err != nil { + log.Println(err) + return + } + c.responseNotifier.Store(id, make(chan error, 1)) + c.responseStatusNotifier.Store(id, make(chan int, 1)) + c.dispatchRequest(msg) + go c.retrieveResponseAsync(id, responseChannel) + return +} + func (c *Client) authenticate(requestID string) (err error) { auth := c.conn.getAuth() req, err := prepareAuthRequest(requestID, auth.username, auth.password) @@ -128,6 +155,15 @@ func (c *Client) Execute(query string) (resp []Response, err error) { return } +// Execute formats a raw Gremlin query, sends it to Gremlin Server, and the results are streamed to channel provided in method paramater. +func (c *Client) ExecuteAsync(query string, responseChannel chan AsyncResponse) (err error) { + if c.conn.IsDisposed() { + return errors.New("you cannot write on disposed connection") + } + err = c.executeAsync(query, nil, nil, responseChannel) + return +} + // ExecuteFileWithBindings takes a file path to a Gremlin script, sends it to Gremlin Server with bindings, and returns the result. func (c *Client) ExecuteFileWithBindings(path string, bindings, rebindings map[string]string) (resp []Response, err error) { if c.conn.IsDisposed() { diff --git a/gremtune_test.go b/gremtune_test.go index fd66e983..dcff26c1 100644 --- a/gremtune_test.go +++ b/gremtune_test.go @@ -2,26 +2,45 @@ package gremtune import ( "encoding/json" + "fmt" + "log" + "strconv" + "strings" "testing" + "time" ) func init() { InitGremlinClients() } +type BulkResponse struct { + Type string `json:"type"` + Value []struct { + Type string `json:"type"` + Value []interface{} `json:"value"` + } `json:"value"` +} + func truncateData(t *testing.T) { - t.Logf("Removing all data from gremlin server") - r, err := g.Execute(`g.V().drop().iterate()`) - t.Logf("Removed all vertices, response: %v+ \n err: %s", r, err) + log.Println("Removing all data from gremlin server strated...") + _, err := g.Execute(`g.V('1234').drop()`) + if err != nil { + t.Fatal(err) + } + //t.Logf("Removed all vertices, response: %v+ \n err: %s", string(r[0].Result.Data), err) + _, err = g.Execute(`g.V('2145').drop()`) if err != nil { t.Fatal(err) } + //t.Logf("Removed all vertices, response: %v+ \n err: %s", string(r[0].Result.Data), err) + log.Println("Removing all data from gremlin server completed...") } func seedData(t *testing.T) { truncateData(t) - t.Logf("Seeding data...") - r, err := g.Execute(` + log.Println("Seeding data started...") + _, err := g.Execute(` g.addV('Phil').property(id, '1234'). property('timestamp', '2018-07-01T13:37:45-05:00'). property('source', 'tree'). @@ -34,10 +53,48 @@ func seedData(t *testing.T) { from('x'). to('y') `) - t.Logf("Added two vertices and one edge, response: %v+ \n err: %s", r, err) if err != nil { t.Fatal(err) } + //t.Logf("Added two vertices and one edge, response: %v+ \n err: %s", string(r[0].Result.Data), err) + log.Println("Seeding data completed...") +} + +func truncateBulkData(t *testing.T) { + log.Println("Removing bulk data from gremlin server strated...") + _, err := g.Execute(`g.V().hasLabel('EmployeeBulkData').drop().iterate()`) + if err != nil { + t.Fatal(err) + } + //t.Logf("Removed all vertices, response: %v+ \n err: %s", string(r[0].Result.Data), err) + _, err = g.Execute(`g.V().hasLabel('EmployerBulkData').drop()`) + if err != nil { + t.Fatal(err) + } + //t.Logf("Removed all vertices, response: %v+ \n err: %s", string(r[0].Result.Data), err) + log.Println("Removing bulk data from gremlin server completed...") +} + +func seedBulkData(t *testing.T) { + truncateBulkData(t) + log.Println("Seeding bulk data started...") + + _, err := g.Execute(` + g.addV('EmployerBulkData').property(id, '1234_EmployerBulkData').property('timestamp', '2018-07-01T13:37:45-05:00').property('source', 'tree') + `) + if err != nil { + t.Fatal(err) + } + //t.Logf("Added EmployerBulkData vertices, response: %v+ \n err: %s", string(r[0].Result.Data), err) + + for i := 1; i < 641; i++ { + _, err = g.Execute("g.addV('EmployeeBulkData').property(id, '" + strconv.Itoa(i) + "_EmployeeBulkData').property('timestamp', '2018-07-01T13:37:45-05:00').property('source', 'tree').as('y').addE('employes').from(V('1234_EmployerBulkData')).to('y')") + if err != nil { + t.Fatal(err) + } + //t.Logf("Added two vertices and one edge, response: %v+ \n err: %s", string(r[0].Result.Data), err) + } + log.Println("Seeding bulk data completed...") } type nodeLabels struct { @@ -47,52 +104,122 @@ type nodeLabels struct { func TestExecute(t *testing.T) { seedData(t) r, err := g.Execute(`g.V('1234').label()`) - t.Logf("Execute get vertex, response: %s \n err: %s", r[0].Result.Data, err) - nl := new(nodeLabels) - err = json.Unmarshal(r[0].Result.Data, &nl) - if len(nl.Label) != 1 { - t.Errorf("There should only be 1 node label, got: %v+", nl) + if err != nil { + t.Errorf("Unexpected error returned from server err: %v", err.Error()) + } else { + t.Logf("Execute get vertex, response: %v \n err: %v", string(r[0].Result.Data), err) + nl := new(nodeLabels) + err = json.Unmarshal(r[0].Result.Data, &nl) + if len(nl.Label) != 1 { + t.Errorf("There should only be 1 node label, got: %v+", nl) + } + expected := "Phil" + got := nl.Label[0] + if nl.Label[0] != expected { + t.Errorf("Unexpected label returned, expected: %s got: %s", expected, got) + } } - expected := "Phil" - got := nl.Label[0] - if nl.Label[0] != expected { - t.Errorf("Unexpected label returned, expected: %s got: %s", expected, got) +} + +func TestExecuteBulkData(t *testing.T) { + seedBulkData(t) + defer truncateBulkData(t) + start := time.Now() + r, err := g.Execute(`g.V().hasLabel('EmployerBulkData').both('employes').hasLabel('EmployeeBulkData').valueMap(true)`) + log.Println(fmt.Sprintf("Execution time it took to execute query %s", time.Since(start))) + if err != nil { + t.Errorf("Unexpected error returned from server err: %v", err.Error()) + } else { + //t.Logf("Execute get vertex, response: %v \n err: %v", string(r[0].Result.Data), err) + nl := new(BulkResponse) + datastr := strings.Replace(string(r[0].Result.Data), "@type", "type", -1) + datastr = strings.Replace(datastr, "@value", "value", -1) + err = json.Unmarshal([]byte(datastr), &nl) + if len(nl.Value) != 64 { + t.Errorf("There should only be 64 value, got: %v+", len(nl.Value)) + } + if len(r) != 10 { + t.Errorf("There should only be 10 value, got: %v+", len(r)) + } + } +} + +func TestExecuteBulkDataAsync(t *testing.T) { + seedBulkData(t) + //defer truncateBulkData(t) + start := time.Now() + responseChannel := make(chan AsyncResponse, 2) + err := g.ExecuteAsync(`g.V().hasLabel('EmployerBulkData').both('employes').hasLabel('EmployeeBulkData').valueMap(true)`, responseChannel) + log.Println(fmt.Sprintf("Time it took to execute query %s", time.Since(start))) + if err != nil { + t.Errorf("Unexpected error returned from server err: %v", err.Error()) + } else { + count := 0 + asyncResponse := AsyncResponse{} + start = time.Now() + for asyncResponse = range responseChannel { + log.Println(fmt.Sprintf("Time it took to get async response: %s response status: %v (206 means partial and 200 final response)", time.Since(start), asyncResponse.Response.Status.Code)) + count++ + //t.Logf("Execute get vertex, response: %v \n err: %v", string(r[0].Result.Data), err) + nl := new(BulkResponse) + datastr := strings.Replace(string(asyncResponse.Response.Result.Data), "@type", "type", -1) + datastr = strings.Replace(datastr, "@value", "value", -1) + err = json.Unmarshal([]byte(datastr), &nl) + if len(nl.Value) != 64 { + t.Errorf("There should only be 64 value, got: %v+", len(nl.Value)) + } + start = time.Now() + // if count ==1 { + // time.Sleep(2 * time.Second) + // } + } + if count != 10 { + t.Errorf("There should only be 10 value, got: %v+", count) + } } } func TestExecuteWithBindings(t *testing.T) { seedData(t) r, err := g.ExecuteWithBindings( - `g.V(x).label()`, + "g.V(x).label()", map[string]string{"x": "1234"}, map[string]string{}, ) - t.Logf("Execute with bindings get vertex, response: %s \n err: %s", r[0].Result.Data, err) - nl := new(nodeLabels) - err = json.Unmarshal(r[0].Result.Data, &nl) - if len(nl.Label) != 1 { - t.Errorf("There should only be 1 node label, got: %v+", nl) - } - expected := "Phil" - got := nl.Label[0] - if nl.Label[0] != expected { - t.Errorf("Unexpected label returned, expected: %s got: %s", expected, got) + if err != nil { + t.Errorf("Unexpected error returned from server err: %v", err.Error()) + } else { + t.Logf("Execute with bindings get vertex, response: %s \n err: %s", string(r[0].Result.Data), err) + nl := new(nodeLabels) + err = json.Unmarshal(r[0].Result.Data, &nl) + if len(nl.Label) != 1 { + t.Errorf("There should only be 1 node label, got: %v+", nl) + } + expected := "Phil" + got := nl.Label[0] + if nl.Label[0] != expected { + t.Errorf("Unexpected label returned, expected: %s got: %s", expected, got) + } } } func TestExecuteFile(t *testing.T) { seedData(t) r, err := g.ExecuteFile("scripts/test.groovy") - t.Logf("ExecuteFile get vertex, response: %s \n err: %s", r[0].Result.Data, err) - nl := new(nodeLabels) - err = json.Unmarshal(r[0].Result.Data, &nl) - if len(nl.Label) != 1 { - t.Errorf("There should only be 1 node label, got: %v+", nl) - } - expected := "Vincent" - got := nl.Label[0] - if nl.Label[0] != expected { - t.Errorf("Unexpected label returned, expected: %s got: %s", expected, got) + if err != nil { + t.Errorf("Unexpected error returned from server err: %v", err.Error()) + } else { + t.Logf("ExecuteFile get vertex, response: %s \n err: %s", string(r[0].Result.Data), err) + nl := new(nodeLabels) + err = json.Unmarshal(r[0].Result.Data, &nl) + if len(nl.Label) != 1 { + t.Errorf("There should only be 1 node label, got: %v+", nl) + } + expected := "Vincent" + got := nl.Label[0] + if nl.Label[0] != expected { + t.Errorf("Unexpected label returned, expected: %s got: %s", expected, got) + } } } @@ -103,31 +230,39 @@ func TestExecuteFileWithBindings(t *testing.T) { map[string]string{"x": "2145"}, map[string]string{}, ) - t.Logf("ExecuteFileWithBindings get vertex, response: %s \n err: %s", r[0].Result.Data, err) - nl := new(nodeLabels) - err = json.Unmarshal(r[0].Result.Data, &nl) - if len(nl.Label) != 1 { - t.Errorf("There should only be 1 node label, got: %v+", nl) - } - expected := "Vincent" - got := nl.Label[0] - if nl.Label[0] != expected { - t.Errorf("Unexpected label returned, expected: %s got: %s", expected, got) + if err != nil { + t.Errorf("Unexpected error returned from server err: %v", err.Error()) + } else { + t.Logf("ExecuteFileWithBindings get vertex, response: %s \n err: %s", r[0].Result.Data, err) + nl := new(nodeLabels) + err = json.Unmarshal(r[0].Result.Data, &nl) + if len(nl.Label) != 1 { + t.Errorf("There should only be 1 node label, got: %v+", nl) + } + expected := "Vincent" + got := nl.Label[0] + if nl.Label[0] != expected { + t.Errorf("Unexpected label returned, expected: %s got: %s", expected, got) + } } } func TestPoolExecute(t *testing.T) { seedData(t) r, err := gp.Execute(`g.V('1234').label()`) - t.Logf("PoolExecute get vertex, response: %s \n err: %s", r[0].Result.Data, err) - nl := new(nodeLabels) - err = json.Unmarshal(r[0].Result.Data, &nl) - if len(nl.Label) != 1 { - t.Errorf("There should only be 1 node label, got: %v+", nl) - } - expected := "Phil" - got := nl.Label[0] - if nl.Label[0] != expected { - t.Errorf("Unexpected label returned, expected: %s got: %s", expected, got) + if err != nil { + t.Errorf("Unexpected error returned from server err: %v", err.Error()) + } else { + t.Logf("PoolExecute get vertex, response: %s \n err: %s", r[0].Result.Data, err) + nl := new(nodeLabels) + err = json.Unmarshal(r[0].Result.Data, &nl) + if len(nl.Label) != 1 { + t.Errorf("There should only be 1 node label, got: %v+", nl) + } + expected := "Phil" + got := nl.Label[0] + if nl.Label[0] != expected { + t.Errorf("Unexpected label returned, expected: %s got: %s", expected, got) + } } } diff --git a/response.go b/response.go index 3638a398..ed35baff 100644 --- a/response.go +++ b/response.go @@ -33,6 +33,12 @@ type Result struct { Meta map[string]interface{} `json:"meta"` } +// Response structs holds the entire response from requests to the gremlin server +type AsyncResponse struct { + Response Response `json:"response"` //Partial Response object + ErrorMessage string `json:"errorMessage"` // Error message if there was an error +} + // Response structs holds the entire response from requests to the gremlin server type Response struct { RequestID string `json:"requestId"` @@ -79,15 +85,72 @@ func (c *Client) saveResponse(resp Response, err error) { newdata := append(container, resp) // Create new data container with new data c.results.Store(resp.RequestID, newdata) // Add new data to buffer for future retrieval respNotifier, load := c.responseNotifier.LoadOrStore(resp.RequestID, make(chan error, 1)) + responseStatusNotifier, load := c.responseStatusNotifier.LoadOrStore(resp.RequestID, make(chan int, 1)) _ = load + if cap(responseStatusNotifier.(chan int)) > len(responseStatusNotifier.(chan int)) { + // Channel is not full so adding the response status to the channel else it will cause the method to wait till the response is read by requester + responseStatusNotifier.(chan int) <- resp.Status.Code + } if resp.Status.Code != statusPartialContent { respNotifier.(chan error) <- err } } +// retrieveResponse retrieves the response saved by saveResponse and send the retrieved reponse to the channel . +func (c *Client) retrieveResponseAsync(id string, responseChannel chan AsyncResponse) { + var responseProcessedIndex int + responseNotifier, _ := c.responseNotifier.Load(id) + responseStatusNotifier, _ := c.responseStatusNotifier.Load(id) + for status := range responseStatusNotifier.(chan int) { + _ = status + if dataI, ok := c.results.Load(id); ok { + d := dataI.([]interface{}) + // Only retrieve all but one from the partial responses saved in results Map that are not sent to responseChannel + for i := responseProcessedIndex; i < len(d)-1; i++ { + responseProcessedIndex++ + asyncResponse := AsyncResponse{} + asyncResponse.Response = d[i].(Response) + // Send the Partial response object to the responseChannel + responseChannel <- asyncResponse + } + } + //Checks to see If there was an Error or full response has been provided by Neptune + if len(responseNotifier.(chan error)) > 0 { + //Checks to see If there was an Error or will get nil when final reponse has been provided by Neptune + err := <-responseNotifier.(chan error) + if dataI, ok := c.results.Load(id); ok { + d := dataI.([]interface{}) + // Retrieve all the partial responses that are not sent to responseChannel + for i := responseProcessedIndex; i < len(d); i++ { + responseProcessedIndex++ + asyncResponse := AsyncResponse{} + asyncResponse.Response = d[i].(Response) + //when final partial response it sent it also sends the error message if there was an error on the last partial response retrival + if responseProcessedIndex == len(d) && err != nil { + asyncResponse.ErrorMessage = err.Error() + } + // Send the Partial response object to the responseChannel + responseChannel <- asyncResponse + } + } + // All the Partial response object including the final one has been sent to the responseChannel + break + } + } + // All the Partial response object including the final one has been sent to the responseChannel so closing responseStatusNotifier, responseNotifier, responseChannel and removing all the reponse stored + defer close(responseStatusNotifier.(chan int)) + defer close(responseNotifier.(chan error)) + defer c.responseNotifier.Delete(id) + defer c.responseStatusNotifier.Delete(id) + defer c.deleteResponse(id) + defer close(responseChannel) + return +} + // retrieveResponse retrieves the response saved by saveResponse. func (c *Client) retrieveResponse(id string) (data []Response, err error) { resp, _ := c.responseNotifier.Load(id) + responseStatusNotifier, _ := c.responseStatusNotifier.Load(id) err = <-resp.(chan error) if err == nil { if dataI, ok := c.results.Load(id); ok { @@ -97,7 +160,9 @@ func (c *Client) retrieveResponse(id string) (data []Response, err error) { data[i] = d[i].(Response) } close(resp.(chan error)) + close(responseStatusNotifier.(chan int)) c.responseNotifier.Delete(id) + c.responseStatusNotifier.Delete(id) c.deleteResponse(id) } } From 25abbddbebd5561ddb161a759d687f866d252fc4 Mon Sep 17 00:00:00 2001 From: sr464e Date: Sat, 14 Sep 2019 18:23:13 -0700 Subject: [PATCH 2/6] * Neptune provides 64 values per Response that is why Execute at present provides a [] of Response since it waits for all the responses to be retrieved and then provides it. I have added the ExecuteAsync method that takes a channel to provide the Response as request parmeter and provides the Response as and when it is provided by Neptune. The Response are streamed to the caller and once all the Response are provided the channel is closed. * Added TestExecuteBulkData and TestExecuteBulkDataAsync so can compare using Execute and ExecuteAsync * Fixed the existing test cases --- README.md | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 577a939a..14b4ddd1 100644 --- a/README.md +++ b/README.md @@ -61,8 +61,10 @@ func main() { } ``` -Example for streaming the result (Neptune streams 64 rows at a time. go test -v -run ExecuteBulkDataAsync is cmd to run the testcase) +Example for streaming the result ========== +Neptune provides 64 values per Response that is why Execute at present provides a [] of Response since it waits for all the responses to be retrieved and then provides it.In ExecuteAsync method it takes a channel to provide the Response as request parmeter and provides the Response as and when it is provided by Neptune. The Response are streamed to the caller and once all the Response are provided the channel is closed. +go test -v -run ExecuteBulkDataAsync is the cmd to run the testcase) ```go package main From 28ada6541f91c243fdf40b335b9c48ef716099d4 Mon Sep 17 00:00:00 2001 From: sr464e Date: Sat, 14 Sep 2019 19:23:25 -0700 Subject: [PATCH 3/6] * Fixed review comments --- response.go | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/response.go b/response.go index ed35baff..5f272aad 100644 --- a/response.go +++ b/response.go @@ -33,7 +33,7 @@ type Result struct { Meta map[string]interface{} `json:"meta"` } -// Response structs holds the entire response from requests to the gremlin server +// AsyncResponse structs holds the entire response from requests to the gremlin server type AsyncResponse struct { Response Response `json:"response"` //Partial Response object ErrorMessage string `json:"errorMessage"` // Error message if there was an error @@ -108,7 +108,7 @@ func (c *Client) retrieveResponseAsync(id string, responseChannel chan AsyncResp // Only retrieve all but one from the partial responses saved in results Map that are not sent to responseChannel for i := responseProcessedIndex; i < len(d)-1; i++ { responseProcessedIndex++ - asyncResponse := AsyncResponse{} + var asyncResponse AsyncResponse = AsyncResponse{} asyncResponse.Response = d[i].(Response) // Send the Partial response object to the responseChannel responseChannel <- asyncResponse @@ -138,13 +138,12 @@ func (c *Client) retrieveResponseAsync(id string, responseChannel chan AsyncResp } } // All the Partial response object including the final one has been sent to the responseChannel so closing responseStatusNotifier, responseNotifier, responseChannel and removing all the reponse stored - defer close(responseStatusNotifier.(chan int)) - defer close(responseNotifier.(chan error)) - defer c.responseNotifier.Delete(id) - defer c.responseStatusNotifier.Delete(id) - defer c.deleteResponse(id) - defer close(responseChannel) - return + close(responseStatusNotifier.(chan int)) + close(responseNotifier.(chan error)) + c.responseNotifier.Delete(id) + c.responseStatusNotifier.Delete(id) + c.deleteResponse(id) + close(responseChannel) } // retrieveResponse retrieves the response saved by saveResponse. From a0c6c887142c610f05fd2695712b74fb1726f5be Mon Sep 17 00:00:00 2001 From: sr464e Date: Sat, 14 Sep 2019 21:04:23 -0700 Subject: [PATCH 4/6] * Fixed SCRIPT EVALUATION ERROR - Response Message: Expected an id that is convertible to class java.lang.Long but received class java.lang.String - [1234_EmployerBulkData] --- gremtune_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/gremtune_test.go b/gremtune_test.go index dcff26c1..9a8aceb2 100644 --- a/gremtune_test.go +++ b/gremtune_test.go @@ -80,15 +80,15 @@ func seedBulkData(t *testing.T) { log.Println("Seeding bulk data started...") _, err := g.Execute(` - g.addV('EmployerBulkData').property(id, '1234_EmployerBulkData').property('timestamp', '2018-07-01T13:37:45-05:00').property('source', 'tree') + g.addV('EmployerBulkData').property(id, '1234567890').property('timestamp', '2018-07-01T13:37:45-05:00').property('source', 'tree') `) if err != nil { t.Fatal(err) } //t.Logf("Added EmployerBulkData vertices, response: %v+ \n err: %s", string(r[0].Result.Data), err) - for i := 1; i < 641; i++ { - _, err = g.Execute("g.addV('EmployeeBulkData').property(id, '" + strconv.Itoa(i) + "_EmployeeBulkData').property('timestamp', '2018-07-01T13:37:45-05:00').property('source', 'tree').as('y').addE('employes').from(V('1234_EmployerBulkData')).to('y')") + for i := 9001; i < 9641; i++ { + _, err = g.Execute("g.addV('EmployeeBulkData').property(id, '" + strconv.Itoa(i) + "').property('timestamp', '2018-07-01T13:37:45-05:00').property('source', 'tree').as('y').addE('employes').from(V('1234567890')).to('y')") if err != nil { t.Fatal(err) } From a971a2755d40b836e584d0778f8a210365286381 Mon Sep 17 00:00:00 2001 From: sr464e Date: Wed, 18 Sep 2019 14:30:57 -0700 Subject: [PATCH 5/6] * Fixed Go Mod for internal use till my pull request gets approved and merged. --- go.mod | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go.mod b/go.mod index 2472c555..0ea36280 100644 --- a/go.mod +++ b/go.mod @@ -1,4 +1,4 @@ -module github.com/schwartzmx/gremtune +module github.com/rsrinathr/gremtune require ( github.com/gofrs/uuid v3.2.0+incompatible From a2b232c71094dba872009312fc38f2480e885b17 Mon Sep 17 00:00:00 2001 From: sr464e Date: Thu, 10 Oct 2019 16:41:31 -0700 Subject: [PATCH 6/6] Fixed review comments --- README.md | 2 +- client.go | 2 +- go.mod | 2 +- gremtune_test.go | 15 +-------------- response.go | 2 +- 5 files changed, 5 insertions(+), 18 deletions(-) diff --git a/README.md b/README.md index 14b4ddd1..dab84fd3 100644 --- a/README.md +++ b/README.md @@ -63,7 +63,7 @@ func main() { Example for streaming the result ========== -Neptune provides 64 values per Response that is why Execute at present provides a [] of Response since it waits for all the responses to be retrieved and then provides it.In ExecuteAsync method it takes a channel to provide the Response as request parmeter and provides the Response as and when it is provided by Neptune. The Response are streamed to the caller and once all the Response are provided the channel is closed. +Neptune provides 64 values per Response that is why Execute at present provides a [] of Response since it waits for all the responses to be retrieved and then provides it.In ExecuteAsync method it takes a channel to provide the Response as request parameter and provides the Response as and when it is provided by Neptune. The Response are streamed to the caller and once all the Responses are provided the channel is closed. go test -v -run ExecuteBulkDataAsync is the cmd to run the testcase) ```go package main diff --git a/client.go b/client.go index b48a0773..a975178a 100644 --- a/client.go +++ b/client.go @@ -16,7 +16,7 @@ type Client struct { responses chan []byte results *sync.Map responseNotifier *sync.Map // responseNotifier notifies the requester that a response has arrived for the request - responseStatusNotifier *sync.Map // responseNotifier notifies the requester that a response has arrived for the request with the code + responseStatusNotifier *sync.Map // responseStatusNotifier notifies the requester that a response has arrived for the request with the code sync.RWMutex Errored bool } diff --git a/go.mod b/go.mod index 0ea36280..2472c555 100644 --- a/go.mod +++ b/go.mod @@ -1,4 +1,4 @@ -module github.com/rsrinathr/gremtune +module github.com/schwartzmx/gremtune require ( github.com/gofrs/uuid v3.2.0+incompatible diff --git a/gremtune_test.go b/gremtune_test.go index 9a8aceb2..31d7428d 100644 --- a/gremtune_test.go +++ b/gremtune_test.go @@ -23,17 +23,15 @@ type BulkResponse struct { } func truncateData(t *testing.T) { - log.Println("Removing all data from gremlin server strated...") + log.Println("Removing all data from gremlin server started...") _, err := g.Execute(`g.V('1234').drop()`) if err != nil { t.Fatal(err) } - //t.Logf("Removed all vertices, response: %v+ \n err: %s", string(r[0].Result.Data), err) _, err = g.Execute(`g.V('2145').drop()`) if err != nil { t.Fatal(err) } - //t.Logf("Removed all vertices, response: %v+ \n err: %s", string(r[0].Result.Data), err) log.Println("Removing all data from gremlin server completed...") } @@ -56,7 +54,6 @@ func seedData(t *testing.T) { if err != nil { t.Fatal(err) } - //t.Logf("Added two vertices and one edge, response: %v+ \n err: %s", string(r[0].Result.Data), err) log.Println("Seeding data completed...") } @@ -66,12 +63,10 @@ func truncateBulkData(t *testing.T) { if err != nil { t.Fatal(err) } - //t.Logf("Removed all vertices, response: %v+ \n err: %s", string(r[0].Result.Data), err) _, err = g.Execute(`g.V().hasLabel('EmployerBulkData').drop()`) if err != nil { t.Fatal(err) } - //t.Logf("Removed all vertices, response: %v+ \n err: %s", string(r[0].Result.Data), err) log.Println("Removing bulk data from gremlin server completed...") } @@ -85,14 +80,12 @@ func seedBulkData(t *testing.T) { if err != nil { t.Fatal(err) } - //t.Logf("Added EmployerBulkData vertices, response: %v+ \n err: %s", string(r[0].Result.Data), err) for i := 9001; i < 9641; i++ { _, err = g.Execute("g.addV('EmployeeBulkData').property(id, '" + strconv.Itoa(i) + "').property('timestamp', '2018-07-01T13:37:45-05:00').property('source', 'tree').as('y').addE('employes').from(V('1234567890')).to('y')") if err != nil { t.Fatal(err) } - //t.Logf("Added two vertices and one edge, response: %v+ \n err: %s", string(r[0].Result.Data), err) } log.Println("Seeding bulk data completed...") } @@ -130,7 +123,6 @@ func TestExecuteBulkData(t *testing.T) { if err != nil { t.Errorf("Unexpected error returned from server err: %v", err.Error()) } else { - //t.Logf("Execute get vertex, response: %v \n err: %v", string(r[0].Result.Data), err) nl := new(BulkResponse) datastr := strings.Replace(string(r[0].Result.Data), "@type", "type", -1) datastr = strings.Replace(datastr, "@value", "value", -1) @@ -146,7 +138,6 @@ func TestExecuteBulkData(t *testing.T) { func TestExecuteBulkDataAsync(t *testing.T) { seedBulkData(t) - //defer truncateBulkData(t) start := time.Now() responseChannel := make(chan AsyncResponse, 2) err := g.ExecuteAsync(`g.V().hasLabel('EmployerBulkData').both('employes').hasLabel('EmployeeBulkData').valueMap(true)`, responseChannel) @@ -160,7 +151,6 @@ func TestExecuteBulkDataAsync(t *testing.T) { for asyncResponse = range responseChannel { log.Println(fmt.Sprintf("Time it took to get async response: %s response status: %v (206 means partial and 200 final response)", time.Since(start), asyncResponse.Response.Status.Code)) count++ - //t.Logf("Execute get vertex, response: %v \n err: %v", string(r[0].Result.Data), err) nl := new(BulkResponse) datastr := strings.Replace(string(asyncResponse.Response.Result.Data), "@type", "type", -1) datastr = strings.Replace(datastr, "@value", "value", -1) @@ -169,9 +159,6 @@ func TestExecuteBulkDataAsync(t *testing.T) { t.Errorf("There should only be 64 value, got: %v+", len(nl.Value)) } start = time.Now() - // if count ==1 { - // time.Sleep(2 * time.Second) - // } } if count != 10 { t.Errorf("There should only be 10 value, got: %v+", count) diff --git a/response.go b/response.go index 5f272aad..d51fc5b8 100644 --- a/response.go +++ b/response.go @@ -96,7 +96,7 @@ func (c *Client) saveResponse(resp Response, err error) { } } -// retrieveResponse retrieves the response saved by saveResponse and send the retrieved reponse to the channel . +// retrieveResponseAsync retrieves the response saved by saveResponse and send the retrieved reponse to the channel . func (c *Client) retrieveResponseAsync(id string, responseChannel chan AsyncResponse) { var responseProcessedIndex int responseNotifier, _ := c.responseNotifier.Load(id)