diff --git a/append_entries.go b/append_entries.go index fef37cd6166..096ea8d3cab 100644 --- a/append_entries.go +++ b/append_entries.go @@ -30,7 +30,7 @@ type AppendEntriesResponse struct { //------------------------------------------------------------------------------ // Creates a new AppendEntries request. -func NewAppendEntriesRequest(term uint64, leaderName string, prevLogIndex uint64, prevLogTerm uint64, entries []*LogEntry, commitIndex uint64) *AppendEntriesRequest { +func newAppendEntriesRequest(term uint64, leaderName string, prevLogIndex uint64, prevLogTerm uint64, entries []*LogEntry, commitIndex uint64) *AppendEntriesRequest { return &AppendEntriesRequest{ Term: term, LeaderName: leaderName, @@ -42,7 +42,7 @@ func NewAppendEntriesRequest(term uint64, leaderName string, prevLogIndex uint64 } // Creates a new AppendEntries response. -func NewAppendEntriesResponse(term uint64, success bool, commitIndex uint64) *AppendEntriesResponse { +func newAppendEntriesResponse(term uint64, success bool, commitIndex uint64) *AppendEntriesResponse { return &AppendEntriesResponse{ Term: term, Success: success, diff --git a/command.go b/command.go index 66bb18241ed..ceb7172c768 100644 --- a/command.go +++ b/command.go @@ -40,7 +40,7 @@ type Command interface { //-------------------------------------- // Creates a new instance of a command by name. -func NewCommand(name string) (Command, error) { +func newCommand(name string) (Command, error) { // Find the registered command. command := commandTypes[name] if command == nil { diff --git a/log.go b/log.go index 639156307fc..3c387f04f52 100644 --- a/log.go +++ b/log.go @@ -35,7 +35,7 @@ type Log struct { //------------------------------------------------------------------------------ // Creates a new log. -func NewLog() *Log { +func newLog() *Log { return &Log{ entries: make([]*LogEntry, 0), } @@ -47,24 +47,12 @@ func NewLog() *Log { // //------------------------------------------------------------------------------ -func (l *Log) SetStartIndex(i uint64) { - l.startIndex = i -} - -func (l *Log) StartIndex() uint64 { - return l.startIndex -} - -func (l *Log) SetStartTerm(t uint64) { - l.startTerm = t -} - //-------------------------------------- // Log Indices //-------------------------------------- // The current index in the log. -func (l *Log) CurrentIndex() uint64 { +func (l *Log) currentIndex() uint64 { l.mutex.Lock() defer l.mutex.Unlock() @@ -83,24 +71,19 @@ func (l *Log) internalCurrentIndex() uint64 { } // The next index in the log. -func (l *Log) NextIndex() uint64 { - return l.CurrentIndex() + 1 -} - -// The last committed index in the log. -func (l *Log) CommitIndex() uint64 { - return l.commitIndex +func (l *Log) nextIndex() uint64 { + return l.currentIndex() + 1 } // Determines if the log contains zero entries. -func (l *Log) IsEmpty() bool { +func (l *Log) isEmpty() bool { l.mutex.Lock() defer l.mutex.Unlock() return (len(l.entries) == 0) && (l.startIndex == 0) } // The name of the last command in the log. -func (l *Log) LastCommandName() string { +func (l *Log) lastCommandName() string { l.mutex.Lock() defer l.mutex.Unlock() if len(l.entries) > 0 { @@ -116,7 +99,7 @@ func (l *Log) LastCommandName() string { //-------------------------------------- // The current term in the log. -func (l *Log) CurrentTerm() uint64 { +func (l *Log) currentTerm() uint64 { l.mutex.Lock() defer l.mutex.Unlock() @@ -138,7 +121,7 @@ func (l *Log) CurrentTerm() uint64 { // Opens the log file and reads existing entries. The log can remain open and // continue to append entries to the end of the log. -func (l *Log) Open(path string) error { +func (l *Log) open(path string) error { l.mutex.Lock() defer l.mutex.Unlock() @@ -160,8 +143,8 @@ func (l *Log) Open(path string) error { } // Instantiate log entry and decode into it. - entry := NewLogEntry(l, 0, 0, nil) - n, err := entry.Decode(reader) + entry := newLogEntry(l, 0, 0, nil) + n, err := entry.decode(reader) if err != nil { file.Close() if err = os.Truncate(path, int64(lastIndex)); err != nil { @@ -196,7 +179,7 @@ func (l *Log) Open(path string) error { } // Closes the log file. -func (l *Log) Close() { +func (l *Log) close() { l.mutex.Lock() defer l.mutex.Unlock() @@ -213,12 +196,12 @@ func (l *Log) Close() { //-------------------------------------- // Creates a log entry associated with this log. -func (l *Log) CreateEntry(term uint64, command Command) *LogEntry { - return NewLogEntry(l, l.NextIndex(), term, command) +func (l *Log) createEntry(term uint64, command Command) *LogEntry { + return newLogEntry(l, l.nextIndex(), term, command) } // Checks if the log contains a given index/term combination. -func (l *Log) ContainsEntry(index uint64, term uint64) bool { +func (l *Log) containsEntry(index uint64, term uint64) bool { if index <= l.startIndex || index > (l.startIndex+uint64(len(l.entries))) { return false } @@ -228,7 +211,7 @@ func (l *Log) ContainsEntry(index uint64, term uint64) bool { // Retrieves a list of entries after a given index as well as the term of the // index provided. A nil list of entries is returned if the index no longer // exists because a snapshot was made. -func (l *Log) GetEntriesAfter(index uint64) ([]*LogEntry, uint64) { +func (l *Log) getEntriesAfter(index uint64) ([]*LogEntry, uint64) { l.mutex.Lock() defer l.mutex.Unlock() @@ -259,7 +242,7 @@ func (l *Log) GetEntriesAfter(index uint64) ([]*LogEntry, uint64) { // Retrieves the error returned from an entry. The error can only exist after // the entry has been committed. -func (l *Log) GetEntryError(entry *LogEntry) error { +func (l *Log) getEntryError(entry *LogEntry) error { l.mutex.Lock() defer l.mutex.Unlock() @@ -278,7 +261,7 @@ func (l *Log) GetEntryError(entry *LogEntry) error { //-------------------------------------- // Retrieves the last index and term that has been committed to the log. -func (l *Log) CommitInfo() (index uint64, term uint64) { +func (l *Log) commitInfo() (index uint64, term uint64) { l.mutex.Lock() defer l.mutex.Unlock() @@ -298,7 +281,7 @@ func (l *Log) CommitInfo() (index uint64, term uint64) { } // Retrieves the last index and term that has been committed to the log. -func (l *Log) LastInfo() (index uint64, term uint64) { +func (l *Log) lastInfo() (index uint64, term uint64) { l.mutex.Lock() defer l.mutex.Unlock() @@ -313,14 +296,14 @@ func (l *Log) LastInfo() (index uint64, term uint64) { } // Updates the commit index -func (l *Log) UpdateCommitIndex(index uint64) { +func (l *Log) updateCommitIndex(index uint64) { l.mutex.Lock() defer l.mutex.Unlock() l.commitIndex = index } // Updates the commit index and writes entries after that index to the stable storage. -func (l *Log) SetCommitIndex(index uint64) error { +func (l *Log) setCommitIndex(index uint64) error { l.mutex.Lock() defer l.mutex.Unlock() @@ -353,7 +336,7 @@ func (l *Log) SetCommitIndex(index uint64) error { entry := l.entries[entryIndex] // Write to storage. - if err := entry.Encode(l.file); err != nil { + if err := entry.encode(l.file); err != nil { return err } @@ -373,14 +356,14 @@ func (l *Log) SetCommitIndex(index uint64) error { // Truncates the log to the given index and term. This only works if the log // at the index has not been committed. -func (l *Log) Truncate(index uint64, term uint64) error { +func (l *Log) truncate(index uint64, term uint64) error { l.mutex.Lock() defer l.mutex.Unlock() debugln("[Truncate] truncate to ", index) // Do not allow committed entries to be truncated. - if index < l.CommitIndex() { + if index < l.commitIndex { debugln("[Truncate] error 1") - return fmt.Errorf("raft.Log: Index is already committed (%v): (IDX=%v, TERM=%v)", l.CommitIndex(), index, term) + return fmt.Errorf("raft.Log: Index is already committed (%v): (IDX=%v, TERM=%v)", l.commitIndex, index, term) } // Do not truncate past end of entries. @@ -415,8 +398,8 @@ func (l *Log) Truncate(index uint64, term uint64) error { //-------------------------------------- // Appends a series of entries to the log. These entries are not written to -// disk until SetCommitIndex() is called. -func (l *Log) AppendEntries(entries []*LogEntry) error { +// disk until setCommitIndex() is called. +func (l *Log) appendEntries(entries []*LogEntry) error { l.mutex.Lock() defer l.mutex.Unlock() @@ -430,13 +413,6 @@ func (l *Log) AppendEntries(entries []*LogEntry) error { return nil } -// Appends a single entry to the log. -func (l *Log) AppendEntry(entry *LogEntry) error { - l.mutex.Lock() - defer l.mutex.Unlock() - return l.appendEntry(entry) -} - // Writes a single log entry to the end of the log. This function does not // obtain a lock and should only be used internally. Use AppendEntries() and // AppendEntry() to use it externally. @@ -467,7 +443,7 @@ func (l *Log) appendEntry(entry *LogEntry) error { //-------------------------------------- // compaction the log before index -func (l *Log) Compact(index uint64, term uint64) error { +func (l *Log) compact(index uint64, term uint64) error { var entries []*LogEntry l.mutex.Lock() @@ -490,7 +466,7 @@ func (l *Log) Compact(index uint64, term uint64) error { return err } for _, entry := range entries { - err = entry.Encode(file) + err = entry.encode(file) if err != nil { return err } diff --git a/log_entry.go b/log_entry.go index 50f0ed5e60f..d9a15e94b4a 100644 --- a/log_entry.go +++ b/log_entry.go @@ -41,7 +41,7 @@ type logEntryRawMessage struct { //------------------------------------------------------------------------------ // Creates a new log entry associated with a log. -func NewLogEntry(log *Log, index uint64, term uint64, command Command) *LogEntry { +func newLogEntry(log *Log, index uint64, term uint64, command Command) *LogEntry { return &LogEntry{ log: log, Index: index, @@ -62,7 +62,7 @@ func NewLogEntry(log *Log, index uint64, term uint64, command Command) *LogEntry //-------------------------------------- // Encodes the log entry to a buffer. -func (e *LogEntry) Encode(w io.Writer) error { +func (e *LogEntry) encode(w io.Writer) error { if w == nil { return errors.New("raft.LogEntry: Writer required to encode") } @@ -87,7 +87,7 @@ func (e *LogEntry) Encode(w io.Writer) error { } // Decodes the log entry from a buffer. Returns the number of bytes read. -func (e *LogEntry) Decode(r io.Reader) (pos int, err error) { +func (e *LogEntry) decode(r io.Reader) (pos int, err error) { pos = 0 if r == nil { @@ -137,7 +137,7 @@ func (e *LogEntry) Decode(r io.Reader) (pos int, err error) { } // Instantiate command by name. - command, err := NewCommand(commandName) + command, err := newCommand(commandName) if err != nil { err = fmt.Errorf("raft.LogEntry: Unable to instantiate command (%s): %v", commandName, err) return @@ -187,7 +187,7 @@ func (e *LogEntry) UnmarshalJSON(data []byte) error { // Create a command based on the name. var err error - if e.Command, err = NewCommand(obj.Name); err != nil { + if e.Command, err = newCommand(obj.Name); err != nil { return err } json.Unmarshal(obj.Command, e.Command) diff --git a/log_entry_test.go b/log_entry_test.go index b7a6032750f..19765dbd323 100644 --- a/log_entry_test.go +++ b/log_entry_test.go @@ -18,7 +18,7 @@ import ( // Ensure that we can encode a log entry to JSON. func TestLogEntryMarshal(t *testing.T) { - e := NewLogEntry(nil, 1, 2, &joinCommand{Name: "localhost:1000"}) + e := newLogEntry(nil, 1, 2, &joinCommand{Name: "localhost:1000"}) if b, err := json.Marshal(e); !(string(b) == `{"command":{"name":"localhost:1000"},"index":1,"name":"test:join","term":2}` && err == nil) { t.Fatalf("Unexpected log entry marshalling: %v (%v)", string(b), err) } @@ -32,6 +32,6 @@ func TestLogEntryUnmarshal(t *testing.T) { t.Fatalf("Log entry unmarshalling error: %v", err) } if !(e.Index == 1 && e.Term == 2 && reflect.DeepEqual(e.Command, &joinCommand{Name: "localhost:1000"})) { - t.Fatalf("Log entry unmarshaled incorrectly: %v | %v", e, NewLogEntry(nil, 1, 2, &joinCommand{Name: "localhost:1000"})) + t.Fatalf("Log entry unmarshaled incorrectly: %v | %v", e, newLogEntry(nil, 1, 2, &joinCommand{Name: "localhost:1000"})) } } diff --git a/log_test.go b/log_test.go index b26544c5154..dc107ec03e4 100644 --- a/log_test.go +++ b/log_test.go @@ -20,28 +20,28 @@ import ( // Ensure that we can append to a new log. func TestLogNewLog(t *testing.T) { path := getLogPath() - log := NewLog() + log := newLog() log.ApplyFunc = func(c Command) (interface{}, error) { return nil, nil } - if err := log.Open(path); err != nil { + if err := log.open(path); err != nil { t.Fatalf("Unable to open log: %v", err) } - defer log.Close() + defer log.close() defer os.Remove(path) - if err := log.AppendEntry(NewLogEntry(log, 1, 1, &TestCommand1{"foo", 20})); err != nil { + if err := log.appendEntry(newLogEntry(log, 1, 1, &testCommand1{"foo", 20})); err != nil { t.Fatalf("Unable to append: %v", err) } - if err := log.AppendEntry(NewLogEntry(log, 2, 1, &TestCommand2{100})); err != nil { + if err := log.appendEntry(newLogEntry(log, 2, 1, &testCommand2{100})); err != nil { t.Fatalf("Unable to append: %v", err) } - if err := log.AppendEntry(NewLogEntry(log, 3, 2, &TestCommand1{"bar", 0})); err != nil { + if err := log.appendEntry(newLogEntry(log, 3, 2, &testCommand1{"bar", 0})); err != nil { t.Fatalf("Unable to append: %v", err) } // Partial commit. - if err := log.SetCommitIndex(2); err != nil { + if err := log.setCommitIndex(2); err != nil { t.Fatalf("Unable to partially commit: %v", err) } expected := `cf4aab23 0000000000000001 0000000000000001 cmd_1 {"val":"foo","i":20}` + "\n" + @@ -50,12 +50,12 @@ func TestLogNewLog(t *testing.T) { if string(actual) != expected { t.Fatalf("Unexpected buffer:\nexp:\n%s\ngot:\n%s", expected, string(actual)) } - if index, term := log.CommitInfo(); index != 2 || term != 1 { + if index, term := log.commitInfo(); index != 2 || term != 1 { t.Fatalf("Invalid commit info [IDX=%v, TERM=%v]", index, term) } // Full commit. - if err := log.SetCommitIndex(3); err != nil { + if err := log.setCommitIndex(3); err != nil { t.Fatalf("Unable to commit: %v", err) } expected = `cf4aab23 0000000000000001 0000000000000001 cmd_1 {"val":"foo","i":20}` + "\n" + @@ -65,7 +65,7 @@ func TestLogNewLog(t *testing.T) { if string(actual) != expected { t.Fatalf("Unexpected buffer:\nexp:\n%s\ngot:\n%s", expected, string(actual)) } - if index, term := log.CommitInfo(); index != 3 || term != 2 { + if index, term := log.commitInfo(); index != 3 || term != 2 { t.Fatalf("Invalid commit info [IDX=%v, TERM=%v]", index, term) } } @@ -75,20 +75,20 @@ func TestLogExistingLog(t *testing.T) { log, path := setupLog(`cf4aab23 0000000000000001 0000000000000001 cmd_1 {"val":"foo","i":20}` + "\n" + `4c08d91f 0000000000000002 0000000000000001 cmd_2 {"x":100}` + "\n" + `6ac5807c 0000000000000003 0000000000000002 cmd_1 {"val":"bar","i":0}` + "\n") - defer log.Close() + defer log.close() defer os.Remove(path) // Validate existing log entries. if len(log.entries) != 3 { t.Fatalf("Expected 3 entries, got %d", len(log.entries)) } - if log.entries[0].Index != 1 || log.entries[0].Term != 1 || !reflect.DeepEqual(log.entries[0].Command, &TestCommand1{"foo", 20}) { + if log.entries[0].Index != 1 || log.entries[0].Term != 1 || !reflect.DeepEqual(log.entries[0].Command, &testCommand1{"foo", 20}) { t.Fatalf("Unexpected entry[0]: %v", log.entries[0]) } - if log.entries[1].Index != 2 || log.entries[1].Term != 1 || !reflect.DeepEqual(log.entries[1].Command, &TestCommand2{100}) { + if log.entries[1].Index != 2 || log.entries[1].Term != 1 || !reflect.DeepEqual(log.entries[1].Command, &testCommand2{100}) { t.Fatalf("Unexpected entry[1]: %v", log.entries[1]) } - if log.entries[2].Index != 3 || log.entries[2].Term != 2 || !reflect.DeepEqual(log.entries[2].Command, &TestCommand1{"bar", 0}) { + if log.entries[2].Index != 3 || log.entries[2].Term != 2 || !reflect.DeepEqual(log.entries[2].Command, &testCommand1{"bar", 0}) { t.Fatalf("Unexpected entry[2]: %v", log.entries[2]) } } @@ -98,22 +98,22 @@ func TestLogContainsEntries(t *testing.T) { log, path := setupLog(`cf4aab23 0000000000000001 0000000000000001 cmd_1 {"val":"foo","i":20}` + "\n" + `4c08d91f 0000000000000002 0000000000000001 cmd_2 {"x":100}` + "\n" + `6ac5807c 0000000000000003 0000000000000002 cmd_1 {"val":"bar","i":0}` + "\n") - defer log.Close() + defer log.close() defer os.Remove(path) - if log.ContainsEntry(0, 0) { + if log.containsEntry(0, 0) { t.Fatalf("Zero-index entry should not exist in log.") } - if log.ContainsEntry(1, 0) { + if log.containsEntry(1, 0) { t.Fatalf("Entry with mismatched term should not exist") } - if log.ContainsEntry(4, 0) { + if log.containsEntry(4, 0) { t.Fatalf("Out-of-range entry should not exist") } - if !log.ContainsEntry(2, 1) { + if !log.containsEntry(2, 1) { t.Fatalf("Entry 2/1 should exist") } - if !log.ContainsEntry(3, 2) { + if !log.containsEntry(3, 2) { t.Fatalf("Entry 2/1 should exist") } } @@ -123,17 +123,17 @@ func TestLogRecovery(t *testing.T) { path := setupLogFile(`cf4aab23 0000000000000001 0000000000000001 cmd_1 {"val":"foo","i":20}` + "\n" + `4c08d91f 0000000000000002 0000000000000001 cmd_2 {"x":100}` + "\n" + `6ac5807c 0000000000000003 00000000000`) - log := NewLog() + log := newLog() log.ApplyFunc = func(c Command) (interface{}, error) { return nil, nil } - if err := log.Open(path); err != nil { + if err := log.open(path); err != nil { t.Fatalf("Unable to open log: %v", err) } - defer log.Close() + defer log.close() defer os.Remove(path) - if err := log.AppendEntry(NewLogEntry(log, 3, 2, &TestCommand1{"bat", -5})); err != nil { + if err := log.appendEntry(newLogEntry(log, 3, 2, &testCommand1{"bat", -5})); err != nil { t.Fatalf("Unable to append: %v", err) } @@ -141,13 +141,13 @@ func TestLogRecovery(t *testing.T) { if len(log.entries) != 3 { t.Fatalf("Expected 2 entries, got %d", len(log.entries)) } - if log.entries[0].Index != 1 || log.entries[0].Term != 1 || !reflect.DeepEqual(log.entries[0].Command, &TestCommand1{"foo", 20}) { + if log.entries[0].Index != 1 || log.entries[0].Term != 1 || !reflect.DeepEqual(log.entries[0].Command, &testCommand1{"foo", 20}) { t.Fatalf("Unexpected entry[0]: %v", log.entries[0]) } - if log.entries[1].Index != 2 || log.entries[1].Term != 1 || !reflect.DeepEqual(log.entries[1].Command, &TestCommand2{100}) { + if log.entries[1].Index != 2 || log.entries[1].Term != 1 || !reflect.DeepEqual(log.entries[1].Command, &testCommand2{100}) { t.Fatalf("Unexpected entry[1]: %v", log.entries[1]) } - if log.entries[2].Index != 3 || log.entries[2].Term != 2 || !reflect.DeepEqual(log.entries[2].Command, &TestCommand1{"bat", -5}) { + if log.entries[2].Index != 3 || log.entries[2].Term != 2 || !reflect.DeepEqual(log.entries[2].Command, &testCommand1{"bat", -5}) { t.Fatalf("Unexpected entry[2]: %v", log.entries[2]) } @@ -160,7 +160,7 @@ func TestLogRecovery(t *testing.T) { } // Validate committed log contents. - if err := log.SetCommitIndex(3); err != nil { + if err := log.setCommitIndex(3); err != nil { t.Fatalf("Unable to partially commit: %v", err) } expected = `cf4aab23 0000000000000001 0000000000000001 cmd_1 {"val":"foo","i":20}` + "\n" + @@ -179,46 +179,46 @@ func TestLogRecovery(t *testing.T) { // Ensure that we can truncate uncommitted entries in the log. func TestLogTruncate(t *testing.T) { log, path := setupLog("") - if err := log.Open(path); err != nil { + if err := log.open(path); err != nil { t.Fatalf("Unable to open log: %v", err) } - defer log.Close() + defer log.close() defer os.Remove(path) - entry1 := NewLogEntry(log, 1, 1, &TestCommand1{"foo", 20}) - if err := log.AppendEntry(entry1); err != nil { + entry1 := newLogEntry(log, 1, 1, &testCommand1{"foo", 20}) + if err := log.appendEntry(entry1); err != nil { t.Fatalf("Unable to append: %v", err) } - entry2 := NewLogEntry(log, 2, 1, &TestCommand2{100}) - if err := log.AppendEntry(entry2); err != nil { + entry2 := newLogEntry(log, 2, 1, &testCommand2{100}) + if err := log.appendEntry(entry2); err != nil { t.Fatalf("Unable to append: %v", err) } - entry3 := NewLogEntry(log, 3, 2, &TestCommand1{"bar", 0}) - if err := log.AppendEntry(entry3); err != nil { + entry3 := newLogEntry(log, 3, 2, &testCommand1{"bar", 0}) + if err := log.appendEntry(entry3); err != nil { t.Fatalf("Unable to append: %v", err) } - if err := log.SetCommitIndex(2); err != nil { + if err := log.setCommitIndex(2); err != nil { t.Fatalf("Unable to partially commit: %v", err) } // Truncate committed entry. - if err := log.Truncate(1, 1); err == nil || err.Error() != "raft.Log: Index is already committed (2): (IDX=1, TERM=1)" { + if err := log.truncate(1, 1); err == nil || err.Error() != "raft.Log: Index is already committed (2): (IDX=1, TERM=1)" { t.Fatalf("Truncating committed entries shouldn't work: %v", err) } // Truncate past end of log. - if err := log.Truncate(4, 2); err == nil || err.Error() != "raft.Log: Entry index does not exist (MAX=3): (IDX=4, TERM=2)" { + if err := log.truncate(4, 2); err == nil || err.Error() != "raft.Log: Entry index does not exist (MAX=3): (IDX=4, TERM=2)" { t.Fatalf("Truncating past end-of-log shouldn't work: %v", err) } // Truncate entry with mismatched term. - if err := log.Truncate(2, 2); err == nil || err.Error() != "raft.Log: Entry at index does not have matching term (1): (IDX=2, TERM=2)" { + if err := log.truncate(2, 2); err == nil || err.Error() != "raft.Log: Entry at index does not have matching term (1): (IDX=2, TERM=2)" { t.Fatalf("Truncating mismatched entries shouldn't work: %v", err) } // Truncate end of log. - if err := log.Truncate(3, 2); !(err == nil && reflect.DeepEqual(log.entries, []*LogEntry{entry1, entry2, entry3})) { + if err := log.truncate(3, 2); !(err == nil && reflect.DeepEqual(log.entries, []*LogEntry{entry1, entry2, entry3})) { t.Fatalf("Truncating end of log should work: %v\n\nEntries:\nActual: %v\nExpected: %v", err, log.entries, []*LogEntry{entry1, entry2, entry3}) } // Truncate at last commit. - if err := log.Truncate(2, 1); !(err == nil && reflect.DeepEqual(log.entries, []*LogEntry{entry1, entry2})) { + if err := log.truncate(2, 1); !(err == nil && reflect.DeepEqual(log.entries, []*LogEntry{entry1, entry2})) { t.Fatalf("Truncating at last commit should work: %v\n\nEntries:\nActual: %v\nExpected: %v", err, log.entries, []*LogEntry{entry1, entry2}) } diff --git a/peer.go b/peer.go index ba2833013b4..cbb597dfc36 100644 --- a/peer.go +++ b/peer.go @@ -19,10 +19,10 @@ type Peer struct { name string prevLogIndex uint64 mutex sync.Mutex - heartbeatTimer *Timer + heartbeatTimer *timer } -type FlushResponse struct { +type flushResponse struct { term uint64 success bool err error @@ -36,11 +36,11 @@ type FlushResponse struct { //------------------------------------------------------------------------------ // Creates a new peer. -func NewPeer(server *Server, name string, heartbeatTimeout time.Duration) *Peer { +func newPeer(server *Server, name string, heartbeatTimeout time.Duration) *Peer { p := &Peer{ server: server, name: name, - heartbeatTimer: NewTimer(heartbeatTimeout, heartbeatTimeout), + heartbeatTimer: newTimer(heartbeatTimeout, heartbeatTimeout), } return p @@ -57,17 +57,12 @@ func (p *Peer) Name() string { return p.name } -// Retrieves the heartbeat timeout. -func (p *Peer) HeartbeatTimeout() time.Duration { - return p.heartbeatTimer.MinDuration() -} - // Sets the heartbeat timeout. -func (p *Peer) SetHeartbeatTimeout(duration time.Duration) { - p.heartbeatTimer.SetDuration(duration) +func (p *Peer) setHeartbeatTimeout(duration time.Duration) { + p.heartbeatTimer.setDuration(duration) } -func (p *Peer) StartHeartbeat() { +func (p *Peer) startHeartbeat() { go p.heartbeat() } @@ -85,7 +80,7 @@ func (p *Peer) StartHeartbeat() { func (p *Peer) stop() { p.mutex.Lock() defer p.mutex.Unlock() - p.heartbeatTimer.Stop() + p.heartbeatTimer.stop() } //-------------------------------------- @@ -240,9 +235,9 @@ func (p *Peer) heartbeat() { // (1) timeout/fire happens, flush the peer // (2) stopped, return - if p.heartbeatTimer.Start() { + if p.heartbeatTimer.start() { - var f FlushResponse + var f flushResponse f.peer = p @@ -251,7 +246,7 @@ func (p *Peer) heartbeat() { // if the peer successfully appended the log entry // we will tell the commit center if f.success { - if p.prevLogIndex > p.server.log.CommitIndex() { + if p.prevLogIndex > p.server.log.commitIndex { debugln("[Heartbeat] Peer", p.Name(), "send to commit center") p.server.response <- f debugln("[Heartbeat] Peer", p.Name(), "back from commit center") diff --git a/request_vote.go b/request_vote.go index 2fd1f791e99..e198ffb4ae5 100644 --- a/request_vote.go +++ b/request_vote.go @@ -29,7 +29,7 @@ type RequestVoteResponse struct { //------------------------------------------------------------------------------ // Creates a new RequestVote request. -func NewRequestVoteRequest(term uint64, candidateName string, lastLogIndex uint64, lastLogTerm uint64) *RequestVoteRequest { +func newRequestVoteRequest(term uint64, candidateName string, lastLogIndex uint64, lastLogTerm uint64) *RequestVoteRequest { return &RequestVoteRequest{ Term: term, CandidateName: candidateName, @@ -39,7 +39,7 @@ func NewRequestVoteRequest(term uint64, candidateName string, lastLogIndex uint6 } // Creates a new RequestVote response. -func NewRequestVoteResponse(term uint64, voteGranted bool) *RequestVoteResponse { +func newRequestVoteResponse(term uint64, voteGranted bool) *RequestVoteResponse { return &RequestVoteResponse{ Term: term, VoteGranted: voteGranted, diff --git a/server.go b/server.go index f5f6b7200d6..17edbc496e7 100644 --- a/server.go +++ b/server.go @@ -63,9 +63,9 @@ type Server struct { mutex sync.Mutex stateMutex sync.Mutex - electionTimer *Timer + electionTimer *timer heartbeatTimeout time.Duration - response chan FlushResponse + response chan flushResponse stepDown chan uint64 stop chan bool @@ -97,10 +97,10 @@ func NewServer(name string, path string, transporter Transporter, stateMachine S context: context, state: Stopped, peers: make(map[string]*Peer), - log: NewLog(), + log: newLog(), stepDown: make(chan uint64, 1), stop: make(chan bool), - electionTimer: NewTimer(DefaultElectionTimeout, DefaultElectionTimeout*2), + electionTimer: newTimer(DefaultElectionTimeout, DefaultElectionTimeout*2), heartbeatTimeout: DefaultHeartbeatTimeout, } @@ -182,9 +182,7 @@ func (s *Server) Term() uint64 { // Retrieves the current committed index of the server. func (s *Server) CommittedIndex() uint64 { - - return s.log.CommitIndex() - + return s.log.commitIndex } // Retrieves the name of the candidate this server voted for in this term. @@ -196,7 +194,7 @@ func (s *Server) VotedFor() string { func (s *Server) IsLogEmpty() bool { s.mutex.Lock() defer s.mutex.Unlock() - return s.log.IsEmpty() + return s.log.isEmpty() } // A list of all the log entries. This should only be used for debugging purposes. @@ -210,11 +208,11 @@ func (s *Server) LogEntries() []*LogEntry { } // A reference to the command name of the last entry. -func (s *Server) LastCommandName() string { +func (s *Server) lastCommandName() string { s.mutex.Lock() defer s.mutex.Unlock() if s.log != nil { - return s.log.LastCommandName() + return s.log.lastCommandName() } return "" } @@ -246,19 +244,19 @@ func (s *Server) QuorumSize() int { // Retrieves the election timeout. func (s *Server) ElectionTimeout() time.Duration { - return s.electionTimer.MinDuration() + return s.electionTimer.minDuration } // Sets the election timeout. func (s *Server) SetElectionTimeout(duration time.Duration) { - s.electionTimer.SetMinDuration(duration) - s.electionTimer.SetMaxDuration(duration * 2) + s.electionTimer.minDuration = duration + s.electionTimer.maxDuration = duration * 2 } // Start heartbeat when the server promote to leader -func (s *Server) StartHeartbeatTimeout() { +func (s *Server) startHeartbeatTimeout() { for _, peer := range s.peers { - peer.StartHeartbeat() + peer.startHeartbeat() } } @@ -272,13 +270,13 @@ func (s *Server) HeartbeatTimeout() time.Duration { } // Sets the heartbeat timeout. -func (s *Server) SetHeartbeatTimeout(duration time.Duration) { +func (s *Server) setHeartbeatTimeout(duration time.Duration) { s.mutex.Lock() defer s.mutex.Unlock() s.heartbeatTimeout = duration for _, peer := range s.peers { - peer.SetHeartbeatTimeout(duration) + peer.setHeartbeatTimeout(duration) } } @@ -303,20 +301,20 @@ func (s *Server) Initialize() error { } // Initialize response channel - s.response = make(chan FlushResponse, 128) + s.response = make(chan flushResponse, 128) // Create snapshot directory if not exist os.Mkdir(s.path+"/snapshot", 0700) // Initialize the log and load it up. - if err := s.log.Open(s.LogPath()); err != nil { + if err := s.log.open(s.LogPath()); err != nil { debugln("log error") s.unload() return fmt.Errorf("raft.Server: %v", err) } // Update the term to the last term in the log. - s.currentTerm = s.log.CurrentTerm() + s.currentTerm = s.log.currentTerm() return nil } @@ -400,7 +398,7 @@ func (s *Server) StartLeader() { func (s *Server) Stop() { s.mutex.Lock() if s.state == Follower { - s.electionTimer.Stop() + s.electionTimer.stop() } else { s.mutex.Unlock() s.stop <- true @@ -420,7 +418,7 @@ func (s *Server) unload() { if s.log != nil { // still some concurrency issue with stop // need lock - s.log.Close() + s.log.close() } } @@ -445,11 +443,11 @@ func (s *Server) startFollowerLoop() (stop bool) { return true } - if s.electionTimer.Start() { + if s.electionTimer.start() { return false } else { - s.electionTimer.Ready() + s.electionTimer.ready() continue } } @@ -476,7 +474,7 @@ func (s *Server) startCandidateLoop() (stop bool, leader bool) { s.leader = "" s.votedFor = s.Name() - lastLogIndex, lastLogTerm := s.log.LastInfo() + lastLogIndex, lastLogTerm := s.log.lastInfo() for { @@ -485,7 +483,7 @@ func (s *Server) startCandidateLoop() (stop bool, leader bool) { // Request votes from each of our peers. c := make(chan *RequestVoteResponse, len(s.peers)) - req := NewRequestVoteRequest(s.currentTerm, s.name, lastLogIndex, lastLogTerm) + req := newRequestVoteRequest(s.currentTerm, s.name, lastLogIndex, lastLogTerm) for _, peer := range s.peers { go peer.sendVoteRequest(req, c) @@ -558,7 +556,7 @@ func (s *Server) startLeaderLoop() bool { s.stateMutex.Unlock() - logIndex, _ := s.log.LastInfo() + logIndex, _ := s.log.lastInfo() // after here we let the leader stepdown in the startLeaderSelect loop @@ -569,8 +567,8 @@ func (s *Server) startLeaderLoop() bool { debugln("[Leader] Set ", peer.Name(), "Prev to", logIndex) peer.prevLogIndex = logIndex - peer.heartbeatTimer.Ready() - peer.StartHeartbeat() + peer.heartbeatTimer.ready() + peer.startHeartbeat() } // Begin to collect response from followers @@ -656,7 +654,7 @@ func (s *Server) startLeaderSelect() bool { count := 1 for { - var response FlushResponse + var response flushResponse select { case response = <-s.response: @@ -684,21 +682,21 @@ func (s *Server) startLeaderSelect() bool { if count >= s.QuorumSize() { // Determine the committed index that a majority has. var indices []uint64 - indices = append(indices, s.log.CurrentIndex()) + indices = append(indices, s.log.currentIndex()) for _, peer := range s.peers { indices = append(indices, peer.prevLogIndex) } - sort.Sort(Uint64Slice(indices)) + sort.Sort(uint64Slice(indices)) // We can commit upto the index which the mojarity // of the members have appended. commitIndex := indices[s.QuorumSize()-1] - committedIndex := s.log.CommitIndex() + committedIndex := s.log.commitIndex if commitIndex > committedIndex { debugln(indices) debugln(s.GetState(), "[CommitCenter] Going to Commit ", commitIndex) - s.log.SetCommitIndex(commitIndex) + s.log.setCommitIndex(commitIndex) debugln("[CommitCenter] Commit ", commitIndex) for i := committedIndex; i < commitIndex; i++ { @@ -750,13 +748,13 @@ func (s *Server) Do(command Command) (interface{}, error) { s.stateMutex.Unlock() - entry := s.log.CreateEntry(term, command) + entry := s.log.createEntry(term, command) - if err := s.log.AppendEntry(entry); err != nil { + if err := s.log.appendEntry(entry); err != nil { return nil, err } - s.response <- FlushResponse{term, true, nil, nil} + s.response <- flushResponse{term, true, nil, nil} // to speed up the response time // TODO: think about this carefully @@ -764,7 +762,7 @@ func (s *Server) Do(command Command) (interface{}, error) { // but will reduce through output // for _, peer := range s.peers { - // peer.heartbeatTimer.Fire() + // peer.heartbeatTimer.fire() // } debugln("[Do] join!") @@ -788,13 +786,13 @@ func (s *Server) AppendEntries(req *AppendEntriesRequest) (*AppendEntriesRespons defer s.mutex.Unlock() // If the server is stopped then reject it. if !s.Running() { - return NewAppendEntriesResponse(s.currentTerm, false, 0), fmt.Errorf("raft.Server: Server stopped") + return newAppendEntriesResponse(s.currentTerm, false, 0), fmt.Errorf("raft.Server: Server stopped") } // If the request is coming from an old term then reject it. if req.Term < s.currentTerm { - return NewAppendEntriesResponse(s.currentTerm, false, s.log.CommitIndex()), fmt.Errorf("raft.Server: Stale request term") + return newAppendEntriesResponse(s.currentTerm, false, s.log.commitIndex), fmt.Errorf("raft.Server: Stale request term") } debugln("Peer ", s.Name(), "received heartbeat from ", req.LeaderName, @@ -807,33 +805,33 @@ func (s *Server) AppendEntries(req *AppendEntriesRequest) (*AppendEntriesRespons // Reset election timeout. if s.electionTimer != nil { - s.electionTimer.Stop() + s.electionTimer.stop() } // Reject if log doesn't contain a matching previous entry. - if err := s.log.Truncate(req.PrevLogIndex, req.PrevLogTerm); err != nil { - return NewAppendEntriesResponse(s.currentTerm, false, s.log.CommitIndex()), err + if err := s.log.truncate(req.PrevLogIndex, req.PrevLogTerm); err != nil { + return newAppendEntriesResponse(s.currentTerm, false, s.log.commitIndex), err } debugln("Peer ", s.Name(), "after truncate ") // Append entries to the log. - if err := s.log.AppendEntries(req.Entries); err != nil { - return NewAppendEntriesResponse(s.currentTerm, false, s.log.CommitIndex()), err + if err := s.log.appendEntries(req.Entries); err != nil { + return newAppendEntriesResponse(s.currentTerm, false, s.log.commitIndex), err } debugln("Peer ", s.Name(), "commit index ", req.CommitIndex, " from ", req.LeaderName) // Commit up to the commit index. - if err := s.log.SetCommitIndex(req.CommitIndex); err != nil { - return NewAppendEntriesResponse(s.currentTerm, false, s.log.CommitIndex()), err + if err := s.log.setCommitIndex(req.CommitIndex); err != nil { + return newAppendEntriesResponse(s.currentTerm, false, s.log.commitIndex), err } debugln("Peer ", s.Name(), "after commit ") debugln("Peer ", s.Name(), "reply heartbeat from ", req.LeaderName, " ", req.Term, " ", s.currentTerm, " ", time.Now()) - return NewAppendEntriesResponse(s.currentTerm, true, s.log.CommitIndex()), nil + return newAppendEntriesResponse(s.currentTerm, true, s.log.commitIndex), nil } // Creates an AppendEntries request. Can return a nil request object if the @@ -842,9 +840,9 @@ func (s *Server) createAppendEntriesRequest(prevLogIndex uint64) *AppendEntriesR if s.log == nil { return nil } - entries, prevLogTerm := s.log.GetEntriesAfter(prevLogIndex) + entries, prevLogTerm := s.log.getEntriesAfter(prevLogIndex) if entries != nil { - return NewAppendEntriesRequest(s.currentTerm, s.name, prevLogIndex, prevLogTerm, entries, s.log.CommitIndex()) + return newAppendEntriesRequest(s.currentTerm, s.name, prevLogIndex, prevLogTerm, entries, s.log.commitIndex) } else { return nil } @@ -865,12 +863,12 @@ func (s *Server) RequestVote(req *RequestVoteRequest) (*RequestVoteResponse, err //debugln("[RequestVote] got the lock") // Fail if the server is not running. if !s.Running() { - return NewRequestVoteResponse(s.currentTerm, false), fmt.Errorf("raft.Server: Server is stopped") + return newRequestVoteResponse(s.currentTerm, false), fmt.Errorf("raft.Server: Server is stopped") } // If the request is coming from an old term then reject it. if req.Term < s.currentTerm { - return NewRequestVoteResponse(s.currentTerm, false), fmt.Errorf("raft.Server: Stale term: %v < %v", req.Term, s.currentTerm) + return newRequestVoteResponse(s.currentTerm, false), fmt.Errorf("raft.Server: Stale term: %v < %v", req.Term, s.currentTerm) } s.setCurrentTerm(req.Term) @@ -878,15 +876,15 @@ func (s *Server) RequestVote(req *RequestVoteRequest) (*RequestVoteResponse, err // If we've already voted for a different candidate then don't vote for this candidate. if s.votedFor != "" && s.votedFor != req.CandidateName { debugln("already vote for ", s.votedFor, " false to ", req.CandidateName) - return NewRequestVoteResponse(s.currentTerm, false), fmt.Errorf("raft.Server: Already voted for %v", s.votedFor) + return newRequestVoteResponse(s.currentTerm, false), fmt.Errorf("raft.Server: Already voted for %v", s.votedFor) } // If the candidate's log is not at least as up-to-date as // our last log then don't vote. - lastIndex, lastTerm := s.log.LastInfo() + lastIndex, lastTerm := s.log.lastInfo() if lastIndex > req.LastLogIndex || lastTerm > req.LastLogTerm { debugln("my log is more up-to-date") - return NewRequestVoteResponse(s.currentTerm, false), fmt.Errorf("raft.Server: Out-of-date log: [%v/%v] > [%v/%v]", lastIndex, lastTerm, req.LastLogIndex, req.LastLogTerm) + return newRequestVoteResponse(s.currentTerm, false), fmt.Errorf("raft.Server: Out-of-date log: [%v/%v] > [%v/%v]", lastIndex, lastTerm, req.LastLogIndex, req.LastLogTerm) } // If we made it this far then cast a vote and reset our election time out. @@ -895,10 +893,10 @@ func (s *Server) RequestVote(req *RequestVoteRequest) (*RequestVoteResponse, err debugln(s.Name(), "Vote for ", req.CandidateName, "at term", req.Term) if s.electionTimer != nil { - s.electionTimer.Stop() + s.electionTimer.stop() } - return NewRequestVoteResponse(s.currentTerm, true), nil + return newRequestVoteResponse(s.currentTerm, true), nil } // Updates the current term on the server if the term is greater than the @@ -948,9 +946,9 @@ func (s *Server) AddPeer(name string) error { // Only add the peer if it doesn't have the same name. if s.name != name { //debugln("Add peer ", name) - peer := NewPeer(s, name, s.heartbeatTimeout) + peer := newPeer(s, name, s.heartbeatTimeout) if s.state == Leader { - peer.StartHeartbeat() + peer.startHeartbeat() } s.peers[peer.name] = peer @@ -994,7 +992,7 @@ func (s *Server) RemovePeer(name string) error { func (s *Server) createSnapshotRequest() *SnapshotRequest { s.mutex.Lock() defer s.mutex.Unlock() - return NewSnapshotRequest(s.name, s.lastSnapshot) + return newSnapshotRequest(s.name, s.lastSnapshot) } // The background snapshot function @@ -1014,7 +1012,7 @@ func (s *Server) takeSnapshot() error { return errors.New("handling snapshot") } - lastIndex, lastTerm := s.log.CommitInfo() + lastIndex, lastTerm := s.log.commitInfo() if lastIndex == 0 || lastTerm == 0 { return errors.New("No logs") @@ -1047,7 +1045,7 @@ func (s *Server) takeSnapshot() error { s.saveSnapshot() - s.log.Compact(lastIndex, lastTerm) + s.log.compact(lastIndex, lastTerm) return nil } @@ -1059,7 +1057,7 @@ func (s *Server) saveSnapshot() error { return errors.New("no snapshot to save") } - err := s.currentSnapshot.Save() + err := s.currentSnapshot.save() if err != nil { return err @@ -1070,7 +1068,7 @@ func (s *Server) saveSnapshot() error { // delete the previous snapshot if there is any change if tmp != nil && !(tmp.LastIndex == s.lastSnapshot.LastIndex && tmp.LastTerm == s.lastSnapshot.LastTerm) { - tmp.Remove() + tmp.remove() } s.currentSnapshot = nil return nil @@ -1096,7 +1094,7 @@ func (s *Server) SnapshotRecovery(req *SnapshotRequest) (*SnapshotResponse, erro //update term and index s.currentTerm = req.LastTerm - s.log.UpdateCommitIndex(req.LastIndex) + s.log.updateCommitIndex(req.LastIndex) snapshotPath := s.SnapshotPath(req.LastIndex, req.LastTerm) @@ -1104,9 +1102,9 @@ func (s *Server) SnapshotRecovery(req *SnapshotRequest) (*SnapshotResponse, erro s.saveSnapshot() - s.log.Compact(req.LastIndex, req.LastTerm) + s.log.compact(req.LastIndex, req.LastTerm) - return NewSnapshotResponse(req.LastTerm, true, req.LastIndex), nil + return newSnapshotResponse(req.LastTerm, true, req.LastIndex), nil } @@ -1185,9 +1183,9 @@ func (s *Server) LoadSnapshot() error { s.AddPeer(peerName) } - s.log.SetStartTerm(s.lastSnapshot.LastTerm) - s.log.SetStartIndex(s.lastSnapshot.LastIndex) - s.log.UpdateCommitIndex(s.lastSnapshot.LastIndex) + s.log.startTerm = s.lastSnapshot.LastTerm + s.log.startIndex = s.lastSnapshot.LastIndex + s.log.updateCommitIndex(s.lastSnapshot.LastIndex) return err } diff --git a/server_test.go b/server_test.go index bfd6d4d0768..f621fe34a29 100644 --- a/server_test.go +++ b/server_test.go @@ -25,7 +25,7 @@ func TestServerRequestVote(t *testing.T) { server.Initialize() server.StartLeader() defer server.Stop() - resp, err := server.RequestVote(NewRequestVoteRequest(1, "foo", 0, 0)) + resp, err := server.RequestVote(newRequestVoteRequest(1, "foo", 0, 0)) if !(resp.Term == 1 && resp.VoteGranted && err == nil) { t.Fatalf("Invalid request vote response: %v/%v (%v)", resp.Term, resp.VoteGranted, err) } @@ -38,7 +38,7 @@ func TestServerRequestVoteDeniedForStaleTerm(t *testing.T) { server.StartLeader() server.currentTerm = 2 defer server.Stop() - resp, err := server.RequestVote(NewRequestVoteRequest(1, "foo", 0, 0)) + resp, err := server.RequestVote(newRequestVoteRequest(1, "foo", 0, 0)) if !(resp.Term == 2 && !resp.VoteGranted && err != nil && err.Error() == "raft.Server: Stale term: 1 < 2") { t.Fatalf("Invalid request vote response: %v/%v (%v)", resp.Term, resp.VoteGranted, err) } @@ -54,11 +54,11 @@ func TestServerRequestVoteDeniedIfAlreadyVoted(t *testing.T) { server.StartLeader() server.currentTerm = 2 defer server.Stop() - resp, err := server.RequestVote(NewRequestVoteRequest(2, "foo", 0, 0)) + resp, err := server.RequestVote(newRequestVoteRequest(2, "foo", 0, 0)) if !(resp.Term == 2 && resp.VoteGranted && err == nil) { t.Fatalf("First vote should not have been denied (%v)", err) } - resp, err = server.RequestVote(NewRequestVoteRequest(2, "bar", 0, 0)) + resp, err = server.RequestVote(newRequestVoteRequest(2, "bar", 0, 0)) if !(resp.Term == 2 && !resp.VoteGranted && err != nil && err.Error() == "raft.Server: Already voted for foo") { t.Fatalf("Second vote should have been denied (%v)", err) } @@ -71,11 +71,11 @@ func TestServerRequestVoteApprovedIfAlreadyVotedInOlderTerm(t *testing.T) { server.StartLeader() server.currentTerm = 2 defer server.Stop() - resp, err := server.RequestVote(NewRequestVoteRequest(2, "foo", 0, 0)) + resp, err := server.RequestVote(newRequestVoteRequest(2, "foo", 0, 0)) if !(resp.Term == 2 && resp.VoteGranted && server.VotedFor() == "foo" && err == nil) { t.Fatalf("First vote should not have been denied (%v)", err) } - resp, err = server.RequestVote(NewRequestVoteRequest(3, "bar", 0, 0)) + resp, err = server.RequestVote(newRequestVoteRequest(3, "bar", 0, 0)) // now stepdown is done by channel, need time time.Sleep(5 * time.Millisecond) @@ -97,19 +97,19 @@ func TestServerRequestVoteDenyIfCandidateLogIsBehind(t *testing.T) { defer server.Stop() - resp, err := server.RequestVote(NewRequestVoteRequest(2, "foo", 2, 2)) + resp, err := server.RequestVote(newRequestVoteRequest(2, "foo", 2, 2)) if !(resp.Term == 2 && !resp.VoteGranted && err != nil && err.Error() == "raft.Server: Out-of-date log: [3/2] > [2/2]") { t.Fatalf("Stale index vote should have been denied [%v/%v] (%v)", resp.Term, resp.VoteGranted, err) } - resp, err = server.RequestVote(NewRequestVoteRequest(2, "foo", 3, 1)) + resp, err = server.RequestVote(newRequestVoteRequest(2, "foo", 3, 1)) if !(resp.Term == 2 && !resp.VoteGranted && err != nil && err.Error() == "raft.Server: Out-of-date log: [3/2] > [3/1]") { t.Fatalf("Stale term vote should have been denied [%v/%v] (%v)", resp.Term, resp.VoteGranted, err) } - resp, err = server.RequestVote(NewRequestVoteRequest(2, "foo", 3, 2)) + resp, err = server.RequestVote(newRequestVoteRequest(2, "foo", 3, 2)) if !(resp.Term == 2 && resp.VoteGranted && err == nil) { t.Fatalf("Matching log vote should have been granted (%v)", err) } - resp, err = server.RequestVote(NewRequestVoteRequest(2, "foo", 4, 3)) + resp, err = server.RequestVote(newRequestVoteRequest(2, "foo", 4, 3)) if !(resp.Term == 2 && resp.VoteGranted && err == nil) { t.Fatalf("Ahead-of-log vote should have been granted (%v)", err) } @@ -213,32 +213,31 @@ func TestServerAppendEntries(t *testing.T) { defer server.Stop() // Append single entry. - entries := []*LogEntry{NewLogEntry(nil, 1, 1, &TestCommand1{"foo", 10})} - resp, err := server.AppendEntries(NewAppendEntriesRequest(1, "ldr", 0, 0, entries, 0)) + entries := []*LogEntry{newLogEntry(nil, 1, 1, &testCommand1{"foo", 10})} + resp, err := server.AppendEntries(newAppendEntriesRequest(1, "ldr", 0, 0, entries, 0)) if !(resp.Term == 1 && resp.Success && err == nil) { t.Fatalf("AppendEntries failed: %v/%v : %v", resp.Term, resp.Success, err) } - if index, term := server.log.CommitInfo(); !(index == 0 && term == 0) { + if index, term := server.log.commitInfo(); !(index == 0 && term == 0) { t.Fatalf("Invalid commit info [IDX=%v, TERM=%v]", index, term) } // Append multiple entries + commit the last one. - entries = []*LogEntry{NewLogEntry(nil, 2, 1, &TestCommand1{"bar", 20}), NewLogEntry(nil, 3, 1, &TestCommand1{"baz", 30})} - resp, err = server.AppendEntries(NewAppendEntriesRequest(1, "ldr", 1, 1, entries, 1)) + entries = []*LogEntry{newLogEntry(nil, 2, 1, &testCommand1{"bar", 20}), newLogEntry(nil, 3, 1, &testCommand1{"baz", 30})} + resp, err = server.AppendEntries(newAppendEntriesRequest(1, "ldr", 1, 1, entries, 1)) if !(resp.Term == 1 && resp.Success && err == nil) { t.Fatalf("AppendEntries failed: %v/%v : %v", resp.Term, resp.Success, err) } - if index, term := server.log.CommitInfo(); !(index == 1 && term == 1) { + if index, term := server.log.commitInfo(); !(index == 1 && term == 1) { t.Fatalf("Invalid commit info [IDX=%v, TERM=%v]", index, term) } // Send zero entries and commit everything. - resp, err = server.AppendEntries(NewAppendEntriesRequest(2, "ldr", 3, 1, []*LogEntry{}, 3)) - + resp, err = server.AppendEntries(newAppendEntriesRequest(2, "ldr", 3, 1, []*LogEntry{}, 3)) if !(resp.Term == 2 && resp.Success && err == nil) { t.Fatalf("AppendEntries failed: %v/%v : %v", resp.Term, resp.Success, err) } - if index, term := server.log.CommitInfo(); !(index == 3 && term == 1) { + if index, term := server.log.commitInfo(); !(index == 3 && term == 1) { t.Fatalf("Invalid commit info [IDX=%v, TERM=%v]", index, term) } } @@ -253,12 +252,12 @@ func TestServerAppendEntriesWithStaleTermsAreRejected(t *testing.T) { server.currentTerm = 2 // Append single entry. - entries := []*LogEntry{NewLogEntry(nil, 1, 1, &TestCommand1{"foo", 10})} - resp, err := server.AppendEntries(NewAppendEntriesRequest(1, "ldr", 0, 0, entries, 0)) + entries := []*LogEntry{newLogEntry(nil, 1, 1, &testCommand1{"foo", 10})} + resp, err := server.AppendEntries(newAppendEntriesRequest(1, "ldr", 0, 0, entries, 0)) if !(resp.Term == 2 && !resp.Success && err != nil && err.Error() == "raft.Server: Stale request term") { t.Fatalf("AppendEntries should have failed: %v/%v : %v", resp.Term, resp.Success, err) } - if index, term := server.log.CommitInfo(); !(index == 0 && term == 0) { + if index, term := server.log.commitInfo(); !(index == 0 && term == 0) { t.Fatalf("Invalid commit info [IDX=%v, TERM=%v]", index, term) } } @@ -273,17 +272,17 @@ func TestServerAppendEntriesRejectedIfAlreadyCommitted(t *testing.T) { // Append single entry + commit. entries := []*LogEntry{ - NewLogEntry(nil, 1, 1, &TestCommand1{"foo", 10}), - NewLogEntry(nil, 2, 1, &TestCommand1{"foo", 15}), + newLogEntry(nil, 1, 1, &testCommand1{"foo", 10}), + newLogEntry(nil, 2, 1, &testCommand1{"foo", 15}), } - resp, err := server.AppendEntries(NewAppendEntriesRequest(1, "ldr", 0, 0, entries, 2)) + resp, err := server.AppendEntries(newAppendEntriesRequest(1, "ldr", 0, 0, entries, 2)) if !(resp.Term == 1 && resp.Success && err == nil) { t.Fatalf("AppendEntries failed: %v/%v : %v", resp.Term, resp.Success, err) } // Append entry again (post-commit). - entries = []*LogEntry{NewLogEntry(nil, 2, 1, &TestCommand1{"bar", 20})} - resp, err = server.AppendEntries(NewAppendEntriesRequest(1, "ldr", 2, 1, entries, 1)) + entries = []*LogEntry{newLogEntry(nil, 2, 1, &testCommand1{"bar", 20})} + resp, err = server.AppendEntries(newAppendEntriesRequest(1, "ldr", 2, 1, entries, 1)) if !(resp.Term == 1 && !resp.Success && err != nil && err.Error() == "raft.Log: Cannot append entry with earlier index in the same term (1:2 <= 1:2)") { t.Fatalf("AppendEntries should have failed: %v/%v : %v", resp.Term, resp.Success, err) } @@ -296,21 +295,21 @@ func TestServerAppendEntriesOverwritesUncommittedEntries(t *testing.T) { server.StartLeader() defer server.Stop() - entry1 := NewLogEntry(nil, 1, 1, &TestCommand1{"foo", 10}) - entry2 := NewLogEntry(nil, 2, 1, &TestCommand1{"foo", 15}) - entry3 := NewLogEntry(nil, 2, 2, &TestCommand1{"bar", 20}) + entry1 := newLogEntry(nil, 1, 1, &testCommand1{"foo", 10}) + entry2 := newLogEntry(nil, 2, 1, &testCommand1{"foo", 15}) + entry3 := newLogEntry(nil, 2, 2, &testCommand1{"bar", 20}) // Append single entry + commit. entries := []*LogEntry{entry1, entry2} - resp, err := server.AppendEntries(NewAppendEntriesRequest(1, "ldr", 0, 0, entries, 1)) - if !(resp.Term == 1 && resp.Success && err == nil && server.log.CommitIndex() == 1 && reflect.DeepEqual(server.log.entries, []*LogEntry{entry1, entry2})) { + resp, err := server.AppendEntries(newAppendEntriesRequest(1, "ldr", 0, 0, entries, 1)) + if !(resp.Term == 1 && resp.Success && err == nil && server.log.commitIndex == 1 && reflect.DeepEqual(server.log.entries, []*LogEntry{entry1, entry2})) { t.Fatalf("AppendEntries failed: %v/%v : %v", resp.Term, resp.Success, err) } // Append entry that overwrites the second (uncommitted) entry. entries = []*LogEntry{entry3} - resp, err = server.AppendEntries(NewAppendEntriesRequest(2, "ldr", 1, 1, entries, 2)) - if !(resp.Term == 2 && resp.Success && err == nil && server.log.CommitIndex() == 2 && reflect.DeepEqual(server.log.entries, []*LogEntry{entry1, entry3})) { + resp, err = server.AppendEntries(newAppendEntriesRequest(2, "ldr", 1, 1, entries, 2)) + if !(resp.Term == 2 && resp.Success && err == nil && server.log.commitIndex == 2 && reflect.DeepEqual(server.log.entries, []*LogEntry{entry1, entry3})) { t.Fatalf("AppendEntries should have succeeded: %v/%v : %v", resp.Term, resp.Success, err) } } @@ -326,7 +325,7 @@ func TestServerDenyCommandExecutionWhenFollower(t *testing.T) { server.StartFollower() defer server.Stop() var err error - if _, err = server.Do(&TestCommand1{"foo", 10}); err != NotLeaderError { + if _, err = server.Do(&testCommand1{"foo", 10}); err != NotLeaderError { t.Fatalf("Expected error: %v, got: %v", NotLeaderError, err) } } @@ -414,12 +413,12 @@ func TestServerMultiNode(t *testing.T) { if name == "1" { leader = server - server.SetHeartbeatTimeout(testHeartbeatTimeout) + server.setHeartbeatTimeout(testHeartbeatTimeout) server.StartLeader() time.Sleep(100 * time.Millisecond) } else { server.SetElectionTimeout(testElectionTimeout) - server.SetHeartbeatTimeout(testHeartbeatTimeout) + server.setHeartbeatTimeout(testHeartbeatTimeout) server.StartFollower() time.Sleep(10 * time.Millisecond) } @@ -468,7 +467,7 @@ func TestServerMultiNode(t *testing.T) { debugln("Found leader") for i := 0; i < 10; i++ { debugln("[Test] do ", value.Name()) - if _, err := value.Do(&TestCommand2{X: 1}); err != nil { + if _, err := value.Do(&testCommand2{X: 1}); err != nil { break } debugln("[Test] Done") diff --git a/snapshot.go b/snapshot.go index e5e52e74704..d35474f8a43 100644 --- a/snapshot.go +++ b/snapshot.go @@ -27,7 +27,7 @@ type Snapshot struct { } // Save the snapshot to a file -func (ss *Snapshot) Save() error { +func (ss *Snapshot) save() error { // Write machine state to temporary buffer. // open file @@ -59,7 +59,7 @@ func (ss *Snapshot) Save() error { } // remove the file of the snapshot -func (ss *Snapshot) Remove() error { +func (ss *Snapshot) remove() error { err := os.Remove(ss.Path) return err } diff --git a/snapshot_request.go b/snapshot_request.go index 18d39ccc4a2..8ee197d6eeb 100644 --- a/snapshot_request.go +++ b/snapshot_request.go @@ -23,7 +23,7 @@ type SnapshotResponse struct { //------------------------------------------------------------------------------ // Creates a new Snapshot request. -func NewSnapshotRequest(leaderName string, snapshot *Snapshot) *SnapshotRequest { +func newSnapshotRequest(leaderName string, snapshot *Snapshot) *SnapshotRequest { return &SnapshotRequest{ LeaderName: leaderName, LastIndex: snapshot.LastIndex, @@ -34,7 +34,7 @@ func NewSnapshotRequest(leaderName string, snapshot *Snapshot) *SnapshotRequest } // Creates a new Snapshot response. -func NewSnapshotResponse(term uint64, success bool, commitIndex uint64) *SnapshotResponse { +func newSnapshotResponse(term uint64, success bool, commitIndex uint64) *SnapshotResponse { return &SnapshotResponse{ Term: term, Success: success, diff --git a/sort.go b/sort.go index 85deb59defc..bf4c303af35 100644 --- a/sort.go +++ b/sort.go @@ -6,7 +6,7 @@ package raft // //------------------------------------------------------------------------------ -type Uint64Slice []uint64 +type uint64Slice []uint64 //------------------------------------------------------------------------------ // @@ -18,6 +18,6 @@ type Uint64Slice []uint64 // uint64 //-------------------------------------- -func (p Uint64Slice) Len() int { return len(p) } -func (p Uint64Slice) Less(i, j int) bool { return p[i] < p[j] } -func (p Uint64Slice) Swap(i, j int) { p[i], p[j] = p[j], p[i] } +func (p uint64Slice) Len() int { return len(p) } +func (p uint64Slice) Less(i, j int) bool { return p[i] < p[j] } +func (p uint64Slice) Swap(i, j int) { p[i], p[j] = p[j], p[i] } diff --git a/test.go b/test.go index 28154ed0913..b1b29752e97 100644 --- a/test.go +++ b/test.go @@ -14,8 +14,8 @@ const ( func init() { RegisterCommand(&joinCommand{}) - RegisterCommand(&TestCommand1{}) - RegisterCommand(&TestCommand2{}) + RegisterCommand(&testCommand1{}) + RegisterCommand(&testCommand2{}) } //------------------------------------------------------------------------------ @@ -44,11 +44,11 @@ func setupLogFile(content string) string { func setupLog(content string) (*Log, string) { path := setupLogFile(content) - log := NewLog() + log := newLog() log.ApplyFunc = func(c Command) (interface{}, error) { return nil, nil } - if err := log.Open(path); err != nil { + if err := log.open(path); err != nil { panic("Unable to open log") } return log, path @@ -82,7 +82,7 @@ func newTestCluster(names []string, transporter Transporter, lookup map[string]* lookup[name] = server } for _, server := range servers { - server.SetHeartbeatTimeout(testHeartbeatTimeout) + server.setHeartbeatTimeout(testHeartbeatTimeout) for _, peer := range servers { server.AddPeer(peer.Name()) } @@ -147,16 +147,16 @@ func (c *joinCommand) Apply(server *Server) (interface{}, error) { // Command1 //-------------------------------------- -type TestCommand1 struct { +type testCommand1 struct { Val string `json:"val"` I int `json:"i"` } -func (c *TestCommand1) CommandName() string { +func (c *testCommand1) CommandName() string { return "cmd_1" } -func (c *TestCommand1) Apply(server *Server) (interface{}, error) { +func (c *testCommand1) Apply(server *Server) (interface{}, error) { return nil, nil } @@ -164,14 +164,14 @@ func (c *TestCommand1) Apply(server *Server) (interface{}, error) { // Command2 //-------------------------------------- -type TestCommand2 struct { +type testCommand2 struct { X int `json:"x"` } -func (c *TestCommand2) CommandName() string { +func (c *testCommand2) CommandName() string { return "cmd_2" } -func (c *TestCommand2) Apply(server *Server) (interface{}, error) { +func (c *testCommand2) Apply(server *Server) (interface{}, error) { return nil, nil } diff --git a/timer.go b/timer.go index b8c30721c02..c7eb734118e 100644 --- a/timer.go +++ b/timer.go @@ -12,9 +12,9 @@ import ( // //------------------------------------------------------------------------------ -type Timer struct { - fire chan time.Time - stop chan bool +type timer struct { + fireChan chan time.Time + stopChan chan bool state int rand *rand.Rand @@ -38,7 +38,7 @@ const ( //------------------------------------------------------------------------------ // Creates a new timer. Panics if a non-positive duration is used. -func NewTimer(minDuration time.Duration, maxDuration time.Duration) *Timer { +func newTimer(minDuration time.Duration, maxDuration time.Duration) *timer { if minDuration <= 0 { panic("raft: Non-positive minimum duration not allowed") } else if maxDuration <= 0 { @@ -47,13 +47,13 @@ func NewTimer(minDuration time.Duration, maxDuration time.Duration) *Timer { panic("raft: Minimum duration cannot be greater than maximum duration") } - return &Timer{ + return &timer{ minDuration: minDuration, maxDuration: maxDuration, state: READY, rand: rand.New(rand.NewSource(time.Now().UnixNano())), - stop: make(chan bool, 1), - fire: make(chan time.Time), + stopChan: make(chan bool, 1), + fireChan: make(chan time.Time), } } @@ -63,28 +63,8 @@ func NewTimer(minDuration time.Duration, maxDuration time.Duration) *Timer { // //------------------------------------------------------------------------------ -// Retrieves the minimum duration of the timer. -func (t *Timer) MinDuration() time.Duration { - return t.minDuration -} - -// Sets the minimum duration of the timer. -func (t *Timer) SetMinDuration(duration time.Duration) { - t.minDuration = duration -} - -// Retrieves the maximum duration of the timer. -func (t *Timer) MaxDuration() time.Duration { - return t.maxDuration -} - -// Sets the maximum duration of the timer. -func (t *Timer) SetMaxDuration(duration time.Duration) { - t.maxDuration = duration -} - // Sets the minimum and maximum duration of the timer. -func (t *Timer) SetDuration(duration time.Duration) { +func (t *timer) setDuration(duration time.Duration) { t.minDuration = duration t.maxDuration = duration } @@ -96,12 +76,12 @@ func (t *Timer) SetDuration(duration time.Duration) { //------------------------------------------------------------------------------ // Checks if the timer is currently running. -func (t *Timer) Running() bool { +func (t *timer) running() bool { return t.state == RUNNING } // Stops the timer and closes the channel. -func (t *Timer) Stop() { +func (t *timer) stop() { t.mutex.Lock() defer t.mutex.Unlock() @@ -113,12 +93,12 @@ func (t *Timer) Stop() { t.state = STOPPED // non-blocking buffer - t.stop <- true + t.stopChan <- true } } // Change the state of timer to ready -func (t *Timer) Ready() { +func (t *timer) ready() { t.mutex.Lock() defer t.mutex.Unlock() @@ -126,14 +106,14 @@ func (t *Timer) Ready() { panic("Timer is already running") } t.state = READY - t.stop = make(chan bool, 1) - t.fire = make(chan time.Time) + t.stopChan = make(chan bool, 1) + t.fireChan = make(chan time.Time) } // Fire at the timer -func (t *Timer) Fire() { +func (t *timer) fire() { select { - case t.fire <- time.Now(): + case t.fireChan <- time.Now(): return default: return @@ -146,7 +126,7 @@ func (t *Timer) Fire() { // (3) fired // Return false if stopped. // Make sure the start func will not restart the stopped timer. -func (t *Timer) Start() bool { +func (t *timer) start() bool { t.mutex.Lock() if t.state != READY { @@ -170,8 +150,8 @@ func (t *Timer) Start() bool { stopped := false select { case <-internalTimer.C: - case <-t.fire: - case <-t.stop: + case <-t.fireChan: + case <-t.stopChan: stopped = true } diff --git a/timer_test.go b/timer_test.go index caff0f51d52..60cd746b255 100644 --- a/timer_test.go +++ b/timer_test.go @@ -13,12 +13,12 @@ import ( // Ensure that we can start an election timer and it will go off in the specified duration. func TestTimer(t *testing.T) { - timer := NewTimer(5*time.Millisecond, 10*time.Millisecond) + timer := newTimer(5*time.Millisecond, 10*time.Millisecond) // test timer start for i := 0; i < 10; i++ { start := time.Now() - timer.Start() + timer.start() duration := time.Now().Sub(start) if duration > 12*time.Millisecond || duration < 5*time.Millisecond { @@ -30,7 +30,7 @@ func TestTimer(t *testing.T) { for i := 0; i < 100; i++ { start := time.Now() go stop(timer) - timer.Start() + timer.start() duration := time.Now().Sub(start) if duration > 3*time.Millisecond { @@ -38,14 +38,14 @@ func TestTimer(t *testing.T) { } // ready the timer after stop it - timer.Ready() + timer.ready() } // test timer fire for i := 0; i < 100; i++ { start := time.Now() go fire(timer) - timer.Start() + timer.start() duration := time.Now().Sub(start) if duration > 3*time.Millisecond { @@ -65,22 +65,22 @@ func TestTimer(t *testing.T) { if ret != false { t.Fatal("cannot stop timer!") } - timer.Ready() + timer.ready() } } -func stop(t *Timer) { +func stop(t *timer) { time.Sleep(time.Millisecond) - t.Stop() + t.stop() } -func start(t *Timer, resp chan bool) { +func start(t *timer, resp chan bool) { time.Sleep(time.Millisecond) - resp <- t.Start() + resp <- t.start() } -func fire(t *Timer) { +func fire(t *timer) { time.Sleep(time.Millisecond) - t.Fire() + t.fire() }