Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: LongList read from file #16994

Merged
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
38bb963
fix: LongListHeap read from file
thenswan Dec 9, 2024
b246ad7
refactor: init the list from the given file channel
thenswan Dec 10, 2024
7089bb1
Merge branch 'develop' into 16652-longlistheap-snapshots-are-broken-i…
thenswan Dec 11, 2024
089bafd
chore: improve LongList tests and additional refactoring
thenswan Dec 11, 2024
76780e2
chore: address comments
thenswan Dec 12, 2024
1ee9228
chore: improve javadoc
thenswan Dec 12, 2024
41524c8
chore: address comments
thenswan Dec 18, 2024
25e9558
WIP tests refactoring
thenswan Dec 20, 2024
a96465b
fix bug (reading from empty ll) and consolidate tests in AbstractLong…
thenswan Jan 8, 2025
997b6c2
tests: wip
thenswan Jan 10, 2025
ef90652
tests: improve comments
thenswan Jan 10, 2025
3f0a01f
tests: improve names
thenswan Jan 10, 2025
dc47520
refactor: update factories var names
thenswan Jan 13, 2025
4d8f004
refactor: update test names
thenswan Jan 13, 2025
f6eebbf
test: directMemoryUsedAtStart validation update
thenswan Jan 13, 2025
d454e59
test: pick up testReallocateThreadLocalBufferWhenMemoryChunkSizeChang…
thenswan Jan 13, 2025
75dd0b7
Merge remote-tracking branch 'origin/main' into 16652-longlistheap-sn…
thenswan Jan 13, 2025
e0c9709
spotless
thenswan Jan 13, 2025
8dac428
test: update AbstractLongListTest#writeReadRangeElement()
thenswan Jan 13, 2025
e048540
Merge branch 'main' into 16652-longlistheap-snapshots-are-broken-if-i…
thenswan Jan 13, 2025
69d81e3
chore: address some of PR comments
thenswan Jan 14, 2025
18426e3
chores
thenswan Jan 15, 2025
e3a2346
address PR comments
thenswan Jan 15, 2025
4245f49
Merge branch 'main' into 16652-longlistheap-snapshots-are-broken-if-i…
thenswan Jan 15, 2025
52e07c0
test: update test method names
thenswan Jan 16, 2025
6ff3250
refactor: AbstractLongList#setChunk method
thenswan Jan 16, 2025
e7bf46b
fix: read empty long list with set valid range
thenswan Jan 17, 2025
a6f676b
Merge branch 'main' into 16652-longlistheap-snapshots-are-broken-if-i…
thenswan Jan 17, 2025
0e99cf6
spotless
thenswan Jan 17, 2025
397b05d
test: optimized and merged some tests
thenswan Jan 17, 2025
3c269ac
address pr comment
thenswan Jan 17, 2025
5e66c44
chore: LongListHeap small optimization
thenswan Jan 23, 2025
49de0c1
Merge branch 'main' into 16652-longlistheap-snapshots-are-broken-if-i…
thenswan Jan 23, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (C) 2021-2024 Hedera Hashgraph, LLC
* Copyright (C) 2021-2025 Hedera Hashgraph, LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -223,8 +223,13 @@
maxLongs = headerBuffer.getLong();
if (formatVersion >= MIN_VALID_INDEX_SUPPORT_VERSION) {
minValidIndex.set(headerBuffer.getLong());
// "inflating" the size by number of indices that are to the left of the min valid index
size.set(minValidIndex.get() + (fileChannel.size() - currentFileHeaderSize) / Long.BYTES);
if (minValidIndex.get() < 0) {
// Empty list, nothing to read
size.set(0);
} else {
// "inflating" the size by number of indices that are to the left of the min valid index
size.set(minValidIndex.get() + (fileChannel.size() - currentFileHeaderSize) / Long.BYTES);
}
} else {
minValidIndex.set(0);
size.set((fileChannel.size() - FILE_HEADER_SIZE_V1) / Long.BYTES);
Expand All @@ -239,13 +244,96 @@
/**
* Initializes the list from the given file channel. At the moment of the call all the class metadata
* is already initialized from the file header.
*
* @param sourceFileName the name of the file from which the list is initialized
* @param fileChannel the file channel to read the list body from
* @throws IOException if there was a problem reading the file
*/
protected abstract void readBodyFromFileChannelOnInit(String sourceFileName, FileChannel fileChannel)
protected void readBodyFromFileChannelOnInit(String sourceFileName, FileChannel fileChannel) throws IOException {
if (minValidIndex.get() < 0) {
// Empty list, nothing to read
return;
}

final int firstChunkIndex = toIntExact(minValidIndex.get() / numLongsPerChunk);
final int lastChunkIndex = toIntExact(maxValidIndex.get() / numLongsPerChunk);
final int minValidIndexInChunk = toIntExact(minValidIndex.get() % numLongsPerChunk);
final int maxValidIndexInChunk = toIntExact(maxValidIndex.get() % numLongsPerChunk);

for (int chunkIndex = firstChunkIndex; chunkIndex <= lastChunkIndex; chunkIndex++) {
final int startIndexInChunk = (chunkIndex == firstChunkIndex) ? minValidIndexInChunk : 0;
final int endIndexInChunk = (chunkIndex == lastChunkIndex) ? (maxValidIndexInChunk + 1) : numLongsPerChunk;

C chunk = readChunkData(fileChannel, chunkIndex, startIndexInChunk, endIndexInChunk);
setChunk(chunkIndex, chunk, startIndexInChunk, endIndexInChunk);
}
}

/**
* Reads data from the specified {@code fileChannel} and stores it into a chunk.
* The data is read from the specified range within the chunk.
* Subclasses must implement this method to read data from the provided {@code fileChannel}.
*
* @param fileChannel the file channel to read from
* @param chunkIndex the index of the chunk to store the read data
* @param startIndex the starting index (inclusive) within the chunk
* @param endIndex the ending index (exclusive) within the chunk
* @return a chunk (byte buffer, array or long that represents an offset of the chunk)
* @throws IOException if there is an error reading the file
*/
protected abstract C readChunkData(FileChannel fileChannel, int chunkIndex, int startIndex, int endIndex)
throws IOException;

/**
* Stores the specified chunk at the given {@code chunkIndex}. Subclasses may override
* this method to perform additional actions (e.g., persisting the chunk to a file).
*
* @param chunkIndex the index where the chunk is to be stored
* @param chunk the chunk to store
* @param startIndex the starting index (inclusive) within the chunk to write data
* @param endIndex the ending index (exclusive) within the chunk
* @throws IOException if an I/O error occurs, when overridden by a subclass
*/
protected void setChunk(int chunkIndex, C chunk, int startIndex, int endIndex) throws IOException {
chunkList.set(chunkIndex, chunk);
}

/**
* Reads a specified range of elements from a file channel into the given buffer.
* <p>
* This method computes the appropriate byte offsets within the buffer and the number of bytes
* to read based on the provided {@code startIndex} and {@code endIndex}. It then performs a
* complete read of that data from the file channel into the buffer.
*
* @param fileChannel the file channel to read data from
* @param chunkIndex the index of the chunk being read
* @param startIndex the starting index (inclusive) within the chunk of the first element to read
* @param endIndex the ending index (exclusive) within the chunk of the last element to read
* @param buffer the buffer into which data will be read
* @throws IOException if an error occurs while reading from the file,
* or if the number of bytes read does not match the expected size
*/
protected static void readDataIntoBuffer(
final FileChannel fileChannel,
final int chunkIndex,
final int startIndex,
final int endIndex,
final ByteBuffer buffer)
throws IOException {
final int startOffset = startIndex * Long.BYTES;
final int endOffset = endIndex * Long.BYTES;

buffer.position(startOffset);
buffer.limit(endOffset);

final int bytesToRead = endOffset - startOffset;
final long bytesRead = MerkleDbFileUtils.completelyRead(fileChannel, buffer);
if (bytesRead != bytesToRead) {
throw new IOException("Failed to read chunks, chunkIndex=" + chunkIndex + " expected=" + bytesToRead

Check warning on line 332 in platform-sdk/swirlds-merkledb/src/main/java/com/swirlds/merkledb/collections/AbstractLongList.java

View check run for this annotation

Codecov / codecov/patch

platform-sdk/swirlds-merkledb/src/main/java/com/swirlds/merkledb/collections/AbstractLongList.java#L332

Added line #L332 was not covered by tests
+ " actual=" + bytesRead);
}
}

/**
* Called when the list is initialized from an empty or absent source file.
* @param path the path to the source file
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (C) 2021-2024 Hedera Hashgraph, LLC
* Copyright (C) 2021-2025 Hedera Hashgraph, LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -25,7 +25,6 @@
import com.swirlds.merkledb.utilities.MerkleDbFileUtils;
import edu.umd.cs.findbugs.annotations.NonNull;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.io.UncheckedIOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
Expand Down Expand Up @@ -55,7 +54,7 @@

/** This file channel is to work with the temporary file.
*/
private final FileChannel currentFileChannel;
private FileChannel currentFileChannel;

/**
* Path to the temporary file used to store the data.
Expand Down Expand Up @@ -139,8 +138,6 @@
if (tempFile == null) {
throw new IllegalStateException("The temp file is not initialized");
}
currentFileChannel = FileChannel.open(
tempFile, StandardOpenOption.CREATE, StandardOpenOption.READ, StandardOpenOption.WRITE);
}

/**
Expand All @@ -157,57 +154,39 @@
protected void readBodyFromFileChannelOnInit(final String sourceFileName, final FileChannel fileChannel)
throws IOException {
tempFile = createTempFile(sourceFileName, configuration);
artemananiev marked this conversation as resolved.
Show resolved Hide resolved
if (minValidIndex.get() < 0) {
// Nothing to read
return;
}
// create temporary file for writing
try (final RandomAccessFile rf = new RandomAccessFile(tempFile.toFile(), "rw")) {
// ensure that the amount of disk space is enough
// two additional chunks are required to accommodate "compressed" first and last chunks in the original file
rf.setLength(fileChannel.size() + 2L * memoryChunkSize);
final FileChannel tempFileCHannel = rf.getChannel();

final int totalNumberOfChunks = calculateNumberOfChunks(size());
final int firstChunkWithDataIndex = toIntExact(minValidIndex.get() / numLongsPerChunk);
final int minValidIndexInChunk = toIntExact(minValidIndex.get() % numLongsPerChunk);
final int lastChunkWithDataIndex = totalNumberOfChunks - firstChunkWithDataIndex - 1;

// copy the first chunk
final ByteBuffer transferBuffer = initOrGetTransferBuffer();
// we need to make sure that the chunk is written in full.
// If a value is absent, the list element will have IMPERMISSIBLE_VALUE
fillBufferWithZeroes(transferBuffer);
transferBuffer.position(minValidIndexInChunk * Long.BYTES);
MerkleDbFileUtils.completelyRead(fileChannel, transferBuffer);
transferBuffer.flip();
// writing the full chunk, all values before minValidIndexInChunk are zeroes
MerkleDbFileUtils.completelyWrite(tempFileCHannel, transferBuffer, 0);
chunkList.set(firstChunkWithDataIndex, 0L);

// copy everything except for the first chunk and the last chunk
final int numberOfFullChunks = totalNumberOfChunks - firstChunkWithDataIndex - 2;
if (numberOfFullChunks > 0) {
final long bytesToTransfer = (long) numberOfFullChunks * memoryChunkSize;
final long bytesTransferred = MerkleDbFileUtils.completelyTransferFrom(
tempFileCHannel, fileChannel, memoryChunkSize, bytesToTransfer);
if (bytesTransferred != bytesToTransfer) {
throw new IOException("Failed to read long list chunks, expected=" + bytesToTransfer + " actual="
+ bytesTransferred);
}
}

// copy the last chunk
transferBuffer.clear();
MerkleDbFileUtils.completelyRead(fileChannel, transferBuffer);
transferBuffer.flip();
MerkleDbFileUtils.completelyWrite(
tempFileCHannel, transferBuffer, (long) lastChunkWithDataIndex * memoryChunkSize);
currentFileChannel = FileChannel.open(
tempFile, StandardOpenOption.CREATE, StandardOpenOption.READ, StandardOpenOption.WRITE);

for (int i = firstChunkWithDataIndex + 1; i < totalNumberOfChunks; i++) {
chunkList.set(i, (long) (i - firstChunkWithDataIndex) * memoryChunkSize);
}
super.readBodyFromFileChannelOnInit(sourceFileName, fileChannel);
}

/** {@inheritDoc} */
@Override
protected Long readChunkData(FileChannel fileChannel, int chunkIndex, int startIndex, int endIndex)
throws IOException {
final ByteBuffer transferBuffer = initOrGetTransferBuffer();
artemananiev marked this conversation as resolved.
Show resolved Hide resolved
fillBufferWithZeroes(transferBuffer);

readDataIntoBuffer(fileChannel, chunkIndex, startIndex, endIndex, transferBuffer);

final int firstChunkIndex = toIntExact(minValidIndex.get() / numLongsPerChunk);
final long chunk = ((long) (chunkIndex - firstChunkIndex) * memoryChunkSize);

int startOffset = startIndex * Long.BYTES;
int endOffset = endIndex * Long.BYTES;

transferBuffer.position(startOffset);
transferBuffer.limit(endOffset);

int bytesToWrite = endOffset - startOffset;
long bytesWritten = MerkleDbFileUtils.completelyWrite(currentFileChannel, transferBuffer, chunk + startOffset);
if (bytesWritten != bytesToWrite) {
throw new IOException("Failed to write long list (disk) chunks, chunkIndex=" + chunkIndex + " expected="

Check warning on line 185 in platform-sdk/swirlds-merkledb/src/main/java/com/swirlds/merkledb/collections/LongListDisk.java

View check run for this annotation

Codecov / codecov/patch

platform-sdk/swirlds-merkledb/src/main/java/com/swirlds/merkledb/collections/LongListDisk.java#L185

Added line #L185 was not covered by tests
+ bytesToWrite + " actual=" + bytesWritten);
}

return chunk;
}

private void fillBufferWithZeroes(ByteBuffer transferBuffer) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (C) 2021-2024 Hedera Hashgraph, LLC
* Copyright (C) 2021-2025 Hedera Hashgraph, LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -20,6 +20,7 @@
import static java.nio.ByteBuffer.allocateDirect;

import com.swirlds.config.api.Configuration;
import com.swirlds.merkledb.utilities.MemoryUtils;
import com.swirlds.merkledb.utilities.MerkleDbFileUtils;
import edu.umd.cs.findbugs.annotations.NonNull;
import java.io.IOException;
Expand Down Expand Up @@ -51,6 +52,9 @@
@SuppressWarnings("unused")
public final class LongListHeap extends AbstractLongList<AtomicLongArray> {

/** A buffer for reading chunk data from the file only during the initialization. */
private ByteBuffer initReadBuffer;

/** Construct a new LongListHeap with the default number of longs per chunk. */
public LongListHeap() {
this(DEFAULT_NUM_LONGS_PER_CHUNK, DEFAULT_MAX_LONGS_TO_STORE, 0);
Expand Down Expand Up @@ -87,23 +91,34 @@ public LongListHeap(final Path file, final Configuration configuration) throws I

/** {@inheritDoc} */
@Override
protected void readBodyFromFileChannelOnInit(String sourceFileName, FileChannel fileChannel) throws IOException {
// read data
final int numOfArrays = calculateNumberOfChunks(size());
final ByteBuffer buffer = allocateDirect(memoryChunkSize);
buffer.order(ByteOrder.nativeOrder());
for (int i = 0; i < numOfArrays; i++) {
final AtomicLongArray atomicLongArray = new AtomicLongArray(numLongsPerChunk);
buffer.clear();
MerkleDbFileUtils.completelyRead(fileChannel, buffer);
buffer.flip();
int index = 0;
while (buffer.remaining() > 0) {
atomicLongArray.set(index, buffer.getLong());
index++;
}
chunkList.set(i, atomicLongArray);
protected void readBodyFromFileChannelOnInit(final String sourceFileName, final FileChannel fileChannel)
throws IOException {
initReadBuffer = ByteBuffer.allocateDirect(memoryChunkSize).order(ByteOrder.nativeOrder());

super.readBodyFromFileChannelOnInit(sourceFileName, fileChannel);

try {
artemananiev marked this conversation as resolved.
Show resolved Hide resolved
MemoryUtils.closeDirectByteBuffer(initReadBuffer);
} finally {
initReadBuffer = null;
}
}

/** {@inheritDoc} */
@Override
protected AtomicLongArray readChunkData(FileChannel fileChannel, int chunkIndex, int startIndex, int endIndex)
throws IOException {
AtomicLongArray chunk = createChunk();

readDataIntoBuffer(fileChannel, chunkIndex, startIndex, endIndex, initReadBuffer);
initReadBuffer.flip();
thenswan marked this conversation as resolved.
Show resolved Hide resolved

int index = 0;
while (initReadBuffer.hasRemaining()) {
chunk.set(index++, initReadBuffer.getLong());
}

return chunk;
}

/** {@inheritDoc} */
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (C) 2021-2024 Hedera Hashgraph, LLC
* Copyright (C) 2021-2025 Hedera Hashgraph, LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -87,26 +87,11 @@ public LongListOffHeap(final Path file, final Configuration configuration) throw

/** {@inheritDoc} */
@Override
protected void readBodyFromFileChannelOnInit(String sourceFileName, FileChannel fileChannel) throws IOException {
if (minValidIndex.get() < 0) {
// Empty list, nothing to read
return;
}
final int totalNumberOfChunks = calculateNumberOfChunks(size());
final int firstChunkWithDataIndex = toIntExact(minValidIndex.get() / numLongsPerChunk);
final int minValidIndexInChunk = toIntExact(minValidIndex.get() % numLongsPerChunk);
// read the first chunk
final ByteBuffer firstBuffer = createChunk();
firstBuffer.position(minValidIndexInChunk * Long.BYTES).limit(firstBuffer.capacity());
MerkleDbFileUtils.completelyRead(fileChannel, firstBuffer);
chunkList.set(firstChunkWithDataIndex, firstBuffer);
// read the rest of the data
for (int i = firstChunkWithDataIndex + 1; i < totalNumberOfChunks; i++) {
final ByteBuffer directBuffer = createChunk();
MerkleDbFileUtils.completelyRead(fileChannel, directBuffer);
directBuffer.position(0);
chunkList.set(i, directBuffer);
}
protected ByteBuffer readChunkData(FileChannel fileChannel, int chunkIndex, int startIndex, int endIndex)
throws IOException {
ByteBuffer chunk = createChunk();
readDataIntoBuffer(fileChannel, chunkIndex, startIndex, endIndex, chunk);
return chunk;
}

/** {@inheritDoc} */
Expand Down
Loading
Loading