diff --git a/presto-main/src/main/java/com/facebook/presto/execution/SqlTask.java b/presto-main/src/main/java/com/facebook/presto/execution/SqlTask.java index 0db9bb8047872..ab38b45118b3f 100644 --- a/presto-main/src/main/java/com/facebook/presto/execution/SqlTask.java +++ b/presto-main/src/main/java/com/facebook/presto/execution/SqlTask.java @@ -18,10 +18,10 @@ import com.facebook.airlift.stats.CounterStat; import com.facebook.presto.Session; import com.facebook.presto.execution.StateMachine.StateChangeListener; -import com.facebook.presto.execution.buffer.BufferInfo; import com.facebook.presto.execution.buffer.BufferResult; import com.facebook.presto.execution.buffer.LazyOutputBuffer; import com.facebook.presto.execution.buffer.OutputBuffer; +import com.facebook.presto.execution.buffer.OutputBufferInfo; import com.facebook.presto.execution.buffer.OutputBuffers; import com.facebook.presto.execution.buffer.OutputBuffers.OutputBufferId; import com.facebook.presto.execution.buffer.SpoolingOutputBufferFactory; @@ -495,12 +495,9 @@ public ListenableFuture getTaskResults(OutputBufferId bufferId, lo return outputBuffer.get(bufferId, startingSequenceId, maxSize); } - public Optional getTaskBufferInfo(OutputBufferId bufferId) + public OutputBufferInfo getOutputBufferInfo() { - requireNonNull(bufferId, "bufferId is null"); - return outputBuffer.getInfo().getBuffers().stream() - .filter(bufferInfo -> bufferInfo.getBufferId().equals(bufferId)) - .findFirst(); + return outputBuffer.getInfo(); } public void acknowledgeTaskResults(OutputBufferId bufferId, long sequenceId) diff --git a/presto-main/src/main/java/com/facebook/presto/execution/SqlTaskManager.java b/presto-main/src/main/java/com/facebook/presto/execution/SqlTaskManager.java index 11b9c5940af17..3d127247fd5ee 100644 --- a/presto-main/src/main/java/com/facebook/presto/execution/SqlTaskManager.java +++ b/presto-main/src/main/java/com/facebook/presto/execution/SqlTaskManager.java @@ -23,8 +23,8 @@ import com.facebook.presto.common.block.BlockEncodingSerde; import com.facebook.presto.event.SplitMonitor; import com.facebook.presto.execution.StateMachine.StateChangeListener; -import com.facebook.presto.execution.buffer.BufferInfo; import com.facebook.presto.execution.buffer.BufferResult; +import com.facebook.presto.execution.buffer.OutputBufferInfo; import com.facebook.presto.execution.buffer.OutputBuffers; import com.facebook.presto.execution.buffer.OutputBuffers.OutputBufferId; import com.facebook.presto.execution.buffer.SpoolingOutputBufferFactory; @@ -454,12 +454,10 @@ public ListenableFuture getTaskResults(TaskId taskId, OutputBuffer return tasks.getUnchecked(taskId).getTaskResults(bufferId, startingSequenceId, maxSize); } - @Override - public Optional getTaskBufferInfo(TaskId taskId, OutputBufferId bufferId) + public OutputBufferInfo getOutputBufferInfo(TaskId taskId) { requireNonNull(taskId, "taskId is null"); - requireNonNull(bufferId, "bufferId is null"); - return tasks.getUnchecked(taskId).getTaskBufferInfo(bufferId); + return tasks.getUnchecked(taskId).getOutputBufferInfo(); } @Override diff --git a/presto-main/src/main/java/com/facebook/presto/execution/TaskManager.java b/presto-main/src/main/java/com/facebook/presto/execution/TaskManager.java index 71a49e782dfb3..98ff29a00fb25 100644 --- a/presto-main/src/main/java/com/facebook/presto/execution/TaskManager.java +++ b/presto-main/src/main/java/com/facebook/presto/execution/TaskManager.java @@ -15,8 +15,8 @@ import com.facebook.presto.Session; import com.facebook.presto.execution.StateMachine.StateChangeListener; -import com.facebook.presto.execution.buffer.BufferInfo; import com.facebook.presto.execution.buffer.BufferResult; +import com.facebook.presto.execution.buffer.OutputBufferInfo; import com.facebook.presto.execution.buffer.OutputBuffers; import com.facebook.presto.execution.buffer.OutputBuffers.OutputBufferId; import com.facebook.presto.execution.scheduler.TableWriteInfo; @@ -119,11 +119,9 @@ TaskInfo updateTask( ListenableFuture getTaskResults(TaskId taskId, OutputBufferId bufferId, long startingSequenceId, DataSize maxSize); /** - * Gets the {@link BufferInfo} associated with the specified task and buffer id. - * - * @return absent if the buffer is not present, otherwise the buffer info + * Gets the {@link OutputBufferInfo} associated with the specified task. */ - Optional getTaskBufferInfo(TaskId taskId, OutputBufferId bufferId); + OutputBufferInfo getOutputBufferInfo(TaskId taskId); /** * Acknowledges previously received results. diff --git a/presto-main/src/main/java/com/facebook/presto/server/AsyncPageTransportServlet.java b/presto-main/src/main/java/com/facebook/presto/server/AsyncPageTransportServlet.java index 93db9be2a6d44..7619c3bb6f5dd 100644 --- a/presto-main/src/main/java/com/facebook/presto/server/AsyncPageTransportServlet.java +++ b/presto-main/src/main/java/com/facebook/presto/server/AsyncPageTransportServlet.java @@ -203,9 +203,11 @@ public void onTimeout(AsyncEvent event) () -> BufferResult.emptyResults( taskManager.getTaskInstanceId(taskId), token, - taskManager.getTaskBufferInfo(taskId, bufferId) + taskManager.getOutputBufferInfo(taskId).getBuffers().stream() + .filter(bufferInfo -> bufferInfo.getBufferId().equals(bufferId)) .map(BufferInfo::getPageBufferInfo) .map(PageBufferInfo::getBufferedBytes) + .findFirst() .orElse(0L), false), waitTime, diff --git a/presto-main/src/main/java/com/facebook/presto/server/TaskResource.java b/presto-main/src/main/java/com/facebook/presto/server/TaskResource.java index 851e4d7091716..37450b850aa68 100644 --- a/presto-main/src/main/java/com/facebook/presto/server/TaskResource.java +++ b/presto-main/src/main/java/com/facebook/presto/server/TaskResource.java @@ -23,6 +23,7 @@ import com.facebook.presto.execution.TaskManager; import com.facebook.presto.execution.TaskState; import com.facebook.presto.execution.TaskStatus; +import com.facebook.presto.execution.buffer.OutputBufferInfo; import com.facebook.presto.execution.buffer.OutputBuffers.OutputBufferId; import com.facebook.presto.metadata.HandleResolver; import com.facebook.presto.metadata.MetadataUpdates; @@ -300,11 +301,19 @@ public Response taskResultsHeaders( requireNonNull(taskId, "taskId is null"); requireNonNull(bufferId, "bufferId is null"); - return taskManager.getTaskBufferInfo(taskId, bufferId) - .map(bufferInfo -> Response.ok() - .header(PRESTO_BUFFER_REMAINING_BYTES, bufferInfo.getPageBufferInfo().getBufferedBytes()) - .header(PRESTO_BUFFER_COMPLETE, bufferInfo.isFinished()) - .build()) + OutputBufferInfo outputBufferInfo = taskManager.getOutputBufferInfo(taskId); + return outputBufferInfo.getBuffers().stream() + .filter(bufferInfo -> bufferInfo.getBufferId().equals(bufferId)) + .map(bufferInfo -> { + long bufferedBytes = bufferInfo.getPageBufferInfo().getBufferedBytes(); + return Response.ok() + .header(PRESTO_BUFFER_REMAINING_BYTES, bufferedBytes) + .header(PRESTO_BUFFER_COMPLETE, bufferInfo.isFinished() + // a buffer which has the noMorePages flag set and which is empty is completed + || (!outputBufferInfo.getState().canAddPages() && bufferedBytes == 0)) + .build(); + }) + .findFirst() .orElse(Response.status(Response.Status.NOT_FOUND).build()); } diff --git a/presto-main/src/main/java/com/facebook/presto/server/thrift/ThriftTaskService.java b/presto-main/src/main/java/com/facebook/presto/server/thrift/ThriftTaskService.java index 2d27faee2d30e..949ba36409f79 100644 --- a/presto-main/src/main/java/com/facebook/presto/server/thrift/ThriftTaskService.java +++ b/presto-main/src/main/java/com/facebook/presto/server/thrift/ThriftTaskService.java @@ -66,9 +66,11 @@ public ListenableFuture getResults(TaskId taskId, OutputBuff () -> BufferResult.emptyResults( taskManager.getTaskInstanceId(taskId), token, - taskManager.getTaskBufferInfo(taskId, bufferId) + taskManager.getOutputBufferInfo(taskId).getBuffers().stream() + .filter(info -> info.getBufferId().equals(bufferId)) .map(BufferInfo::getPageBufferInfo) .map(PageBufferInfo::getBufferedBytes) + .findFirst() .orElse(0L), false), waitTime, diff --git a/presto-main/src/test/java/com/facebook/presto/execution/TestSqlTask.java b/presto-main/src/test/java/com/facebook/presto/execution/TestSqlTask.java index 89b5177d0d1df..d9fea75b9b0c6 100644 --- a/presto-main/src/test/java/com/facebook/presto/execution/TestSqlTask.java +++ b/presto-main/src/test/java/com/facebook/presto/execution/TestSqlTask.java @@ -166,7 +166,7 @@ public void testSimpleQuery() // Task results clear out all buffered data once ack is sent long pagesRetanedSizeInBytes = results.getSerializedPages().stream().mapToLong(SerializedPage::getRetainedSizeInBytes).sum(); assertEquals(results.getBufferedBytes(), 0); - Optional taskBufferInfo = sqlTask.getTaskBufferInfo(OUT); + Optional taskBufferInfo = sqlTask.getOutputBufferInfo().getBuffers().stream().filter(buffer -> buffer.getBufferId().equals(OUT)).findFirst(); assertTrue(taskBufferInfo.isPresent()); // Buffer still remains as acknowledgement has not been received assertEquals(taskBufferInfo.get().getPageBufferInfo().getBufferedBytes(), pagesRetanedSizeInBytes); @@ -175,7 +175,7 @@ public void testSimpleQuery() results = sqlTask.getTaskResults(OUT, results.getToken() + results.getSerializedPages().size(), new DataSize(1, MEGABYTE)).get(); pagesRetanedSizeInBytes = results.getSerializedPages().stream().mapToLong(SerializedPage::getRetainedSizeInBytes).sum(); assertEquals(results.getBufferedBytes(), pagesRetanedSizeInBytes); - taskBufferInfo = sqlTask.getTaskBufferInfo(OUT); + taskBufferInfo = sqlTask.getOutputBufferInfo().getBuffers().stream().filter(buffer -> buffer.getBufferId().equals(OUT)).findFirst(); assertTrue(taskBufferInfo.isPresent()); assertEquals(taskBufferInfo.get().getPageBufferInfo().getBufferedBytes(), pagesRetanedSizeInBytes); } diff --git a/presto-main/src/test/java/com/facebook/presto/execution/TestSqlTaskManager.java b/presto-main/src/test/java/com/facebook/presto/execution/TestSqlTaskManager.java index 74b3859c6b616..15496a48bef0d 100644 --- a/presto-main/src/test/java/com/facebook/presto/execution/TestSqlTaskManager.java +++ b/presto-main/src/test/java/com/facebook/presto/execution/TestSqlTaskManager.java @@ -170,13 +170,17 @@ public void testRemainingBufferMetadata() // Task results clear out all buffered data once ack is sent long retainedPageSize = results.getSerializedPages().get(0).getRetainedSizeInBytes(); assertEquals(results.getBufferedBytes(), 0); - Optional taskBufferInfo = sqlTaskManager.getTaskBufferInfo(TASK_ID, OUT); + Optional taskBufferInfo = sqlTaskManager.getOutputBufferInfo(TASK_ID).getBuffers().stream() + .filter(bufferInfo -> bufferInfo.getBufferId().equals(OUT)) + .findFirst(); assertTrue(taskBufferInfo.isPresent()); // Buffer still remains as acknowledgement has not been received assertEquals(taskBufferInfo.get().getPageBufferInfo().getBufferedBytes(), retainedPageSize); // Once acknowledged, the retained size of the data page is removed from the buffer sqlTaskManager.acknowledgeTaskResults(taskId, OUT, results.getNextToken()); - taskBufferInfo = sqlTaskManager.getTaskBufferInfo(TASK_ID, OUT); + taskBufferInfo = sqlTaskManager.getOutputBufferInfo(TASK_ID).getBuffers().stream() + .filter(bufferInfo -> bufferInfo.getBufferId().equals(OUT)) + .findFirst(); assertTrue(taskBufferInfo.isPresent()); assertEquals(taskBufferInfo.get().getPageBufferInfo().getBufferedBytes(), 0); diff --git a/presto-main/src/test/java/com/facebook/presto/server/TestThriftServerInfoIntegration.java b/presto-main/src/test/java/com/facebook/presto/server/TestThriftServerInfoIntegration.java index 2281a216c772d..e1e713ebf711c 100644 --- a/presto-main/src/test/java/com/facebook/presto/server/TestThriftServerInfoIntegration.java +++ b/presto-main/src/test/java/com/facebook/presto/server/TestThriftServerInfoIntegration.java @@ -34,8 +34,8 @@ import com.facebook.presto.execution.TaskSource; import com.facebook.presto.execution.TaskState; import com.facebook.presto.execution.TaskStatus; -import com.facebook.presto.execution.buffer.BufferInfo; import com.facebook.presto.execution.buffer.BufferResult; +import com.facebook.presto.execution.buffer.OutputBufferInfo; import com.facebook.presto.execution.buffer.OutputBuffers; import com.facebook.presto.execution.buffer.OutputBuffers.OutputBufferId; import com.facebook.presto.execution.scheduler.TableWriteInfo; @@ -218,7 +218,7 @@ public ListenableFuture getTaskResults(TaskId taskId, OutputBuffer } @Override - public Optional getTaskBufferInfo(TaskId taskId, OutputBufferId bufferId) + public OutputBufferInfo getOutputBufferInfo(TaskId taskId) { throw new UnsupportedOperationException(); } diff --git a/presto-main/src/test/java/com/facebook/presto/server/TestThriftTaskIntegration.java b/presto-main/src/test/java/com/facebook/presto/server/TestThriftTaskIntegration.java index 8bdd9e47d4c08..81d4ab3b650a2 100644 --- a/presto-main/src/test/java/com/facebook/presto/server/TestThriftTaskIntegration.java +++ b/presto-main/src/test/java/com/facebook/presto/server/TestThriftTaskIntegration.java @@ -33,8 +33,8 @@ import com.facebook.presto.execution.TaskSource; import com.facebook.presto.execution.TaskState; import com.facebook.presto.execution.TaskStatus; -import com.facebook.presto.execution.buffer.BufferInfo; import com.facebook.presto.execution.buffer.BufferResult; +import com.facebook.presto.execution.buffer.OutputBufferInfo; import com.facebook.presto.execution.buffer.OutputBuffers; import com.facebook.presto.execution.buffer.OutputBuffers.OutputBufferId; import com.facebook.presto.execution.buffer.ThriftBufferResult; @@ -247,7 +247,7 @@ public ListenableFuture getTaskResults(TaskId taskId, OutputBuffer } @Override - public Optional getTaskBufferInfo(TaskId taskId, OutputBufferId bufferId) + public OutputBufferInfo getOutputBufferInfo(TaskId taskId) { throw new UnsupportedOperationException(); } diff --git a/presto-spark-base/src/main/java/com/facebook/presto/spark/node/PrestoSparkTaskManager.java b/presto-spark-base/src/main/java/com/facebook/presto/spark/node/PrestoSparkTaskManager.java index d5d7ae159df33..cd4c3a457ccbe 100644 --- a/presto-spark-base/src/main/java/com/facebook/presto/spark/node/PrestoSparkTaskManager.java +++ b/presto-spark-base/src/main/java/com/facebook/presto/spark/node/PrestoSparkTaskManager.java @@ -21,8 +21,8 @@ import com.facebook.presto.execution.TaskSource; import com.facebook.presto.execution.TaskState; import com.facebook.presto.execution.TaskStatus; -import com.facebook.presto.execution.buffer.BufferInfo; import com.facebook.presto.execution.buffer.BufferResult; +import com.facebook.presto.execution.buffer.OutputBufferInfo; import com.facebook.presto.execution.buffer.OutputBuffers; import com.facebook.presto.execution.scheduler.TableWriteInfo; import com.facebook.presto.memory.MemoryPoolAssignmentsRequest; @@ -104,7 +104,7 @@ public ListenableFuture getTaskResults(TaskId taskId, OutputBuffer } @Override - public Optional getTaskBufferInfo(TaskId taskId, OutputBuffers.OutputBufferId bufferId) + public OutputBufferInfo getOutputBufferInfo(TaskId taskId) { throw new UnsupportedOperationException(); }