Skip to content

Commit

Permalink
It work!
Browse files Browse the repository at this point in the history
  • Loading branch information
yuta17 committed May 4, 2020
1 parent 46a21e9 commit ea11fc2
Show file tree
Hide file tree
Showing 5 changed files with 214 additions and 50 deletions.
27 changes: 8 additions & 19 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ gobulk is a tool to DB replication.

This tool can be used to create database for data analysis, staging environment.

When you run it, all schema information and data will be synchronized.
When you run it, all schema information and data will be synced.

If you want to add options, write options in yaml files.

Expand All @@ -14,24 +14,13 @@ If you want to add options, write options in yaml files.
go get github.com/yuta17/gobulk
```

## Usage
## Examples

```go
inputUrl := `YOUR INPUT DB URL`
outputUrl := `YOUR OUTPUT DB URL`
columnOptions := ioutil.ReadFile("./tables/table1.yml")

client := gobulk.NewClient(inputUrl, outputUrl)
client.SetColumnOptions(columnOptions)
client.Sync()
```
[example](https://github.com/yuta17/gobulk/blob/master/example/main.go)

## Todo

```yml
# ./tables/table1.yml
table:
name: table1
masking_columns:
- column1
- column2
```
- [x] Sync data from input database to output database.
- [ ] Masking data in specified column.
- [ ] Automatically follow column changes of input database.
- [ ] Support multiple DBMS client.
211 changes: 185 additions & 26 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,21 @@ package gobulk

import (
"database/sql"
"fmt"
"log"
"strconv"
"strings"

_ "github.com/go-sql-driver/mysql"
)

// Client .
type Client struct {
DbmsName string
InputUrl string
OutputUrl string
InputDB *sql.DB
OutputDB *sql.DB
}

// Column .
type Column struct {
Field string
Type string
Expand All @@ -22,55 +26,210 @@ type Column struct {
Extra string
}

var tableName string
var columnName string

func NewClient(DbmsName string, InputUrl string, OutputUrl string) *Client {
return &Client{DbmsName: DbmsName, InputUrl: InputUrl, OutputUrl: OutputUrl}
// NewClient .
func NewClient(DbmsName, InputURL, OutputURL string) (*Client, error) {
inputDb, err := inputDB(DbmsName, InputURL)
if err != nil {
return nil, err
}
outputDb, err := outputDB(DbmsName, OutputURL)
if err != nil {
return nil, err
}
return &Client{InputDB: inputDb, OutputDB: outputDb}, nil
}

func (c *Client) Sync() error {
// input
inputDb, err := sql.Open(c.DbmsName, c.InputUrl)
func inputDB(dbmsName, inputURL string) (*sql.DB, error) {
inputDb, err := sql.Open(dbmsName, inputURL)
if err != nil {
return err
return nil, err
}
err = inputDb.Ping()
if err != nil {
return err
return nil, err
}
return inputDb, nil
}

// output
outputDb, err := sql.Open(c.DbmsName, c.OutputUrl)
func outputDB(dbmsName, outputURL string) (*sql.DB, error) {
outputDb, err := sql.Open(dbmsName, outputURL)
if err != nil {
return err
return nil, err
}
err = outputDb.Ping()
if err != nil {
return err
return nil, err
}
defer inputDb.Close()
defer outputDb.Close()
return outputDb, nil
}

// Sync .
func (c *Client) Sync() error {
defer c.InputDB.Close()
defer c.OutputDB.Close()

tables, err := inputDb.Query("show tables")
tables, err := c.InputDB.Query("show tables")
if err != nil {
return err
}

var tableName string
for tables.Next() {
err := tables.Scan(&tableName)
if err != nil {
return err
}
log.Println("=== table:" + tableName + " ===")
columns, err := inputDb.Query("show columns from " + tableName)
for columns.Next() {
var column Column
err := columns.Scan(&column.Field, &column.Type, &column.Null, &column.Key, &column.Default, &column.Extra)

log.Println("=== table: " + tableName + " ===")

err = c.createTableIfNotExisted(tableName)
if err != nil {
return err
}

rows, err := c.InputDB.Query("select * from " + tableName)
if err != nil {
return err
}
if !rows.Next() {
continue
}
columnNames, err := rows.Columns()
if err != nil {
return err
}
values, err := c.getValues(rows, columnNames)
if err != nil {
return err
}

if len(values) != 0 {
err := c.execUpsertQuery(tableName, columnNames, values)
if err != nil {
return err
}
log.Println("column: " + column.Field)
}
rows.Close()
}
tables.Close()

return nil
}

func (c *Client) createTableIfNotExisted(tableName string) error {
_, notExistedErr := c.OutputDB.Query("select * from " + tableName + " limit 1")
columns, err := c.InputDB.Query("show columns from " + tableName)
if err != nil {
return err
}

createColumns := []string{}
for columns.Next() {
var column Column
err := columns.Scan(&column.Field, &column.Type, &column.Null, &column.Key, &column.Default, &column.Extra)
if err != nil {
return err
}

if notExistedErr != nil {
var notNullStr string
var priStr string
if column.Key == "YES" {
notNullStr = "not null"
} else {
notNullStr = ""
}

if column.Key == "PRI" {
priStr = "primary key"
} else {
priStr = ""
}

createColumns = append(createColumns, fmt.Sprintf("`%s` %s %s %s", column.Field, column.Type, notNullStr, priStr))
}
}

if notExistedErr != nil {
createTableQuery := fmt.Sprintf("create table %s (%s)", tableName, strings.Join(createColumns, ","))
_, err := c.OutputDB.Exec(createTableQuery)
if err != nil {
return err
}
log.Println(fmt.Sprintf("%s table has been created.", tableName))
}
columns.Close()
return nil
}

func (c *Client) getValues(rows *sql.Rows, columnNames []string) ([]string, error) {
count := len(columnNames)
values := make([]interface{}, count)
valuePointers := make([]interface{}, count)

rowvals := []string{}

for rows.Next() {
for i := range columnNames {
valuePointers[i] = &values[i]
}

err := rows.Scan(valuePointers...)
if err != nil {
return nil, err
}

r := []string{}
for i := range columnNames {
val := values[i]
b, ok := val.([]byte)
var v string
if ok {
v = string(b)
v = "'" + v + "'"
}
if b == nil {
v = "NULL"
}
if i == 0 {
v = "(" + v
}
if i == (len(columnNames) - 1) {
v = v + ")"
}
r = append(r, v)
}

insertValues := strings.Join(r, ",")
rowvals = append(rowvals, insertValues)
}
return rowvals, nil
}

func (c *Client) execUpsertQuery(tableName string, columnNames, values []string) error {
updateValues := []string{}
columnNamesWithBackQuote := []string{}

for _, columnName := range columnNames {
columnNamesWithBackQuote = append(columnNamesWithBackQuote, "`"+columnName+"`")
v := fmt.Sprintf("`%s` = values(`%s`)",
columnName,
columnName)
updateValues = append(updateValues, v)
}

upsertQuery := fmt.Sprintf(`insert into %s (%s) values %s on duplicate key update %s`,
tableName,
strings.Join(columnNamesWithBackQuote, ","),
strings.Join(values, ","),
strings.Join(updateValues, ","))

_, err := c.OutputDB.Exec(upsertQuery)
if err != nil {
return err
}

recordCount := len(values)
log.Println(fmt.Sprintf("Done. Upserted records count: %s", strconv.Itoa(recordCount)))
return nil
}
13 changes: 13 additions & 0 deletions client_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package gobulk

import (
"testing"
)

func TestNewClient(t *testing.T) {
t.SkipNow()
}

func TestSync(t *testing.T) {
t.SkipNow()
}
11 changes: 7 additions & 4 deletions example/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,13 @@ import (
)

func main() {
inputUrl := "root@tcp(127.0.0.1:3306)/campfire_development"
outputUrl := "root@tcp(127.0.0.1:3306)/campfire_development_copy"
client := gobulk.NewClient("mysql", inputUrl, outputUrl)
err := client.Sync()
inputURL := "root@tcp(127.0.0.1:3306)/YOUR_INPUT_DATABASE_NAME"
outputURL := "root@tcp(127.0.0.1:3306)/YOUR_OUTPUT_DATABASE_NAME"
client, err := gobulk.NewClient("mysql", inputURL, outputURL)
if err != nil {
log.Errorln(err)
}
err = client.Sync()
if err != nil {
log.Errorln(err)
}
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,5 @@ go 1.14

require (
github.com/go-sql-driver/mysql v1.5.0
github.com/prometheus/common v0.9.1 // indirect
github.com/prometheus/common v0.9.1
)

0 comments on commit ea11fc2

Please sign in to comment.