Skip to content

Commit

Permalink
database/sql: allow using a single connection from the database
Browse files Browse the repository at this point in the history
Databases have the following concepts: Statement, Batch, and Session.

A statement is often a single line like:
SELECT Amount from Account where ID = 50;

A batch is one or more statements submitted together for the query
to process. It may be a DELETE, INSERT, two UPDATES and a SELECT in
a single query text.

A session is usually represented by a single database connection.
This often is an issue when dealing with scopes in databases.
Temporary tables and variables can have batch, session, or global
scope depending on the syntax, database, and use.

Furthermore, some databases (sybase and derivatives in perticular)
that prevent certain statements from being in the same batch
and may necessitate being in the same session.

By allowing users to extract a Conn from the database they can manage
session on their own without hacking around it by making connection
pools of single connections (a real workaround presented in issue).
It is tempting to just use a transaction, but this isn't always
desirable or an option if running an interactive session or
alter script set that itself starts transactions.

Fixes #18081

Change-Id: I9bdf0796632c48d4bcaef3624c629641984ffaf2
Reviewed-on: https://go-review.googlesource.com/40694
Reviewed-by: Brad Fitzpatrick <bradfitz@golang.org>
  • Loading branch information
kardianos committed Apr 24, 2017
1 parent 42c5f39 commit d234f9a
Show file tree
Hide file tree
Showing 2 changed files with 283 additions and 5 deletions.
200 changes: 195 additions & 5 deletions src/database/sql/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -583,6 +583,17 @@ func Open(driverName, dataSourceName string) (*DB, error) {
return db, nil
}

func (db *DB) pingDC(ctx context.Context, dc *driverConn, release func(error)) error {
var err error
if pinger, ok := dc.ci.(driver.Pinger); ok {
withLock(dc, func() {
err = pinger.Ping(ctx)
})
}
release(err)
return err
}

// PingContext verifies a connection to the database is still alive,
// establishing a connection if necessary.
func (db *DB) PingContext(ctx context.Context) error {
Expand All @@ -602,11 +613,7 @@ func (db *DB) PingContext(ctx context.Context) error {
return err
}

if pinger, ok := dc.ci.(driver.Pinger); ok {
err = pinger.Ping(ctx)
}
dc.releaseConn(err)
return err
return db.pingDC(ctx, dc, dc.releaseConn)
}

// Ping verifies a connection to the database is still alive,
Expand Down Expand Up @@ -1404,6 +1411,189 @@ func (db *DB) Driver() driver.Driver {
return db.driver
}

// ErrConnDone is returned by any operation that is performed on a connection
// that has already been committed or rolled back.
var ErrConnDone = errors.New("database/sql: connection is already closed")

// Conn returns a single connection by either opening a new connection
// or returning an existing connection from the connection pool. Conn will
// block until either a connection is returned or ctx is canceled.
// Queries run on the same Conn will be run in the same database session.
//
// Every Conn must be returned to the database pool after use by
// calling Conn.Close.
func (db *DB) Conn(ctx context.Context) (*Conn, error) {
var dc *driverConn
var err error
for i := 0; i < maxBadConnRetries; i++ {
dc, err = db.conn(ctx, cachedOrNewConn)
if err != driver.ErrBadConn {
break
}
}
if err == driver.ErrBadConn {
dc, err = db.conn(ctx, cachedOrNewConn)
}
if err != nil {
return nil, err
}

conn := &Conn{
db: db,
dc: dc,
}
return conn, nil
}

// Conn represents a single database session rather a pool of database
// sessions. Prefer running queries from DB unless there is a specific
// need for a continuous single database session.
//
// A Conn must call Close to return the connection to the database pool
// and may do so concurrently with a running query.
//
// After a call to Close, all operations on the
// connection fail with ErrConnDone.
type Conn struct {
db *DB

// closemu prevents the connection from closing while there
// is an active query. It is held for read during queries
// and exclusively during close.
closemu sync.RWMutex

// dc is owned until close, at which point
// it's returned to the connection pool.
dc *driverConn

// done transitions from 0 to 1 exactly once, on close.
// Once done, all operations fail with ErrConnDone.
// Use atomic operations on value when checking value.
done int32
}

func (c *Conn) grabConn() (*driverConn, error) {
if atomic.LoadInt32(&c.done) != 0 {
return nil, ErrConnDone
}
return c.dc, nil
}

// PingContext verifies the connection to the database is still alive.
func (c *Conn) PingContext(ctx context.Context) error {
dc, err := c.grabConn()
if err != nil {
return err
}

c.closemu.RLock()
return c.db.pingDC(ctx, dc, c.closemuRUnlockCondReleaseConn)
}

// ExecContext executes a query without returning any rows.
// The args are for any placeholder parameters in the query.
func (c *Conn) ExecContext(ctx context.Context, query string, args ...interface{}) (Result, error) {
dc, err := c.grabConn()
if err != nil {
return nil, err
}

c.closemu.RLock()
return c.db.execDC(ctx, dc, c.closemuRUnlockCondReleaseConn, query, args)
}

// QueryContext executes a query that returns rows, typically a SELECT.
// The args are for any placeholder parameters in the query.
func (c *Conn) QueryContext(ctx context.Context, query string, args ...interface{}) (*Rows, error) {
dc, err := c.grabConn()
if err != nil {
return nil, err
}

c.closemu.RLock()
return c.db.queryDC(ctx, dc, c.closemuRUnlockCondReleaseConn, query, args)
}

