diff --git a/README.md b/README.md index 0906076e..dab84fd3 100644 --- a/README.md +++ b/README.md @@ -61,6 +61,65 @@ func main() { } ``` +Example for streaming the result +========== +Neptune provides 64 values per Response that is why Execute at present provides a [] of Response since it waits for all the responses to be retrieved and then provides it.In ExecuteAsync method it takes a channel to provide the Response as request parameter and provides the Response as and when it is provided by Neptune. The Response are streamed to the caller and once all the Responses are provided the channel is closed. +go test -v -run ExecuteBulkDataAsync is the cmd to run the testcase) +```go +package main + +import ( + "fmt" + "log" + "time" + "strings" + "github.com/schwartzmx/gremtune" +) + +func main() { + errs := make(chan error) + go func(chan error) { + err := <-errs + log.Fatal("Lost connection to the database: " + err.Error()) + }(errs) // Example of connection error handling logic + + dialer := gremtune.NewDialer("ws://127.0.0.1:8182") // Returns a WebSocket dialer to connect to Gremlin Server + g, err := gremtune.Dial(dialer, errs) // Returns a gremtune client to interact with + if err != nil { + fmt.Println(err) + return + } + start := time.Now() + responseChannel := make(chan AsyncResponse, 10) + err := g.ExecuteAsync( // Sends a query to Gremlin Server + "g.V().hasLabel('Employee').valueMap(true)", responseChannel + ) + log.Println(fmt.Sprintf("Time it took to execute query %s", time.Since(start))) + if err != nil { + fmt.Println(err) + return + } + count := 0 + asyncResponse := AsyncResponse{} + start = time.Now() + for asyncResponse = range responseChannel { + log.Println(fmt.Sprintf("Time it took to get async response: %s response status: %v", time.Since(start), asyncResponse.Response.Status.Code)) + count++ + + nl := new(BulkResponse) + datastr := strings.Replace(string(asyncResponse.Response.Result.Data), "@type", "type", -1) + datastr = strings.Replace(datastr, "@value", "value", -1) + err = json.Unmarshal([]byte(datastr), &nl) + if err != nil { + fmt.Println(err) + return nil, err + } + log.Println(fmt.Sprintf("No of rows retrieved: %v", len(nl.Value))) + start = time.Now() + } +} +``` + Authentication ========== The plugin accepts authentication creating a secure dialer where credentials are setted. diff --git a/client.go b/client.go index 3362adcd..a975178a 100644 --- a/client.go +++ b/client.go @@ -11,11 +11,12 @@ import ( // Client is a container for the gremtune client. type Client struct { - conn dialer - requests chan []byte - responses chan []byte - results *sync.Map - responseNotifier *sync.Map // responseNotifier notifies the requester that a response has arrived for the request + conn dialer + requests chan []byte + responses chan []byte + results *sync.Map + responseNotifier *sync.Map // responseNotifier notifies the requester that a response has arrived for the request + responseStatusNotifier *sync.Map // responseStatusNotifier notifies the requester that a response has arrived for the request with the code sync.RWMutex Errored bool } @@ -44,6 +45,7 @@ func newClient() (c Client) { c.responses = make(chan []byte, 3) // c.responses takes raw responses from ReadWorker and delivers it for sorting to handelResponse c.results = &sync.Map{} c.responseNotifier = &sync.Map{} + c.responseStatusNotifier = &sync.Map{} return } @@ -85,6 +87,7 @@ func (c *Client) executeRequest(query string, bindings, rebindings *map[string]s return } c.responseNotifier.Store(id, make(chan error, 1)) + c.responseStatusNotifier.Store(id, make(chan int, 1)) c.dispatchRequest(msg) resp, err = c.retrieveResponse(id) if err != nil { @@ -93,6 +96,30 @@ func (c *Client) executeRequest(query string, bindings, rebindings *map[string]s return } +func (c *Client) executeAsync(query string, bindings, rebindings *map[string]string, responseChannel chan AsyncResponse) (err error) { + var req request + var id string + if bindings != nil && rebindings != nil { + req, id, err = prepareRequestWithBindings(query, *bindings, *rebindings) + } else { + req, id, err = prepareRequest(query) + } + if err != nil { + return + } + + msg, err := packageRequest(req) + if err != nil { + log.Println(err) + return + } + c.responseNotifier.Store(id, make(chan error, 1)) + c.responseStatusNotifier.Store(id, make(chan int, 1)) + c.dispatchRequest(msg) + go c.retrieveResponseAsync(id, responseChannel) + return +} + func (c *Client) authenticate(requestID string) (err error) { auth := c.conn.getAuth() req, err := prepareAuthRequest(requestID, auth.username, auth.password) @@ -128,6 +155,15 @@ func (c *Client) Execute(query string) (resp []Response, err error) { return } +// Execute formats a raw Gremlin query, sends it to Gremlin Server, and the results are streamed to channel provided in method paramater. +func (c *Client) ExecuteAsync(query string, responseChannel chan AsyncResponse) (err error) { + if c.conn.IsDisposed() { + return errors.New("you cannot write on disposed connection") + } + err = c.executeAsync(query, nil, nil, responseChannel) + return +} + // ExecuteFileWithBindings takes a file path to a Gremlin script, sends it to Gremlin Server with bindings, and returns the result. func (c *Client) ExecuteFileWithBindings(path string, bindings, rebindings map[string]string) (resp []Response, err error) { if c.conn.IsDisposed() { diff --git a/gremtune_test.go b/gremtune_test.go index fd66e983..31d7428d 100644 --- a/gremtune_test.go +++ b/gremtune_test.go @@ -2,26 +2,43 @@ package gremtune import ( "encoding/json" + "fmt" + "log" + "strconv" + "strings" "testing" + "time" ) func init() { InitGremlinClients() } +type BulkResponse struct { + Type string `json:"type"` + Value []struct { + Type string `json:"type"` + Value []interface{} `json:"value"` + } `json:"value"` +} + func truncateData(t *testing.T) { - t.Logf("Removing all data from gremlin server") - r, err := g.Execute(`g.V().drop().iterate()`) - t.Logf("Removed all vertices, response: %v+ \n err: %s", r, err) + log.Println("Removing all data from gremlin server started...") + _, err := g.Execute(`g.V('1234').drop()`) + if err != nil { + t.Fatal(err) + } + _, err = g.Execute(`g.V('2145').drop()`) if err != nil { t.Fatal(err) } + log.Println("Removing all data from gremlin server completed...") } func seedData(t *testing.T) { truncateData(t) - t.Logf("Seeding data...") - r, err := g.Execute(` + log.Println("Seeding data started...") + _, err := g.Execute(` g.addV('Phil').property(id, '1234'). property('timestamp', '2018-07-01T13:37:45-05:00'). property('source', 'tree'). @@ -34,10 +51,43 @@ func seedData(t *testing.T) { from('x'). to('y') `) - t.Logf("Added two vertices and one edge, response: %v+ \n err: %s", r, err) if err != nil { t.Fatal(err) } + log.Println("Seeding data completed...") +} + +func truncateBulkData(t *testing.T) { + log.Println("Removing bulk data from gremlin server strated...") + _, err := g.Execute(`g.V().hasLabel('EmployeeBulkData').drop().iterate()`) + if err != nil { + t.Fatal(err) + } + _, err = g.Execute(`g.V().hasLabel('EmployerBulkData').drop()`) + if err != nil { + t.Fatal(err) + } + log.Println("Removing bulk data from gremlin server completed...") +} + +func seedBulkData(t *testing.T) { + truncateBulkData(t) + log.Println("Seeding bulk data started...") + + _, err := g.Execute(` + g.addV('EmployerBulkData').property(id, '1234567890').property('timestamp', '2018-07-01T13:37:45-05:00').property('source', 'tree') + `) + if err != nil { + t.Fatal(err) + } + + for i := 9001; i < 9641; i++ { + _, err = g.Execute("g.addV('EmployeeBulkData').property(id, '" + strconv.Itoa(i) + "').property('timestamp', '2018-07-01T13:37:45-05:00').property('source', 'tree').as('y').addE('employes').from(V('1234567890')).to('y')") + if err != nil { + t.Fatal(err) + } + } + log.Println("Seeding bulk data completed...") } type nodeLabels struct { @@ -47,52 +97,116 @@ type nodeLabels struct { func TestExecute(t *testing.T) { seedData(t) r, err := g.Execute(`g.V('1234').label()`) - t.Logf("Execute get vertex, response: %s \n err: %s", r[0].Result.Data, err) - nl := new(nodeLabels) - err = json.Unmarshal(r[0].Result.Data, &nl) - if len(nl.Label) != 1 { - t.Errorf("There should only be 1 node label, got: %v+", nl) + if err != nil { + t.Errorf("Unexpected error returned from server err: %v", err.Error()) + } else { + t.Logf("Execute get vertex, response: %v \n err: %v", string(r[0].Result.Data), err) + nl := new(nodeLabels) + err = json.Unmarshal(r[0].Result.Data, &nl) + if len(nl.Label) != 1 { + t.Errorf("There should only be 1 node label, got: %v+", nl) + } + expected := "Phil" + got := nl.Label[0] + if nl.Label[0] != expected { + t.Errorf("Unexpected label returned, expected: %s got: %s", expected, got) + } } - expected := "Phil" - got := nl.Label[0] - if nl.Label[0] != expected { - t.Errorf("Unexpected label returned, expected: %s got: %s", expected, got) +} + +func TestExecuteBulkData(t *testing.T) { + seedBulkData(t) + defer truncateBulkData(t) + start := time.Now() + r, err := g.Execute(`g.V().hasLabel('EmployerBulkData').both('employes').hasLabel('EmployeeBulkData').valueMap(true)`) + log.Println(fmt.Sprintf("Execution time it took to execute query %s", time.Since(start))) + if err != nil { + t.Errorf("Unexpected error returned from server err: %v", err.Error()) + } else { + nl := new(BulkResponse) + datastr := strings.Replace(string(r[0].Result.Data), "@type", "type", -1) + datastr = strings.Replace(datastr, "@value", "value", -1) + err = json.Unmarshal([]byte(datastr), &nl) + if len(nl.Value) != 64 { + t.Errorf("There should only be 64 value, got: %v+", len(nl.Value)) + } + if len(r) != 10 { + t.Errorf("There should only be 10 value, got: %v+", len(r)) + } + } +} + +func TestExecuteBulkDataAsync(t *testing.T) { + seedBulkData(t) + start := time.Now() + responseChannel := make(chan AsyncResponse, 2) + err := g.ExecuteAsync(`g.V().hasLabel('EmployerBulkData').both('employes').hasLabel('EmployeeBulkData').valueMap(true)`, responseChannel) + log.Println(fmt.Sprintf("Time it took to execute query %s", time.Since(start))) + if err != nil { + t.Errorf("Unexpected error returned from server err: %v", err.Error()) + } else { + count := 0 + asyncResponse := AsyncResponse{} + start = time.Now() + for asyncResponse = range responseChannel { + log.Println(fmt.Sprintf("Time it took to get async response: %s response status: %v (206 means partial and 200 final response)", time.Since(start), asyncResponse.Response.Status.Code)) + count++ + nl := new(BulkResponse) + datastr := strings.Replace(string(asyncResponse.Response.Result.Data), "@type", "type", -1) + datastr = strings.Replace(datastr, "@value", "value", -1) + err = json.Unmarshal([]byte(datastr), &nl) + if len(nl.Value) != 64 { + t.Errorf("There should only be 64 value, got: %v+", len(nl.Value)) + } + start = time.Now() + } + if count != 10 { + t.Errorf("There should only be 10 value, got: %v+", count) + } } } func TestExecuteWithBindings(t *testing.T) { seedData(t) r, err := g.ExecuteWithBindings( - `g.V(x).label()`, + "g.V(x).label()", map[string]string{"x": "1234"}, map[string]string{}, ) - t.Logf("Execute with bindings get vertex, response: %s \n err: %s", r[0].Result.Data, err) - nl := new(nodeLabels) - err = json.Unmarshal(r[0].Result.Data, &nl) - if len(nl.Label) != 1 { - t.Errorf("There should only be 1 node label, got: %v+", nl) - } - expected := "Phil" - got := nl.Label[0] - if nl.Label[0] != expected { - t.Errorf("Unexpected label returned, expected: %s got: %s", expected, got) + if err != nil { + t.Errorf("Unexpected error returned from server err: %v", err.Error()) + } else { + t.Logf("Execute with bindings get vertex, response: %s \n err: %s", string(r[0].Result.Data), err) + nl := new(nodeLabels) + err = json.Unmarshal(r[0].Result.Data, &nl) + if len(nl.Label) != 1 { + t.Errorf("There should only be 1 node label, got: %v+", nl) + } + expected := "Phil" + got := nl.Label[0] + if nl.Label[0] != expected { + t.Errorf("Unexpected label returned, expected: %s got: %s", expected, got) + } } } func TestExecuteFile(t *testing.T) { seedData(t) r, err := g.ExecuteFile("scripts/test.groovy") - t.Logf("ExecuteFile get vertex, response: %s \n err: %s", r[0].Result.Data, err) - nl := new(nodeLabels) - err = json.Unmarshal(r[0].Result.Data, &nl) - if len(nl.Label) != 1 { - t.Errorf("There should only be 1 node label, got: %v+", nl) - } - expected := "Vincent" - got := nl.Label[0] - if nl.Label[0] != expected { - t.Errorf("Unexpected label returned, expected: %s got: %s", expected, got) + if err != nil { + t.Errorf("Unexpected error returned from server err: %v", err.Error()) + } else { + t.Logf("ExecuteFile get vertex, response: %s \n err: %s", string(r[0].Result.Data), err) + nl := new(nodeLabels) + err = json.Unmarshal(r[0].Result.Data, &nl) + if len(nl.Label) != 1 { + t.Errorf("There should only be 1 node label, got: %v+", nl) + } + expected := "Vincent" + got := nl.Label[0] + if nl.Label[0] != expected { + t.Errorf("Unexpected label returned, expected: %s got: %s", expected, got) + } } } @@ -103,31 +217,39 @@ func TestExecuteFileWithBindings(t *testing.T) { map[string]string{"x": "2145"}, map[string]string{}, ) - t.Logf("ExecuteFileWithBindings get vertex, response: %s \n err: %s", r[0].Result.Data, err) - nl := new(nodeLabels) - err = json.Unmarshal(r[0].Result.Data, &nl) - if len(nl.Label) != 1 { - t.Errorf("There should only be 1 node label, got: %v+", nl) - } - expected := "Vincent" - got := nl.Label[0] - if nl.Label[0] != expected { - t.Errorf("Unexpected label returned, expected: %s got: %s", expected, got) + if err != nil { + t.Errorf("Unexpected error returned from server err: %v", err.Error()) + } else { + t.Logf("ExecuteFileWithBindings get vertex, response: %s \n err: %s", r[0].Result.Data, err) + nl := new(nodeLabels) + err = json.Unmarshal(r[0].Result.Data, &nl) + if len(nl.Label) != 1 { + t.Errorf("There should only be 1 node label, got: %v+", nl) + } + expected := "Vincent" + got := nl.Label[0] + if nl.Label[0] != expected { + t.Errorf("Unexpected label returned, expected: %s got: %s", expected, got) + } } } func TestPoolExecute(t *testing.T) { seedData(t) r, err := gp.Execute(`g.V('1234').label()`) - t.Logf("PoolExecute get vertex, response: %s \n err: %s", r[0].Result.Data, err) - nl := new(nodeLabels) - err = json.Unmarshal(r[0].Result.Data, &nl) - if len(nl.Label) != 1 { - t.Errorf("There should only be 1 node label, got: %v+", nl) - } - expected := "Phil" - got := nl.Label[0] - if nl.Label[0] != expected { - t.Errorf("Unexpected label returned, expected: %s got: %s", expected, got) + if err != nil { + t.Errorf("Unexpected error returned from server err: %v", err.Error()) + } else { + t.Logf("PoolExecute get vertex, response: %s \n err: %s", r[0].Result.Data, err) + nl := new(nodeLabels) + err = json.Unmarshal(r[0].Result.Data, &nl) + if len(nl.Label) != 1 { + t.Errorf("There should only be 1 node label, got: %v+", nl) + } + expected := "Phil" + got := nl.Label[0] + if nl.Label[0] != expected { + t.Errorf("Unexpected label returned, expected: %s got: %s", expected, got) + } } } diff --git a/response.go b/response.go index 3638a398..d51fc5b8 100644 --- a/response.go +++ b/response.go @@ -33,6 +33,12 @@ type Result struct { Meta map[string]interface{} `json:"meta"` } +// AsyncResponse structs holds the entire response from requests to the gremlin server +type AsyncResponse struct { + Response Response `json:"response"` //Partial Response object + ErrorMessage string `json:"errorMessage"` // Error message if there was an error +} + // Response structs holds the entire response from requests to the gremlin server type Response struct { RequestID string `json:"requestId"` @@ -79,15 +85,71 @@ func (c *Client) saveResponse(resp Response, err error) { newdata := append(container, resp) // Create new data container with new data c.results.Store(resp.RequestID, newdata) // Add new data to buffer for future retrieval respNotifier, load := c.responseNotifier.LoadOrStore(resp.RequestID, make(chan error, 1)) + responseStatusNotifier, load := c.responseStatusNotifier.LoadOrStore(resp.RequestID, make(chan int, 1)) _ = load + if cap(responseStatusNotifier.(chan int)) > len(responseStatusNotifier.(chan int)) { + // Channel is not full so adding the response status to the channel else it will cause the method to wait till the response is read by requester + responseStatusNotifier.(chan int) <- resp.Status.Code + } if resp.Status.Code != statusPartialContent { respNotifier.(chan error) <- err } } +// retrieveResponseAsync retrieves the response saved by saveResponse and send the retrieved reponse to the channel . +func (c *Client) retrieveResponseAsync(id string, responseChannel chan AsyncResponse) { + var responseProcessedIndex int + responseNotifier, _ := c.responseNotifier.Load(id) + responseStatusNotifier, _ := c.responseStatusNotifier.Load(id) + for status := range responseStatusNotifier.(chan int) { + _ = status + if dataI, ok := c.results.Load(id); ok { + d := dataI.([]interface{}) + // Only retrieve all but one from the partial responses saved in results Map that are not sent to responseChannel + for i := responseProcessedIndex; i < len(d)-1; i++ { + responseProcessedIndex++ + var asyncResponse AsyncResponse = AsyncResponse{} + asyncResponse.Response = d[i].(Response) + // Send the Partial response object to the responseChannel + responseChannel <- asyncResponse + } + } + //Checks to see If there was an Error or full response has been provided by Neptune + if len(responseNotifier.(chan error)) > 0 { + //Checks to see If there was an Error or will get nil when final reponse has been provided by Neptune + err := <-responseNotifier.(chan error) + if dataI, ok := c.results.Load(id); ok { + d := dataI.([]interface{}) + // Retrieve all the partial responses that are not sent to responseChannel + for i := responseProcessedIndex; i < len(d); i++ { + responseProcessedIndex++ + asyncResponse := AsyncResponse{} + asyncResponse.Response = d[i].(Response) + //when final partial response it sent it also sends the error message if there was an error on the last partial response retrival + if responseProcessedIndex == len(d) && err != nil { + asyncResponse.ErrorMessage = err.Error() + } + // Send the Partial response object to the responseChannel + responseChannel <- asyncResponse + } + } + // All the Partial response object including the final one has been sent to the responseChannel + break + } + } + // All the Partial response object including the final one has been sent to the responseChannel so closing responseStatusNotifier, responseNotifier, responseChannel and removing all the reponse stored + close(responseStatusNotifier.(chan int)) + close(responseNotifier.(chan error)) + c.responseNotifier.Delete(id) + c.responseStatusNotifier.Delete(id) + c.deleteResponse(id) + close(responseChannel) +} + // retrieveResponse retrieves the response saved by saveResponse. func (c *Client) retrieveResponse(id string) (data []Response, err error) { resp, _ := c.responseNotifier.Load(id) + responseStatusNotifier, _ := c.responseStatusNotifier.Load(id) err = <-resp.(chan error) if err == nil { if dataI, ok := c.results.Load(id); ok { @@ -97,7 +159,9 @@ func (c *Client) retrieveResponse(id string) (data []Response, err error) { data[i] = d[i].(Response) } close(resp.(chan error)) + close(responseStatusNotifier.(chan int)) c.responseNotifier.Delete(id) + c.responseStatusNotifier.Delete(id) c.deleteResponse(id) } }