Skip to content

Commit

Permalink
Enable dumping corrupt WAL segments (#145)
Browse files Browse the repository at this point in the history
* Enable dumping corrupt WAL segments

 Add ability to dump WAL segment with corrupt page headers and recrods
 skips over missing/broken page headers
 skips over misformatted log recrods
 allows dumping log record from a particular file starting from an
optional offset
 (without a need of carefully crafted input)
  • Loading branch information
antons-antons authored and tristan957 committed Aug 9, 2023
1 parent 281c226 commit ee31c09
Show file tree
Hide file tree
Showing 3 changed files with 276 additions and 40 deletions.
117 changes: 88 additions & 29 deletions src/backend/access/transam/xlogreader.c
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ WALOpenSegmentInit(WALOpenSegment *seg, WALSegmentContext *segcxt,
void
XLogBeginRead(XLogReaderState *state, XLogRecPtr RecPtr)
{
Assert(!XLogRecPtrIsInvalid(RecPtr));
Assert(!XLogRecPtrIsInvalid(RecPtr) || state->skip_lsn_checks);

ResetDecoder(state);

Expand Down Expand Up @@ -279,6 +279,14 @@ XLogReadRecord(XLogReaderState *state, char **errormsg)
bool gotheader;
int readOff;

#define SKIP_INVALID_RECORD(rec_ptr) do { \
rec_ptr = MAXALIGN(rec_ptr + 1); \
if (rec_ptr % XLOG_BLCKSZ <= MAXALIGN(1)) \
goto restart; \
else \
goto skip_invalid; \
} while (0);

/*
* randAccess indicates whether to verify the previous-record pointer of
* the record we're reading. We only do this if we're reading
Expand Down Expand Up @@ -315,7 +323,7 @@ XLogReadRecord(XLogReaderState *state, char **errormsg)
* In this case, EndRecPtr should already be pointing to a valid
* record starting position.
*/
Assert(XRecOffIsValid(RecPtr));
Assert(XRecOffIsValid(RecPtr) || state->skip_lsn_checks);
randAccess = true;
}

Expand Down Expand Up @@ -351,17 +359,23 @@ XLogReadRecord(XLogReaderState *state, char **errormsg)
}
else if (targetRecOff < pageHeaderSize)
{
report_invalid_record(state, "invalid record offset at %X/%X",
if(!state->skip_page_validation)
{
report_invalid_record(state, "invalid record offset at %X/%X",
LSN_FORMAT_ARGS(RecPtr));
goto err;
goto err;
}
}

if ((((XLogPageHeader) state->readBuf)->xlp_info & XLP_FIRST_IS_CONTRECORD) &&
targetRecOff == pageHeaderSize)
{
report_invalid_record(state, "contrecord is requested by %X/%X",
if(!state->skip_page_validation)
{
report_invalid_record(state, "contrecord is requested by %X/%X",
LSN_FORMAT_ARGS(RecPtr));
goto err;
goto err;
}
}

/* ReadPageInternal has verified the page header */
Expand All @@ -376,6 +390,7 @@ XLogReadRecord(XLogReaderState *state, char **errormsg)
* cannot access any other fields until we've verified that we got the
* whole header.
*/
skip_invalid:
record = (XLogRecord *) (state->readBuf + RecPtr % XLOG_BLCKSZ);
total_len = record->xl_tot_len;

Expand All @@ -391,20 +406,33 @@ XLogReadRecord(XLogReaderState *state, char **errormsg)
{
if (!ValidXLogRecordHeader(state, RecPtr, state->ReadRecPtr, record,
randAccess))
goto err;
{
if(!state->skip_invalid_records)
goto err;

SKIP_INVALID_RECORD(RecPtr);
}

gotheader = true;
}
else
{
/* XXX: more validation should be done here */
if (total_len < SizeOfXLogRecord)
{
report_invalid_record(state,
"invalid record length at %X/%X: wanted %u, got %u",
LSN_FORMAT_ARGS(RecPtr),
(uint32) SizeOfXLogRecord, total_len);
goto err;
if(!state->skip_invalid_records)
{
report_invalid_record(state,
"invalid record length at %X/%X: wanted %u, got %u",
LSN_FORMAT_ARGS(RecPtr),
(uint32) SizeOfXLogRecord, total_len);

goto err;
}

SKIP_INVALID_RECORD(RecPtr);
}

gotheader = false;
}

Expand All @@ -425,10 +453,16 @@ XLogReadRecord(XLogReaderState *state, char **errormsg)
if (total_len > state->readRecordBufSize &&
!allocate_recordbuf(state, total_len))
{
/* We treat this as a "bogus data" condition */
report_invalid_record(state, "record length %u at %X/%X too long",
total_len, LSN_FORMAT_ARGS(RecPtr));
goto err;

if(!state->skip_invalid_records)
{
/* We treat this as a "bogus data" condition */
report_invalid_record(state, "record length %u at %X/%X too long",
total_len, LSN_FORMAT_ARGS(RecPtr));
goto err;
}

SKIP_INVALID_RECORD(RecPtr);
}

/* Copy the first fragment of the record from the first page. */
Expand Down Expand Up @@ -473,10 +507,15 @@ XLogReadRecord(XLogReaderState *state, char **errormsg)
/* Check that the continuation on next page looks valid */
if (!(pageHeader->xlp_info & XLP_FIRST_IS_CONTRECORD))
{
report_invalid_record(state,
if(!state->skip_invalid_records)
{
report_invalid_record(state,
"there is no contrecord flag at %X/%X",
LSN_FORMAT_ARGS(RecPtr));
goto err;
goto err;
}

SKIP_INVALID_RECORD(RecPtr);
}

/*
Expand All @@ -486,12 +525,17 @@ XLogReadRecord(XLogReaderState *state, char **errormsg)
if (pageHeader->xlp_rem_len == 0 ||
total_len != (pageHeader->xlp_rem_len + gotlen))
{
report_invalid_record(state,
if(!state->skip_invalid_records)
{
report_invalid_record(state,
"invalid contrecord length %u (expected %lld) at %X/%X",
pageHeader->xlp_rem_len,
((long long) total_len) - gotlen,
LSN_FORMAT_ARGS(RecPtr));
goto err;
goto err;
}

SKIP_INVALID_RECORD(RecPtr);
}

/* Append the continuation from this page to the buffer */
Expand Down Expand Up @@ -522,7 +566,13 @@ XLogReadRecord(XLogReaderState *state, char **errormsg)
record = (XLogRecord *) state->readRecordBuf;
if (!ValidXLogRecordHeader(state, RecPtr, state->ReadRecPtr,
record, randAccess))
goto err;
{
if(!state->skip_invalid_records)
goto err;

SKIP_INVALID_RECORD(RecPtr);
}

gotheader = true;
}
} while (gotlen < total_len);
Expand All @@ -531,7 +581,12 @@ XLogReadRecord(XLogReaderState *state, char **errormsg)

record = (XLogRecord *) state->readRecordBuf;
if (!ValidXLogRecord(state, record, RecPtr))
goto err;
{
if(!state->skip_invalid_records)
goto err;

SKIP_INVALID_RECORD(RecPtr);
}

pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) state->readBuf);
state->ReadRecPtr = RecPtr;
Expand All @@ -548,7 +603,12 @@ XLogReadRecord(XLogReaderState *state, char **errormsg)