// QueryRowContext executes a query that is expected to return at most one row.
// QueryRowContext always returns a non-nil value. Errors are deferred until
// Row's Scan method is called.
func (c *Conn) QueryRowContext(ctx context.Context, query string, args ...interface{}) *Row {
rows, err := c.QueryContext(ctx, query, args...)
return &Row{rows: rows, err: err}
}

// PrepareContext creates a prepared statement for later queries or executions.
// Multiple queries or executions may be run concurrently from the
// returned statement.
// The caller must call the statement's Close method
// when the statement is no longer needed.
//
// The provided context is used for the preparation of the statement, not for the
// execution of the statement.
func (c *Conn) PrepareContext(ctx context.Context, query string) (*Stmt, error) {
dc, err := c.grabConn()
if err != nil {
return nil, err
}

c.closemu.RLock()
return c.db.prepareDC(ctx, dc, c.closemuRUnlockCondReleaseConn, query)
}

// BeginTx starts a transaction.
//
// The provided context is used until the transaction is committed or rolled back.
// If the context is canceled, the sql package will roll back
// the transaction. Tx.Commit will return an error if the context provided to
// BeginTx is canceled.
//
// The provided TxOptions is optional and may be nil if defaults should be used.
// If a non-default isolation level is used that the driver doesn't support,
// an error will be returned.
func (c *Conn) BeginTx(ctx context.Context, opts *TxOptions) (*Tx, error) {
dc, err := c.grabConn()
if err != nil {
return nil, err
}

c.closemu.RLock()
return c.db.beginDC(ctx, dc, c.closemuRUnlockCondReleaseConn, opts)
}

// closemuRUnlockCondReleaseConn read unlocks closemu
// as the sql operation is done with the dc.
func (c *Conn) closemuRUnlockCondReleaseConn(err error) {
c.closemu.RUnlock()
if err == driver.ErrBadConn {
c.close(err)
}
}

func (c *Conn) close(err error) error {
if !atomic.CompareAndSwapInt32(&c.done, 0, 1) {
return ErrConnDone
}

// Lock around releasing the driver connection
// to ensure all queries have been stopped before doing so.
c.closemu.Lock()
defer c.closemu.Unlock()

c.dc.releaseConn(err)
c.dc = nil
c.db = nil
return err
}

// Close returns the connection to the connection pool.
// All operations after a Close will return with ErrConnDone.
// Close is safe to call concurrently with other operations and will
// block until all other operations finish. It may be useful to first
// cancel any used context and then call close directly after.
func (c *Conn) Close() error {
return c.close(nil)
}

// Tx is an in-progress database transaction.
//
// A transaction must end with a call to Commit or Rollback.
Expand Down
88 changes: 88 additions & 0 deletions src/database/sql/sql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ func closeDB(t testing.TB, db *DB) {
t.Errorf("Error closing fakeConn: %v", err)
}
})
db.mu.Lock()
for i, dc := range db.freeConn {
if n := len(dc.openStmt); n > 0 {
// Just a sanity check. This is legal in
Expand All @@ -149,6 +150,8 @@ func closeDB(t testing.TB, db *DB) {
t.Errorf("while closing db, freeConn %d/%d had %d open stmts; want 0", i, len(db.freeConn), n)
}
}
db.mu.Unlock()

err := db.Close()
if err != nil {
t.Fatalf("error closing DB: %v", err)
Expand Down Expand Up @@ -1298,6 +1301,69 @@ func TestTxErrBadConn(t *testing.T) {
}
}

func TestConnQuery(t *testing.T) {
db := newTestDB(t, "people")
defer closeDB(t, db)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
conn, err := db.Conn(ctx)
if err != nil {
t.Fatal(err)
}
defer conn.Close()

var name string
err = conn.QueryRowContext(ctx, "SELECT|people|name|age=?", 3).Scan(&name)
if err != nil {
t.Fatal(err)
}
if name != "Chris" {
t.Fatalf("unexpected result, got %q want Chris", name)
}

err = conn.PingContext(ctx)
if err != nil {
t.Fatal(err)
}
}

func TestConnTx(t *testing.T) {
db := newTestDB(t, "people")
defer closeDB(t, db)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
conn, err := db.Conn(ctx)
if err != nil {
t.Fatal(err)
}
defer conn.Close()

tx, err := conn.BeginTx(ctx, nil)
if err != nil {
t.Fatal(err)
}
insertName, insertAge := "Nancy", 33
_, err = tx.ExecContext(ctx, "INSERT|people|name=?,age=?,photo=APHOTO", insertName, insertAge)
if err != nil {
t.Fatal(err)
}
err = tx.Commit()
if err != nil {
t.Fatal(err)
}

var selectName string
err = conn.QueryRowContext(ctx, "SELECT|people|name|age=?", insertAge).Scan(&selectName)
if err != nil {
t.Fatal(err)
}
if selectName != insertName {
t.Fatalf("got %q want %q", selectName, insertName)
}
}

// Tests fix for issue 2542, that we release a lock when querying on
// a closed connection.
func TestIssue2542Deadlock(t *testing.T) {
Expand Down Expand Up @@ -2338,6 +2404,28 @@ func TestManyErrBadConn(t *testing.T) {
if err = stmt.Close(); err != nil {
t.Fatal(err)
}

// Conn
db = manyErrBadConnSetup()
defer closeDB(t, db)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
conn, err := db.Conn(ctx)
if err != nil {
t.Fatal(err)
}
err = conn.Close()
if err != nil {
t.Fatal(err)
}

// Ping
db = manyErrBadConnSetup()
defer closeDB(t, db)
err = db.PingContext(ctx)
if err != nil {
t.Fatal(err)
}
}

// golang.org/issue/5718
Expand Down

0 comments on commit d234f9a

Please sign in to comment.