Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add backoff retry capability in rest client #170

Merged
merged 16 commits into from
Nov 22, 2023

Conversation

dai-chen
Copy link
Collaborator

@dai-chen dai-chen commented Nov 21, 2023

Description

Added auto backoff retry capability by intercepting request between OpenSearch RestClient and Apache CloseableHttpAsyncClient underlying. The retry policy is configurable via Flint Spark configuration.

Documentation

Please see user manual for more details: https://github.com/opensearch-project/opensearch-spark/blob/main/docs/index.md#configurations

Problem Statement

  • Apache CloseableHttpAsyncClient
    • It is an async HTTP client which returns result via Future object and doesn't retry any failure
    • If error happened before connection, exception will be thrown such as ConnectException, SocketTimeoutException (wrapped by ExecutionException in Future object)
    • Otherwise, a HttpResponse object is returned with status code and original error info from server side
  • OpenSearch RestClient
    • It will retry 502 Bad Gateway, 503 Service Unavailable, 504 Gateway Timeout on next OS node.
    • However, because we always pass single OS endpoint to Flint client, this means there is no other node to retry

Proposed Solution

With the changes, Apache CloseableHttpAsyncClient and every Future object created from it will be wrapped so Future.get() call will be intercepted and auto retry by Failsafe library.

  1. By default, the first failed request will be retried 3 more times after 1s, 2s and 4s with 0.1s jitter.
  2. If max retry is set to 0, the entire retry mechanism will be disabled

Pseudocode for core logic:

# Before the changes
org.opensearch.client.RestClient.performRequest()
  |-> org.apache.http.impl.nio.client.CloseableHttpAsyncClient.execute(...).get()

# After the changes
org.opensearch.client.RestClient.performRequest()
  |-> org.opensearch.flint.core.http.RetryableHttpAsyncClient.execute(...).get()
      |-> org.apache.http.impl.nio.client.CloseableHttpAsyncClient.execute(...).get()

Retryable Error Cases

Because the retryable status code and exception are configurable, minimum retry-able scope is preferred to avoid unexpected impact. By default, only the following status code are considered retry-able and no exception class will be retried.

  • 429 Too Many Requests
  • 502 Bad Gateway

Please see complete exception list below:

Screenshot 2023-11-22 at 8 00 06 PM

Testing

Success Case

Verified the retry logic doesn't impact the normal index create with SigV4 authentication:

$ spark-shell ...   --conf spark.datasource.flint.scheme=https  \ 
  --conf spark.datasource.flint.auth=sigv4   \
  --conf spark.datasource.flint.region=us-east-1 ...

scala> spark.sparkContext.setLogLevel("INFO")

scala> spark.sql("""
     | CREATE SKIPPING INDEX ON ds_tables.http_logs
     | (status VALUE_SET)
     | WITH (
     |   auto_refresh=true
     | );
     | """)

23/11/22 00:16:20 INFO FlintOpenSearchMetadataLog: Updating log entry 
FlintMetadataLogEntry(ZmxpbnRfbXlnbHVlX2RzX3RhYmxlc19odHRwX2xvZ3Nfc2tpcHBpbmdfaW5kZXg=,
3,1,1700612175488,refreshing,myglue,)
23/11/22 00:16:20 INFO RetryableHttpAsyncClient: Building retryable http async client with options:
FlintRetryOptions{maxRetries=3, retryableStatusCodes=429,502, retryableExceptionClassNames=}

Retry by HTTP Status Code Test

# By default only 429 and 502 will be retried
23/11/22 21:27:40 INFO FlintOpenSearchMetadataLog: Creating log entry FlintMetadataLogEntry(,-2,0,0,empty,myglue,)
23/11/22 21:27:40 INFO RetryableHttpAsyncClient: Building retryable http async client with options: 
FlintRetryOptions{maxRetries=3, retryableStatusCodes=429,502, retryableExceptionClassNames=Optional.empty}
23/11/22 21:27:40 INFO HttpStatusCodeResultPredicate: Checking if status code is retryable: 403
23/11/22 21:27:40 INFO HttpStatusCodeResultPredicate: Status code 403 check result: false
23/11/22 21:27:40 ERROR FlintSpark: Failed to create Flint index
java.lang.IllegalStateException: Failed to write log entry 
FlintMetadataLogEntry(ZmxpbnRfbXlnbHVlX2RzX3RhYmxlc19odHRwX2xvZ3Nfc2tpcHBpbmdfaW5kZXg=,-2,0,0,empty,myglue,)
  ...
  Caused by: org.opensearch.OpenSearchStatusException: OpenSearch exception [type=cluster_block_exception, reason=blocked by: [FORBIDDEN/6/cluster read-only (api)];]
	at org.opensearch.rest.BytesRestResponse.errorFromXContent(BytesRestResponse.java:207) ~[flint-spark-integration-assembly-0.1.0-SNAPSHOT.jar:0.1.0-SNAPSHOT]
  ...

