Skip to content

Commit

Permalink
Merge pull request #3 from rsrinathr/master
Browse files Browse the repository at this point in the history
Stream response as and when Neptune provides it
  • Loading branch information
schwartzmx authored Oct 11, 2019
2 parents d15d64a + a2b232c commit a6aabc9
Show file tree
Hide file tree
Showing 4 changed files with 342 additions and 61 deletions.
59 changes: 59 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,65 @@ func main() {
}
```

Example for streaming the result
==========
Neptune provides 64 values per Response that is why Execute at present provides a [] of Response since it waits for all the responses to be retrieved and then provides it.In ExecuteAsync method it takes a channel to provide the Response as request parameter and provides the Response as and when it is provided by Neptune. The Response are streamed to the caller and once all the Responses are provided the channel is closed.
go test -v -run ExecuteBulkDataAsync is the cmd to run the testcase)
```go
package main

import (
"fmt"
"log"
"time"
"strings"
"github.com/schwartzmx/gremtune"
)

func main() {
errs := make(chan error)
go func(chan error) {
err := <-errs
log.Fatal("Lost connection to the database: " + err.Error())
}(errs) // Example of connection error handling logic

dialer := gremtune.NewDialer("ws://127.0.0.1:8182") // Returns a WebSocket dialer to connect to Gremlin Server
g, err := gremtune.Dial(dialer, errs) // Returns a gremtune client to interact with
if err != nil {
fmt.Println(err)
return
}
start := time.Now()
responseChannel := make(chan AsyncResponse, 10)
err := g.ExecuteAsync( // Sends a query to Gremlin Server
"g.V().hasLabel('Employee').valueMap(true)", responseChannel
)
log.Println(fmt.Sprintf("Time it took to execute query %s", time.Since(start)))
if err != nil {
fmt.Println(err)
return
}
count := 0
asyncResponse := AsyncResponse{}
start = time.Now()
for asyncResponse = range responseChannel {
log.Println(fmt.Sprintf("Time it took to get async response: %s response status: %v", time.Since(start), asyncResponse.Response.Status.Code))
count++

nl := new(BulkResponse)
datastr := strings.Replace(string(asyncResponse.Response.Result.Data), "@type", "type", -1)
datastr = strings.Replace(datastr, "@value", "value", -1)
err = json.Unmarshal([]byte(datastr), &nl)
if err != nil {
fmt.Println(err)
return nil, err
}
log.Println(fmt.Sprintf("No of rows retrieved: %v", len(nl.Value)))
start = time.Now()
}
}
```

Authentication
==========
The plugin accepts authentication creating a secure dialer where credentials are setted.
Expand Down
46 changes: 41 additions & 5 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,12 @@ import (

// Client is a container for the gremtune client.
type Client struct {
conn dialer
requests chan []byte
responses chan []byte
results *sync.Map
responseNotifier *sync.Map // responseNotifier notifies the requester that a response has arrived for the request
conn dialer
requests chan []byte
responses chan []byte
results *sync.Map
responseNotifier *sync.Map // responseNotifier notifies the requester that a response has arrived for the request
responseStatusNotifier *sync.Map // responseStatusNotifier notifies the requester that a response has arrived for the request with the code
sync.RWMutex
Errored bool
}
Expand Down Expand Up @@ -44,6 +45,7 @@ func newClient() (c Client) {
c.responses = make(chan []byte, 3) // c.responses takes raw responses from ReadWorker and delivers it for sorting to handelResponse
c.results = &sync.Map{}
c.responseNotifier = &sync.Map{}
c.responseStatusNotifier = &sync.Map{}
return
}

Expand Down Expand Up @@ -85,6 +87,7 @@ func (c *Client) executeRequest(query string, bindings, rebindings *map[string]s
return
}
c.responseNotifier.Store(id, make(chan error, 1))
c.responseStatusNotifier.Store(id, make(chan int, 1))
c.dispatchRequest(msg)
resp, err = c.retrieveResponse(id)
if err != nil {
Expand All @@ -93,6 +96,30 @@ func (c *Client) executeRequest(query string, bindings, rebindings *map[string]s
return
}

func (c *Client) executeAsync(query string, bindings, rebindings *map[string]string, responseChannel chan AsyncResponse) (err error) {
var req request
var id string
if bindings != nil && rebindings != nil {
req, id, err = prepareRequestWithBindings(query, *bindings, *rebindings)
} else {
req, id, err = prepareRequest(query)
}
if err != nil {
return
}

msg, err := packageRequest(req)
if err != nil {
log.Println(err)
return
}
c.responseNotifier.Store(id, make(chan error, 1))
c.responseStatusNotifier.Store(id, make(chan int, 1))
c.dispatchRequest(msg)
go c.retrieveResponseAsync(id, responseChannel)
return
}

func (c *Client) authenticate(requestID string) (err error) {
auth := c.conn.getAuth()
req, err := prepareAuthRequest(requestID, auth.username, auth.password)
Expand Down Expand Up @@ -128,6 +155,15 @@ func (c *Client) Execute(query string) (resp []Response, err error) {
return
}

// Execute formats a raw Gremlin query, sends it to Gremlin Server, and the results are streamed to channel provided in method paramater.
func (c *Client) ExecuteAsync(query string, responseChannel chan AsyncResponse) (err error) {
if c.conn.IsDisposed() {
return errors.New("you cannot write on disposed connection")
}
err = c.executeAsync(query, nil, nil, responseChannel)
return
}

// ExecuteFileWithBindings takes a file path to a Gremlin script, sends it to Gremlin Server with bindings, and returns the result.
func (c *Client) ExecuteFileWithBindings(path string, bindings, rebindings map[string]string) (resp []Response, err error) {
if c.conn.IsDisposed() {
Expand Down
Loading

0 comments on commit a6aabc9

Please sign in to comment.