-
Notifications
You must be signed in to change notification settings - Fork 5.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Redesign Exchange protocol to reduce query latency #21926
Comments
CC: @agrawaldevesh |
@mbasmanova thank you for the clear write up. I had one initial question.
Do we need special casing for this first request in order to receive most of the performance improvement? Would it not be sufficient to simply receive the header on subsequent shuffles and adapt according to the values we receive? |
@tdcmeehan Tim, thank you for considering this proposal.
A simpler improvement on the existing protocol is to simply add X-Presto-Buffer-Remaining-Bytes header that producer uses to specify how many more bytes it has buffered and ready to go. However, this is not enough. Consider what happens when request for data comes back with X-Presto-Buffer-Remaining-Bytes indicating there is no more data yet. The consumer needs to continue polling the producer, but it no longer knows how much data it may get in response and when. So, we are back to original problem where we don't know whom to ask for data and end up spending lots of wall time asking the "wrong" producers before finally getting to the "right" one. |
@mbasmanova so is the idea like this:
If this is accurate, does this mean, what we're trying to avoid is being biased by one particular task result reporting before others have finished reporting? I also don't understand your example. If we receive In other words, I believe there may be some fairness mechanism in this algorithm, otherwise we could be misled by the initially reported bytes which might not reflect current reality. And if there is, wouldn't this fairness mechanism be enough to ensure that even if we chose producers at random, given enough time it would prioritize the producers that end up producing the most data? |
The main goal is to ensure that as long as there is data available, the consumer is working hard at fetching that data and avoid having consumer being idle when there is data to fetch. With the existing design, we see cases when consumers are not fetching data although it is available. Consider, a single consumer with 100 producers. If consumer initiates a fetch from all 100 producers in parallel, it is possible that all 100 respond back with 1MB of data each. This would require consumer to allocate 100MB of memory and go over budget. To solve this problem, consumers were modified to fetch from a limited number of producers at once. Furthermore, to avoid too much back-and-worth when waiting for data to be produced, each fetch blocks for 2 seconds if data is not available. Let's say a consumer has a budget to 10MB, so it fetches data from 10 producers in parallel. Let's also say that producer 95 has data, but no other producer does. The consumer will spent 2s waiting for producers 1-10 to report they have nothing, another 2s for producers 11-20 to report they have nothing, etc. until it gets to producer 95 after spending 18 seconds waiting. Then, let's say producer 95 returned all the data it had and will take a bit of time to produce more data. The consumer will repeat the cycle above and again sit idle waiting for another 18 seconds before getting next batch of data. |
@mbasmanova thank you for the example, that clears things up a lot. Just to clear up my understanding: in this example, would it be fair to say that if we just added the header, and not the initial special case call to retrieve the initial buffers, that it would take us 18s to discover that producer 95 has the majority of the data? And if so, that means going forward, producer 95 would be prioritized above the rest? Meaning, we would have just an 18s penality, instead of a penalty that is amortized over the life of the query? |
@tdcmeehan Yes that is true, but only if the throughput distribution of producers does not change over time. With the new protocol, it can adapt to any change and react quickly so no large latency can occur no matter what. |
Oh, I see. Is this because we periodically take a "snapshot" of all producers' pending output buffers? I had presumed that the initial request with |
@tdcmeehan It's done periodically whenever the source is in wait mode:
|
Not quite. We run two independent loops: wait-for-data, fetch-data. Wait-for-data loop is waiting for sources to report having data. This loop processes all sources in parallel. Fetch-data loop is fetching data from sources that reported having some data. This loop processes a subset of sources at a time to stay within the memory budget. Any given source is being assigned to either one loop or the other, but not both. Initially all sources go into the wait-for-data loop. As soon as the source responds as having data it is moved to the data-fetching loop. The source stays in the data-fetching loop until response indicates there is no more data available, but the source is not at-end. At this point the source is moved into the wait-for-data loop. Sources that reported being at-end are removed from the loops and do not participate in further processing. |
Makes sense. One idea on the API with respect to backwards compatibility. It seems like |
To do that, we need Coordinator to add support for this new HEAD request. This is needed to support queries that use SYSTEM tables and have their leaf scans run on Coordinator. In this case, Prestissimo workers need to fetch data from Coordinator. |
@mbasmanova this is true, but from the description, it seems that we would just need to make sure that this call doesn't fail (perhaps, just returns 200 if the buffer exists at all), which would be a trivial change. It doesn't seem like it would require support for the |
In the current design, this call must return remaining bytes, otherwise the consumer will never switch from waiting to fetching-data for that producer. Currently, we say that get-data-size request is allowed to return data, but if it doesn't and if it doesn't specify remaining bytes, we'll continue issuing that same request until we get "no-more-data" or "remaining-bytes". |
@mbasmanova got it, I missed that part of the design. Would something along these lines work in @HEAD
@Path("{taskId}/results/{bufferId}")
public Response head(
@PathParam("taskId") TaskId taskId,
@PathParam("bufferId") OutputBufferId bufferId,
@HeaderParam(PRESTO_MAX_SIZE) DataSize maxSize)
{
requireNonNull(taskId, "taskId is null");
requireNonNull(bufferId, "bufferId is null");
OutputBufferInfo outputBuffers = taskManager.getTaskInfo(taskId).getOutputBuffers();
Optional<BufferInfo> bufferInfo = outputBuffers.getBuffers().stream().filter(buffer -> buffer.getBufferId().equals(bufferId)).findFirst();
Response.ResponseBuilder responseBuilder = Response.ok();
return bufferInfo
.map(info -> responseBuilder.header("X-Presto-Buffer-Remaining-Bytes", info.getPageBufferInfo().getBufferedBytes())).orElse(responseBuilder)
.build();
} |
@tdcmeehan I'm not familiar with this code, but it looks like this would work. Do you know if this works for all kinds of buffers, i.e. broadcast/partitioned/arbitrary? |
There are |
@tdcmeehan Tim, any chance you could help add support for HTTP HEAD metadata requests and and returning remaining bytes from existing HTTP GET requests to coordinator? @Yuhta Jimmy, what do you think about this proposal? It looks cleaner, but unfortunately requires changes to the coordinator, so we won't be able to modify Prestissimo independently. |
I think this is good. On the Prestissimo side we will handle both HEAD and |
@mbasmanova @Yuhta I'll work on the coordinator part. |
@tdcmeehan Thank you, Tim. |
Summary: This is the first diff to upgrade the exchange protocol. This change only exposes the remaining bytes to buffer manager; it does not change the existing protocol yet, and is compatible with the current Prestissimo code. Also added a few statistics to spot skewed exchange. See prestodb/presto#21926 for details about the design. Differential Revision: D53793123
Summary: This is the first diff to upgrade the exchange protocol. This change only exposes the remaining bytes to buffer manager; it does not change the existing protocol yet, and is compatible with the current Prestissimo code. Also added a few statistics to spot skewed exchange. See prestodb/presto#21926 for details about the design. Differential Revision: D53793123
Summary: This is the first diff to upgrade the exchange protocol. This change only exposes the remaining bytes to buffer manager; it does not change the existing protocol yet, and is compatible with the current Prestissimo code. Also added a few statistics to spot skewed exchange. See prestodb/presto#21926 for details about the design. Differential Revision: D53793123
Summary: This is the first diff to upgrade the exchange protocol. This change only exposes the remaining bytes to buffer manager; it does not change the existing protocol yet, and is compatible with the current Prestissimo code. Also added a few statistics to spot skewed exchange. See prestodb/presto#21926 for details about the design. Differential Revision: D53793123
Summary: This is the first diff to upgrade the exchange protocol. This change only exposes the remaining bytes to buffer manager; it does not change the existing protocol yet, and is compatible with the current Prestissimo code. See prestodb/presto#21926 for details about the design. Reviewed By: mbasmanova Differential Revision: D53793123
Summary: Upgrade the exchange protocol. We will poll the remaining data sizes using `ExchangeSource::getDataSizes` from all the producers and schedule actual data fetch according to the memory budget. This reduce the waiting for data time significantly in some cases, for a query that was timing out after 1 hour on 600 nodes cluster, we reduce the wall time to 4.72 minutes on 400 nodes cluster (Java is taking 36.08 minutes on 1000 nodes cluster). See prestodb/presto#21926 Differential Revision: D54027466
This is the change needed on presto_cpp side to implement the new exchange protocol. Implement `ExchangeSource::getDataSizes` and use it when the header `X-Presto-Get-Data-Size` exists (will be removed in the future once coordinator support the `HEAD` calls), or the endpoint is called with HTTP `HEAD` method. All changes here are backward-compatible. The new protocol will not be used until we start calling `getDataSizes` on velox side in `ExchangeClient`. See prestodb#21926 for details about design of the new protocol. Also added new runtime stats `numTopOutputBuffers`.
This is the change needed on presto_cpp side to implement the new exchange protocol. Implement `ExchangeSource::getDataSizes` and use it when the header `X-Presto-Get-Data-Size` exists (will be removed in the future once coordinator support the `HEAD` calls), or the endpoint is called with HTTP `HEAD` method. All changes here are backward-compatible. The new protocol will not be used until we start calling `getDataSizes` on velox side in `ExchangeClient`. See #21926 for details about design of the new protocol. Also added new runtime stats `numTopOutputBuffers`.
Summary: facebookincubator#8845 Upgrade the exchange protocol. We will poll the remaining data sizes using `ExchangeSource::getDataSizes` from all the producers and schedule actual data fetch according to the memory budget. This reduce the waiting for data time significantly in some cases, for a query that was timing out after 1 hour on 600 nodes cluster, we reduce the wall time to 4.72 minutes on 400 nodes cluster (Java is taking 36.08 minutes on 1000 nodes cluster). See prestodb/presto#21926 Reviewed By: amitkdutta Differential Revision: D54027466
Summary: facebookincubator#8845 Upgrade the exchange protocol. We will poll the remaining data sizes using `ExchangeSource::getDataSizes` from all the producers and schedule actual data fetch according to the memory budget. This reduce the waiting for data time significantly in some cases, for a query that was timing out after 1 hour on 600 nodes cluster, we reduce the wall time to 4.72 minutes on 400 nodes cluster (Java is taking 36.08 minutes on 1000 nodes cluster). See prestodb/presto#21926 Reviewed By: amitkdutta, mbasmanova Differential Revision: D54027466
Summary: facebookincubator#8845 Upgrade the exchange protocol. We will poll the remaining data sizes using `ExchangeSource::getDataSizes` from all the producers and schedule actual data fetch according to the memory budget. This reduce the waiting for data time significantly in some cases, for a query that was timing out after 1 hour on 600 nodes cluster, we reduce the wall time to 4.72 minutes on 400 nodes cluster (Java is taking 36.08 minutes on 1000 nodes cluster). See prestodb/presto#21926 Reviewed By: amitkdutta, mbasmanova Differential Revision: D54027466
Summary: Pull Request resolved: #8845 #8845 Upgrade the exchange protocol. We will poll the remaining data sizes using `ExchangeSource::getDataSizes` from all the producers and schedule actual data fetch according to the memory budget. This reduce the waiting for data time significantly in some cases, for a query that was timing out after 1 hour on 600 nodes cluster, we reduce the wall time to 4.72 minutes on 400 nodes cluster (Java is taking 36.08 minutes on 1000 nodes cluster). See prestodb/presto#21926 Reviewed By: amitkdutta, mbasmanova Differential Revision: D54027466 fbshipit-source-id: 5f0274132aee27a61774bdc0287c35834285a1bd
This is the change needed on presto_cpp side to implement the new exchange protocol. Implement `ExchangeSource::getDataSizes` and use it when the header `X-Presto-Get-Data-Size` exists (will be removed in the future once coordinator support the `HEAD` calls), or the endpoint is called with HTTP `HEAD` method. All changes here are backward-compatible. The new protocol will not be used until we start calling `getDataSizes` on velox side in `ExchangeClient`. See prestodb#21926 for details about design of the new protocol. Also added new runtime stats `numTopOutputBuffers`.
This is the change needed on presto_cpp side to implement the new exchange protocol. Implement `ExchangeSource::getDataSizes` and use it when the header `X-Presto-Get-Data-Size` exists (will be removed in the future once coordinator support the `HEAD` calls), or the endpoint is called with HTTP `HEAD` method. All changes here are backward-compatible. The new protocol will not be used until we start calling `getDataSizes` on velox side in `ExchangeClient`. See prestodb#21926 for details about design of the new protocol. Also added new runtime stats `numTopOutputBuffers`.
Have we made the changes to use the new protocol that relies on HEAD to get output buffer metadata in C++? |
@tdcmeehan Not yet, what is the coordinator version that has the HEAD support in? We need to check all our internal coordinator has been upgraded to newer than this verison |
@Yuhta this has been present for any version cut after March 3rd. I believe given your internal release cadence, it would be safe to assume it's present. Unrelated, but I also believe you're already consuming the |
Yes we just need to switch to |
@tdcmeehan I try to switch the implementation in #22697 but it seems hitting some bug in coordinator side. It seems the coordinator keep sending back 0 remaining byte without
|
@Yuhta I looked into this, and found that it's because there's a difference in behavior between buffer info when it's reported (e.g. in TaskInfo) versus when it's reported back as part of the exchange data. Basically the coordinator is expecting at least one GET to finish off the buffer and transition it to finished, while C++ is expecting this to be reported from the header. I opened #22711 which I think will fix it. |
The new exchange protocol in prestodb#21926 expects the HEAD request to indicate the buffer is completed when the noMorePages signal has been set and the buffer is empty. This commit fixes the current behavior, which is that the coordinator expects at least one GET to transition the output buffer to completed state before returning that the buffer has been completed.
The new exchange protocol in #21926 expects the HEAD request to indicate the buffer is completed when the noMorePages signal has been set and the buffer is empty. This commit fixes the current behavior, which is that the coordinator expects at least one GET to transition the output buffer to completed state before returning that the buffer has been completed.
Summary: Pull Request resolved: facebookincubator#8845 facebookincubator#8845 Upgrade the exchange protocol. We will poll the remaining data sizes using `ExchangeSource::getDataSizes` from all the producers and schedule actual data fetch according to the memory budget. This reduce the waiting for data time significantly in some cases, for a query that was timing out after 1 hour on 600 nodes cluster, we reduce the wall time to 4.72 minutes on 400 nodes cluster (Java is taking 36.08 minutes on 1000 nodes cluster). See prestodb/presto#21926 Reviewed By: amitkdutta, mbasmanova Differential Revision: D54027466 fbshipit-source-id: 5f0274132aee27a61774bdc0287c35834285a1bd
The new exchange protocol in prestodb#21926 expects the HEAD request to indicate the buffer is completed when the noMorePages signal has been set and the buffer is empty. This commit fixes the current behavior, which is that the coordinator expects at least one GET to transition the output buffer to completed state before returning that the buffer has been completed.
@tdcmeehan Seems there is still some issue, the coordinator always reporting there is some data to fetch so the query never ends: https://app.circleci.com/pipelines/github/prestodb/presto/19551/workflows/2f010ea7-2634-4505-a5a1-aa3571f9a7be/jobs/78435/artifacts
|
An investigation of a slow exchange-heavy query prompted re-thinking of Exchange protocol used in Presto and Prestissimo.
The query under investigation has 10+ count(distinct) which are planned as 10+ stages with MarkDistinct and shuffle. This query shuffles same data 10+ times and some stages are skewed so that a handful of workers process 80% of the data. A simplified version of the query with fewer count(distinct) was taking 8 min, where 2 min went into fetching data over the network and 6 minutes went into waiting.
The consumer task has a memory limit (32GB) and therefore cannot pull from all the producers at once. The consumer also doesn't know which producers have data and which don't. When pulling from producers that do not have data, the consumer waits for 2 seconds before moving on to the next producer. These delays add up. We'd like to redesign Exchange protocol to eliminate these delays. We tried a prototype of the modified protocol described below on a query that used to time out after 1h. That query finished in < 4m.
Modified Protocol
Presto uses many-to-many streaming shuffle. M upstream workers partition data N ways. N downstream workers pull data from M upstream workers each.
Upstream workers use PartitionedOutput operators to partition the data and store it in OutputBuffer. There is a single OutputBuffer for each task with N partitions, one per downstream worker.
Downstream workers use ExchangeOperators and a single ExchangeClient to pull data from output buffers. ExchangeClient creates multiple ExchangeSources, one per upstream worker, to pull the data.
ExchangeSource uses HTTP protocol to pull data: https://prestodb.io/docs/current/develop/worker-protocol.html#data-plane
The existing protocol allows the consumer to ask for data and specify a limit on response size (maxBytes) and maximum time to wait for data to become available before sending an empty response (maxWait).
The limit on response size is advisory. The producer may not respect it. This would happen if the limit is very small (few KB) or if there is a single row that’s bigger than 1MB.
The consumer needs to maintain a cap on memory usage and therefore needs to be careful not to fetch from too many sources at once.
It is hard to come up with an optimal scheduling algorithm without knowing which sources have data and how much. Safe choices sacrifice latency.
Therefore we propose to modify the protocol to allow consumers to ask producers about how much data they have available first, then use this information to schedule data fetch.
There are cases when Prestissimo workers need to continue using the existing protocol to exchange data with Coordinator. This happens when the Coordinator pulls query results from the C++ worker. This also happens when C++ workers pull data from the leaf fragment that runs on Coordinator (TableScan for system tables). Therefore, we are going to extend the existing protocol in a backwards compatible way.
We introduce the HTTP header X-Presto-Get-Data-Size and use it with a GET {taskId}/results/{bufferId}/{token} request.
Prestissimo workers, in the presence of X-Presto-Get-Data-Size header, will return the amount of data available: a list of page sizes. The consumer will be able to request as many pages from the start of the list as would fit in memory by specifying the total bytes in these pages.
If data is not available yet, the producer would wait for up to maxWait seconds before responding.
We also introduce an HTTP header X-Presto-Buffer-Remaining-Bytes that is set by the producer in response to a GET {taskId}/results/{bufferId}/{token} to indicate how many bytes are still available.
A Prestissimo consumer would first query all (at the same time) producers for how much data they have.
The producers would respond with available data specified in the X-Presto-Buffer-Remaining-Bytes header.
The consumer then comes up with a plan to fetch available data and control the response sizes using the existing X-Presto-Max-Size header.
The consumer would prioritize fetching data from the sources that have most data. This will unblock these producers sooner and allow them to produce more data while the consumer fetches data from the remaining sources.
When fetching data, the consumer would receive the data and the size of the remaining data. This will allow the consumer to continue fetching the data without extra metadata requests.
Rinse and repeat until all sources report no-more-data.
The value of X-Presto-Buffer-Remaining-Bytes is a list of page sizes in order in which they will be produced. This is a comma-separated list of positive integers, where each value is the number of bytes in a page. For example,
Prestissimo worker includes X-Presto-Buffer-Remaining-Bytes header in all responses to GET {taskId}/results/{bufferId}/{token} regardless of whether X-Presto-Get-Data-Size header is present or not. When no data is available the header value is set to 0.
Backwards compatibility with the Coordinator
There are two use cases that involve data transfer between the Coordinator that doesn’t understand the new protocol and a Prestissimo worker.
Use case 1: Coordinator fetches query results from the worker.
Prestissimo workers would include the new X-Presto-Buffer-Remaining-Bytes header in response to existing GET {taskId}/results/{bufferId}/{token} request (without X-Presto-Get-Data-Size). Coordinator would ignore this header and continue fetching data without the benefit of knowing how much data is available.
Use case 2: Prestissimo worker fetches table scan results for system table from the Coordinator.
Prestissimo worker includes the new X-Presto-Get-Data-Size header in the existing GET {taskId}/results/{bufferId}/{token} request. Coordinator will ignore this header and return the data. Prestissimo workers should accept that response (even though they only asked for data size). This should work because in this case there is only one source and it doesn’t produce a lot of data. Prestissimo workers would also set the X-Presto-Max-Size header to avoid accidentally getting too much data (just in case).
Implementation
ExchangeClient in Velox would implement two independent loops: wait-for-data, fetch-data.
Wait-for-data loop is waiting for sources to report having data. This loop processes all sources in parallel.
Fetch-data loop is fetching data from sources that reported having some data. This loop processes a subset of sources at a time to stay within the memory budget.
Any given source is being assigned to either one loop or the other, but not both. Initially all sources go into the wait-for-data loop. As soon as the source responds as having data it is moved to the data-fetching loop. The source stays in the data-fetching loop until response indicates there is no more data available, but the source is not at-end. At this point the source is moved into the wait-for-data loop. Sources that reported being at-end are removed from the loops and do not participate in further processing.
CC: @Yuhta @spershin @arhimondr @tdcmeehan @aditi-pandit @majetideepak @pranjalssh
The text was updated successfully, but these errors were encountered: