Skip to content

Commit

Permalink
api: write a connection schema getter
Browse files Browse the repository at this point in the history
Write a helper function to load the actual schema for the user.

Previously we stored actual schema in a private `schemaResolver`
field and `Schema` field was used only to get a current schema.
But now because of the new function, we don't need to store the
`Schema` as a different field. So `Schema` was also removed.

To update the schema, used needs to use `GetSchema` + `SetSchema`
in pair. `SetSchema(Schema)` replacing the `OverrideSchema(*Schema)`.

`Spaces` and `SpacesById` fields of the `Schema` struct store spaces by value.
`Fields` and `FieldsById` fields of the `Space` struct store fields by value.
`Index` and `IndexById` fields of the `Space` struct store indexes by value.
`Fields` field of the `Index` struct store `IndexField` by value.

Closes #7
  • Loading branch information
DerekBum committed Nov 21, 2023
1 parent 6225ec4 commit 663b54b
Show file tree
Hide file tree
Showing 6 changed files with 195 additions and 67 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release.
- Support `IPROTO_FEATURE_SPACE_AND_INDEX_NAMES` for Tarantool
version >= 3.0.0-alpha1 (#338). It allows to use space and index names
in requests instead of their IDs.
- `GetSchema` function to get the actual schema (#7)

### Changed

Expand All @@ -51,6 +52,9 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release.
instead of `crud.OptUint` (#342)
- Change all `Upsert` and `Update` requests to accept `*tarantool.Operations`
as `ops` parameters instead of `interface{}` (#348)
- Change `OverrideSchema(*Schema)` to `SetSchema(Schema)` (#7)
- Change values, stored by pointers in the `Schema`, `Space`, `Index` structs,
to be stored by their values (#7)

### Deprecated

Expand All @@ -70,6 +74,7 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release.
- UUID_extId (#158)
- IPROTO constants (#158)
- Code() method from the Request interface (#158)
- `Schema` field from the `Connection` struct (#7)

### Fixed

Expand Down
11 changes: 11 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,13 @@ now does not attempt to reconnect and tries to establish a connection only once.
Function might be canceled via context. Context accepted as first argument,
and user may cancel it in process.

#### Connection schema

* Removed `Schema` field from the `Connection` struct. Instead, new
`GetSchema(Connector)` function was added to get the actual connection
schema on demand.
* `OverrideSchema(*Schema)` method replaced with the `SetSchema(Schema)`.

#### Protocol changes

* `iproto.Feature` type used instead of `ProtocolFeature`.
Expand All @@ -260,6 +267,10 @@ and user may cancel it in process.
interface to get information if the usage of space and index names in requests
is supported.
* `Schema` structure no longer implements `SchemaResolver` interface.
* `Spaces` and `SpacesById` fields of the `Schema` struct store spaces by value.
* `Fields` and `FieldsById` fields of the `Space` struct store fields by value.
`Index` and `IndexById` fields of the `Space` struct store indexes by value.
* `Fields` field of the `Index` struct store `IndexField` by value.

## Contributing

Expand Down
28 changes: 17 additions & 11 deletions connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,8 +160,6 @@ type Connection struct {
c Conn
mutex sync.Mutex
cond *sync.Cond
// Schema contains schema loaded on connection.
Schema *Schema
// schemaResolver contains a SchemaResolver implementation.
schemaResolver SchemaResolver
// requestId contains the last request ID for requests with nil context.
Expand Down Expand Up @@ -436,12 +434,14 @@ func Connect(ctx context.Context, addr string, opts Opts) (conn *Connection, err

// TODO: reload schema after reconnect.
if !conn.opts.SkipSchema {
if err = conn.loadSchema(); err != nil {
schema, err := GetSchema(conn)
if err != nil {
conn.mutex.Lock()
defer conn.mutex.Unlock()
conn.closeConnection(err, true)
return nil, err
}
conn.SetSchema(schema)
}

return conn, err
Expand Down Expand Up @@ -1302,15 +1302,21 @@ func (conn *Connection) ConfiguredTimeout() time.Duration {
return conn.opts.Timeout
}

// OverrideSchema sets Schema for the connection.
func (conn *Connection) OverrideSchema(s *Schema) {
if s != nil {
conn.mutex.Lock()
defer conn.mutex.Unlock()
conn.lockShards()
defer conn.unlockShards()
// SetSchema sets Schema for the connection.
func (conn *Connection) SetSchema(s Schema) {
sCopy := s.copy()
spaceAndIndexNamesSupported :=
isFeatureInSlice(iproto.IPROTO_FEATURE_SPACE_AND_INDEX_NAMES,
conn.serverProtocolInfo.Features)

conn.Schema = s
conn.mutex.Lock()
defer conn.mutex.Unlock()
conn.lockShards()
defer conn.unlockShards()

conn.schemaResolver = &loadedSchemaResolver{
Schema: sCopy,
SpaceAndIndexNamesSupported: spaceAndIndexNamesSupported,
}
}

Expand Down
26 changes: 23 additions & 3 deletions example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1063,7 +1063,10 @@ func ExampleSchema() {
conn := exampleConnect(opts)
defer conn.Close()

schema := conn.Schema
schema, err := tarantool.GetSchema(conn)
if err != nil {
fmt.Printf("unexpected error: %s\n", err.Error())
}
if schema.SpacesById == nil {
fmt.Println("schema.SpacesById is nil")
}
Expand All @@ -1080,13 +1083,30 @@ func ExampleSchema() {
// Space 2 ID 616 schematest
}

// Example demonstrates how to update the connection schema.
func ExampleConnection_SetSchema() {
conn := exampleConnect(opts)
defer conn.Close()

// Get the actual schema.
schema, err := tarantool.GetSchema(conn)
if err != nil {
fmt.Printf("unexpected error: %s\n", err.Error())
}
// Update the current schema to match the actual one.
conn.SetSchema(schema)
}

// Example demonstrates how to retrieve information with space schema.
func ExampleSpace() {
conn := exampleConnect(opts)
defer conn.Close()

// Save Schema to a local variable to avoid races
schema := conn.Schema
schema, err := tarantool.GetSchema(conn)
if err != nil {
fmt.Printf("unexpected error: %s\n", err.Error())
}
if schema.SpacesById == nil {
fmt.Println("schema.SpacesById is nil")
}
Expand Down Expand Up @@ -1120,7 +1140,7 @@ func ExampleSpace() {
// Space 1 ID 617 test memtx
// Space 1 ID 0 false
// Index 0 primary
// &{0 unsigned} &{2 string}
// {0 unsigned} {2 string}
// SpaceField 1 name0 unsigned
// SpaceField 2 name3 unsigned
}
Expand Down
107 changes: 68 additions & 39 deletions schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"errors"
"fmt"

"github.com/tarantool/go-iproto"
"github.com/vmihailenco/msgpack/v5"
"github.com/vmihailenco/msgpack/v5/msgpcode"
)
Expand Down Expand Up @@ -58,9 +57,22 @@ type SchemaResolver interface {
type Schema struct {
Version uint
// Spaces is map from space names to spaces.
Spaces map[string]*Space
Spaces map[string]Space
// SpacesById is map from space numbers to spaces.
SpacesById map[uint32]*Space
SpacesById map[uint32]Space
}

func (schema *Schema) copy() Schema {
schemaCopy := *schema
schemaCopy.Spaces = make(map[string]Space, len(schema.Spaces))
for name, space := range schema.Spaces {
schemaCopy.Spaces[name] = space.copy()
}
schemaCopy.SpacesById = make(map[uint32]Space, len(schema.SpacesById))
for id, space := range schema.SpacesById {
schemaCopy.SpacesById[id] = space.copy()
}
return schemaCopy
}

// Space contains information about Tarantool's space.
Expand All @@ -72,12 +84,33 @@ type Space struct {
Temporary bool // Is this space temporary?
// Field configuration is not mandatory and not checked by Tarantool.
FieldsCount uint32
Fields map[string]*Field
FieldsById map[uint32]*Field
Fields map[string]Field
FieldsById map[uint32]Field
// Indexes is map from index names to indexes.
Indexes map[string]*Index
Indexes map[string]Index
// IndexesById is map from index numbers to indexes.
IndexesById map[uint32]*Index
IndexesById map[uint32]Index
}

func (space *Space) copy() Space {
spaceCopy := *space
spaceCopy.Fields = make(map[string]Field, len(space.Fields))
for name, field := range space.Fields {
spaceCopy.Fields[name] = field
}
spaceCopy.FieldsById = make(map[uint32]Field, len(space.FieldsById))
for id, field := range space.FieldsById {
spaceCopy.FieldsById[id] = field
}
spaceCopy.Indexes = make(map[string]Index, len(space.Indexes))
for name, index := range space.Indexes {
spaceCopy.Indexes[name] = index.copy()
}
spaceCopy.IndexesById = make(map[uint32]Index, len(space.IndexesById))
for id, index := range space.IndexesById {
spaceCopy.IndexesById[id] = index.copy()
}
return spaceCopy
}

func (space *Space) DecodeMsgpack(d *msgpack.Decoder) error {
Expand Down Expand Up @@ -135,17 +168,17 @@ func (space *Space) DecodeMsgpack(d *msgpack.Decoder) error {
return errors.New("unexpected schema format (space flags)")
}
}
space.FieldsById = make(map[uint32]*Field)
space.Fields = make(map[string]*Field)
space.IndexesById = make(map[uint32]*Index)
space.Indexes = make(map[string]*Index)
space.FieldsById = make(map[uint32]Field)
space.Fields = make(map[string]Field)
space.IndexesById = make(map[uint32]Index)
space.Indexes = make(map[string]Index)
if arrayLen >= vspaceSpFormatFieldNum {
fieldCount, err := d.DecodeArrayLen()
if err != nil {
return err
}
for i := 0; i < fieldCount; i++ {
field := &Field{}
field := Field{}
if err := field.DecodeMsgpack(d); err != nil {
return err
}
Expand Down Expand Up @@ -206,7 +239,14 @@ type Index struct {
Name string
Type string
Unique bool
Fields []*IndexField
Fields []IndexField
}

func (index *Index) copy() Index {
indexCopy := *index
indexCopy.Fields = make([]IndexField, len(index.Fields))
copy(indexCopy.Fields, index.Fields)
return indexCopy
}

func (index *Index) DecodeMsgpack(d *msgpack.Decoder) error {
Expand Down Expand Up @@ -261,9 +301,9 @@ func (index *Index) DecodeMsgpack(d *msgpack.Decoder) error {
if err != nil {
return err
}
index.Fields = make([]*IndexField, fieldCount)
index.Fields = make([]IndexField, fieldCount)
for i := 0; i < int(fieldCount); i++ {
index.Fields[i] = new(IndexField)
index.Fields[i] = IndexField{}
if index.Fields[i].Id, err = d.DecodeUint32(); err != nil {
return err
}
Expand Down Expand Up @@ -340,51 +380,40 @@ func (indexField *IndexField) DecodeMsgpack(d *msgpack.Decoder) error {
return errors.New("unexpected schema format (index fields)")
}

func (conn *Connection) loadSchema() (err error) {
schema := new(Schema)
schema.SpacesById = make(map[uint32]*Space)
schema.Spaces = make(map[string]*Space)
// GetSchema returns the actual schema for the connection.
func GetSchema(conn Connector) (Schema, error) {
schema := Schema{}
schema.SpacesById = make(map[uint32]Space)
schema.Spaces = make(map[string]Space)

// Reload spaces.
var spaces []*Space
err = conn.SelectTyped(vspaceSpId, 0, 0, maxSchemas, IterAll, []interface{}{}, &spaces)
var spaces []Space
err := conn.SelectTyped(vspaceSpId, 0, 0, maxSchemas, IterAll, []interface{}{}, &spaces)
if err != nil {
return err
return Schema{}, err
}
for _, space := range spaces {
schema.SpacesById[space.Id] = space
schema.Spaces[space.Name] = space
}

// Reload indexes.
var indexes []*Index
var indexes []Index
err = conn.SelectTyped(vindexSpId, 0, 0, maxSchemas, IterAll, []interface{}{}, &indexes)
if err != nil {
return err
return Schema{}, err
}
for _, index := range indexes {
spaceId := index.SpaceId
if _, ok := schema.SpacesById[spaceId]; ok {
schema.SpacesById[spaceId].IndexesById[index.Id] = index
schema.SpacesById[spaceId].Indexes[index.Name] = index
} else {
return errors.New("concurrent schema update")
return Schema{}, errors.New("concurrent schema update")
}
}

spaceAndIndexNamesSupported :=
isFeatureInSlice(iproto.IPROTO_FEATURE_SPACE_AND_INDEX_NAMES,
conn.serverProtocolInfo.Features)

conn.lockShards()
conn.Schema = schema
conn.schemaResolver = &loadedSchemaResolver{
Schema: schema,
SpaceAndIndexNamesSupported: spaceAndIndexNamesSupported,
}
conn.unlockShards()

return nil
return schema, nil
}

// resolveSpaceNumber tries to resolve a space number.
Expand Down Expand Up @@ -462,7 +491,7 @@ func resolveIndexNumber(i interface{}) (uint32, error) {
}

type loadedSchemaResolver struct {
Schema *Schema
Schema Schema
// SpaceAndIndexNamesSupported shows if a current Tarantool version supports
// iproto.IPROTO_FEATURE_SPACE_AND_INDEX_NAMES.
SpaceAndIndexNamesSupported bool
Expand Down
Loading

0 comments on commit 663b54b

Please sign in to comment.