Skip to content
This repository has been archived by the owner on Jan 28, 2021. It is now read-only.

*: implement LOCK and UNLOCK of tables #448

Merged
merged 3 commits into from
Oct 18, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 63 additions & 0 deletions engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1361,6 +1361,42 @@ func TestUse(t *testing.T) {
require.Equal("foo", e.Catalog.CurrentDatabase())
}

func TestLocks(t *testing.T) {
require := require.New(t)

t1 := newLockableTable(mem.NewTable("t1", nil))
t2 := newLockableTable(mem.NewTable("t2", nil))
t3 := mem.NewTable("t3", nil)
catalog := sql.NewCatalog()
db := mem.NewDatabase("db")
db.AddTable("t1", t1)
db.AddTable("t2", t2)
db.AddTable("t3", t3)
catalog.AddDatabase(db)

analyzer := analyzer.NewDefault(catalog)
engine := sqle.New(catalog, analyzer, new(sqle.Config))

_, iter, err := engine.Query(newCtx(), "LOCK TABLES t1 READ, t2 WRITE, t3 READ")
require.NoError(err)

_, err = sql.RowIterToRows(iter)
require.NoError(err)

_, iter, err = engine.Query(newCtx(), "UNLOCK TABLES")
require.NoError(err)

_, err = sql.RowIterToRows(iter)
require.NoError(err)

require.Equal(1, t1.readLocks)
require.Equal(0, t1.writeLocks)
require.Equal(1, t1.unlocks)
require.Equal(0, t2.readLocks)
require.Equal(1, t2.writeLocks)
require.Equal(1, t2.unlocks)
}

func insertRows(t *testing.T, table sql.Inserter, rows ...sql.Row) {
t.Helper()

Expand All @@ -1379,3 +1415,30 @@ func newCtx() *sql.Context {
sql.WithSession(session),
)
}

type lockableTable struct {
sql.Table
readLocks int
writeLocks int
unlocks int
}

func newLockableTable(t sql.Table) *lockableTable {
return &lockableTable{Table: t}
}

var _ sql.Lockable = (*lockableTable)(nil)

func (l *lockableTable) Lock(ctx *sql.Context, write bool) error {
if write {
l.writeLocks++
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we count locks?
Isn't it 1/0 (SET/UNSET) for the same session?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

...or is it just counter for number of read/writes for unlock?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is just a counter for the test

} else {
l.readLocks++
}
return nil
}

func (l *lockableTable) Unlock(ctx *sql.Context, id uint32) error {
l.unlocks++
return nil
}
4 changes: 4 additions & 0 deletions server/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ func (h *Handler) ConnectionClosed(c *mysql.Conn) {
delete(h.c, c.ConnectionID)
h.mu.Unlock()

if err := h.e.Catalog.UnlockTables(nil, c.ConnectionID); err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case the tables would keep locked forever?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it fails, you mean?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then you can retry UNLOCK TABLES. If it fails it's because the tables Unlock failed, so nothing we can do there, as the implementation of that method depends on the tables.

logrus.Errorf("unable to unlock tables on session close: %s", err)
}

logrus.Infof("ConnectionClosed: client %v", c.ConnectionID)
}

Expand Down
8 changes: 8 additions & 0 deletions sql/analyzer/assign_catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,14 @@ func assignCatalog(ctx *sql.Context, a *Analyzer, n sql.Node) (sql.Node, error)
nc := *node
nc.Catalog = a.Catalog
return &nc, nil
case *plan.LockTables:
nc := *node
nc.Catalog = a.Catalog
return &nc, nil
case *plan.UnlockTables:
nc := *node
nc.Catalog = a.Catalog
return &nc, nil
default:
return n, nil
}
Expand Down
12 changes: 12 additions & 0 deletions sql/analyzer/assign_catalog_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,4 +61,16 @@ func TestAssignCatalog(t *testing.T) {
sd, ok := node.(*plan.ShowDatabases)
require.True(ok)
require.Equal(c, sd.Catalog)

node, err = f.Apply(sql.NewEmptyContext(), a, plan.NewLockTables(nil))
require.NoError(err)
lt, ok := node.(*plan.LockTables)
require.True(ok)
require.Equal(c, lt.Catalog)

node, err = f.Apply(sql.NewEmptyContext(), a, plan.NewUnlockTables())
require.NoError(err)
ut, ok := node.(*plan.UnlockTables)
require.True(ok)
require.Equal(c, ut.Catalog)
}
2 changes: 1 addition & 1 deletion sql/analyzer/readonly.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ var ErrQueryNotAllowed = errors.NewKind("query of type %q not allowed in read-on
// error if it's not the case.
func EnsureReadOnly(ctx *sql.Context, a *Analyzer, node sql.Node) (sql.Node, error) {
switch node.(type) {
case *plan.InsertInto, *plan.DropIndex, *plan.CreateIndex:
case *plan.InsertInto, *plan.DropIndex, *plan.CreateIndex, *plan.UnlockTables, *plan.LockTables:
typ := strings.Split(reflect.TypeOf(node).String(), ".")[1]
return nil, ErrQueryNotAllowed.New(typ)
default:
Expand Down
56 changes: 56 additions & 0 deletions sql/catalog.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package sql

import (
"fmt"
"strings"
"sync"

Expand All @@ -19,14 +20,22 @@ type Catalog struct {
mu sync.RWMutex
currentDatabase string
dbs Databases
locks sessionLocks
}

type (
sessionLocks map[uint32]dbLocks
dbLocks map[string]tableLocks
tableLocks map[string]struct{}
)

// NewCatalog returns a new empty Catalog.
func NewCatalog() *Catalog {
return &Catalog{
FunctionRegistry: NewFunctionRegistry(),
IndexRegistry: NewIndexRegistry(),
ProcessList: NewProcessList(),
locks: make(sessionLocks),
}
}

Expand Down Expand Up @@ -124,3 +133,50 @@ func (d Databases) Table(dbName string, tableName string) (Table, error) {

return table, nil
}

// LockTable adds a lock for the given table and session client. It is assumed
// the database is the current database in use.
func (c *Catalog) LockTable(id uint32, table string) {
db := c.CurrentDatabase()
c.mu.Lock()
defer c.mu.Unlock()
if _, ok := c.locks[id]; !ok {
c.locks[id] = make(dbLocks)
}

if _, ok := c.locks[id][db]; !ok {
c.locks[id][db] = make(tableLocks)
}

c.locks[id][db][table] = struct{}{}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if we plan (in the future) lock all DB or Session (not only tables, rows), but if yes then maybe flat structure, e.g. c.locks["id.db.table"] may work better, because you don't need to create nested structs. Moreover when you unlock you have to propagate this info to the higher levels, instead of delete one key.
But frankly it's up to you.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The thing is, we use the elements of each separate step. c.locks[id] to get all the locks for a session and then c.locks[id][db] to get the tables. We can't do that with a single key

}

// UnlockTables unlocks all tables for which the given session client has a
// lock.
func (c *Catalog) UnlockTables(ctx *Context, id uint32) error {
c.mu.Lock()
defer c.mu.Unlock()

var errors []string
for db, tables := range c.locks[id] {
for t := range tables {
table, err := c.dbs.Table(db, t)
if err == nil {
if lockable, ok := table.(Lockable); ok {
if err := lockable.Unlock(ctx, id); err != nil {
errors = append(errors, err.Error())
}
}
} else {
errors = append(errors, err.Error())
}
}
}

delete(c.locks, id)
if len(errors) > 0 {
return fmt.Errorf("error unlocking tables for %d: %s", id, strings.Join(errors, ", "))
}

return nil
}
37 changes: 37 additions & 0 deletions sql/catalog_locks_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package sql

import (
"testing"

"github.com/stretchr/testify/require"
)

func TestCatalogLockTable(t *testing.T) {
require := require.New(t)
c := NewCatalog()
c.SetCurrentDatabase("db1")
c.LockTable(1, "foo")
c.LockTable(2, "bar")
c.LockTable(1, "baz")
c.SetCurrentDatabase("db2")
c.LockTable(1, "qux")

expected := sessionLocks{
1: dbLocks{
"db1": tableLocks{
"foo": struct{}{},
"baz": struct{}{},
},
"db2": tableLocks{
"qux": struct{}{},
},
},
2: dbLocks{
"db1": tableLocks{
"bar": struct{}{},
},
},
}

require.Equal(expected, c.locks)
}
41 changes: 41 additions & 0 deletions sql/catalog_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,3 +81,44 @@ func TestCatalogTable(t *testing.T) {
require.NoError(err)
require.Equal(mytable, table)
}

func TestCatalogUnlockTables(t *testing.T) {
require := require.New(t)

db := mem.NewDatabase("db")
t1 := newLockableTable(mem.NewTable("t1", nil))
t2 := newLockableTable(mem.NewTable("t2", nil))
db.AddTable("t1", t1)
db.AddTable("t2", t2)

c := sql.NewCatalog()
c.AddDatabase(db)

c.LockTable(1, "t1")
c.LockTable(1, "t2")

require.NoError(c.UnlockTables(nil, 1))

require.Equal(1, t1.unlocks)
require.Equal(1, t2.unlocks)
}

type lockableTable struct {
sql.Table
unlocks int
}

func newLockableTable(t sql.Table) *lockableTable {
return &lockableTable{Table: t}
}

var _ sql.Lockable = (*lockableTable)(nil)

func (l *lockableTable) Lock(ctx *sql.Context, write bool) error {
return nil
}

func (l *lockableTable) Unlock(ctx *sql.Context, id uint32) error {
l.unlocks++
return nil
}
16 changes: 16 additions & 0 deletions sql/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,3 +193,19 @@ type Database interface {
type Alterable interface {
Create(name string, schema Schema) error
}

// Lockable should be implemented by tables that can be locked and unlocked.
type Lockable interface {
Nameable
// Lock locks the table either for reads or writes. Any session clients can
// read while the table is locked for read, but not write.
// When the table is locked for write, nobody can write except for the
// session client that requested the lock.
Lock(ctx *Context, write bool) error
// Unlock releases the lock for the current session client. It blocks until
// all reads or writes started during the lock are finished.
// Context may be nil if the unlock it's because the connection was closed.
// The id will always be provided, since in some cases context is not
// available.
Unlock(ctx *Context, id uint32) error
}
10 changes: 4 additions & 6 deletions sql/parse/describe.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ func parseDescribeQuery(ctx *sql.Context, s string) (sql.Node, error) {
r := bufio.NewReader(strings.NewReader(s))

var format, query string
steps := []parseFunc{
err := parseFuncs{
oneOf("describe", "desc", "explain"),
skipSpaces,
expect("format"),
Expand All @@ -28,12 +28,10 @@ func parseDescribeQuery(ctx *sql.Context, s string) (sql.Node, error) {
readIdent(&format),
skipSpaces,
readRemaining(&query),
}
}.exec(r)

for _, step := range steps {
if err := step(r); err != nil {
return nil, err
}
if err != nil {
return nil, err
}

if format != "tree" {
Expand Down
Loading