Skip to content

Commit

Permalink
totally rewritten
Browse files Browse the repository at this point in the history
  • Loading branch information
spikeekips committed Dec 17, 2018
1 parent 3df70d2 commit 9906075
Show file tree
Hide file tree
Showing 36 changed files with 954 additions and 249 deletions.
20 changes: 10 additions & 10 deletions cmd/sebak/cmd/cmd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,14 @@ import (
)

func TestParseFlagValidators(t *testing.T) {
vs, err := parseFlagValidators("https://localhost:12346?address=GDPQ2LBYP3RL3O675H2N5IEYM6PRJNUA5QFMKXIHGTKEB5KS5T3KHFA2")
vs, err := parseFlagValidators("GDPQ2LBYP3RL3O675H2N5IEYM6PRJNUA5QFMKXIHGTKEB5KS5T3KHFA2")
require.NoError(t, err)
require.Equal(t, 1, len(vs))
}

func TestParseFlagsNode(t *testing.T) {
flagNetworkID = "sebak-test-network"
flagValidators = "https://localhost:12346?address=GDPQ2LBYP3RL3O675H2N5IEYM6PRJNUA5QFMKXIHGTKEB5KS5T3KHFA2"
flagValidators = "GDPQ2LBYP3RL3O675H2N5IEYM6PRJNUA5QFMKXIHGTKEB5KS5T3KHFA2"
flagKPSecretSeed = "SCN4NSV5SVHIZWUDJFT4Z5FFVHO3TFRTOIBQLHMNPAZJ37K5A2YFSCBM"
flagBindURL = "http://0.0.0.0:12345"

Expand Down Expand Up @@ -70,24 +70,24 @@ func TestParseFlagsNode(t *testing.T) {

func TestParseFlagSelfValidators(t *testing.T) {
flagNetworkID = "sebak-test-network"
flagValidators = "https://localhost:12346?address=GDPQ2LBYP3RL3O675H2N5IEYM6PRJNUA5QFMKXIHGTKEB5KS5T3KHFA2"
flagValidators = "GDPQ2LBYP3RL3O675H2N5IEYM6PRJNUA5QFMKXIHGTKEB5KS5T3KHFA2"
flagKPSecretSeed = "SCN4NSV5SVHIZWUDJFT4Z5FFVHO3TFRTOIBQLHMNPAZJ37K5A2YFSCBM"
flagBindURL = "http://0.0.0.0:12345"

parseFlagsNode()
require.Equal(t, 2, len(localNode.GetValidators()))

parsedValidator, _ := node.NewValidatorFromURI(flagValidators)
parsedValidator, _ := node.NewValidator(flagValidators, nil, "")
fmt.Println(localNode.GetValidators())
validator := localNode.GetValidators()[parsedValidator.Address()]

require.Equal(t, validator.Address(), parsedValidator.Address())
require.Equal(t, validator.Endpoint().Host, parsedValidator.Endpoint().Host)
require.Equal(t, validator.Endpoint().Port(), parsedValidator.Endpoint().Port())
require.Nil(t, validator.Endpoint())
}

func TestAddingSelfValidatorsWithoutSelf(t *testing.T) {
flagNetworkID = "sebak-test-network"
flagValidators = "https://localhost:12346?address=GDPQ2LBYP3RL3O675H2N5IEYM6PRJNUA5QFMKXIHGTKEB5KS5T3KHFA2"
flagValidators = "GDPQ2LBYP3RL3O675H2N5IEYM6PRJNUA5QFMKXIHGTKEB5KS5T3KHFA2"
flagKPSecretSeed = "SCN4NSV5SVHIZWUDJFT4Z5FFVHO3TFRTOIBQLHMNPAZJ37K5A2YFSCBM"
flagBindURL = "http://0.0.0.0:12345"

Expand All @@ -98,7 +98,7 @@ func TestAddingSelfValidatorsWithoutSelf(t *testing.T) {

{ // check validator added
var found bool
v, _ := node.NewValidatorFromURI(flagValidators)
v, _ := node.NewValidator(flagValidators, nil, "")
for _, validator := range localNode.GetValidators() {
if v.Address() == validator.Address() {
found = true
Expand All @@ -119,7 +119,7 @@ func TestAddingSelfValidatorsWithoutSelf(t *testing.T) {
}

func TestAddingSelfValidatorsWithSelf(t *testing.T) {
targetValidators := "https://localhost:12346?address=GDPQ2LBYP3RL3O675H2N5IEYM6PRJNUA5QFMKXIHGTKEB5KS5T3KHFA2"
targetValidators := "GDPQ2LBYP3RL3O675H2N5IEYM6PRJNUA5QFMKXIHGTKEB5KS5T3KHFA2"

flagNetworkID = "sebak-test-network"
flagValidators = "self " + targetValidators
Expand All @@ -133,7 +133,7 @@ func TestAddingSelfValidatorsWithSelf(t *testing.T) {

{ // check validator added
var found bool
v, _ := node.NewValidatorFromURI(targetValidators)
v, _ := node.NewValidator(targetValidators, nil, "")
for _, validator := range localNode.GetValidators() {
if v.Address() == validator.Address() {
found = true
Expand Down
171 changes: 150 additions & 21 deletions cmd/sebak/cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ var (

flagWatcherMode bool = common.GetENVValue("SEBAK_WATCHER_MODE", "0") == "1"
flagWatchInterval string = common.GetENVValue("SEBAK_WATCH_INTERVAL", "5s")

flagDiscovery cmdcommon.ListFlags // "SEBAK_DISCOVERY"
)

var (
Expand Down Expand Up @@ -116,6 +118,7 @@ var (
syncCheckPrevBlock time.Duration
jsonrpcbindEndpoint *common.Endpoint
watchInterval time.Duration
discoveryEndpoints []*common.Endpoint

logLevel logging.Lvl
log logging.Logger = logging.New("module", "main")
Expand Down Expand Up @@ -226,6 +229,7 @@ func init() {
nodeCmd.Flags().StringVar(&flagCongressAddress, "set-congress-address", flagCongressAddress, "set congress address")
nodeCmd.Flags().BoolVar(&flagWatcherMode, "watcher-mode", flagWatcherMode, "watcher mode")
nodeCmd.Flags().StringVar(&flagWatchInterval, "watch-interval", flagWatchInterval, "watch interval")
nodeCmd.Flags().Var(&flagDiscovery, "discovery", "initial endpoint for discovery")

rootCmd.AddCommand(nodeCmd)
}
Expand Down Expand Up @@ -281,8 +285,8 @@ func parseFlagRateLimit(l cmdcommon.ListFlags, defaultRate limiter.Rate) (rule c
return
}

func parseFlagValidators(v string) (vs []*node.Validator, err error) {
splitted := strings.Fields(strings.TrimSpace(v))
func parseFlagValidators(s string) (vs []*node.Validator, err error) {
splitted := strings.Fields(strings.TrimSpace(s))
if len(splitted) < 1 {
err = fmt.Errorf("must be given")
return
Expand All @@ -293,8 +297,12 @@ func parseFlagValidators(v string) (vs []*node.Validator, err error) {
continue
}

if _, err = keypair.Parse(v); err != nil {
return
}

var validator *node.Validator
if validator, err = node.NewValidatorFromURI(v); err != nil {
if validator, err = node.NewValidator(v, nil, ""); err != nil {
return
}
vs = append(vs, validator)
Expand All @@ -303,6 +311,34 @@ func parseFlagValidators(v string) (vs []*node.Validator, err error) {
return
}

func parseFlagDiscovery(l cmdcommon.ListFlags) (endpoints []*common.Endpoint, err error) {
if len(l) < 1 {
return
}

var endpoint *common.Endpoint
for _, s := range l {
if endpoint, err = common.NewEndpointFromString(s); err != nil {
return
}

var found bool
for _, e := range endpoints {
if endpoint.Equal(e) {
found = true
break
}
}
if found {
continue
}

endpoints = append(endpoints, endpoint)
}

return
}

func parseFlagsNode() {
var err error

Expand Down Expand Up @@ -524,6 +560,33 @@ func parseFlagsNode() {
network.SetHTTPLogging(logging.LvlDebug, httpLogHandler) // httpLog only use `Debug`
}

// checking `--discovery`
l := strings.Fields(common.GetENVValue("SEBAK_DISCOVERY", ""))
for _, i := range l {
flagDiscovery.Set(i)
}

if len(flagDiscovery) < 1 {
log.Warn("--discovery is not given; node will wait to be discovered")
} else {
var endpoints []*common.Endpoint
if endpoints, err = parseFlagDiscovery(flagDiscovery); err != nil {
cmdcommon.PrintFlagsError(nodeCmd, "--discovery", err)
}
for _, endpoint := range endpoints {
if endpoint.Equal(publishEndpoint) {
log.Warn(
"--discovery is same with --publish",
"--discovery", endpoint.String(),
"--publish", publishEndpoint.String(),
)
continue
}

discoveryEndpoints = append(discoveryEndpoints, endpoint)
}
}

if len(flagRateLimitAPI) < 1 {
re := strings.Fields(common.GetENVValue("SEBAK_RATE_LIMIT_API", ""))
for _, r := range re {
Expand Down Expand Up @@ -576,10 +639,8 @@ func parseFlagsNode() {
parsedFlags = append(parsedFlags, "\n\trate-limit-node", rateLimitRuleNode)
parsedFlags = append(parsedFlags, "\n\thttp-cache-adapter", httpCacheAdapter)
parsedFlags = append(parsedFlags, "\n\thttp-cache-pool-size", httpCachePoolSize)

if flagWatcherMode {
parsedFlags = append(parsedFlags, "\n\twatcher-mode", flagWatcherMode)
}
parsedFlags = append(parsedFlags, "\n\tdiscovery", discoveryEndpoints)
parsedFlags = append(parsedFlags, "\n\twatcher-mode", flagWatcherMode)

// create current Node
localNode, err = node.NewLocalNode(kp, bindEndpoint, "")
Expand Down Expand Up @@ -653,12 +714,6 @@ func runNode() error {
return err
}

connectionManager := network.NewValidatorConnectionManager(
localNode,
nt,
policy,
)

st, err := storage.NewStorage(storageConfig)
if err != nil {
log.Crit("failed to initialize storage", "error", err)
Expand Down Expand Up @@ -695,7 +750,10 @@ func runNode() error {
TxPoolNodeLimit: int(txPoolNodeLimit),
JSONRPCEndpoint: jsonrpcbindEndpoint,
WatcherMode: flagWatcherMode,
DiscoveryEndpoints: discoveryEndpoints,
}
connectionManager := network.NewValidatorConnectionManager(localNode, nt, policy, conf)

tp := transaction.NewPool(conf)

c := sync.NewConfig(localNode, st, nt, connectionManager, tp, conf)
Expand Down Expand Up @@ -742,15 +800,86 @@ func runNode() error {
syncer.Stop()
})
}
{
if flagWatcherMode == true {
watcher := c.NewWatcher(syncer)
g.Add(func() error {
return watcher.Start()
}, func(error) {
watcher.Stop()
})

if flagWatcherMode {
// In WatcherMode, get the node information from `--discovery` nodes
localNode.ClearValidators()

for _, endpoint := range conf.DiscoveryEndpoints {
client := nt.GetClient(endpoint)
if client == nil {
err = fmt.Errorf("failed to create network client for discovery")
log.Crit(err.Error(), "endpoint", endpoint)
return err
}
var b []byte
if b, err = client.GetNodeInfo(); err != nil {
log.Crit("failed to get node info from discovery", "endpoint", endpoint, "error", err)
return err
}

var nodeInfo node.NodeInfo
if nodeInfo, err = node.NewNodeInfoFromJSON(b); err != nil {
log.Crit(
"failed to parse node info from discovery",
"endpoint", endpoint,
"error", err,
"received", string(b),
)
return err
}

// Check whether basic policies are matched with remote node, like
// `network-id`. TODO `genesis account`, `common account`, etc.
if nodeInfo.Policy.NetworkID != string(conf.NetworkID) {
log.Crit(
errors.DiscoveryPolicyDoesNotMatch.Error(),
"endpoint", endpoint,
"remote-NetworkID", nodeInfo.Policy.NetworkID,
"local-NetworkID", string(conf.NetworkID),
)
return errors.DiscoveryPolicyDoesNotMatch
}

if nodeInfo.Policy.InitialBalance != conf.InitialBalance {
log.Crit(
errors.DiscoveryPolicyDoesNotMatch.Error(),
"endpoint", endpoint,
"remote-InitialBalance", nodeInfo.Policy.InitialBalance,
"local-InitialBalance", conf.InitialBalance,
)
return errors.DiscoveryPolicyDoesNotMatch
}

var validator *node.Validator
validator, err = node.NewValidator(
nodeInfo.Node.Address,
nodeInfo.Node.Endpoint,
nodeInfo.Node.Alias,
)
if err != nil {
log.Crit(
"failed to create validator from discovery",
"endpoint", endpoint,
"error", err,
"node-info", nodeInfo,
)
return err
}
localNode.AddValidators(validator)
}
if len(localNode.GetValidators()) < 1 {
err = fmt.Errorf("remote nodes not found")
log.Crit(err.Error())
return err
}

watcher := c.NewWatcher(syncer)
g.Add(func() error {
return watcher.Start()
}, func(error) {
watcher.Stop()
})
}
{
cancel := make(chan struct{})
Expand Down
3 changes: 2 additions & 1 deletion docker/node1.env
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ SEBAK_BIND=https://127.0.0.1:2821
# Endpoint sebak jsonrpc binds to
SEBAK_JSONRPC_BIND=https://127.0.0.1:3821
# This is expanded by the `entrypoint.sh` script based on spaces
SEBAK_VALIDATORS=self https://127.0.0.1:2822?address=GAYGELM74WJMKSLDN5YP2VAMP64WC4IXIGICUNK2SCVIT7KPTLY7M3MW&alias=node2 https://127.0.0.1:2823/?address=GDTEPFWEITKFHSUO44NQABY2XHRBBH2UBVGJ2ZJPDREIOL2F6RAEBJE4&alias=node3 https://127.0.0.1:2824/?address=GCDCXYUTLFOZSRQ4K6DTZ3R7ZJMD6CHVL3ZM7KGOG52NOTNHWBKYPCIO&alias=node4
SEBAK_VALIDATORS=self GAYGELM74WJMKSLDN5YP2VAMP64WC4IXIGICUNK2SCVIT7KPTLY7M3MW GDTEPFWEITKFHSUO44NQABY2XHRBBH2UBVGJ2ZJPDREIOL2F6RAEBJE4 GCDCXYUTLFOZSRQ4K6DTZ3R7ZJMD6CHVL3ZM7KGOG52NOTNHWBKYPCIO
# This node's public key
SEBAK_GENESIS_BLOCK=GDIRF4UWPACXPPI4GW7CMTACTCNDIKJEHZK44RITZB4TD3YUM6CCVNGJ
# common account
Expand All @@ -17,3 +17,4 @@ SEBAK_COMMON_ACCOUNT=GCTVCU764UPXKRJW5DS5AWF5ETCCIYPZTWXN5U5CLUNAAJH6D5NGEIIH
# SDKQUVQWNQ3O7YBUBXHGZY6PE3WXIFEIW3Q67HLQRU2I4C7KEX5S62L2
# congress account
SEBAK_CONGRESS_ADDR=GAIJAH4FCEB3AWS2NVKRNDGVJ5QPD7VNOQNGZEPL2FAI6GPZQNKEXRWI
SEBAK_DISCOVERY=https://127.0.0.1:2822 https://127.0.0.1:2823 https://127.0.0.1:2824
3 changes: 2 additions & 1 deletion docker/node2.env
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,11 @@ SEBAK_BIND=https://127.0.0.1:2822
# Endpoint sebak jsonrpc binds to
SEBAK_JSONRPC_BIND=https://127.0.0.1:3822
# This is expanded by the `entrypoint.sh` script based on spaces
SEBAK_VALIDATORS=self https://127.0.0.1:2821?address=GDIRF4UWPACXPPI4GW7CMTACTCNDIKJEHZK44RITZB4TD3YUM6CCVNGJ&alias=node1 https://127.0.0.1:2823?address=GDTEPFWEITKFHSUO44NQABY2XHRBBH2UBVGJ2ZJPDREIOL2F6RAEBJE4&alias=node3 https://127.0.0.1:2824/?address=GCDCXYUTLFOZSRQ4K6DTZ3R7ZJMD6CHVL3ZM7KGOG52NOTNHWBKYPCIO&alias=node4
SEBAK_VALIDATORS=self GDIRF4UWPACXPPI4GW7CMTACTCNDIKJEHZK44RITZB4TD3YUM6CCVNGJ GDTEPFWEITKFHSUO44NQABY2XHRBBH2UBVGJ2ZJPDREIOL2F6RAEBJE4 GCDCXYUTLFOZSRQ4K6DTZ3R7ZJMD6CHVL3ZM7KGOG52NOTNHWBKYPCIO
# node1's public key
SEBAK_GENESIS_BLOCK=GDIRF4UWPACXPPI4GW7CMTACTCNDIKJEHZK44RITZB4TD3YUM6CCVNGJ
# common account
SEBAK_COMMON_ACCOUNT=GCTVCU764UPXKRJW5DS5AWF5ETCCIYPZTWXN5U5CLUNAAJH6D5NGEIIH
# congress account
SEBAK_CONGRESS_ADDR=GAIJAH4FCEB3AWS2NVKRNDGVJ5QPD7VNOQNGZEPL2FAI6GPZQNKEXRWI
SEBAK_DISCOVERY=https://127.0.0.1:2821 https://127.0.0.1:2823 https://127.0.0.1:2824
3 changes: 2 additions & 1 deletion docker/node3.env
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,11 @@ SEBAK_BIND=https://127.0.0.1:2823
# Endpoint sebak jsonrpc binds to
SEBAK_JSONRPC_BIND=https://127.0.0.1:3823
# This is expanded by the `entrypoint.sh` script based on spaces
SEBAK_VALIDATORS=self https://127.0.0.1:2821?address=GDIRF4UWPACXPPI4GW7CMTACTCNDIKJEHZK44RITZB4TD3YUM6CCVNGJ&alias=node1 https://127.0.0.1:2822?address=GAYGELM74WJMKSLDN5YP2VAMP64WC4IXIGICUNK2SCVIT7KPTLY7M3MW&alias=node2 https://127.0.0.1:2824/?address=GCDCXYUTLFOZSRQ4K6DTZ3R7ZJMD6CHVL3ZM7KGOG52NOTNHWBKYPCIO&alias=node4
SEBAK_VALIDATORS=self GDIRF4UWPACXPPI4GW7CMTACTCNDIKJEHZK44RITZB4TD3YUM6CCVNGJ GAYGELM74WJMKSLDN5YP2VAMP64WC4IXIGICUNK2SCVIT7KPTLY7M3MW GCDCXYUTLFOZSRQ4K6DTZ3R7ZJMD6CHVL3ZM7KGOG52NOTNHWBKYPCIO
# node1's public key
SEBAK_GENESIS_BLOCK=GDIRF4UWPACXPPI4GW7CMTACTCNDIKJEHZK44RITZB4TD3YUM6CCVNGJ
# common account
SEBAK_COMMON_ACCOUNT=GCTVCU764UPXKRJW5DS5AWF5ETCCIYPZTWXN5U5CLUNAAJH6D5NGEIIH
# congress account
SEBAK_CONGRESS_ADDR=GAIJAH4FCEB3AWS2NVKRNDGVJ5QPD7VNOQNGZEPL2FAI6GPZQNKEXRWI
SEBAK_DISCOVERY=https://127.0.0.1:2821 https://127.0.0.1:2822 https://127.0.0.1:2824
3 changes: 2 additions & 1 deletion docker/node4.env
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,11 @@ SEBAK_BIND=https://127.0.0.1:2824
# Endpoint sebak jsonrpc binds to
SEBAK_JSONRPC_BIND=https://127.0.0.1:3824
# This is expanded by the `entrypoint.sh` script based on spaces
SEBAK_VALIDATORS=self https://127.0.0.1:2821?address=GDIRF4UWPACXPPI4GW7CMTACTCNDIKJEHZK44RITZB4TD3YUM6CCVNGJ&alias=node1 https://127.0.0.1:2822?address=GAYGELM74WJMKSLDN5YP2VAMP64WC4IXIGICUNK2SCVIT7KPTLY7M3MW&alias=node2 https://127.0.0.1:2823/?address=GDTEPFWEITKFHSUO44NQABY2XHRBBH2UBVGJ2ZJPDREIOL2F6RAEBJE4&alias=node3
SEBAK_VALIDATORS=self GDIRF4UWPACXPPI4GW7CMTACTCNDIKJEHZK44RITZB4TD3YUM6CCVNGJ GAYGELM74WJMKSLDN5YP2VAMP64WC4IXIGICUNK2SCVIT7KPTLY7M3MW GDTEPFWEITKFHSUO44NQABY2XHRBBH2UBVGJ2ZJPDREIOL2F6RAEBJE4
# node1's public key
SEBAK_GENESIS_BLOCK=GDIRF4UWPACXPPI4GW7CMTACTCNDIKJEHZK44RITZB4TD3YUM6CCVNGJ
# common account
SEBAK_COMMON_ACCOUNT=GCTVCU764UPXKRJW5DS5AWF5ETCCIYPZTWXN5U5CLUNAAJH6D5NGEIIH
# congress account
SEBAK_CONGRESS_ADDR=GAIJAH4FCEB3AWS2NVKRNDGVJ5QPD7VNOQNGZEPL2FAI6GPZQNKEXRWI
SEBAK_DISCOVERY=https://127.0.0.1:2821 https://127.0.0.1:2822 https://127.0.0.1:2823
9 changes: 4 additions & 5 deletions lib/common/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,8 @@ import (
"time"
)

//
// Config has timeout features and transaction limit.
// The Config is included in ISAACStateManager and
// these timeout features are used in ISAAC consensus.
//
// Config has timeout features and transaction limit. The Config is included in
// ISAACStateManager and these timeout features are used in ISAAC consensus.
type Config struct {
TimeoutINIT time.Duration
TimeoutSIGN time.Duration
Expand Down Expand Up @@ -40,4 +37,6 @@ type Config struct {
JSONRPCEndpoint *Endpoint

WatcherMode bool

DiscoveryEndpoints []*Endpoint
}
Loading

0 comments on commit 9906075

Please sign in to comment.