/* Record does not cross a page boundary */
if (!ValidXLogRecord(state, record, RecPtr))
goto err;
{
if(!state->skip_invalid_records)
goto err;

SKIP_INVALID_RECORD(RecPtr);
}

state->EndRecPtr = RecPtr + MAXALIGN(total_len);

Expand Down Expand Up @@ -652,8 +712,7 @@ ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen)
/* we can be sure to have enough WAL available, we scrolled back */
Assert(readLen == XLOG_BLCKSZ);

if (!XLogReaderValidatePageHeader(state, targetSegmentPtr,
state->readBuf))
if (!XLogReaderValidatePageHeader(state, targetSegmentPtr, state->readBuf) && !state->skip_page_validation)
goto err;
}

Expand Down Expand Up @@ -690,7 +749,7 @@ ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen)
/*
* Now that we know we have the full header, validate it.
*/
if (!XLogReaderValidatePageHeader(state, pageptr, (char *) hdr))
if (!XLogReaderValidatePageHeader(state, pageptr, (char *) hdr) && !state->skip_page_validation)
goto err;

/* update read state information */
Expand Down Expand Up @@ -748,7 +807,7 @@ ValidXLogRecordHeader(XLogReaderState *state, XLogRecPtr RecPtr,
* We can't exactly verify the prev-link, but surely it should be less
* than the record's own address.
*/
if (!(record->xl_prev < RecPtr))
if (!(record->xl_prev < RecPtr) && !state->skip_lsn_checks)
{
report_invalid_record(state,
"record with incorrect prev-link %X/%X at %X/%X",
Expand All @@ -764,7 +823,7 @@ ValidXLogRecordHeader(XLogReaderState *state, XLogRecPtr RecPtr,
* check guards against torn WAL pages where a stale but valid-looking
* WAL record starts on a sector boundary.
*/
if (record->xl_prev != PrevRecPtr)
if (record->xl_prev != PrevRecPtr && !state->skip_lsn_checks)
{
report_invalid_record(state,
"record with incorrect prev-link %X/%X at %X/%X",
Expand Down Expand Up @@ -907,7 +966,7 @@ XLogReaderValidatePageHeader(XLogReaderState *state, XLogRecPtr recptr,
* check typically fails when an old WAL segment is recycled, and hasn't
* yet been overwritten with new data yet.
*/
if (hdr->xlp_pageaddr != recaddr)
if (hdr->xlp_pageaddr != recaddr && !state->skip_lsn_checks)
{
char fname[MAXFNAMELEN];

Expand Down
Loading

0 comments on commit ee31c09

Please sign in to comment.