# Add 403 HTTP status code to retryable list
scala> spark.conf.set("spark.datasource.flint.retry.http_status_codes", "403")
scala> spark.sql("CREATE SKIPPING INDEX ON ds_tables.http_logs (status VALUE_SET)");

23/11/22 21:31:06 INFO FlintOpenSearchMetadataLog: Creating log entry FlintMetadataLogEntry(,-2,0,0,empty,myglue,)
23/11/22 21:31:06 INFO RetryableHttpAsyncClient: Building retryable http async client with options: FlintRetryOptions{maxRetries=3, retryableStatusCodes=403, retryableExceptionClassNames=Optional.empty}
23/11/22 21:31:06 INFO HttpStatusCodeResultPredicate: Checking if status code is retryable: 403
23/11/22 21:31:06 INFO HttpStatusCodeResultPredicate: Status code 403 check result: true
23/11/22 21:31:06 ERROR FlintRetryOptions: Attempt to execute request failed: ExecutionAttemptedEvent[result=HTTP/1.1 403 Forbidden [Date: Wed, 22 Nov 2023 21:31:06 GMT, Content-Type: application/json; charset=UTF-8, Content-Length: 231, Connection: keep-alive, access-control-allow-origin: *] [Content-Length: 231,Chunked: false], exception=null]

23/11/22 21:31:07 WARN FlintRetryOptions: Retrying failed request at #1
23/11/22 21:31:07 INFO HttpStatusCodeResultPredicate: Checking if status code is retryable: 403
23/11/22 21:31:07 INFO HttpStatusCodeResultPredicate: Status code 403 check result: true
23/11/22 21:31:07 ERROR FlintRetryOptions: Attempt to execute request failed: 
ExecutionAttemptedEvent[result=HTTP/1.1 403 Forbidden [Date: Wed, 22 Nov 2023 21:31:06 GMT, 
Content-Type: application/json; charset=UTF-8, Content-Length: 231, Connection: keep-alive, 
access-control-allow-origin: *] [Content-Length: 231,Chunked: false], exception=null]

23/11/22 21:31:09 WARN FlintRetryOptions: Retrying failed request at #2
23/11/22 21:31:09 INFO HttpStatusCodeResultPredicate: Checking if status code is retryable: 403
23/11/22 21:31:09 INFO HttpStatusCodeResultPredicate: Status code 403 check result: true
23/11/22 21:31:09 ERROR FlintRetryOptions: Attempt to execute request failed: ExecutionAttemptedEvent[result=HTTP/1.1 403 Forbidden [Date: Wed, 22 Nov 2023 21:31:06 GMT, 
Content-Type: application/json; charset=UTF-8, Content-Length: 231, Connection: keep-alive, 
access-control-allow-origin: *] [Content-Length: 231,Chunked: false], exception=null]

23/11/22 21:31:13 WARN FlintRetryOptions: Retrying failed request at #3
23/11/22 21:31:13 INFO HttpStatusCodeResultPredicate: Checking if status code is retryable: 403
23/11/22 21:31:13 INFO HttpStatusCodeResultPredicate: Status code 403 check result: true
23/11/22 21:31:13 ERROR FlintRetryOptions: Attempt to execute request failed: ExecutionAttemptedEvent[result=HTTP/1.1 403 Forbidden [Date: Wed, 22 Nov 2023 21:31:06 GMT, 
Content-Type: application/json; charset=UTF-8, Content-Length: 231, Connection: keep-alive, 
access-control-allow-origin: *] [Content-Length: 231,Chunked: false], exception=null]

23/11/22 21:27:40 ERROR FlintSpark: Failed to create Flint index
java.lang.IllegalStateException: Failed to write log entry FlintMetadataLogEntry(ZmxpbnRfbXlnbHVlX2RzX3RhYmxlc19odHRwX2xvZ3Nfc2tpcHBpbmdfaW5kZXg=,-2,0,0,empty,myglue,)
  ...
