diff --git a/src/engine.rs b/src/engine.rs index b122e942..b0fe7454 100644 --- a/src/engine.rs +++ b/src/engine.rs @@ -304,10 +304,29 @@ where memtable .read() .fetch_entries_to(begin, end, max_size, &mut ents_idx)?; - for i in ents_idx.iter() { - vec.push(read_entry_from_file::(self.pipe_log.as_ref(), i)?); + + let mut blocks: Vec = Vec::new(); + let mut total_bytes = 0; + for (t, i) in ents_idx.iter().enumerate() { + if t == 0 || (i.entries.unwrap() != ents_idx[t - 1].entries.unwrap()) { + blocks.push(i.entries.unwrap()); + total_bytes += i.entries.unwrap().len; + } + } + + if blocks.len() > 5 && total_bytes > 1024 * 1024 { + //Async IO + let bytes = self.pipe_log.async_read_bytes(blocks)?; + parse_entries_from_bytes::(bytes, &mut ents_idx, vec)?; + } else { + //Sync IO + for i in ents_idx.iter() { + vec.push(read_entry_from_file::(self.pipe_log.as_ref(), i)?); + } } + ENGINE_READ_ENTRY_COUNT_HISTOGRAM.observe(ents_idx.len() as f64); + return Ok(ents_idx.len()); } Ok(0) @@ -545,6 +564,38 @@ thread_local! { static BLOCK_CACHE: BlockCache = BlockCache::new(); } +pub(crate) fn parse_entries_from_bytes( + bytes: Vec>, + ents_idx: &mut [EntryIndex], + vec: &mut Vec, +) -> Result<()> { + let mut seq = 0; + for idx in ents_idx { + BLOCK_CACHE.with(|cache| { + if cache.key.get() != idx.entries.unwrap() { + cache.insert( + idx.entries.unwrap(), + LogBatch::decode_entries_block( + &bytes[seq], + idx.entries.unwrap(), + idx.compression_type, + ) + .unwrap(), + ); + seq += 1; + } + let e = parse_from_bytes( + &cache.block.borrow() + [idx.entry_offset as usize..(idx.entry_offset + idx.entry_len) as usize], + ) + .unwrap(); + assert_eq!(M::index(&e), idx.index); + vec.push(e); + }); + } + Ok(()) +} + pub(crate) fn read_entry_from_file(pipe_log: &P, idx: &EntryIndex) -> Result where M: MessageExt, @@ -759,6 +810,57 @@ mod tests { } } + #[test] + fn test_multi_read_entry() { + let sync_batch_size = 1024; + let async_batch_size = 1024 * 1024; + for &entry_size in &[sync_batch_size, async_batch_size] { + let dir = tempfile::Builder::new() + .prefix("test_multi_read_entry") + .tempdir() + .unwrap(); + let cfg = Config { + dir: dir.path().to_str().unwrap().to_owned(), + target_file_size: ReadableSize(1), + ..Default::default() + }; + + let engine = RaftLogEngine::open_with_file_system( + cfg.clone(), + Arc::new(ObfuscatedFileSystem::default()), + ) + .unwrap(); + assert_eq!(engine.path(), dir.path().to_str().unwrap()); + let data = vec![b'x'; entry_size]; + + for i in 0..10 { + for rid in 10..20 { + let index = i + rid; + engine.append(rid, index, index + 1, Some(&data)); + } + } + for i in 10..20 { + let rid = i; + let index = i; + engine.scan_entries(rid, index, index + 10, |_, q, d| { + assert_eq!(q, LogQueue::Append); + assert_eq!(d, &data); + }); + } + + // Recover the engine. + let engine = engine.reopen(); + for i in 10..20 { + let rid = i; + let index = i; + engine.scan_entries(rid, index, index + 10, |_, q, d| { + assert_eq!(q, LogQueue::Append); + assert_eq!(d, &data); + }); + } + } + } + #[test] fn test_clean_raft_group() { fn run_steps(steps: &[Option<(u64, u64)>]) { @@ -1994,6 +2096,20 @@ mod tests { type Handle = ::Handle; type Reader = ::Reader; type Writer = ::Writer; + type MultiReadContext = ::MultiReadContext; + + fn multi_read( + &self, + ctx: &mut Self::MultiReadContext, + handle: Arc, + block: &FileBlockHandle, + ) -> std::io::Result<()> { + self.inner.multi_read(ctx, handle, block) + } + + fn async_finish(&self, ctx: Self::MultiReadContext) -> std::io::Result>> { + self.inner.async_finish(ctx) + } fn create>(&self, path: P) -> std::io::Result { let handle = self.inner.create(&path)?; @@ -2056,6 +2172,10 @@ mod tests { fn new_writer(&self, h: Arc) -> std::io::Result { self.inner.new_writer(h) } + + fn new_async_io_context(&self) -> std::io::Result { + self.inner.new_async_io_context() + } } #[test] diff --git a/src/env/default.rs b/src/env/default.rs index 760580ab..d218c606 100644 --- a/src/env/default.rs +++ b/src/env/default.rs @@ -3,18 +3,23 @@ use std::io::{Read, Result as IoResult, Seek, SeekFrom, Write}; use std::os::unix::io::RawFd; use std::path::Path; +use std::pin::Pin; +use std::slice; use std::sync::Arc; use fail::fail_point; use log::error; use nix::errno::Errno; use nix::fcntl::{self, OFlag}; +use nix::sys::aio::{aio_suspend, Aio, AioRead}; +use nix::sys::signal::SigevNotify; use nix::sys::stat::Mode; use nix::sys::uio::{pread, pwrite}; use nix::unistd::{close, ftruncate, lseek, Whence}; use nix::NixPath; use crate::env::{FileSystem, Handle, WriteExt}; +use crate::pipe_log::FileBlockHandle; fn from_nix_error(e: nix::Error, custom: &'static str) -> std::io::Error { let kind = std::io::Error::from(e).kind(); @@ -256,6 +261,11 @@ impl WriteExt for LogFile { self.inner.allocate(offset, size) } } +#[derive(Default)] +pub struct AioContext { + aio_vec: Vec>>>, + buf_vec: Vec>, +} pub struct DefaultFileSystem; @@ -263,6 +273,41 @@ impl FileSystem for DefaultFileSystem { type Handle = LogFd; type Reader = LogFile; type Writer = LogFile; + type MultiReadContext = AioContext; + + fn multi_read( + &self, + ctx: &mut Self::MultiReadContext, + handle: Arc, + block: &FileBlockHandle, + ) -> IoResult<()> { + let buf = vec![0_u8; block.len]; + ctx.buf_vec.push(buf); + + let mut aior = Box::pin(AioRead::new( + handle.0, + block.offset as i64, + unsafe { + slice::from_raw_parts_mut(ctx.buf_vec.last_mut().unwrap().as_mut_ptr(), block.len) + }, + 0, + SigevNotify::SigevNone, + )); + aior.as_mut().submit()?; + ctx.aio_vec.push(aior); + + Ok(()) + } + + fn async_finish(&self, mut ctx: Self::MultiReadContext) -> IoResult>> { + for seq in 0..ctx.aio_vec.len() { + let buf_len = ctx.buf_vec[seq].len(); + aio_suspend(&[&*ctx.aio_vec[seq]], None)?; + assert_eq!(ctx.aio_vec[seq].as_mut().aio_return()?, buf_len); + } + let res = ctx.buf_vec.to_owned(); + Ok(res) + } fn create>(&self, path: P) -> IoResult { LogFd::create(path.as_ref()) @@ -288,4 +333,8 @@ impl FileSystem for DefaultFileSystem { fn new_writer(&self, handle: Arc) -> IoResult { Ok(LogFile::new(handle)) } + + fn new_async_io_context(&self) -> IoResult { + Ok(AioContext::default()) + } } diff --git a/src/env/mod.rs b/src/env/mod.rs index 5086a665..1e6b350a 100644 --- a/src/env/mod.rs +++ b/src/env/mod.rs @@ -10,11 +10,21 @@ mod obfuscated; pub use default::DefaultFileSystem; pub use obfuscated::ObfuscatedFileSystem; +use crate::pipe_log::FileBlockHandle; /// FileSystem pub trait FileSystem: Send + Sync { type Handle: Send + Sync + Handle; type Reader: Seek + Read + Send; type Writer: Seek + Write + Send + WriteExt; + type MultiReadContext; + + fn multi_read( + &self, + ctx: &mut Self::MultiReadContext, + handle: Arc, + block: &FileBlockHandle, + ) -> Result<()>; + fn async_finish(&self, ctx: Self::MultiReadContext) -> Result>>; fn create>(&self, path: P) -> Result; @@ -55,6 +65,8 @@ pub trait FileSystem: Send + Sync { fn new_reader(&self, handle: Arc) -> Result; fn new_writer(&self, handle: Arc) -> Result; + + fn new_async_io_context(&self) -> Result; } pub trait Handle { diff --git a/src/env/obfuscated.rs b/src/env/obfuscated.rs index 831f5343..62e244f8 100644 --- a/src/env/obfuscated.rs +++ b/src/env/obfuscated.rs @@ -7,6 +7,7 @@ use std::sync::Arc; use crate::env::{DefaultFileSystem, FileSystem, WriteExt}; +use crate::pipe_log::FileBlockHandle; pub struct ObfuscatedReader(::Reader); impl Read for ObfuscatedReader { @@ -89,6 +90,28 @@ impl FileSystem for ObfuscatedFileSystem { type Handle = ::Handle; type Reader = ObfuscatedReader; type Writer = ObfuscatedWriter; + type MultiReadContext = ::MultiReadContext; + + fn multi_read( + &self, + ctx: &mut Self::MultiReadContext, + handle: Arc, + block: &FileBlockHandle, + ) -> IoResult<()> { + self.inner.multi_read(ctx, handle, block) + } + + fn async_finish(&self, ctx: Self::MultiReadContext) -> IoResult>> { + let mut base = self.inner.async_finish(ctx).unwrap(); + + for v in base.iter_mut() { + for c in v.iter_mut() { + // do obfuscation. + *c = c.wrapping_sub(1); + } + } + Ok(base) + } fn create>(&self, path: P) -> IoResult { let r = self.inner.create(path); @@ -127,4 +150,8 @@ impl FileSystem for ObfuscatedFileSystem { fn new_writer(&self, handle: Arc) -> IoResult { Ok(ObfuscatedWriter(self.inner.new_writer(handle)?)) } + + fn new_async_io_context(&self) -> IoResult { + self.inner.new_async_io_context() + } } diff --git a/src/file_pipe_log/log_file.rs b/src/file_pipe_log/log_file.rs index a2a6f53d..638ef2ec 100644 --- a/src/file_pipe_log/log_file.rs +++ b/src/file_pipe_log/log_file.rs @@ -172,7 +172,7 @@ impl LogFileReader { } pub fn read(&mut self, handle: FileBlockHandle) -> Result> { - let mut buf = vec![0; handle.len as usize]; + let mut buf = vec![0; handle.len]; let size = self.read_to(handle.offset, &mut buf)?; buf.truncate(size); Ok(buf) diff --git a/src/file_pipe_log/pipe.rs b/src/file_pipe_log/pipe.rs index e05fa3e4..e23c6d77 100644 --- a/src/file_pipe_log/pipe.rs +++ b/src/file_pipe_log/pipe.rs @@ -13,6 +13,7 @@ use parking_lot::{Mutex, MutexGuard, RwLock}; use crate::config::Config; use crate::env::FileSystem; use crate::event_listener::EventListener; +use crate::memtable::EntryIndex; use crate::metrics::*; use crate::pipe_log::{ FileBlockHandle, FileId, FileSeq, LogFileContext, LogQueue, PipeLog, ReactiveBytes, @@ -254,6 +255,15 @@ impl SinglePipe { reader.read(handle) } + fn async_read(&self, ctx: &mut F::MultiReadContext, blocks: Vec) { + for block in blocks.iter() { + let fd = self.get_fd(block.id.seq).unwrap(); + self.file_system + .multi_read(ctx, fd, block) + .expect("Async read failed."); + } + } + fn append(&self, bytes: &mut T) -> Result { fail_point!("file_pipe_log::append"); let mut writable_file = self.writable_file.lock(); @@ -444,6 +454,17 @@ impl PipeLog for DualPipes { self.pipes[handle.id.queue as usize].read_bytes(handle) } + #[inline] + fn async_read_bytes(&self, blocks: Vec) -> Result>> { + let fs = &self.pipes[LogQueue::Append as usize].file_system; + let mut ctx = fs.new_async_io_context()?; + + self.pipes[LogQueue::Append as usize].async_read(&mut ctx, blocks); + let res = fs.async_finish(ctx)?; + + Ok(res) + } + #[inline] fn append( &self, diff --git a/src/pipe_log.rs b/src/pipe_log.rs index b5c87c8a..03994b17 100644 --- a/src/pipe_log.rs +++ b/src/pipe_log.rs @@ -5,6 +5,7 @@ use std::cmp::Ordering; use std::fmt::{self, Display}; +use crate::memtable::EntryIndex; use fail::fail_point; use num_derive::{FromPrimitive, ToPrimitive}; use num_traits::ToPrimitive; @@ -172,6 +173,9 @@ pub trait PipeLog: Sized { /// Reads some bytes from the specified position. fn read_bytes(&self, handle: FileBlockHandle) -> Result>; + /// Reads bytes from multi blocks using 'Async IO'. + fn async_read_bytes(&self, blocks: Vec) -> Result>>; + /// Appends some bytes to the specified log queue. Returns file position of /// the written bytes. fn append(