From 18f7b4d4615b0a033e4db0943513813ad441a253 Mon Sep 17 00:00:00 2001 From: Tim Meehan Date: Thu, 9 May 2024 19:43:26 -0400 Subject: [PATCH] Treat buffer as completed if no more pages The new exchange protocol in #21926 expects the HEAD request to indicate the buffer is completed when the noMorePages signal has been set and the buffer is empty. This commit fixes the current behavior, which is that the coordinator expects at least one GET to transition the output buffer to completed state before returning that the buffer has been completed. --- .../facebook/presto/execution/SqlTask.java | 9 +++------ .../presto/execution/SqlTaskManager.java | 8 +++----- .../presto/execution/TaskManager.java | 8 +++----- .../server/AsyncPageTransportServlet.java | 4 +++- .../facebook/presto/server/TaskResource.java | 19 ++++++++++++++----- .../server/thrift/ThriftTaskService.java | 4 +++- .../presto/execution/TestSqlTask.java | 4 ++-- .../presto/execution/TestSqlTaskManager.java | 8 ++++++-- .../TestThriftServerInfoIntegration.java | 4 ++-- .../server/TestThriftTaskIntegration.java | 4 ++-- .../spark/node/PrestoSparkTaskManager.java | 4 ++-- 11 files changed, 43 insertions(+), 33 deletions(-) diff --git a/presto-main/src/main/java/com/facebook/presto/execution/SqlTask.java b/presto-main/src/main/java/com/facebook/presto/execution/SqlTask.java index 0db9bb8047872..ab38b45118b3f 100644 --- a/presto-main/src/main/java/com/facebook/presto/execution/SqlTask.java +++ b/presto-main/src/main/java/com/facebook/presto/execution/SqlTask.java @@ -18,10 +18,10 @@ import com.facebook.airlift.stats.CounterStat; import com.facebook.presto.Session; import com.facebook.presto.execution.StateMachine.StateChangeListener; -import com.facebook.presto.execution.buffer.BufferInfo; import com.facebook.presto.execution.buffer.BufferResult; import com.facebook.presto.execution.buffer.LazyOutputBuffer; import com.facebook.presto.execution.buffer.OutputBuffer; +import com.facebook.presto.execution.buffer.OutputBufferInfo; import com.facebook.presto.execution.buffer.OutputBuffers; import com.facebook.presto.execution.buffer.OutputBuffers.OutputBufferId; import com.facebook.presto.execution.buffer.SpoolingOutputBufferFactory; @@ -495,12 +495,9 @@ public ListenableFuture getTaskResults(OutputBufferId bufferId, lo return outputBuffer.get(bufferId, startingSequenceId, maxSize); } - public Optional getTaskBufferInfo(OutputBufferId bufferId) + public OutputBufferInfo getOutputBufferInfo() { - requireNonNull(bufferId, "bufferId is null"); - return outputBuffer.getInfo().getBuffers().stream() - .filter(bufferInfo -> bufferInfo.getBufferId().equals(bufferId)) - .findFirst(); + return outputBuffer.getInfo(); } public void acknowledgeTaskResults(OutputBufferId bufferId, long sequenceId) diff --git a/presto-main/src/main/java/com/facebook/presto/execution/SqlTaskManager.java b/presto-main/src/main/java/com/facebook/presto/execution/SqlTaskManager.java index 11b9c5940af17..3d127247fd5ee 100644 --- a/presto-main/src/main/java/com/facebook/presto/execution/SqlTaskManager.java +++ b/presto-main/src/main/java/com/facebook/presto/execution/SqlTaskManager.java @@ -23,8 +23,8 @@ import com.facebook.presto.common.block.BlockEncodingSerde; import com.facebook.presto.event.SplitMonitor; import com.facebook.presto.execution.StateMachine.StateChangeListener; -import com.facebook.presto.execution.buffer.BufferInfo; import com.facebook.presto.execution.buffer.BufferResult; +import com.facebook.presto.execution.buffer.OutputBufferInfo; import com.facebook.presto.execution.buffer.OutputBuffers; import com.facebook.presto.execution.buffer.OutputBuffers.OutputBufferId; import com.facebook.presto.execution.buffer.SpoolingOutputBufferFactory; @@ -454,12 +454,10 @@ public ListenableFuture getTaskResults(TaskId taskId, OutputBuffer return tasks.getUnchecked(taskId).getTaskResults(bufferId, startingSequenceId, maxSize); } - @Override - public Optional getTaskBufferInfo(TaskId taskId, OutputBufferId bufferId) + public OutputBufferInfo getOutputBufferInfo(TaskId taskId) { requireNonNull(taskId, "taskId is null"); - requireNonNull(bufferId, "bufferId is null"); - return tasks.getUnchecked(taskId).getTaskBufferInfo(bufferId); + return tasks.getUnchecked(taskId).getOutputBufferInfo(); } @Override diff --git a/presto-main/src/main/java/com/facebook/presto/execution/TaskManager.java b/presto-main/src/main/java/com/facebook/presto/execution/TaskManager.java index 71a49e782dfb3..98ff29a00fb25 100644 --- a/presto-main/src/main/java/com/facebook/presto/execution/TaskManager.java +++ b/presto-main/src/main/java/com/facebook/presto/execution/TaskManager.java @@ -15,8 +15,8 @@ import com.facebook.presto.Session; import com.facebook.presto.execution.StateMachine.StateChangeListener; -import com.facebook.presto.execution.buffer.BufferInfo; import com.facebook.presto.execution.buffer.BufferResult; +import com.facebook.presto.execution.buffer.OutputBufferInfo; import com.facebook.presto.execution.buffer.OutputBuffers; import com.facebook.presto.execution.buffer.OutputBuffers.OutputBufferId; import com.facebook.presto.execution.scheduler.TableWriteInfo; @@ -119,11 +119,9 @@ TaskInfo updateTask( ListenableFuture getTaskResults(TaskId taskId, OutputBufferId bufferId, long startingSequenceId, DataSize maxSize); /** - * Gets the {@link BufferInfo} associated with the specified task and buffer id. - * - * @return absent if the buffer is not present, otherwise the buffer info + * Gets the {@link OutputBufferInfo} associated with the specified task. */ - Optional getTaskBufferInfo(TaskId taskId, OutputBufferId bufferId); + OutputBufferInfo getOutputBufferInfo(TaskId taskId); /** * Acknowledges previously received results. diff --git a/presto-main/src/main/java/com/facebook/presto/server/AsyncPageTransportServlet.java b/presto-main/src/main/java/com/facebook/presto/server/AsyncPageTransportServlet.java index 93db9be2a6d44..7619c3bb6f5dd 100644 --- a/presto-main/src/main/java/com/facebook/presto/server/AsyncPageTransportServlet.java +++ b/presto-main/src/main/java/com/facebook/presto/server/AsyncPageTransportServlet.java @@ -203,9 +203,11 @@ public void onTimeout(AsyncEvent event) () -> BufferResult.emptyResults( taskManager.getTaskInstanceId(taskId), token, - taskManager.getTaskBufferInfo(taskId, bufferId) + taskManager.getOutputBufferInfo(taskId).getBuffers().stream() + .filter(bufferInfo -> bufferInfo.getBufferId().equals(bufferId)) .map(BufferInfo::getPageBufferInfo) .map(PageBufferInfo::getBufferedBytes) + .findFirst() .orElse(0L), false), waitTime, diff --git a/presto-main/src/main/java/com/facebook/presto/server/TaskResource.java b/presto-main/src/main/java/com/facebook/presto/server/TaskResource.java index 851e4d7091716..37450b850aa68 100644 --- a/presto-main/src/main/java/com/facebook/presto/server/TaskResource.java +++ b/presto-main/src/main/java/com/facebook/presto/server/TaskResource.java @@ -23,6 +23,7 @@ import com.facebook.presto.execution.TaskManager; import com.facebook.presto.execution.TaskState; import com.facebook.presto.execution.TaskStatus; +import com.facebook.presto.execution.buffer.OutputBufferInfo; import com.facebook.presto.execution.buffer.OutputBuffers.OutputBufferId; import com.facebook.presto.metadata.HandleResolver; import com.facebook.presto.metadata.MetadataUpdates; @@ -300,11 +301,19 @@ public Response taskResultsHeaders( requireNonNull(taskId, "taskId is null"); requireNonNull(bufferId, "bufferId is null"); - return taskManager.getTaskBufferInfo(taskId, bufferId) - .map(bufferInfo -> Response.ok() - .header(PRESTO_BUFFER_REMAINING_BYTES, bufferInfo.getPageBufferInfo().getBufferedBytes()) - .header(PRESTO_BUFFER_COMPLETE, bufferInfo.isFinished()) - .build()) + OutputBufferInfo outputBufferInfo = taskManager.getOutputBufferInfo(taskId); + return outputBufferInfo.getBuffers().stream() + .filter(bufferInfo -> bufferInfo.getBufferId().equals(bufferId)) + .map(bufferInfo -> { + long bufferedBytes = bufferInfo.getPageBufferInfo().getBufferedBytes(); + return Response.ok() + .header(PRESTO_BUFFER_REMAINING_BYTES, bufferedBytes) + .header(PRESTO_BUFFER_COMPLETE, bufferInfo.isFinished() + // a buffer which has the noMorePages flag set and which is empty is completed + || (!outputBufferInfo.getState().canAddPages() && bufferedBytes == 0)) + .build(); + }) + .findFirst() .orElse(Response.status(Response.Status.NOT_FOUND).build()); } diff --git a/presto-main/src/main/java/com/facebook/presto/server/thrift/ThriftTaskService.java b/presto-main/src/main/java/com/facebook/presto/server/thrift/ThriftTaskService.java index 2d27faee2d30e..949ba36409f79 100644 --- a/presto-main/src/main/java/com/facebook/presto/server/thrift/ThriftTaskService.java +++ b/presto-main/src/main/java/com/facebook/presto/server/thrift/ThriftTaskService.java @@ -66,9 +66,11 @@ public ListenableFuture getResults(TaskId taskId, OutputBuff () -> BufferResult.emptyResults( taskManager.getTaskInstanceId(taskId), token, - taskManager.getTaskBufferInfo(taskId, bufferId) + taskManager.getOutputBufferInfo(taskId).getBuffers().stream() + .filter(info -> info.getBufferId().equals(bufferId)) .map(BufferInfo::getPageBufferInfo) .map(PageBufferInfo::getBufferedBytes) + .findFirst() .orElse(0L), false), waitTime, diff --git a/presto-main/src/test/java/com/facebook/presto/execution/TestSqlTask.java b/presto-main/src/test/java/com/facebook/presto/execution/TestSqlTask.java index 89b5177d0d1df..d9fea75b9b0c6 100644 --- a/presto-main/src/test/java/com/facebook/presto/execution/TestSqlTask.java +++ b/presto-main/src/test/java/com/facebook/presto/execution/TestSqlTask.java @@ -166,7 +166,7 @@ public void testSimpleQuery() // Task results clear out all buffered data once ack is sent long pagesRetanedSizeInBytes = results.getSerializedPages().stream().mapToLong(SerializedPage::getRetainedSizeInBytes).sum(); assertEquals(results.getBufferedBytes(), 0); - Optional taskBufferInfo = sqlTask.getTaskBufferInfo(OUT); + Optional taskBufferInfo = sqlTask.getOutputBufferInfo().getBuffers().stream().filter(buffer -> buffer.getBufferId().equals(OUT)).findFirst(); assertTrue(taskBufferInfo.isPresent()); // Buffer still remains as acknowledgement has not been received assertEquals(taskBufferInfo.get().getPageBufferInfo().getBufferedBytes(), pagesRetanedSizeInBytes); @@ -175,7 +175,7 @@ public void testSimpleQuery() results = sqlTask.getTaskResults(OUT, results.getToken() + results.getSerializedPages().size(), new DataSize(1, MEGABYTE)).get(); pagesRetanedSizeInBytes = results.getSerializedPages().stream().mapToLong(SerializedPage::getRetainedSizeInBytes).sum(); assertEquals(results.getBufferedBytes(), pagesRetanedSizeInBytes); - taskBufferInfo = sqlTask.getTaskBufferInfo(OUT); + taskBufferInfo = sqlTask.getOutputBufferInfo().getBuffers().stream().filter(buffer -> buffer.getBufferId().equals(OUT)).findFirst(); assertTrue(taskBufferInfo.isPresent()); assertEquals(taskBufferInfo.get().getPageBufferInfo().getBufferedBytes(), pagesRetanedSizeInBytes); } diff --git a/presto-main/src/test/java/com/facebook/presto/execution/TestSqlTaskManager.java b/presto-main/src/test/java/com/facebook/presto/execution/TestSqlTaskManager.java index 74b3859c6b616..15496a48bef0d 100644 --- a/presto-main/src/test/java/com/facebook/presto/execution/TestSqlTaskManager.java +++ b/presto-main/src/test/java/com/facebook/presto/execution/TestSqlTaskManager.java @@ -170,13 +170,17 @@ public void testRemainingBufferMetadata() // Task results clear out all buffered data once ack is sent long retainedPageSize = results.getSerializedPages().get(0).getRetainedSizeInBytes(); assertEquals(results.getBufferedBytes(), 0); - Optional taskBufferInfo = sqlTaskManager.getTaskBufferInfo(TASK_ID, OUT); + Optional taskBufferInfo = sqlTaskManager.getOutputBufferInfo(TASK_ID).getBuffers().stream() + .filter(bufferInfo -> bufferInfo.getBufferId().equals(OUT)) + .findFirst(); assertTrue(taskBufferInfo.isPresent()); // Buffer still remains as acknowledgement has not been received assertEquals(taskBufferInfo.get().getPageBufferInfo().getBufferedBytes(), retainedPageSize); // Once acknowledged, the retained size of the data page is removed from the buffer sqlTaskManager.acknowledgeTaskResults(taskId, OUT, results.getNextToken()); - taskBufferInfo = sqlTaskManager.getTaskBufferInfo(TASK_ID, OUT); + taskBufferInfo = sqlTaskManager.getOutputBufferInfo(TASK_ID).getBuffers().stream() + .filter(bufferInfo -> bufferInfo.getBufferId().equals(OUT)) + .findFirst(); assertTrue(taskBufferInfo.isPresent()); assertEquals(taskBufferInfo.get().getPageBufferInfo().getBufferedBytes(), 0); diff --git a/presto-main/src/test/java/com/facebook/presto/server/TestThriftServerInfoIntegration.java b/presto-main/src/test/java/com/facebook/presto/server/TestThriftServerInfoIntegration.java index 2281a216c772d..e1e713ebf711c 100644 --- a/presto-main/src/test/java/com/facebook/presto/server/TestThriftServerInfoIntegration.java +++ b/presto-main/src/test/java/com/facebook/presto/server/TestThriftServerInfoIntegration.java @@ -34,8 +34,8 @@ import com.facebook.presto.execution.TaskSource; import com.facebook.presto.execution.TaskState; import com.facebook.presto.execution.TaskStatus; -import com.facebook.presto.execution.buffer.BufferInfo; import com.facebook.presto.execution.buffer.BufferResult; +import com.facebook.presto.execution.buffer.OutputBufferInfo; import com.facebook.presto.execution.buffer.OutputBuffers; import com.facebook.presto.execution.buffer.OutputBuffers.OutputBufferId; import com.facebook.presto.execution.scheduler.TableWriteInfo; @@ -218,7 +218,7 @@ public ListenableFuture getTaskResults(TaskId taskId, OutputBuffer } @Override - public Optional getTaskBufferInfo(TaskId taskId, OutputBufferId bufferId) + public OutputBufferInfo getOutputBufferInfo(TaskId taskId) { throw new UnsupportedOperationException(); } diff --git a/presto-main/src/test/java/com/facebook/presto/server/TestThriftTaskIntegration.java b/presto-main/src/test/java/com/facebook/presto/server/TestThriftTaskIntegration.java index 8bdd9e47d4c08..81d4ab3b650a2 100644 --- a/presto-main/src/test/java/com/facebook/presto/server/TestThriftTaskIntegration.java +++ b/presto-main/src/test/java/com/facebook/presto/server/TestThriftTaskIntegration.java @@ -33,8 +33,8 @@ import com.facebook.presto.execution.TaskSource; import com.facebook.presto.execution.TaskState; import com.facebook.presto.execution.TaskStatus; -import com.facebook.presto.execution.buffer.BufferInfo; import com.facebook.presto.execution.buffer.BufferResult; +import com.facebook.presto.execution.buffer.OutputBufferInfo; import com.facebook.presto.execution.buffer.OutputBuffers; import com.facebook.presto.execution.buffer.OutputBuffers.OutputBufferId; import com.facebook.presto.execution.buffer.ThriftBufferResult; @@ -247,7 +247,7 @@ public ListenableFuture getTaskResults(TaskId taskId, OutputBuffer } @Override - public Optional getTaskBufferInfo(TaskId taskId, OutputBufferId bufferId) + public OutputBufferInfo getOutputBufferInfo(TaskId taskId) { throw new UnsupportedOperationException(); } diff --git a/presto-spark-base/src/main/java/com/facebook/presto/spark/node/PrestoSparkTaskManager.java b/presto-spark-base/src/main/java/com/facebook/presto/spark/node/PrestoSparkTaskManager.java index d5d7ae159df33..cd4c3a457ccbe 100644 --- a/presto-spark-base/src/main/java/com/facebook/presto/spark/node/PrestoSparkTaskManager.java +++ b/presto-spark-base/src/main/java/com/facebook/presto/spark/node/PrestoSparkTaskManager.java @@ -21,8 +21,8 @@ import com.facebook.presto.execution.TaskSource; import com.facebook.presto.execution.TaskState; import com.facebook.presto.execution.TaskStatus; -import com.facebook.presto.execution.buffer.BufferInfo; import com.facebook.presto.execution.buffer.BufferResult; +import com.facebook.presto.execution.buffer.OutputBufferInfo; import com.facebook.presto.execution.buffer.OutputBuffers; import com.facebook.presto.execution.scheduler.TableWriteInfo; import com.facebook.presto.memory.MemoryPoolAssignmentsRequest; @@ -104,7 +104,7 @@ public ListenableFuture getTaskResults(TaskId taskId, OutputBuffer } @Override - public Optional getTaskBufferInfo(TaskId taskId, OutputBuffers.OutputBufferId bufferId) + public OutputBufferInfo getOutputBufferInfo(TaskId taskId) { throw new UnsupportedOperationException(); }