Skip to content

Commit

Permalink
Add buffered bytes metadata to task results
Browse files Browse the repository at this point in the history
Add remaining buffered bytes to task results endpoints.  Add a new
HEAD request to task results to allow for a lightweight way to
retrieve header-only information.
  • Loading branch information
tdcmeehan committed Mar 5, 2024
1 parent d5bb1d9 commit ece7741
Show file tree
Hide file tree
Showing 21 changed files with 262 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ public final class PrestoHeaders
public static final String PRESTO_CURRENT_STATE = "X-Presto-Current-State";
public static final String PRESTO_MAX_WAIT = "X-Presto-Max-Wait";
public static final String PRESTO_MAX_SIZE = "X-Presto-Max-Size";
public static final String PRESTO_BUFFER_REMAINING_BYTES = "X-Presto-Buffer-Remaining-Bytes";
public static final String PRESTO_TASK_INSTANCE_ID = "X-Presto-Task-Instance-Id";
public static final String PRESTO_PAGE_TOKEN = "X-Presto-Page-Sequence-Id";
public static final String PRESTO_PAGE_NEXT_TOKEN = "X-Presto-Page-End-Sequence-Id";
Expand Down
8 changes: 8 additions & 0 deletions presto-docs/src/main/sphinx/develop/worker-protocol.rst
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ upstream worker.
the receipt of the results and allows the worker to delete them.
* A ``DELETE`` on ``{taskId}/results/{bufferId}`` deletes all results from the
specified output buffer in case of an error.
* Optionally, a ``HEAD`` request can be made to ``{taskId}/results/{bufferId}``
to retrieve any non-data page sequence related headers. Use this to check if
the buffer is finished, or to see how much data is buffered without fetching
the data.

Coordinator and workers fetch results in chunks. They specify the maximum size
in bytes for the chunk using ``X-Presto-Max-Size`` HTTP header. Each chunk is
Expand All @@ -73,6 +77,10 @@ response includes:
request the next chunk as ``X-Presto-Page-End-Sequence-Id`` HTTP header,
* An indication that there are no more results as ``X-Presto-Buffer-Complete``
HTTP header with the value of "true".
* The remaining buffered bytes in the output buffer as ``X-Presto-Buffer-Remaining-Bytes``
HTTP header. This should return a comma separated list of the size in bytes of
the pages that can be returned in the next request. This can be used as a hint
to upstream tasks to optimize data exchange.

The body of the response contains a list of pages in :doc:`SerializedPage wire format <serialized-page>`.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.facebook.airlift.stats.CounterStat;
import com.facebook.presto.Session;
import com.facebook.presto.execution.StateMachine.StateChangeListener;
import com.facebook.presto.execution.buffer.BufferInfo;
import com.facebook.presto.execution.buffer.BufferResult;
import com.facebook.presto.execution.buffer.LazyOutputBuffer;
import com.facebook.presto.execution.buffer.OutputBuffer;
Expand Down Expand Up @@ -494,6 +495,14 @@ public ListenableFuture<BufferResult> getTaskResults(OutputBufferId bufferId, lo
return outputBuffer.get(bufferId, startingSequenceId, maxSize);
}

public Optional<BufferInfo> getTaskBufferInfo(OutputBufferId bufferId)
{
requireNonNull(bufferId, "bufferId is null");
return outputBuffer.getInfo().getBuffers().stream()
.filter(bufferInfo -> bufferInfo.getBufferId().equals(bufferId))
.findFirst();
}

public void acknowledgeTaskResults(OutputBufferId bufferId, long sequenceId)
{
requireNonNull(bufferId, "bufferId is null");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.facebook.presto.common.block.BlockEncodingSerde;
import com.facebook.presto.event.SplitMonitor;
import com.facebook.presto.execution.StateMachine.StateChangeListener;
import com.facebook.presto.execution.buffer.BufferInfo;
import com.facebook.presto.execution.buffer.BufferResult;
import com.facebook.presto.execution.buffer.OutputBuffers;
import com.facebook.presto.execution.buffer.OutputBuffers.OutputBufferId;
Expand Down Expand Up @@ -452,6 +453,14 @@ public ListenableFuture<BufferResult> getTaskResults(TaskId taskId, OutputBuffer
return tasks.getUnchecked(taskId).getTaskResults(bufferId, startingSequenceId, maxSize);
}

@Override
public Optional<BufferInfo> getTaskBufferInfo(TaskId taskId, OutputBufferId bufferId)
{
requireNonNull(taskId, "taskId is null");
requireNonNull(bufferId, "bufferId is null");
return tasks.getUnchecked(taskId).getTaskBufferInfo(bufferId);
}

@Override
public void acknowledgeTaskResults(TaskId taskId, OutputBufferId bufferId, long sequenceId)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import com.facebook.presto.Session;
import com.facebook.presto.execution.StateMachine.StateChangeListener;
import com.facebook.presto.execution.buffer.BufferInfo;
import com.facebook.presto.execution.buffer.BufferResult;
import com.facebook.presto.execution.buffer.OutputBuffers;
import com.facebook.presto.execution.buffer.OutputBuffers.OutputBufferId;
Expand Down Expand Up @@ -117,6 +118,13 @@ TaskInfo updateTask(
*/
ListenableFuture<BufferResult> getTaskResults(TaskId taskId, OutputBufferId bufferId, long startingSequenceId, DataSize maxSize);

/**
* Gets the {@link BufferInfo} associated with the specified task and buffer id.
*
* @return absent if the buffer is not present, otherwise the buffer info
*/
Optional<BufferInfo> getTaskBufferInfo(TaskId taskId, OutputBufferId bufferId);

/**
* Acknowledges previously received results.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,20 +28,27 @@ public class BufferResult
{
public static BufferResult emptyResults(String taskInstanceId, long token, boolean bufferComplete)
{
return new BufferResult(taskInstanceId, token, token, bufferComplete, ImmutableList.of());
return emptyResults(taskInstanceId, token, 0, bufferComplete);
}

public static BufferResult emptyResults(String taskInstanceId, long token, long bufferedBytes, boolean bufferComplete)
{
return new BufferResult(taskInstanceId, token, token, bufferComplete, bufferedBytes, ImmutableList.of());
}

private final String taskInstanceId;
private final long token;
private final long nextToken;
private final boolean bufferComplete;
private final long bufferedBytes;
private final List<SerializedPage> serializedPages;

public BufferResult(
String taskInstanceId,
long token,
long nextToken,
boolean bufferComplete,
long bufferedBytes,
List<SerializedPage> serializedPages)
{
checkArgument(!isNullOrEmpty(taskInstanceId), "taskInstanceId is null");
Expand All @@ -50,6 +57,7 @@ public BufferResult(
this.token = token;
this.nextToken = nextToken;
this.bufferComplete = bufferComplete;
this.bufferedBytes = bufferedBytes;
this.serializedPages = ImmutableList.copyOf(requireNonNull(serializedPages, "serializedPages is null"));
}

Expand All @@ -73,6 +81,11 @@ public boolean isBufferComplete()
return bufferComplete;
}

public long getBufferedBytes()
{
return bufferedBytes;
}

public List<SerializedPage> getSerializedPages()
{
return serializedPages;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,26 +135,27 @@ public void destroy()
public void enqueuePages(Collection<SerializedPageReference> pages)
{
PendingRead pendingRead;
long bufferedBytes;
synchronized (this) {
// ignore pages after no more pages is set
// this can happen with limit queries
if (noMorePages) {
return;
}

addPages(pages);
bufferedBytes = addPages(pages);

pendingRead = this.pendingRead;
this.pendingRead = null;
}

// we just added a page, so process the pending read
if (pendingRead != null) {
processRead(pendingRead);
processRead(pendingRead, bufferedBytes);
}
}

private synchronized void addPages(Collection<SerializedPageReference> pages)
private synchronized long addPages(Collection<SerializedPageReference> pages)
{
long rowCount = 0;
long bytesAdded = 0;
Expand All @@ -169,7 +170,7 @@ private synchronized void addPages(Collection<SerializedPageReference> pages)
this.pages.addAll(pages);
rowsAdded.addAndGet(rowCount);
pagesAdded.addAndGet(pageCount);
bufferedBytes.addAndGet(bytesAdded);
return bufferedBytes.addAndGet(bytesAdded);
}

public ListenableFuture<BufferResult> getPages(long sequenceId, DataSize maxSize)
Expand All @@ -195,7 +196,7 @@ public ListenableFuture<BufferResult> getPages(long sequenceId, DataSize maxSize
// Return results immediately if we have data, there will be no more data, or this is
// an out of order request
if (!pages.isEmpty() || noMorePages || sequenceId != currentSequenceId.get()) {
return immediateFuture(processRead(sequenceId, maxSize));
return immediateFuture(processRead(sequenceId, maxSize, bufferedBytes.get()));
}

// otherwise, wait for more data to arrive
Expand All @@ -215,6 +216,7 @@ public ListenableFuture<BufferResult> getPages(long sequenceId, DataSize maxSize
public void setNoMorePages()
{
PendingRead pendingRead;
long bufferedBytes;
synchronized (this) {
// ignore duplicate calls
if (noMorePages) {
Expand All @@ -225,11 +227,12 @@ public void setNoMorePages()

pendingRead = this.pendingRead;
this.pendingRead = null;
bufferedBytes = this.bufferedBytes.get();
}

// there will be no more pages, so process the pending read
if (pendingRead != null) {
processRead(pendingRead);
processRead(pendingRead, bufferedBytes);
}
}

Expand All @@ -241,11 +244,13 @@ public void loadPagesIfNecessary(PagesSupplier pagesSupplier)
// same pending read instance by the time pages are loaded but this is
// safe since the size is rechecked before returning pages.
DataSize maxSize;
long bufferedBytes;
synchronized (this) {
if (pendingRead == null) {
return;
}
maxSize = pendingRead.getMaxSize();
bufferedBytes = this.bufferedBytes.get();
}

boolean dataAddedOrNoMorePages = loadPagesIfNecessary(pagesSupplier, maxSize);
Expand All @@ -256,7 +261,7 @@ public void loadPagesIfNecessary(PagesSupplier pagesSupplier)
pendingRead = this.pendingRead;
}
if (pendingRead != null) {
processRead(pendingRead);
processRead(pendingRead, bufferedBytes);
}
}
}
Expand Down Expand Up @@ -300,22 +305,22 @@ private boolean loadPagesIfNecessary(PagesSupplier pagesSupplier, DataSize maxSi
return dataAddedOrNoMorePages;
}

private void processRead(PendingRead pendingRead)
private void processRead(PendingRead pendingRead, long bufferedBytes)
{
checkState(!Thread.holdsLock(this), "Can not process pending read while holding a lock on this");

if (pendingRead.getResultFuture().isDone()) {
return;
}

BufferResult bufferResult = processRead(pendingRead.getSequenceId(), pendingRead.getMaxSize());
BufferResult bufferResult = processRead(pendingRead.getSequenceId(), pendingRead.getMaxSize(), bufferedBytes);
pendingRead.getResultFuture().set(bufferResult);
}

/**
* @return a result with at least one page if we have pages in buffer, empty result otherwise
*/
private synchronized BufferResult processRead(long sequenceId, DataSize maxSize)
private synchronized BufferResult processRead(long sequenceId, DataSize maxSize, long bufferedBytes)
{
// When pages are added to the partition buffer they are effectively
// assigned an id starting from zero. When a read is processed, the
Expand All @@ -342,13 +347,13 @@ private synchronized BufferResult processRead(long sequenceId, DataSize maxSize)

// if request is for pages before the current position, just return an empty result
if (sequenceId < currentSequenceId.get()) {
return emptyResults(taskInstanceId, sequenceId, false);
return emptyResults(taskInstanceId, sequenceId, bufferedBytes, false);
}

// if this buffer is finished, notify the client of this, so the client
// will destroy this buffer
if (pages.isEmpty() && noMorePages) {
return emptyResults(taskInstanceId, currentSequenceId.get(), true);
return emptyResults(taskInstanceId, currentSequenceId.get(), bufferedBytes, true);
}

// if request is for pages after the current position, there is a bug somewhere
Expand All @@ -361,17 +366,17 @@ private synchronized BufferResult processRead(long sequenceId, DataSize maxSize)
// read the new pages
long maxBytes = maxSize.toBytes();
List<SerializedPage> result = new ArrayList<>();
long bytes = 0;
long bytesReturned = 0;

for (SerializedPageReference page : pages) {
bytes += page.getRetainedSizeInBytes();
bytesReturned += page.getRetainedSizeInBytes();
// break (and don't add) if this page would exceed the limit
if (!result.isEmpty() && bytes > maxBytes) {
if (!result.isEmpty() && bytesReturned > maxBytes) {
break;
}
result.add(page.getSerializedPage());
}
return new BufferResult(taskInstanceId, sequenceId, sequenceId + result.size(), false, result);
return new BufferResult(taskInstanceId, sequenceId, sequenceId + result.size(), false, Math.max(bufferedBytes - bytesReturned, 0), result);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,7 @@ private synchronized ListenableFuture<BufferResult> processRead(long startSequen

ListenableFuture<BufferResult> resultFuture = transform(memoryPages, input -> {
long newSequenceId = startSequenceId + input.size();
return new BufferResult(taskInstanceId, startSequenceId, newSequenceId, false, input);
return new BufferResult(taskInstanceId, startSequenceId, newSequenceId, false, 0, input);
}, executor);

return catchingAsync(resultFuture, Exception.class, e -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ public class ThriftBufferResult
private final long token;
private final long nextToken;
private final boolean bufferComplete;
private final long bufferedBytes;
private final List<ThriftSerializedPage> thriftSerializedPages;

public static ThriftBufferResult fromBufferResult(BufferResult bufferResult)
Expand All @@ -50,6 +51,7 @@ public static ThriftBufferResult fromBufferResult(BufferResult bufferResult)
bufferResult.getToken(),
bufferResult.getNextToken(),
bufferResult.isBufferComplete(),
bufferResult.getBufferedBytes(),
thriftSerializedPages);
}

Expand All @@ -62,6 +64,7 @@ public ThriftBufferResult(
long token,
long nextToken,
boolean bufferComplete,
long bufferedBytes,
List<ThriftSerializedPage> thriftSerializedPages)
{
checkArgument(!isNullOrEmpty(taskInstanceId), "taskInstanceId is null");
Expand All @@ -70,6 +73,7 @@ public ThriftBufferResult(
this.token = token;
this.nextToken = nextToken;
this.bufferComplete = bufferComplete;
this.bufferedBytes = bufferedBytes;
this.thriftSerializedPages = ImmutableList.copyOf(requireNonNull(thriftSerializedPages, "thriftSerializedPages is null"));
}

Expand Down Expand Up @@ -103,6 +107,12 @@ public List<ThriftSerializedPage> getThriftSerializedPages()
return thriftSerializedPages;
}

@ThriftField(6)
public long getBufferedBytes()
{
return bufferedBytes;
}

public List<SerializedPage> getSerializedPages()
{
return thriftSerializedPages.stream()
Expand Down Expand Up @@ -141,13 +151,14 @@ public String toString()
.add("nextToken", nextToken)
.add("taskInstanceId", taskInstanceId)
.add("bufferComplete", bufferComplete)
.add("bufferedBytes", bufferedBytes)
.add("thriftSerializedPages", thriftSerializedPages)
.toString();
}

@VisibleForTesting
public BufferResult toBufferResult()
{
return new BufferResult(taskInstanceId, token, nextToken, bufferComplete, getSerializedPages());
return new BufferResult(taskInstanceId, token, nextToken, bufferComplete, bufferedBytes, getSerializedPages());
}
}
Loading

0 comments on commit ece7741

Please sign in to comment.