Skip to content

Commit

Permalink
Add sqlprojection.CreateSchema() and DropSchema().
Browse files Browse the repository at this point in the history
  • Loading branch information
jmalloc committed Dec 11, 2020
1 parent 5b6e03a commit dba0c0c
Show file tree
Hide file tree
Showing 7 changed files with 174 additions and 113 deletions.
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,12 @@ The format is based on [Keep a Changelog], and this project adheres to
[Keep a Changelog]: https://keepachangelog.com/en/1.0.0/
[Semantic Versioning]: https://semver.org/spec/v2.0.0.html

## [0.6.1] - 2020-12-11

### Added

- Add `sqlprojection.CreateSchema()` and `DropSchema()`

## [0.6.0] - 2020-12-11

As of this release the various SQL projection drivers no longer depend on
Expand Down Expand Up @@ -118,6 +124,7 @@ placeholder format (`?` for MySQL, `$1` for everything else).
[0.5.0]: https://github.com/dogmatiq/projectionkit/releases/tag/v0.5.0
[0.5.1]: https://github.com/dogmatiq/projectionkit/releases/tag/v0.5.1
[0.6.0]: https://github.com/dogmatiq/projectionkit/releases/tag/v0.6.0
[0.6.1]: https://github.com/dogmatiq/projectionkit/releases/tag/v0.6.1

