From b8e59450210dd79b1e258c0e45c876481910c7d5 Mon Sep 17 00:00:00 2001 From: shreyas-s-rao Date: Fri, 1 Mar 2019 14:39:43 +0530 Subject: [PATCH 1/2] wal: add Verify function to perform corruption check on wal contents Signed-off-by: Shreyas Rao --- wal/wal.go | 149 +++++++++++++++++++++++++++++++++++++++++++---------- 1 file changed, 123 insertions(+), 26 deletions(-) diff --git a/wal/wal.go b/wal/wal.go index 96d01a23af6..ef63b52ccbc 100644 --- a/wal/wal.go +++ b/wal/wal.go @@ -223,17 +223,55 @@ func OpenForRead(dirpath string, snap walpb.Snapshot) (*WAL, error) { } func openAtIndex(dirpath string, snap walpb.Snapshot, write bool) (*WAL, error) { - names, err := readWalNames(dirpath) + names, nameIndex, err := selectWALFiles(dirpath, snap) if err != nil { return nil, err } + rs, ls, closer, err := openWALFiles(dirpath, names, nameIndex, write) + if err != nil { + return nil, err + } + + // create a WAL ready for reading + w := &WAL{ + dir: dirpath, + start: snap, + decoder: newDecoder(rs...), + readClose: closer, + locks: ls, + } + + if write { + // write reuses the file descriptors from read; don't close so + // WAL can append without dropping the file lock + w.readClose = nil + if _, _, err := parseWalName(filepath.Base(w.tail().Name())); err != nil { + closer() + return nil, err + } + w.fp = newFilePipeline(w.dir, SegmentSizeBytes) + } + + return w, nil +} + +func selectWALFiles(dirpath string, snap walpb.Snapshot) ([]string, int, error) { + names, err := readWalNames(dirpath) + if err != nil { + return nil, -1, err + } + nameIndex, ok := searchIndex(names, snap.Index) if !ok || !isValidSeq(names[nameIndex:]) { - return nil, ErrFileNotFound + err = ErrFileNotFound + return nil, -1, err } - // open the wal files + return names, nameIndex, nil +} + +func openWALFiles(dirpath string, names []string, nameIndex int, write bool) ([]io.Reader, []*fileutil.LockedFile, func() error, error) { rcs := make([]io.ReadCloser, 0) rs := make([]io.Reader, 0) ls := make([]*fileutil.LockedFile, 0) @@ -243,7 +281,7 @@ func openAtIndex(dirpath string, snap walpb.Snapshot, write bool) (*WAL, error) l, err := fileutil.TryLockFile(p, os.O_RDWR, fileutil.PrivateFileMode) if err != nil { closeAll(rcs...) - return nil, err + return nil, nil, nil, err } ls = append(ls, l) rcs = append(rcs, l) @@ -251,7 +289,7 @@ func openAtIndex(dirpath string, snap walpb.Snapshot, write bool) (*WAL, error) rf, err := os.OpenFile(p, os.O_RDONLY, fileutil.PrivateFileMode) if err != nil { closeAll(rcs...) - return nil, err + return nil, nil, nil, err } ls = append(ls, nil) rcs = append(rcs, rf) @@ -261,27 +299,7 @@ func openAtIndex(dirpath string, snap walpb.Snapshot, write bool) (*WAL, error) closer := func() error { return closeAll(rcs...) } - // create a WAL ready for reading - w := &WAL{ - dir: dirpath, - start: snap, - decoder: newDecoder(rs...), - readClose: closer, - locks: ls, - } - - if write { - // write reuses the file descriptors from read; don't close so - // WAL can append without dropping the file lock - w.readClose = nil - if _, _, err := parseWalName(filepath.Base(w.tail().Name())); err != nil { - closer() - return nil, err - } - w.fp = newFilePipeline(w.dir, SegmentSizeBytes) - } - - return w, nil + return rs, ls, closer, nil } // ReadAll reads out records of the current WAL. @@ -398,6 +416,85 @@ func (w *WAL) ReadAll() (metadata []byte, state raftpb.HardState, ents []raftpb. return metadata, state, ents, err } +// Verify reads through the given WAL and verifies that it is not corrupted. +// It creates a new decoder to read through the records of the given WAL. +// It does not conflict with any open WAL, but it is recommended not to +// call this function after opening the WAL for writing. +// If it cannot read out the expected snap, it will return ErrSnapshotNotFound. +// If the loaded snap doesn't match with the expected one, it will +// return error ErrSnapshotMismatch. +func Verify(walDir string, snap walpb.Snapshot) error { + var metadata []byte + var err error + var match bool + + rec := &walpb.Record{} + + names, nameIndex, err := selectWALFiles(walDir, snap) + if err != nil { + return err + } + + // open wal files in read mode, so that there is no conflict + // when the same WAL is opened elsewhere in write mode + rs, _, closer, err := openWALFiles(walDir, names, nameIndex, false) + if err != nil { + return err + } + + // create a new decoder from the readers on the WAL files + decoder := newDecoder(rs...) + + for err = decoder.decode(rec); err == nil; err = decoder.decode(rec) { + switch rec.Type { + case metadataType: + if metadata != nil && !bytes.Equal(metadata, rec.Data) { + return ErrMetadataConflict + } + metadata = rec.Data + case crcType: + crc := decoder.crc.Sum32() + // Current crc of decoder must match the crc of the record. + // We need not match 0 crc, since the decoder is a new one at this point. + if crc != 0 && rec.Validate(crc) != nil { + return ErrCRCMismatch + } + decoder.updateCRC(rec.Crc) + case snapshotType: + var loadedSnap walpb.Snapshot + pbutil.MustUnmarshal(&loadedSnap, rec.Data) + if loadedSnap.Index == snap.Index { + if loadedSnap.Term != snap.Term { + return ErrSnapshotMismatch + } + match = true + } + // We ignore all entry and state type records as these + // are not necessary for validating the WAL contents + case entryType: + case stateType: + default: + return fmt.Errorf("unexpected block type %d", rec.Type) + } + } + + if closer != nil { + closer() + } + + // We do not have to read out all the WAL entries + // as the decoder is opened in read mode. + if err != io.EOF && err != io.ErrUnexpectedEOF { + return err + } + + if !match { + return ErrSnapshotNotFound + } + + return nil +} + // cut closes current file written and creates a new one ready to append. // cut first creates a temp wal file and writes necessary headers into it. // Then cut atomically rename temp wal file to a wal file. From 6f8aa335747bb54364d9278020a6b768f41e74a6 Mon Sep 17 00:00:00 2001 From: Shreyas Rao Date: Tue, 12 Mar 2019 17:07:00 +0530 Subject: [PATCH 2/2] wal: Add test for Verify Signed-off-by: Shreyas Rao --- wal/wal_test.go | 52 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 52 insertions(+) diff --git a/wal/wal_test.go b/wal/wal_test.go index 71fd7c177c9..b060da19128 100644 --- a/wal/wal_test.go +++ b/wal/wal_test.go @@ -19,6 +19,7 @@ import ( "io" "io/ioutil" "os" + "path" "path/filepath" "reflect" "testing" @@ -150,6 +151,57 @@ func TestOpenAtIndex(t *testing.T) { } } +// TestVerify tests that Verify throws a non-nil error when the WAL is corrupted. +// The test creates a WAL directory and cuts out multiple WAL files. Then +// it corrupts one of the files by completely truncating it. +func TestVerify(t *testing.T) { + walDir, err := ioutil.TempDir(os.TempDir(), "waltest") + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll(walDir) + + // create WAL + w, err := Create(walDir, nil) + if err != nil { + t.Fatal(err) + } + defer w.Close() + + // make 5 separate files + for i := 0; i < 5; i++ { + es := []raftpb.Entry{{Index: uint64(i), Data: []byte("waldata" + string(i+1))}} + if err = w.Save(raftpb.HardState{}, es); err != nil { + t.Fatal(err) + } + if err = w.cut(); err != nil { + t.Fatal(err) + } + } + + // to verify the WAL is not corrupted at this point + err = Verify(walDir, walpb.Snapshot{}) + if err != nil { + t.Errorf("expected a nil error, got %v", err) + } + + walFiles, err := ioutil.ReadDir(walDir) + if err != nil { + t.Fatal(err) + } + + // corrupt the WAL by truncating one of the WAL files completely + err = os.Truncate(path.Join(walDir, walFiles[2].Name()), 0) + if err != nil { + t.Fatal(err) + } + + err = Verify(walDir, walpb.Snapshot{}) + if err == nil { + t.Error("expected a non-nil error, got nil") + } +} + // TODO: split it into smaller tests for better readability func TestCut(t *testing.T) { p, err := ioutil.TempDir(os.TempDir(), "waltest")