Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

write page headers on the first xlog segment #1341

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 91 additions & 7 deletions walkeeper/src/wal_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,18 +189,86 @@ impl PhysicalStorage {
Ok(())
}

/// Write an empty XLOG page
///
/// Note: offset is a segment offset of the first log record to be written on the page
fn write_canned_page(&self, file: &mut File, segno: XLogSegNo, offset: usize, wal_seg_size: usize) -> Result<usize> {
let page_off = offset - offset % XLOG_BLCKSZ;

if page_off != 0 {
file.seek(SeekFrom::Start(page_off as u64))?;
}

// xlp_magic - 0xd10d - 2 bytes
file.write_all(&0xd10du16.to_le_bytes())?;

if page_off == 0 {
// in order to skip to the log recrod we will pretend that the page has cont record
// until real data begins
// xlp_info - 0x0002 XLP_LONG_HEADER - 2 bytes
file.write_all(&0x01u16.to_le_bytes())?;

} else {
if offset % XLOG_BLCKSZ == 24 {
// xlp_info - 0x00 - 2 bytes
file.write_all(&0x00u16.to_le_bytes())?;
} else {
// xlp_info - 0x01 XLP_FIRST_IS_CONTRECORD - 2 bytes
file.write_all(&0x02u16.to_le_bytes())?;
}
}

// xlp_tli - 0x01 - 4 bytes
file.write_all(&0x01u32.to_le_bytes())?;

// xlp_pageaddr - .... - 8 bytes
let x = segno * (wal_seg_size as u64) + page_off as u64;
file.write_all(&x.to_le_bytes())?;

if page_off > 0 {
let hdr_bytes = 24;
let y = (offset - page_off - hdr_bytes) as u32;
// xlp_rem_len - 0x00 4 bytes
file.write_all(&y.to_le_bytes())?;
// padding 4 bytes
file.write_all(&0x00u32.to_le_bytes())?;
// write 0 for the rest of the page
file.write_all(&ZERO_BLOCK[0..XLOG_BLCKSZ-hdr_bytes])?;
} else {
// xlp_rem_len - 0x00 4 bytes
file.write_all(&0x00u32.to_le_bytes())?;
// padding 4 bytes
file.write_all(&0x00u32.to_le_bytes())?;

// first page requires a long header
//
// xlp_sysid - 0x00 8 bytes
file.write_all(&0x00u64.to_le_bytes())?;
// xlp_seg_size = 0x1000000 .... 4 bytes
file.write_all(&0x1000000u32.to_le_bytes())?;
// xlp_xlog_blcksz 0x2000 4 bytes
file.write_all(&0x2000u32.to_le_bytes())?;
file.write_all(&ZERO_BLOCK[0..XLOG_BLCKSZ-40])?;
}


Ok(XLOG_BLCKSZ)
}

/// Open or create WAL segment file. Caller must call seek to the wanted position.
/// Returns `file` and `is_partial`.
fn open_or_create(&self, segno: XLogSegNo, wal_seg_size: usize) -> Result<(File, bool)> {
fn open_or_create(&self, segno: XLogSegNo, wal_seg_size: usize) -> Result<(File, bool, bool)> {
let (wal_file_path, wal_file_partial_path) =
wal_file_paths(&self.timeline_dir, segno, wal_seg_size)?;

// Try to open already completed segment
if let Ok(file) = OpenOptions::new().write(true).open(&wal_file_path) {
Ok((file, false))
warn!("aaa: open already completed segment {}", segno);
Ok((file, false, false))
} else if let Ok(file) = OpenOptions::new().write(true).open(&wal_file_partial_path) {
// Try to open existing partial file
Ok((file, true))
warn!("aaa: open existing partial file {}", segno);
Ok((file, true, false))
} else {
// Create and fill new partial file
let mut file = OpenOptions::new()
Expand All @@ -209,9 +277,18 @@ impl PhysicalStorage {
.open(&wal_file_partial_path)
.with_context(|| format!("Failed to open log file {:?}", &wal_file_path))?;

write_zeroes(&mut file, wal_seg_size)?;
warn!("aaa: Create and fill new partial file {}", segno);

// Write segment header only on the first segment to help pg_waldump
if segno == 1 {
let b = self.write_canned_page(&mut file, segno, 0, wal_seg_size)?;
write_zeroes(&mut file, wal_seg_size - b)?;
} else {
write_zeroes(&mut file, wal_seg_size)?;
}

self.fsync_file(&mut file)?;
Ok((file, true))
Ok((file, true, true))
}
}

Expand All @@ -226,7 +303,8 @@ impl PhysicalStorage {
let mut file = if let Some(file) = self.file.take() {
file
} else {
let (mut file, is_partial) = self.open_or_create(segno, wal_seg_size)?;
let (mut file, is_partial, _is_created) = self.open_or_create(segno, wal_seg_size)?;

assert!(is_partial, "unexpected write into non-partial segment file");
file.seek(SeekFrom::Start(xlogoff as u64))?;
file
Expand Down Expand Up @@ -424,7 +502,13 @@ impl Storage for PhysicalStorage {

let xlogoff = end_pos.segment_offset(wal_seg_size) as usize;
let segno = end_pos.segment_number(wal_seg_size);
let (mut file, is_partial) = self.open_or_create(segno, wal_seg_size)?;
let (mut file, is_partial, is_created) = self.open_or_create(segno, wal_seg_size)?;

if is_created && segno == 1 {
warn!("ddd: primeed a page for log {} in segment {}", xlogoff, segno);
self.write_canned_page(&mut file, segno, xlogoff, wal_seg_size)?;
}


// Fill end with zeroes
file.seek(SeekFrom::Start(xlogoff as u64))?;
Expand Down