From 0715d9fa955374c333c1213f1f82bb6b8540da4f Mon Sep 17 00:00:00 2001 From: Konstantin Knizhnik Date: Sat, 23 Apr 2022 08:52:15 +0300 Subject: [PATCH] Avoid redundand memory allocation and sycnhronization in walredo (#144) * Avoid redundand memory allocation and sycnhronization in walredo * Address review comments * Reduce number of temp buffers and size of inmem file storage for wal redo postgres * Misc cleanup Add comments on 'inmem_smgr.c', remove superfluous copy-pasted comments, pgindent. Co-authored-by: Heikki Linnakangas --- contrib/zenith/inmem_smgr.c | 188 +++++++++++----------------- src/backend/storage/buffer/bufmgr.c | 7 +- src/backend/tcop/zenith_wal_redo.c | 78 +++++------- src/include/miscadmin.h | 3 + 4 files changed, 112 insertions(+), 164 deletions(-) diff --git a/contrib/zenith/inmem_smgr.c b/contrib/zenith/inmem_smgr.c index 6ad1e65b04a..bdd58731f3c 100644 --- a/contrib/zenith/inmem_smgr.c +++ b/contrib/zenith/inmem_smgr.c @@ -2,36 +2,52 @@ * * inmem_smgr.c * + * This is an implementation of the SMGR interface, used in the WAL redo + * process (see src/backend/tcop/zenith_wal_redo.c). It has no persistent + * storage, the pages that are written out are kept in a small number of + * in-memory buffers. + * + * Normally, replaying a WAL record only needs to access a handful of + * buffers, which fit in the normal buffer cache, so this is just for + * "overflow" storage when the buffer cache is not large enough. + * + * * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * * IDENTIFICATION * contrib/zenith/inmem_smgr.c - * - * TODO cleanup obsolete copy-pasted comments *------------------------------------------------------------------------- */ #include "postgres.h" + +#include "pagestore_client.h" #include "storage/block.h" +#include "storage/buf_internals.h" #include "storage/relfilenode.h" -#include "pagestore_client.h" -#include "utils/hsearch.h" -#include "access/xlog.h" +#include "storage/smgr.h" -typedef struct -{ - RelFileNode node; - ForkNumber forknum; - BlockNumber blkno; -} WrNodeKey; +#define MAX_PAGES 128 -typedef struct -{ - WrNodeKey tag; - char data[BLCKSZ]; -} WrNode; +static BufferTag page_tag[MAX_PAGES]; +static char page_body[MAX_PAGES][BLCKSZ]; +static int used_pages; -HTAB *inmem_files; +static int +locate_page(SMgrRelation reln, ForkNumber forknum, BlockNumber blkno) +{ + /* We only hold a small number of pages, so linear search */ + for (int i = 0; i < used_pages; i++) + { + if (RelFileNodeEquals(reln->smgr_rnode.node, page_tag[i].rnode) + && forknum == page_tag[i].forkNum + && blkno == page_tag[i].blockNum) + { + return i; + } + } + return -1; +} /* * inmem_init() -- Initialize private state @@ -39,18 +55,7 @@ HTAB *inmem_files; void inmem_init(void) { - HASHCTL hashCtl; - - hashCtl.keysize = sizeof(WrNodeKey); - hashCtl.entrysize = sizeof(WrNode); - - if (inmem_files) - hash_destroy(inmem_files); - - inmem_files = hash_create("wal-redo files map", - 1024, - &hashCtl, - HASH_ELEM | HASH_BLOBS); + used_pages = 0; } /* @@ -59,15 +64,15 @@ inmem_init(void) bool inmem_exists(SMgrRelation reln, ForkNumber forknum) { - WrNodeKey key; - - key.node = reln->smgr_rnode.node; - key.forknum = forknum; - key.blkno = 0; - return hash_search(inmem_files, - &key, - HASH_FIND, - NULL) != NULL; + for (int i = 0; i < used_pages; i++) + { + if (RelFileNodeEquals(reln->smgr_rnode.node, page_tag[i].rnode) + && forknum == page_tag[i].forkNum) + { + return true; + } + } + return false; } /* @@ -82,21 +87,6 @@ inmem_create(SMgrRelation reln, ForkNumber forknum, bool isRedo) /* * inmem_unlink() -- Unlink a relation. - * - * Note that we're passed a RelFileNodeBackend --- by the time this is called, - * there won't be an SMgrRelation hashtable entry anymore. - * - * forknum can be a fork number to delete a specific fork, or InvalidForkNumber - * to delete all forks. - * - * - * If isRedo is true, it's unsurprising for the relation to be already gone. - * Also, we should remove the file immediately instead of queuing a request - * for later, since during redo there's no possibility of creating a - * conflicting relation. - * - * Note: any failure should be reported as WARNING not ERROR, because - * we are usually not in a transaction anymore when this is called. */ void inmem_unlink(RelFileNodeBackend rnode, ForkNumber forknum, bool isRedo) @@ -116,17 +106,8 @@ void inmem_extend(SMgrRelation reln, ForkNumber forknum, BlockNumber blkno, char *buffer, bool skipFsync) { - WrNodeKey key; - WrNode *node; - - key.node = reln->smgr_rnode.node; - key.forknum = forknum; - key.blkno = blkno; - node = hash_search(inmem_files, - &key, - HASH_ENTER, - NULL); - memcpy(node->data, buffer, BLCKSZ); + /* same as smgwrite() for us */ + inmem_write(reln, forknum, blkno, buffer, skipFsync); } /* @@ -156,9 +137,6 @@ inmem_prefetch(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum) /* * inmem_writeback() -- Tell the kernel to write pages back to storage. - * - * This accepts a range of blocks because flushing several pages at once is - * considerably more efficient than doing so individually. */ void inmem_writeback(SMgrRelation reln, ForkNumber forknum, @@ -173,20 +151,13 @@ void inmem_read(SMgrRelation reln, ForkNumber forknum, BlockNumber blkno, char *buffer) { - WrNodeKey key; - WrNode *node; + int pg; - key.node = reln->smgr_rnode.node; - key.forknum = forknum; - key.blkno = blkno; - node = hash_search(inmem_files, - &key, - HASH_FIND, - NULL); - if (node != NULL) - memcpy(buffer, node->data, BLCKSZ); - else + pg = locate_page(reln, forknum, blkno); + if (pg < 0) memset(buffer, 0, BLCKSZ); + else + memcpy(buffer, page_body[pg], BLCKSZ); } /* @@ -200,17 +171,19 @@ void inmem_write(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, char *buffer, bool skipFsync) { - WrNodeKey key; - WrNode *node; + int pg; + + pg = locate_page(reln, forknum, blocknum); + if (pg < 0) + { + if (used_pages == MAX_PAGES) + elog(ERROR, "Inmem storage overflow"); - key.node = reln->smgr_rnode.node; - key.forknum = forknum; - key.blkno = blocknum; - node = hash_search(inmem_files, - &key, - HASH_ENTER, - NULL); - memcpy(node->data, buffer, BLCKSZ); + pg = used_pages; + used_pages++; + INIT_BUFFERTAG(page_tag[pg], reln->smgr_rnode.node, forknum, blocknum); + } + memcpy(page_body[pg], buffer, BLCKSZ); } /* @@ -219,23 +192,18 @@ inmem_write(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, BlockNumber inmem_nblocks(SMgrRelation reln, ForkNumber forknum) { - WrNodeKey key; - WrNode *node; - - key.node = reln->smgr_rnode.node; - key.forknum = forknum; - key.blkno = 0; + int nblocks = 0; - while (true) + for (int i = 0; i < used_pages; i++) { - node = hash_search(inmem_files, - &key, - HASH_FIND, - NULL); - if (node == NULL) - return key.blkno; - key.blkno += 1; + if (RelFileNodeEquals(reln->smgr_rnode.node, page_tag[i].rnode) + && forknum == page_tag[i].forkNum) + { + if (page_tag[i].blockNum >= nblocks) + nblocks = page_tag[i].blockNum + 1; + } } + return nblocks; } /* @@ -248,19 +216,12 @@ inmem_truncate(SMgrRelation reln, ForkNumber forknum, BlockNumber nblocks) /* * inmem_immedsync() -- Immediately sync a relation to stable storage. - * - * Note that only writes already issued are synced; this routine knows - * nothing of dirty buffers that may exist inside the buffer manager. We - * sync active and inactive segments; smgrDoPendingSyncs() relies on this. - * Consider a relation skipping WAL. Suppose a checkpoint syncs blocks of - * some segment, then mdtruncate() renders that segment inactive. If we - * crash before the next checkpoint syncs the newly-inactive segment, that - * segment may survive recovery, reintroducing unwanted data into the table. */ void inmem_immedsync(SMgrRelation reln, ForkNumber forknum) { } + static const struct f_smgr inmem_smgr = { .smgr_init = inmem_init, @@ -283,12 +244,11 @@ static const struct f_smgr inmem_smgr = const f_smgr * smgr_inmem(BackendId backend, RelFileNode rnode) { - if (backend != InvalidBackendId && !InRecovery) + Assert(InRecovery); + if (backend != InvalidBackendId) return smgr_standard(backend, rnode); else - { return &inmem_smgr; - } } void diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c index b96e033e53c..27eb4f28ca5 100644 --- a/src/backend/storage/buffer/bufmgr.c +++ b/src/backend/storage/buffer/bufmgr.c @@ -804,7 +804,6 @@ ReadBufferWithoutRelcache(RelFileNode rnode, ForkNumber forkNum, mode, strategy, &hit); } - /* * ReadBuffer_common -- common logic for all ReadBuffer variants * @@ -819,7 +818,11 @@ ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum, Block bufBlock; bool found; bool isExtend; - bool isLocalBuf = SmgrIsTemp(smgr); + /* + * wal_redo postgres is working in single user mode, we do not need to synchronize access to shared buffer, + * so let's use local buffers instead + */ + bool isLocalBuf = SmgrIsTemp(smgr) || am_wal_redo_postgres; *hit = false; diff --git a/src/backend/tcop/zenith_wal_redo.c b/src/backend/tcop/zenith_wal_redo.c index 0ddd2ddec24..16298ea7f4f 100644 --- a/src/backend/tcop/zenith_wal_redo.c +++ b/src/backend/tcop/zenith_wal_redo.c @@ -99,6 +99,10 @@ static ssize_t buffered_read(void *buf, size_t count); static BufferTag target_redo_tag; +bool am_wal_redo_postgres; + +static XLogReaderState *reader_state; + #define TRACE DEBUG5 #ifdef HAVE_LIBSECCOMP @@ -166,12 +170,20 @@ WalRedoMain(int argc, char *argv[], InitStandaloneProcess(argv[0]); SetProcessingMode(InitProcessing); + am_wal_redo_postgres = true; /* * Set default values for command-line options. */ InitializeGUCOptions(); + /* + * WAL redo does not need a large number of buffers. And speed of + * DropRelFileNodeAllLocalBuffers() is proportional to the number of + * buffers. So let's keep it small (default value is 1024) + */ + num_temp_buffers = 4; + /* * Parse command-line options. * TODO @@ -293,6 +305,7 @@ WalRedoMain(int argc, char *argv[], if (RmgrTable[rmid].rm_startup != NULL) RmgrTable[rmid].rm_startup(); } + reader_state = XLogReaderAllocate(wal_segment_size, NULL, XL_ROUTINE(), NULL); #ifdef HAVE_LIBSECCOMP /* We prefer opt-out to opt-in for greater security */ @@ -313,16 +326,13 @@ WalRedoMain(int argc, char *argv[], /* * Main processing loop */ + MemoryContextSwitchTo(MessageContext); + initStringInfo(&input_message); + for (;;) { - /* - * Release storage left over from prior query cycle, and create a new - * query input buffer in the cleared MessageContext. - */ - MemoryContextSwitchTo(MessageContext); - MemoryContextResetAndDeleteChildren(MessageContext); - - initStringInfo(&input_message); + /* Release memory left over from prior query cycle. */ + resetStringInfo(&input_message); set_ps_display("idle"); @@ -330,7 +340,6 @@ WalRedoMain(int argc, char *argv[], * (3) read a command (loop blocks here) */ firstchar = ReadRedoCommand(&input_message); - switch (firstchar) { case 'B': /* BeginRedoForBlock */ @@ -406,23 +415,6 @@ pprint_buffer(char *data, int len) return s.data; } -static char * -pprint_tag(BufferTag *tag) -{ - StringInfoData s; - - initStringInfo(&s); - - appendStringInfo(&s, "%u/%u/%u.%d blk %u", - tag->rnode.spcNode, - tag->rnode.dbNode, - tag->rnode.relNode, - tag->forkNum, - tag->blockNum - ); - - return s.data; -} /* ---------------------------------------------------------------- * routines to obtain user input * ---------------------------------------------------------------- @@ -492,7 +484,6 @@ ReadRedoCommand(StringInfo inBuf) return qtype; } - /* * Prepare for WAL replay on given block */ @@ -502,7 +493,6 @@ BeginRedoForBlock(StringInfo input_message) RelFileNode rnode; ForkNumber forknum; BlockNumber blknum; - MemoryContext oldcxt; SMgrRelation reln; /* @@ -520,16 +510,14 @@ BeginRedoForBlock(StringInfo input_message) rnode.relNode = pq_getmsgint(input_message, 4); blknum = pq_getmsgint(input_message, 4); - oldcxt = MemoryContextSwitchTo(TopMemoryContext); INIT_BUFFERTAG(target_redo_tag, rnode, forknum, blknum); - { - char* buf = pprint_tag(&target_redo_tag); - elog(TRACE, "BeginRedoForBlock %s", buf); - pfree(buf); - } - - MemoryContextSwitchTo(oldcxt); + elog(TRACE, "BeginRedoForBlock %u/%u/%u.%d blk %u", + target_redo_tag.rnode.spcNode, + target_redo_tag.rnode.dbNode, + target_redo_tag.rnode.relNode, + target_redo_tag.forkNum, + target_redo_tag.blockNum); reln = smgropen(rnode, InvalidBackendId, RELPERSISTENCE_PERMANENT); if (reln->smgr_cached_nblocks[forknum] == InvalidBlockNumber || @@ -589,7 +577,6 @@ ApplyRecord(StringInfo input_message) XLogRecPtr lsn; XLogRecord *record; int nleft; - XLogReaderState reader_state; /* * message format: @@ -607,20 +594,15 @@ ApplyRecord(StringInfo input_message) elog(ERROR, "mismatch between record (%d) and message size (%d)", record->xl_tot_len, (int) sizeof(XLogRecord) + nleft); - /* FIXME: use XLogReaderAllocate() */ - memset(&reader_state, 0, sizeof(XLogReaderState)); - reader_state.ReadRecPtr = 0; /* no 'prev' record */ - reader_state.EndRecPtr = lsn; /* this record */ - reader_state.decoded_record = record; - reader_state.errormsg_buf = palloc(1000 + 1); /* MAX_ERRORMSG_LEN */ - - if (!DecodeXLogRecord(&reader_state, record, &errormsg)) + XLogBeginRead(reader_state, lsn); + reader_state->decoded_record = record; + if (!DecodeXLogRecord(reader_state, record, &errormsg)) elog(ERROR, "failed to decode WAL record: %s", errormsg); /* Ignore any other blocks than the ones the caller is interested in */ redo_read_buffer_filter = redo_block_filter; - RmgrTable[record->xl_rmid].rm_redo(&reader_state); + RmgrTable[record->xl_rmid].rm_redo(reader_state); redo_read_buffer_filter = NULL; @@ -701,8 +683,8 @@ GetPage(StringInfo input_message) } while (tot_written < BLCKSZ); ReleaseBuffer(buf); - DropDatabaseBuffers(rnode.dbNode); - smgrinit(); //reset inmem smgr state + DropRelFileNodeAllLocalBuffers(rnode); + smgrinit(); /* reset inmem smgr state */ elog(TRACE, "Page sent back for block %u", blknum); } diff --git a/src/include/miscadmin.h b/src/include/miscadmin.h index 3f155ce4f84..72bd0a7ebd4 100644 --- a/src/include/miscadmin.h +++ b/src/include/miscadmin.h @@ -489,4 +489,7 @@ extern void CancelBackup(void); extern size_t get_hash_memory_limit(void); extern int get_hash_mem(void); +/* in src/backend/tcop/zenith_wal_redo.c */ +extern bool am_wal_redo_postgres; + #endif /* MISCADMIN_H */