Skip to content

Commit

Permalink
Merge pull request bisq-network#7032 from alvasw/persistence
Browse files Browse the repository at this point in the history
Implement AtomicFileWriter
  • Loading branch information
alejandrogarcia83 authored Feb 11, 2024
2 parents c5a73c0 + 7f06107 commit 2b4fb78
Show file tree
Hide file tree
Showing 7 changed files with 190 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,19 @@
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousFileChannel;
import java.nio.channels.CompletionHandler;
import java.nio.file.Path;

import java.util.concurrent.CompletableFuture;

import lombok.Getter;

public class AsyncFileChannelWriter implements AsyncFileWriter {
@Getter
private final Path filePath;
private final AsynchronousFileChannel fileChannel;

public AsyncFileChannelWriter(AsynchronousFileChannel fileChannel) {
public AsyncFileChannelWriter(Path filePath, AsynchronousFileChannel fileChannel) {
this.filePath = filePath;
this.fileChannel = fileChannel;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,12 @@

package bisq.persistence;

import java.nio.file.Path;

import java.util.concurrent.CompletableFuture;

public interface AsyncFileWriter {
CompletableFuture<Integer> write(byte[] data, int offset);

Path getFilePath();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* This file is part of Bisq.
*
* Bisq is free software: you can redistribute it and/or modify it
* under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or (at
* your option) any later version.
*
* Bisq is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
* License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with Bisq. If not, see <http://www.gnu.org/licenses/>.
*/

package bisq.persistence;

public class AtomicFileWriteFailedException extends RuntimeException {
public AtomicFileWriteFailedException(String message) {
super(message);
}
}
64 changes: 64 additions & 0 deletions persistence/src/main/java/bisq/persistence/AtomicFileWriter.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* This file is part of Bisq.
*
* Bisq is free software: you can redistribute it and/or modify it
* under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or (at
* your option) any later version.
*
* Bisq is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
* License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with Bisq. If not, see <http://www.gnu.org/licenses/>.
*/

package bisq.persistence;

import java.nio.file.Path;

import java.io.File;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import lombok.extern.slf4j.Slf4j;

@Slf4j
public class AtomicFileWriter {
private final PersistenceFileWriter rollingFileWriter;
private File activeFile;
private File rollingFile;


public AtomicFileWriter(Path destinationPath,
PersistenceFileWriter rollingFileWriter) {
this.rollingFileWriter = rollingFileWriter;
activeFile = destinationPath.toFile();
rollingFile = rollingFileWriter.getFilePath().toFile();
}

public synchronized void write(byte[] data) {
try {
CountDownLatch countDownLatch = rollingFileWriter.write(data);
boolean isSuccess = countDownLatch.await(45, TimeUnit.SECONDS);
if (!isSuccess) {
throw new AtomicFileWriteFailedException("Async atomic file write timeout triggered after 45 seconds.");
}

isSuccess = rollingFile.renameTo(activeFile);
if (!isSuccess) {
throw new AtomicFileWriteFailedException("Couldn't rename rolling file to active file.");
}

File tmpFile = activeFile;
activeFile = rollingFile;
rollingFile = tmpFile;

} catch (InterruptedException e) {
log.error("AtomicFileWriter got interrupted during write.", e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package bisq.persistence;

import java.nio.file.Path;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.function.Consumer;
Expand All @@ -36,6 +38,10 @@ public CountDownLatch write(byte[] data) {
return writeFinished;
}

public Path getFilePath() {
return asyncWriter.getFilePath();
}

private void scheduleAsyncWrite(byte[] data, int offset, int size, CountDownLatch writeFinished) {
asyncWriter.write(data, offset)
.thenAcceptAsync(writeUntilEndAsync(data, offset, size, writeFinished), writeRequestScheduler);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public class AsyncFileChannelWriterTests {
void setup(@TempDir Path tempDir) throws IOException {
filePath = tempDir.resolve("file");
fileChannel = AsynchronousFileChannel.open(filePath, StandardOpenOption.CREATE_NEW, StandardOpenOption.WRITE);
asyncFileChannelWriter = new AsyncFileChannelWriter(fileChannel);
asyncFileChannelWriter = new AsyncFileChannelWriter(filePath, fileChannel);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* This file is part of Bisq.
*
* Bisq is free software: you can redistribute it and/or modify it
* under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or (at
* your option) any later version.
*
* Bisq is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
* License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with Bisq. If not, see <http://www.gnu.org/licenses/>.
*/

package bisq.persistence;

import java.nio.charset.StandardCharsets;
import java.nio.file.Path;

import java.io.File;

import java.util.concurrent.CountDownLatch;

import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.io.TempDir;

import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;

@ExtendWith(MockitoExtension.class)
public class AtomicFileWriterTests {
private static final byte[] DATA = "Hello World!".getBytes(StandardCharsets.UTF_8);
@Mock
private PersistenceFileWriter persistenceFileWriter;
@Mock
private File rollingFile = mock(File.class);
@Mock
private CountDownLatch countDownLatch;
private AtomicFileWriter atomicFileWriter;

@BeforeEach
void setup(@TempDir Path tempDir, @Mock Path rollingFilePath) {
doReturn(countDownLatch).when(persistenceFileWriter).write(any());
doReturn(rollingFile).when(rollingFilePath).toFile();
doReturn(rollingFilePath).when(persistenceFileWriter).getFilePath();

var file = tempDir.resolve("my_file");
atomicFileWriter = new AtomicFileWriter(file, persistenceFileWriter);
}

@Test
void triggerFileWriteTimeout() throws InterruptedException {
doReturn(false).when(countDownLatch).await(anyLong(), any());
assertThrows(AtomicFileWriteFailedException.class,
() -> atomicFileWriter.write(DATA));
}

@Test
void renameFailure() throws InterruptedException {
doReturn(true).when(countDownLatch).await(anyLong(), any());
doReturn(false).when(rollingFile).renameTo(any());

assertThrows(AtomicFileWriteFailedException.class,
() -> atomicFileWriter.write(DATA));
}

@Test
void write() throws InterruptedException {
doReturn(true).when(countDownLatch).await(anyLong(), any());
doReturn(true).when(rollingFile).renameTo(any());
atomicFileWriter.write(DATA);
}
}

0 comments on commit 2b4fb78

Please sign in to comment.