<!-- version template
## [0.0.1] - YYYY-MM-DD
Expand Down
49 changes: 3 additions & 46 deletions sqlprojection/adaptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,7 @@ package sqlprojection
import (
"context"
"database/sql"
"sync/atomic"

"github.com/dogmatiq/cosyne"
"github.com/dogmatiq/dogma"
"github.com/dogmatiq/projectionkit/internal/identity"
"github.com/dogmatiq/projectionkit/internal/unboundhandler"
Expand All @@ -18,11 +16,7 @@ type adaptor struct {

db *sql.DB
key string

m cosyne.Mutex
resolved int32 // atomic bool, fast path
candidates []Driver
selected Driver
cs candidateSet
}

// New returns a new Dogma projection message handler by binding an SQL-specific
Expand All @@ -48,15 +42,7 @@ func New(
key: identity.Key(h),
}

if len(options) == 0 {
options = []Option{
WithCandidateDrivers(BuiltInDrivers()...),
}
}

for _, opt := range options {
opt.applyAdaptorOption(a)
}
a.cs.init(db, options)

return a
}
Expand Down Expand Up @@ -139,7 +125,7 @@ func (a *adaptor) withDriver(
ctx context.Context,
fn func(Driver) error,
) error {
d, err := a.driver(ctx)
d, err := a.cs.resolve(ctx)
if err != nil {
return err
}
Expand Down Expand Up @@ -178,32 +164,3 @@ func (a *adaptor) withTx(

return ok && err == nil, err
}

// driver returns the driver that should be used by the adaptor.
func (a *adaptor) driver(ctx context.Context) (Driver, error) {
if atomic.LoadInt32(&a.resolved) == 0 {
// If the resolved flag is 0 then a.selected has not been populated yet.
// We acquire the mutex to ensure we're the only goroutine attempting
// selection.
if err := a.m.Lock(ctx); err != nil {
return nil, err
}
defer a.m.Unlock()

// Ensure that no another goroutine selected the driver while we were
// waiting to acquire the mutex.
if atomic.LoadInt32(&a.resolved) == 0 {
// If not, it's our turn to try selection.
d, err := SelectDriver(ctx, a.db, a.candidates)
if err != nil {
return nil, err
}

a.candidates = nil
a.selected = d
atomic.StoreInt32(&a.resolved, 1)
}
}

return a.selected, nil
}
8 changes: 2 additions & 6 deletions sqlprojection/adaptor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ var _ = Describe("type adaptor", func() {
ctx context.Context
cancel context.CancelFunc
database *sqltest.Database
driver Driver
db *sql.DB
adaptor dogma.ProjectionMessageHandler
)
Expand All @@ -53,17 +52,14 @@ var _ = Describe("type adaptor", func() {
db, err = database.Open()
Expect(err).ShouldNot(HaveOccurred())

driver, err = SelectDriver(ctx, db, BuiltInDrivers())
Expect(err).ShouldNot(HaveOccurred())

err = driver.CreateSchema(ctx, db)
err = CreateSchema(ctx, db)
Expect(err).ShouldNot(HaveOccurred())

adaptor = New(db, handler)
})

AfterEach(func() {
err := driver.DropSchema(ctx, db)
err := DropSchema(ctx, db)
Expect(err).ShouldNot(HaveOccurred())

err = database.Close()
Expand Down
28 changes: 0 additions & 28 deletions sqlprojection/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,6 @@ package sqlprojection
import (
"context"
"database/sql"
"fmt"

"go.uber.org/multierr"
)

// Driver is an interface for database-specific projection drivers.
Expand Down Expand Up @@ -64,28 +61,3 @@ func BuiltInDrivers() []Driver {
SQLiteDriver,
}
}

// SelectDriver returns the appropriate driver implementation to use with the
// given database from a list of candidate drivers.
func SelectDriver(ctx context.Context, db *sql.DB, candidates []Driver) (Driver, error) {
var err error

for _, d := range candidates {
e := d.IsCompatibleWith(ctx, db)
if e == nil {
return d, nil
}

err = multierr.Append(err, fmt.Errorf(
"%T is not compatible with %T: %w",
d,
db.Driver(),
e,
))
}

return nil, multierr.Append(err, fmt.Errorf(
"none of the candidate drivers are compatible with %T",
db.Driver(),
))
}
33 changes: 0 additions & 33 deletions sqlprojection/option.go

This file was deleted.

40 changes: 40 additions & 0 deletions sqlprojection/schema.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package sqlprojection

import (
"context"
"database/sql"
)

// CreateSchema creates the schema elements necessary to store projections on
// the given database.
//
// If no candidate drivers are provided all built-in drivers are considered as
// candidates.
func CreateSchema(ctx context.Context, db *sql.DB, options ...Option) error {
var cs candidateSet
cs.init(db, options)

d, err := cs.resolve(ctx)
if err != nil {
return err
}

return d.CreateSchema(ctx, db)
}

// DropSchema drops the schema elements necessary to store projections on the
// given database.
//
// If no candidate drivers are provided all built-in drivers are considered as
// candidates.
func DropSchema(ctx context.Context, db *sql.DB, options ...Option) error {
var cs candidateSet
cs.init(db, options)

d, err := cs.resolve(ctx)
if err != nil {
return err
}

return d.DropSchema(ctx, db)
}
122 changes: 122 additions & 0 deletions sqlprojection/select.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
package sqlprojection

import (
"context"
"database/sql"
"fmt"
"sync/atomic"

"github.com/dogmatiq/cosyne"
"go.uber.org/multierr"
)

// SelectDriver returns the appropriate driver implementation to use with the
// given database from a list of candidate drivers.
func SelectDriver(ctx context.Context, db *sql.DB, candidates []Driver) (Driver, error) {
var err error

for _, d := range candidates {
e := d.IsCompatibleWith(ctx, db)
if e == nil {
return d, nil
}

err = multierr.Append(err, fmt.Errorf(
"%T is not compatible with %T: %w",
d,
db.Driver(),
e,
))
}

return nil, multierr.Append(err, fmt.Errorf(
"none of the candidate drivers are compatible with %T",
db.Driver(),
))
}

// An Option configures the optional behavior of an SQL projection.
type Option interface {
applyCandidateSetOption(*candidateSet)
}

type adaptorOptionFunc func(*candidateSet)

func (f adaptorOptionFunc) applyCandidateSetOption(s *candidateSet) {
f(s)
}

// WithDriver returns an Option that forces use of a specific Driver.
//
// It takes precedence over any WithCandidateDriver() option.
func WithDriver(d Driver) Option {
return adaptorOptionFunc(func(s *candidateSet) {
s.resolved = 1
s.candidates = []Driver{d}
})
}

// WithCandidateDrivers returns an Option that adds candidate drivers for
// selection as the driver to use.
func WithCandidateDrivers(drivers ...Driver) Option {
return adaptorOptionFunc(func(s *candidateSet) {
if s.resolved == 0 {
s.candidates = append(s.candidates, drivers...)
}
})
}

// candidateSet is a set of drivers that are candidates for use with a
// particular database.
type candidateSet struct {
m cosyne.Mutex
resolved uint32
db *sql.DB
candidates []Driver
}

// init sets up the candidate set.
//
// If options is empty the default options are applied.
func (s *candidateSet) init(db *sql.DB, options []Option) {
s.db = db

if len(options) == 0 {
options = []Option{
WithCandidateDrivers(BuiltInDrivers()...),
}
}

for _, opt := range options {
opt.applyCandidateSetOption(s)
}
}

// resolve selects the appropriate driver from the candidates.
func (s *candidateSet) resolve(ctx context.Context) (Driver, error) {
if atomic.LoadUint32(&s.resolved) == 0 {
// If the resolved flag is 0 then a.selected has not been populated yet.
// We acquire the mutex to ensure we're the only goroutine attempting
// selection.
if err := s.m.Lock(ctx); err != nil {
return nil, err
}
defer s.m.Unlock()

// Ensure that no another goroutine selected the driver while we were
// waiting to acquire the mutex.
if atomic.LoadUint32(&s.resolved) == 0 {
// If not, it's our turn to try selection.
d, err := SelectDriver(ctx, s.db, s.candidates)
if err != nil {
return nil, err
}

s.db = nil
s.candidates = []Driver{d}
atomic.StoreUint32(&s.resolved, 1)
}
}

return s.candidates[0], nil
}

0 comments on commit dba0c0c

Please sign in to comment.