Caused by: org.opensearch.OpenSearchStatusException: OpenSearch exception [type=cluster_block_exception, reason=blocked by: [FORBIDDEN/6/cluster read-only (api)];]
	at org.opensearch.rest.BytesRestResponse.errorFromXContent(BytesRestResponse.java:207) ~[flint-spark-integration-assembly-0.1.0-SNAPSHOT.jar:0.1.0-SNAPSHOT]
  ...
  Suppressed: org.opensearch.client.ResponseException: method [PUT], host [https://search-flint-test-os-paj3qjpewauxh5egty5grfvdk4.us-east-1.es.amazonaws.com], URI [/.query_execution_request_myglue/_doc/ZmxpbnRfbXlnbHVlX2RzX3RhYmxlc19odHRwX2xvZ3Nfc2tpcHBpbmdfaW5kZXg=?timeout=1m], status line [HTTP/1.1 403 Forbidden]
{"error":{"root_cause":[{"type":"cluster_block_exception","reason":"blocked by: [FORBIDDEN/6/cluster read-only (api)];"}],"type":"cluster_block_exception","reason":"blocked by: [FORBIDDEN/6/cluster read-only (api)];"},"status":403}

Retry by Exception Class Name Test

$ spark-shell ... \
  --conf spark.datasource.flint.host=search-flint-test-os-wrong-host.us-east-1.es.amazonaws.com

# By default, no retry on any exception
spark.sql("CREATE SKIPPING INDEX ON ds_tables.http_logs (status VALUE_SET)");
23/11/22 21:44:44 INFO FlintSpark: Creating Flint index FlintSparkSkippingIndex(myglue.ds_tables.http_logs,List(ValueSetSkippingStrategy(VALUE_SET,status,int)),FlintSparkIndexOptions(Map())) with ignoreIfExists false
23/11/22 21:44:44 INFO FlintOpenSearchClient: Checking if Flint index exists flint_myglue_ds_tables_http_logs_skipping_index
23/11/22 21:44:44 INFO RetryableHttpAsyncClient: Building retryable http async client with options: 
FlintRetryOptions{maxRetries=3, retryableStatusCodes=403, retryableExceptionClassNames=Optional.empty}

23/11/22 21:44:44 ERROR RetryableHttpAsyncClient: Request failed permanently. Re-throwing original exception.
java.lang.IllegalStateException: Failed to check if Flint index exists flint_myglue_ds_tables_http_logs_skipping_index
  ...
  Caused by: java.net.UnknownHostException: search-flint-test-os-wrong-host.us-east-1.es.amazonaws.com: Name or service not known
  ...

# Configure exception class name to retry
scala> spark.conf.set("spark.datasource.flint.retry.exception_class_names", "java.net.UnknownHostException")
scala> spark.sql("show index on stream.lineitem_tiny")

23/11/22 21:45:09 INFO FlintSpark: Creating Flint index FlintSparkSkippingIndex(myglue.ds_tables.http_logs,List(ValueSetSkippingStrategy(VALUE_SET,status,int)),FlintSparkIndexOptions(Map())) with ignoreIfExists false
23/11/22 21:45:09 INFO FlintOpenSearchClient: Checking if Flint index exists flint_myglue_ds_tables_http_logs_skipping_index
23/11/22 21:45:09 INFO RetryableHttpAsyncClient: Building retryable http async client with options: 
FlintRetryOptions{maxRetries=3, retryableStatusCodes=403, retryableExceptionClassNames=Optional[java.net.UnknownHostException]}
23/11/22 21:45:09 INFO ErrorStacktraceFailurePredicate: Checking if exception retryable: java.util.concurrent.ExecutionException: java.net.UnknownHostException: search-flint-test-os-wrong-host.us-east-1.es.amazonaws.com: Name or service not known
23/11/22 21:45:09 INFO ErrorStacktraceFailurePredicate: Checking if exception retryable: java.net.UnknownHostException: search-flint-test-os-wrong-host.us-east-1.es.amazonaws.com: Name or service not known
23/11/22 21:45:09 INFO ErrorStacktraceFailurePredicate: Exception is retryable: 
java.net.UnknownHostException: search-flint-test-os-wrong-host.us-east-1.es.amazonaws.com: Name or service not known
23/11/22 21:45:09 ERROR FlintRetryOptions: Attempt to execute request failed: ExecutionAttemptedEvent[result=null, exception=java.util.concurrent.ExecutionException: java.net.UnknownHostException: search-flint-test-os-wrong-host.us-east-1.es.amazonaws.com: Name or service not known]

23/11/22 21:45:10 WARN FlintRetryOptions: Retrying failed request at #1
23/11/22 21:45:10 INFO ErrorStacktraceFailurePredicate: Checking if exception retryable: java.util.concurrent.ExecutionException: java.net.UnknownHostException: search-flint-test-os-wrong-host.us-east-1.es.amazonaws.com
23/11/22 21:45:10 INFO ErrorStacktraceFailurePredicate: Checking if exception retryable: java.net.UnknownHostException: search-flint-test-os-wrong-host.us-east-1.es.amazonaws.com
23/11/22 21:45:10 INFO ErrorStacktraceFailurePredicate: Exception is retryable: java.net.UnknownHostException: search-flint-test-os-wrong-host.us-east-1.es.amazonaws.com
23/11/22 21:45:10 ERROR FlintRetryOptions: Attempt to execute request failed: ExecutionAttemptedEvent[result=null, exception=java.util.concurrent.ExecutionException: java.net.UnknownHostException: search-flint-test-os-wrong-host.us-east-1.es.amazonaws.com]

23/11/22 21:45:12 WARN FlintRetryOptions: Retrying failed request at #2
23/11/22 21:45:12 INFO ErrorStacktraceFailurePredicate: Checking if exception retryable: java.util.concurrent.ExecutionException: java.net.UnknownHostException: search-flint-test-os-wrong-host.us-east-1.es.amazonaws.com
23/11/22 21:45:12 INFO ErrorStacktraceFailurePredicate: Checking if exception retryable: java.net.UnknownHostException: search-flint-test-os-wrong-host.us-east-1.es.amazonaws.com
23/11/22 21:45:12 INFO ErrorStacktraceFailurePredicate: Exception is retryable: java.net.UnknownHostException: search-flint-test-os-wrong-host.us-east-1.es.amazonaws.com
23/11/22 21:45:12 ERROR FlintRetryOptions: Attempt to execute request failed: ExecutionAttemptedEvent[result=null, exception=java.util.concurrent.ExecutionException: java.net.UnknownHostException: search-flint-test-os-wrong-host.us-east-1.es.amazonaws.com]

23/11/22 21:45:16 WARN FlintRetryOptions: Retrying failed request at #3
23/11/22 21:45:16 INFO ErrorStacktraceFailurePredicate: Checking if exception retryable: java.util.concurrent.ExecutionException: java.net.UnknownHostException: search-flint-test-os-wrong-host.us-east-1.es.amazonaws.com
23/11/22 21:45:16 INFO ErrorStacktraceFailurePredicate: Checking if exception retryable: java.net.UnknownHostException: search-flint-test-os-wrong-host.us-east-1.es.amazonaws.com
23/11/22 21:45:16 INFO ErrorStacktraceFailurePredicate: Exception is retryable: java.net.UnknownHostException: search-flint-test-os-wrong-host.us-east-1.es.amazonaws.com
23/11/22 21:45:16 ERROR FlintRetryOptions: Attempt to execute request failed: ExecutionAttemptedEvent[result=null, exception=java.util.concurrent.ExecutionException: java.net.UnknownHostException: search-flint-test-os-wrong-host.us-east-1.es.amazonaws.com]

23/11/22 21:45:16 ERROR RetryableHttpAsyncClient: Request failed permanently. Re-throwing original exception.
java.lang.IllegalStateException: Failed to check if Flint index exists flint_myglue_ds_tables_http_logs_skipping_index
  ...
  Caused by: java.net.UnknownHostException: search-flint-test-os-wrong-host.us-east-1.es.amazonaws.com
  ...

Maximum Retry Test

scala> spark.conf.set("spark.datasource.flint.retry.max_retries", "1")
scala> spark.sql("CREATE SKIPPING INDEX ON ds_tables.http_logs (status VALUE_SET)");

23/11/22 21:52:02 INFO FlintSpark: Creating Flint index FlintSparkSkippingIndex(myglue.ds_tables.http_logs,List(ValueSetSkippingStrategy(VALUE_SET,status,int)),FlintSparkIndexOptions(Map())) with ignoreIfExists false
23/11/22 21:52:02 INFO FlintOpenSearchClient: Checking if Flint index exists flint_myglue_ds_tables_http_logs_skipping_index
23/11/22 21:52:02 INFO RetryableHttpAsyncClient: Building retryable http async client with options: FlintRetryOptions{maxRetries=1, retryableStatusCodes=403, retryableExceptionClassNames=Optional[java.net.UnknownHostException]}
23/11/22 21:52:02 INFO ErrorStacktraceFailurePredicate: Checking if exception retryable: java.util.concurrent.ExecutionException: java.net.UnknownHostException: search-flint-test-os-wrong-host.us-east-1.es.amazonaws.com: Name or service not known
23/11/22 21:52:02 INFO ErrorStacktraceFailurePredicate: Checking if exception retryable: java.net.UnknownHostException: search-flint-test-os-wrong-host.us-east-1.es.amazonaws.com: Name or service not known
23/11/22 21:52:02 INFO ErrorStacktraceFailurePredicate: Exception is retryable: java.net.UnknownHostException: search-flint-test-os-wrong-host.us-east-1.es.amazonaws.com: Name or service not known
23/11/22 21:52:02 ERROR FlintRetryOptions: Attempt to execute request failed: ExecutionAttemptedEvent[result=null, exception=java.util.concurrent.ExecutionException: java.net.UnknownHostException: search-flint-test-os-wrong-host.us-east-1.es.amazonaws.com: Name or service not known]

23/11/22 21:52:03 WARN FlintRetryOptions: Retrying failed request at #1
23/11/22 21:52:03 INFO ErrorStacktraceFailurePredicate: Checking if exception retryable: java.util.concurrent.ExecutionException: java.net.UnknownHostException: search-flint-test-os-wrong-host.us-east-1.es.amazonaws.com
23/11/22 21:52:03 INFO ErrorStacktraceFailurePredicate: Checking if exception retryable: java.net.UnknownHostException: search-flint-test-os-wrong-host.us-east-1.es.amazonaws.com
23/11/22 21:52:03 INFO ErrorStacktraceFailurePredicate: Exception is retryable: java.net.UnknownHostException: search-flint-test-os-wrong-host.us-east-1.es.amazonaws.com
23/11/22 21:52:03 ERROR FlintRetryOptions: Attempt to execute request failed: ExecutionAttemptedEvent[result=null, exception=java.util.concurrent.ExecutionException: java.net.UnknownHostException: search-flint-test-os-wrong-host.us-east-1.es.amazonaws.com]

23/11/22 21:52:03 ERROR RetryableHttpAsyncClient: Request failed permanently. Re-throwing original exception.
...

Disabling Retry Test

scala> spark.conf.set("spark.datasource.flint.retry.max_retries", "0")
scala> spark.sql("CREATE SKIPPING INDEX ON ds_tables.http_logs (status VALUE_SET)");

<b># Original Apache HTTP client created without retry wrapper
# (No "Building retryable http async ... " logging)</b>
23/11/22 21:57:03 INFO FlintSpark: Creating Flint index FlintSparkSkippingIndex(myglue.ds_tables.http_logs,List(ValueSetSkippingStrategy(VALUE_SET,status,int)),FlintSparkIndexOptions(Map())) with ignoreIfExists false
23/11/22 21:57:03 INFO FlintOpenSearchClient: Checking if Flint index exists flint_myglue_ds_tables_http_logs_skipping_index
23/11/22 21:57:03 WARN CredentialsLegacyConfigLocationProvider: Found the legacy config profiles file at [/home/hadoop/.aws/config]. Please move it to the latest default location [~/.aws/credentials].
java.lang.IllegalStateException: Failed to check if Flint index exists flint_myglue_ds_tables_http_logs_skipping_index
...

Issues Resolved

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

Signed-off-by: Chen Dai <daichen@amazon.com>
Signed-off-by: Chen Dai <daichen@amazon.com>
Signed-off-by: Chen Dai <daichen@amazon.com>
Signed-off-by: Chen Dai <daichen@amazon.com>
@dai-chen dai-chen added the enhancement New feature or request label Nov 21, 2023
@dai-chen dai-chen self-assigned this Nov 21, 2023
Signed-off-by: Chen Dai <daichen@amazon.com>
Signed-off-by: Chen Dai <daichen@amazon.com>
Signed-off-by: Chen Dai <daichen@amazon.com>
Signed-off-by: Chen Dai <daichen@amazon.com>
Signed-off-by: Chen Dai <daichen@amazon.com>
Signed-off-by: Chen Dai <daichen@amazon.com>
Signed-off-by: Chen Dai <daichen@amazon.com>
Signed-off-by: Chen Dai <daichen@amazon.com>
Signed-off-by: Chen Dai <daichen@amazon.com>
@dai-chen dai-chen marked this pull request as ready for review November 22, 2023 22:41
Signed-off-by: Chen Dai <daichen@amazon.com>
Signed-off-by: Chen Dai <daichen@amazon.com>
@dai-chen dai-chen merged commit 78193f5 into opensearch-project:main Nov 22, 2023
4 checks passed
@dai-chen dai-chen deleted the add-auto-retry-http-client branch November 22, 2023 23:59
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants