-
Notifications
You must be signed in to change notification settings - Fork 506
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement sqlexp Messages for Query/QueryContext #690
Implement sqlexp Messages for Query/QueryContext #690
Conversation
token.go
Outdated
if sess.logFlags&logRows != 0 && done.Status&doneCount != 0 { | ||
sess.log.Printf("(%d row(s) affected)\n", done.RowCount) | ||
|
||
if outs.msgq != nil { | ||
sqlexp.ReturnMessageEnqueue(ctx, outs.msgq, sqlexp.MsgRowsAffected{Count: int64(done.RowCount)}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What should we do if ReturnMessageEnqueue fails? Simply returning the error doesn't seem helpful as it won't be any kind of error the caller can reasonably handle, is it? We definitely wouldn't want to have data/sql retry it.
Should we just panic?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I looked at why it would return an error and it seems like overkill for it return ctx.Err() because the app is going to get that error back from Message() anyway.
// Message is called by clients after Query to dequeue messages.
func (m *ReturnMessage) Message(ctx context.Context) RawMessage {
select {
case <-ctx.Done():
return MsgNextResultSet{}
case raw := <-m.queue:
return raw
}
}
// ReturnMessageEnqueue is called by the driver to enqueue the driver.
// Drivers should not call this until after it returns from Query.
func ReturnMessageEnqueue(ctx context.Context, m *ReturnMessage, raw RawMessage) error {
select {
case <-ctx.Done():
return ctx.Err()
case m.queue <- raw:
return nil
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let me write a test where the context times out and doesn't return any results. I am not sure what will happen :-) I should probably write the error to the channel.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the basis of a test I think:
ctx := context.WithTimeout(context.Background(), 5000*time.Millisecond)
retmsg := &sqlexp.ReturnMessage{}
r, err := conn.QueryContext(ctx, `waitfor delay '00:00:15'; select 100`, retmsg)
With my current implementation the client won't ever see a sqlexp.MsgNext
because it waits until it gets columns to send it. I think I can just post MsgNext as the first thing inside processSingleResponse
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
then again, it seems the core data/sql code handles this already. This test passes if I don't do anything to handle errors from ReturnMessageEnqueue. The only message in the log is MsgNextResultSet
func TestTimeoutWithNoResults(t *testing.T) {
conn := open(t)
defer conn.Close()
latency, _ := getLatency(t)
ctx, _ := context.WithTimeout(context.Background(), latency+5000*time.Millisecond)
retmsg := &sqlexp.ReturnMessage{}
r, err := conn.QueryContext(ctx, `waitfor delay '00:00:15'; select 100`, retmsg)
if err != nil {
t.Fatal(err.Error())
}
defer r.Close()
active := true
for active {
msg := retmsg.Message(ctx)
t.Logf("Got a message: %s", reflect.TypeOf(msg))
switch m := msg.(type) {
case sqlexp.MsgNextResultSet:
active = r.NextResultSet()
if active {
t.Fatal("NextResultSet returned true")
}
case sqlexp.MsgNext:
if r.Next() {
t.Fatal("Got a successful Next even though the query should have timed out")
}
case sqlexp.MsgRowsAffected:
t.Fatalf("Got a MsgRowsAffected %d", m.Count)
}
}
if r.Err() != context.DeadlineExceeded {
t.Fatalf("Unexpected error: %v", r.Err())
}
}
@@ -850,6 +897,10 @@ func (t tokenProcessor) nextToken() (tokenStruct, error) { | |||
return nil, nil | |||
} | |||
case <-t.ctx.Done(): | |||
// It seems the Message function on t.outs.msgq doesn't get the Done if it comes here instead |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this case seems to be somewhat problematic. Occasionally the new test or benchmark will fail because it only reads 3 result sets instead of 4. The common behavior in each of the failures is that the log has an extra DoneStruct with status = 32 indicating we went down this code path without reading the row tokens from the channel first.
Do you have any clues on what might causes ctx.Done() to fire here before we empty the channel? Should processSingleResponse not return right away when it gets a doneStruct with an even numbered status? I am going to add some logging locally to try to track this down.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We probably need to look into removing the go routines happy reads (as a guess). denisenkom has a branch with that prototyped, though it needs cancellation and brought up to speed. But it might be something else, we should look into it.
Codecov Report
@@ Coverage Diff @@
## master #690 +/- ##
==========================================
+ Coverage 70.96% 71.16% +0.19%
==========================================
Files 24 24
Lines 5208 5386 +178
==========================================
+ Hits 3696 3833 +137
- Misses 1285 1305 +20
- Partials 227 248 +21
Continue to review full report at Codecov.
|
messages_example_test.go
Outdated
) | ||
|
||
const ( | ||
msgQuery = `select name from sys.tables |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it would be better to select something that won't vary from system to system (and potentially even run to run). That would make the test more repeatable.
I have a consumer application for this feature. See https://github.com/microsoft/go-sqlcmd/blob/c3cd43b19dd77c0fc6761b552c03e599b7e7b338/pkg/sqlcmd/sqlcmd.go#L452 |
I just merged azure. If you could update? |
…nto shueybubbles/messageq
@@ -850,6 +897,10 @@ func (t tokenProcessor) nextToken() (tokenStruct, error) { | |||
return nil, nil | |||
} | |||
case <-t.ctx.Done(): | |||
// It seems the Message function on t.outs.msgq doesn't get the Done if it comes here instead |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We probably need to look into removing the go routines happy reads (as a guess). denisenkom has a branch with that prototyped, though it needs cancellation and brought up to speed. But it might be something else, we should look into it.
This is a minimally invasive addition to address #221.
For existing callers, the only change is the log will include messages for language and database changes.
Apps that want to take advantage of https://github.com/golang-sql/sqlexp can now write a message loop to retrieve data sets, errors, and messages.
A sample loop might look like this: