Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add InlinePcesWriter #16000

Merged
merged 7 commits into from
Oct 30, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -116,16 +116,6 @@
*/
private final double spanOverlapFactor;

/**
* The highest event sequence number that has been written to the stream (but possibly not yet flushed).
*/
private long lastWrittenEvent = -1;

/**
* The highest event sequence number that has been durably flushed to disk.
*/
private long lastFlushedEvent = -1;

/**
* If true then all added events are new and need to be written to the stream. If false then all added events are
* already durable and do not need to be written to the stream.
Expand Down Expand Up @@ -168,7 +158,7 @@
averageSpanUtilization = new LongRunningAverage(pcesConfig.spanUtilizationRunningAverageLength());

fileType = eventConfig.useBirthRoundAncientThreshold()
? AncientMode.BIRTH_ROUND_THRESHOLD

Check warning on line 161 in platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/event/preconsensus/CommonPcesWriter.java

View check run for this annotation

Codecov / codecov/patch

platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/event/preconsensus/CommonPcesWriter.java#L161

Added line #L161 was not covered by tests
: AncientMode.GENERATION_THRESHOLD;
}

Expand All @@ -188,19 +178,22 @@
* Inform the preconsensus event writer that a discontinuity has occurred in the preconsensus event stream.
*
* @param newOriginRound the round of the state that the new stream will be starting from
* @return {@code true} if this method call resulted in the current file being closed
*/
public void registerDiscontinuity(@NonNull final Long newOriginRound) {
public boolean registerDiscontinuity(@NonNull final Long newOriginRound) {
if (!streamingNewEvents) {
logger.error(EXCEPTION.getMarker(), "registerDiscontinuity() called while replaying events");

Check warning on line 185 in platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/event/preconsensus/CommonPcesWriter.java

View check run for this annotation

Codecov / codecov/patch

platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/event/preconsensus/CommonPcesWriter.java#L185

Added line #L185 was not covered by tests
}

try {
if (currentMutableFile != null) {
closeFile();
return true;
}
} finally {
fileManager.registerDiscontinuity(newOriginRound);
}
return false;

Check warning on line 196 in platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/event/preconsensus/CommonPcesWriter.java

View check run for this annotation

Codecov / codecov/patch

platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/event/preconsensus/CommonPcesWriter.java#L196

Added line #L196 was not covered by tests
}

/**
Expand All @@ -211,7 +204,7 @@
*/
public void updateNonAncientEventBoundary(@NonNull final EventWindow nonAncientBoundary) {
if (nonAncientBoundary.getAncientThreshold() < this.nonAncientBoundary) {
throw new IllegalArgumentException("Non-ancient boundary cannot be decreased. Current = "

Check warning on line 207 in platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/event/preconsensus/CommonPcesWriter.java

View check run for this annotation

Codecov / codecov/patch

platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/event/preconsensus/CommonPcesWriter.java#L207

Added line #L207 was not covered by tests
+ this.nonAncientBoundary + ", requested = " + nonAncientBoundary);
}

Expand All @@ -237,10 +230,9 @@
try {
previousSpan = currentMutableFile.getUtilizedSpan();
if (!bootstrapMode) {
averageSpanUtilization.add(previousSpan);

Check warning on line 233 in platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/event/preconsensus/CommonPcesWriter.java

View check run for this annotation

Codecov / codecov/patch

platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/event/preconsensus/CommonPcesWriter.java#L233

Added line #L233 was not covered by tests
}
currentMutableFile.close();
lastFlushedEvent = lastWrittenEvent;

fileManager.finishedWritingFile(currentMutableFile);
currentMutableFile = null;
Expand All @@ -248,8 +240,8 @@
// Not strictly required here, but not a bad place to ensure we delete
// files incrementally (as opposed to deleting a bunch of files all at once).
pruneOldFiles();
} catch (final IOException e) {
throw new UncheckedIOException("unable to prune files", e);

Check warning on line 244 in platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/event/preconsensus/CommonPcesWriter.java

View check run for this annotation

Codecov / codecov/patch

platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/event/preconsensus/CommonPcesWriter.java#L243-L244

Added lines #L243 - L244 were not covered by tests
}
}

Expand All @@ -261,13 +253,13 @@
// Don't attempt to prune files until we are done replaying the event stream (at start up).
// Files are being iterated on a different thread, and it isn't thread safe to prune files
// while they are being iterated.
return;

Check warning on line 256 in platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/event/preconsensus/CommonPcesWriter.java

View check run for this annotation

Codecov / codecov/patch

platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/event/preconsensus/CommonPcesWriter.java#L256

Added line #L256 was not covered by tests
}

try {
fileManager.pruneOldFiles(minimumAncientIdentifierToStore);
} catch (final IOException e) {
throw new UncheckedIOException("unable to prune old files", e);

Check warning on line 262 in platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/event/preconsensus/CommonPcesWriter.java

View check run for this annotation

Codecov / codecov/patch

platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/event/preconsensus/CommonPcesWriter.java#L261-L262

Added lines #L261 - L262 were not covered by tests
}
}

Expand All @@ -280,18 +272,18 @@
public boolean prepareOutputStream(@NonNull final PlatformEvent eventToWrite) throws IOException {
boolean fileClosed = false;
if (currentMutableFile != null) {
final boolean fileCanContainEvent =
currentMutableFile.canContain(eventToWrite.getAncientIndicator(fileType));
final boolean fileIsFull =

Check warning on line 277 in platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/event/preconsensus/CommonPcesWriter.java

View check run for this annotation

Codecov / codecov/patch

platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/event/preconsensus/CommonPcesWriter.java#L275-L277

Added lines #L275 - L277 were not covered by tests
UNIT_BYTES.convertTo(currentMutableFile.fileSize(), UNIT_MEGABYTES) >= preferredFileSizeMegabytes;

if (!fileCanContainEvent || fileIsFull) {
closeFile();
fileClosed = true;

Check warning on line 282 in platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/event/preconsensus/CommonPcesWriter.java

View check run for this annotation

Codecov / codecov/patch

platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/event/preconsensus/CommonPcesWriter.java#L281-L282

Added lines #L281 - L282 were not covered by tests
}

if (fileIsFull) {
bootstrapMode = false;

Check warning on line 286 in platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/event/preconsensus/CommonPcesWriter.java

View check run for this annotation

Codecov / codecov/patch

platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/event/preconsensus/CommonPcesWriter.java#L286

Added line #L286 was not covered by tests
}
}

Expand Down Expand Up @@ -344,7 +336,7 @@
* @return the type of the PCES file
*/
public AncientMode getFileType() {
return fileType;

Check warning on line 339 in platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/event/preconsensus/CommonPcesWriter.java

View check run for this annotation

Codecov / codecov/patch

platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/event/preconsensus/CommonPcesWriter.java#L339

Added line #L339 was not covered by tests
}

/**
Expand All @@ -362,45 +354,9 @@
* @return the current mutable file
*/
public PcesMutableFile getCurrentMutableFile() {
return currentMutableFile;

Check warning on line 357 in platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/event/preconsensus/CommonPcesWriter.java

View check run for this annotation

Codecov / codecov/patch

platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/event/preconsensus/CommonPcesWriter.java#L357

Added line #L357 was not covered by tests
}

/**
* Get the last written event.
*
* @return the last written event
*/
public long getLastWrittenEvent() {
return lastWrittenEvent;
}

/**
* Get the last flushed event.
*
* @return the last flushed event
*/
public long getLastFlushedEvent() {
return lastFlushedEvent;
}

/**
* Set the last written event.
*
* @param lastWrittenEvent the last written event
*/
public void setLastWrittenEvent(final long lastWrittenEvent) {
this.lastWrittenEvent = lastWrittenEvent;
}

/**
* Set the last flushed event.
*
* @param lastFlushedEvent the last flushed event
*/
public void setLastFlushedEvent(final long lastFlushedEvent) {
this.lastFlushedEvent = lastFlushedEvent;
}

/**
* Close the current mutable file.
*/
Expand All @@ -408,8 +364,8 @@
if (currentMutableFile != null) {
try {
currentMutableFile.close();
} catch (final IOException e) {
throw new UncheckedIOException(e);

Check warning on line 368 in platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/event/preconsensus/CommonPcesWriter.java

View check run for this annotation

Codecov / codecov/patch

platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/event/preconsensus/CommonPcesWriter.java#L367-L368

Added lines #L367 - L368 were not covered by tests
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import org.apache.logging.log4j.Logger;

public class DefaultInlinePcesWriter implements InlinePcesWriter {
private static final Logger logger = LogManager.getLogger(DefaultInlinePcesWriter.class);

Check warning on line 30 in platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/event/preconsensus/DefaultInlinePcesWriter.java

View check run for this annotation

Codecov / codecov/patch

platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/event/preconsensus/DefaultInlinePcesWriter.java#L30

Added line #L30 was not covered by tests

private final CommonPcesWriter commonPcesWriter;
/**
Expand All @@ -37,16 +37,16 @@
* @param fileManager manages all preconsensus event stream files currently on disk
*/
public DefaultInlinePcesWriter(
@NonNull final PlatformContext platformContext, @NonNull final PcesFileManager fileManager) {
Objects.requireNonNull(platformContext, "platformContext is required");
Objects.requireNonNull(fileManager, "fileManager is required");
commonPcesWriter = new CommonPcesWriter(platformContext, fileManager, true);
}

Check warning on line 44 in platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/event/preconsensus/DefaultInlinePcesWriter.java

View check run for this annotation

Codecov / codecov/patch

platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/event/preconsensus/DefaultInlinePcesWriter.java#L40-L44

Added lines #L40 - L44 were not covered by tests

@Override
public void beginStreamingNewEvents() {
commonPcesWriter.beginStreamingNewEvents();
}

Check warning on line 49 in platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/event/preconsensus/DefaultInlinePcesWriter.java

View check run for this annotation

Codecov / codecov/patch

platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/event/preconsensus/DefaultInlinePcesWriter.java#L48-L49

Added lines #L48 - L49 were not covered by tests

/**
* {@inheritDoc}
Expand All @@ -54,33 +54,25 @@
@NonNull
@Override
public PlatformEvent writeEvent(@NonNull PlatformEvent event) {
if (event.getStreamSequenceNumber() == PlatformEvent.NO_STREAM_SEQUENCE_NUMBER) {
throw new IllegalStateException("Event must have a valid stream sequence number");
}

// if we aren't streaming new events yet, assume that the given event is already durable
if (!commonPcesWriter.isStreamingNewEvents()) {
commonPcesWriter.setLastWrittenEvent(event.getStreamSequenceNumber());
commonPcesWriter.setLastFlushedEvent(event.getStreamSequenceNumber());
return event;

Check warning on line 59 in platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/event/preconsensus/DefaultInlinePcesWriter.java

View check run for this annotation

Codecov / codecov/patch

platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/event/preconsensus/DefaultInlinePcesWriter.java#L59

Added line #L59 was not covered by tests
}

// don't do anything with ancient events
if (event.getAncientIndicator(commonPcesWriter.getFileType()) < commonPcesWriter.getNonAncientBoundary()) {
throw new IllegalStateException("Ancient events should not be written to the PCES");
// don't do anything with ancient events
return event;

Check warning on line 64 in platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/event/preconsensus/DefaultInlinePcesWriter.java

View check run for this annotation

Codecov / codecov/patch

platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/event/preconsensus/DefaultInlinePcesWriter.java#L64

Added line #L64 was not covered by tests
}

try {
commonPcesWriter.prepareOutputStream(event);
commonPcesWriter.getCurrentMutableFile().writeEvent(event);

Check warning on line 69 in platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/event/preconsensus/DefaultInlinePcesWriter.java

View check run for this annotation

Codecov / codecov/patch

platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/event/preconsensus/DefaultInlinePcesWriter.java#L68-L69

Added lines #L68 - L69 were not covered by tests
commonPcesWriter.setLastWrittenEvent(event.getStreamSequenceNumber());

commonPcesWriter.getCurrentMutableFile().flush();

Check warning on line 71 in platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/event/preconsensus/DefaultInlinePcesWriter.java

View check run for this annotation

Codecov / codecov/patch

platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/event/preconsensus/DefaultInlinePcesWriter.java#L71

Added line #L71 was not covered by tests
lpetrovic05 marked this conversation as resolved.
Show resolved Hide resolved
commonPcesWriter.setLastFlushedEvent(commonPcesWriter.getLastWrittenEvent());

return event;
} catch (final IOException e) {
throw new UncheckedIOException(e);

Check warning on line 75 in platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/event/preconsensus/DefaultInlinePcesWriter.java

View check run for this annotation

Codecov / codecov/patch

platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/event/preconsensus/DefaultInlinePcesWriter.java#L73-L75

Added lines #L73 - L75 were not covered by tests
}
}

Expand All @@ -89,19 +81,19 @@
*/
@Override
public void registerDiscontinuity(@NonNull Long newOriginRound) {
commonPcesWriter.registerDiscontinuity(newOriginRound);
}

Check warning on line 85 in platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/event/preconsensus/DefaultInlinePcesWriter.java

View check run for this annotation

Codecov / codecov/patch

platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/event/preconsensus/DefaultInlinePcesWriter.java#L84-L85

Added lines #L84 - L85 were not covered by tests

/**
* {@inheritDoc}
*/
@Override
public void updateNonAncientEventBoundary(@NonNull EventWindow nonAncientBoundary) {
commonPcesWriter.updateNonAncientEventBoundary(nonAncientBoundary);
}

Check warning on line 93 in platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/event/preconsensus/DefaultInlinePcesWriter.java

View check run for this annotation

Codecov / codecov/patch

platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/event/preconsensus/DefaultInlinePcesWriter.java#L92-L93

Added lines #L92 - L93 were not covered by tests

@Override
public void setMinimumAncientIdentifierToStore(@NonNull final Long minimumAncientIdentifierToStore) {
commonPcesWriter.setMinimumAncientIdentifierToStore(minimumAncientIdentifierToStore);
}

Check warning on line 98 in platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/event/preconsensus/DefaultInlinePcesWriter.java

View check run for this annotation

Codecov / codecov/patch

platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/event/preconsensus/DefaultInlinePcesWriter.java#L97-L98

Added lines #L97 - L98 were not covered by tests
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,16 @@

private static final Logger logger = LogManager.getLogger(DefaultPcesWriter.class);

/**
* The highest event sequence number that has been written to the stream (but possibly not yet flushed).
*/
private long lastWrittenEvent = -1;

/**
* The highest event sequence number that has been durably flushed to disk.
*/
private long lastFlushedEvent = -1;

private final CommonPcesWriter commonPcesWriter;

/**
Expand All @@ -55,10 +65,10 @@
*/
public DefaultPcesWriter(
@NonNull final PlatformContext platformContext, @NonNull final PcesFileManager fileManager) {
Objects.requireNonNull(platformContext, "platformContext is required");
Objects.requireNonNull(fileManager, "fileManager is required");

Check warning on line 69 in platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/event/preconsensus/DefaultPcesWriter.java

View check run for this annotation

Codecov / codecov/patch

platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/event/preconsensus/DefaultPcesWriter.java#L68-L69

Added lines #L68 - L69 were not covered by tests

commonPcesWriter = new CommonPcesWriter(platformContext, fileManager, false);

Check warning on line 71 in platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/event/preconsensus/DefaultPcesWriter.java

View check run for this annotation

Codecov / codecov/patch

platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/event/preconsensus/DefaultPcesWriter.java#L71

Added line #L71 was not covered by tests
}

/**
Expand All @@ -66,7 +76,7 @@
*/
@Override
public void beginStreamingNewEvents() {
commonPcesWriter.beginStreamingNewEvents();

Check warning on line 79 in platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/event/preconsensus/DefaultPcesWriter.java

View check run for this annotation

Codecov / codecov/patch

platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/event/preconsensus/DefaultPcesWriter.java#L79

Added line #L79 was not covered by tests
}

/**
Expand All @@ -81,8 +91,8 @@

// if we aren't streaming new events yet, assume that the given event is already durable
if (!commonPcesWriter.isStreamingNewEvents()) {
commonPcesWriter.setLastWrittenEvent(event.getStreamSequenceNumber());
commonPcesWriter.setLastFlushedEvent(event.getStreamSequenceNumber());
lastWrittenEvent = event.getStreamSequenceNumber();
lastFlushedEvent = event.getStreamSequenceNumber();
return event.getStreamSequenceNumber();
}

Expand All @@ -92,13 +102,16 @@
}

try {
final boolean fileClosed = commonPcesWriter.prepareOutputStream(event);

Check warning on line 105 in platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/event/preconsensus/DefaultPcesWriter.java

View check run for this annotation

Codecov / codecov/patch

platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/event/preconsensus/DefaultPcesWriter.java#L105

Added line #L105 was not covered by tests
if (fileClosed) {
lastFlushedEvent = lastWrittenEvent;

Check warning on line 107 in platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/event/preconsensus/DefaultPcesWriter.java

View check run for this annotation

Codecov / codecov/patch

platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/event/preconsensus/DefaultPcesWriter.java#L107

Added line #L107 was not covered by tests
}
commonPcesWriter.getCurrentMutableFile().writeEvent(event);
commonPcesWriter.setLastWrittenEvent(event.getStreamSequenceNumber());
lastFlushedEvent = event.getStreamSequenceNumber();

Check warning on line 110 in platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/event/preconsensus/DefaultPcesWriter.java

View check run for this annotation

Codecov / codecov/patch

platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/event/preconsensus/DefaultPcesWriter.java#L109-L110

Added lines #L109 - L110 were not covered by tests

final boolean flushPerformed = processFlushRequests();

return fileClosed || flushPerformed ? commonPcesWriter.getLastFlushedEvent() : null;
return fileClosed || flushPerformed ? lastFlushedEvent : null;
} catch (final IOException e) {
throw new UncheckedIOException(e);
}
Expand All @@ -110,8 +123,11 @@
@Override
@Nullable
public Long registerDiscontinuity(@NonNull final Long newOriginRound) {
commonPcesWriter.registerDiscontinuity(newOriginRound);
return commonPcesWriter.getLastFlushedEvent();
final boolean fileClosed = commonPcesWriter.registerDiscontinuity(newOriginRound);

Check warning on line 126 in platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/event/preconsensus/DefaultPcesWriter.java

View check run for this annotation

Codecov / codecov/patch

platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/event/preconsensus/DefaultPcesWriter.java#L126

Added line #L126 was not covered by tests
if (fileClosed) {
lastFlushedEvent = lastWrittenEvent;

Check warning on line 128 in platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/event/preconsensus/DefaultPcesWriter.java

View check run for this annotation

Codecov / codecov/patch

platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/event/preconsensus/DefaultPcesWriter.java#L128

Added line #L128 was not covered by tests
}
return lastFlushedEvent;

Check warning on line 130 in platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/event/preconsensus/DefaultPcesWriter.java

View check run for this annotation

Codecov / codecov/patch

platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/event/preconsensus/DefaultPcesWriter.java#L130

Added line #L130 was not covered by tests
}

/**
Expand All @@ -122,7 +138,7 @@
public Long submitFlushRequest(@NonNull final Long sequenceNumber) {
flushRequests.add(sequenceNumber);

return processFlushRequests() ? commonPcesWriter.getLastFlushedEvent() : null;
return processFlushRequests() ? lastFlushedEvent : null;
}

/**
Expand All @@ -131,30 +147,30 @@
* @return true if a flush was performed, otherwise false
*/
private boolean processFlushRequests() {
boolean flushRequired = false;

Check warning on line 150 in platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/event/preconsensus/DefaultPcesWriter.java

View check run for this annotation

Codecov / codecov/patch

platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/event/preconsensus/DefaultPcesWriter.java#L150

Added line #L150 was not covered by tests
while (!flushRequests.isEmpty() && flushRequests.peekFirst() <= commonPcesWriter.getLastWrittenEvent()) {
while (!flushRequests.isEmpty() && flushRequests.peekFirst() <= lastWrittenEvent) {
final long flushRequest = flushRequests.removeFirst();

Check warning on line 152 in platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/event/preconsensus/DefaultPcesWriter.java

View check run for this annotation

Codecov / codecov/patch

platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/event/preconsensus/DefaultPcesWriter.java#L152

Added line #L152 was not covered by tests

if (flushRequest > commonPcesWriter.getLastFlushedEvent()) {
if (flushRequest > lastFlushedEvent) {
flushRequired = true;

Check warning on line 155 in platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/event/preconsensus/DefaultPcesWriter.java

View check run for this annotation

Codecov / codecov/patch

platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/event/preconsensus/DefaultPcesWriter.java#L155

Added line #L155 was not covered by tests
}
}

if (flushRequired) {
if (commonPcesWriter.getCurrentMutableFile() == null) {
logger.error(EXCEPTION.getMarker(), "Flush required, but no file is open. This should never happen");

Check warning on line 161 in platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/event/preconsensus/DefaultPcesWriter.java

View check run for this annotation

Codecov / codecov/patch

platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/event/preconsensus/DefaultPcesWriter.java#L161

Added line #L161 was not covered by tests
}

try {
commonPcesWriter.getCurrentMutableFile().flush();
} catch (final IOException e) {
throw new UncheckedIOException(e);
}

Check warning on line 168 in platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/event/preconsensus/DefaultPcesWriter.java

View check run for this annotation

Codecov / codecov/patch

platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/event/preconsensus/DefaultPcesWriter.java#L165-L168

Added lines #L165 - L168 were not covered by tests

commonPcesWriter.setLastFlushedEvent(commonPcesWriter.getLastWrittenEvent());
lastFlushedEvent = lastWrittenEvent;

Check warning on line 170 in platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/event/preconsensus/DefaultPcesWriter.java

View check run for this annotation

Codecov / codecov/patch

platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/event/preconsensus/DefaultPcesWriter.java#L170

Added line #L170 was not covered by tests
}

return flushRequired;

Check warning on line 173 in platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/event/preconsensus/DefaultPcesWriter.java

View check run for this annotation

Codecov / codecov/patch

platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/event/preconsensus/DefaultPcesWriter.java#L173

Added line #L173 was not covered by tests
}

/**
Expand All @@ -162,7 +178,7 @@
*/
@Override
public void updateNonAncientEventBoundary(@NonNull final EventWindow nonAncientBoundary) {
commonPcesWriter.updateNonAncientEventBoundary(nonAncientBoundary);

Check warning on line 181 in platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/event/preconsensus/DefaultPcesWriter.java

View check run for this annotation

Codecov / codecov/patch

platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/event/preconsensus/DefaultPcesWriter.java#L181

Added line #L181 was not covered by tests
}

/**
Expand All @@ -170,13 +186,13 @@
*/
@Override
public void setMinimumAncientIdentifierToStore(@NonNull final Long minimumAncientIdentifierToStore) {
commonPcesWriter.setMinimumAncientIdentifierToStore(minimumAncientIdentifierToStore);

Check warning on line 189 in platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/event/preconsensus/DefaultPcesWriter.java

View check run for this annotation

Codecov / codecov/patch

platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/event/preconsensus/DefaultPcesWriter.java#L189

Added line #L189 was not covered by tests
}

/**
* Close the current mutable file.
*/
public void closeCurrentMutableFile() {
commonPcesWriter.closeCurrentMutableFile();

Check warning on line 196 in platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/event/preconsensus/DefaultPcesWriter.java

View check run for this annotation

Codecov / codecov/patch

platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/event/preconsensus/DefaultPcesWriter.java#L196

Added line #L196 was not covered by tests
}
}
Loading