Skip to content

Commit

Permalink
Invalidate http cache with non retryable errors
Browse files Browse the repository at this point in the history
  • Loading branch information
molotkov-and committed Feb 28, 2024
1 parent 5b9ad59 commit 9bfd1dc
Show file tree
Hide file tree
Showing 2 changed files with 114 additions and 4 deletions.
23 changes: 19 additions & 4 deletions ydb/library/actors/http/http_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -159,20 +159,27 @@ class THttpOutgoingCacheActor : public NActors::TActorBootstrapped<THttpOutgoing
}
cacheRecord.Waiters.clear();
TString error;
bool isRetryableError = true;
if (event->Get()->Error.empty()) {
if (event->Get()->Response != nullptr && !StatusSuccess(event->Get()->Response->Status)) {
error = event->Get()->Response->Message;
isRetryableError = IsRetryableError(event->Get()->Response->Status);
}
} else {
error = event->Get()->Error;
}
if (!error.empty()) {
LOG_WARN_S(ctx, HttpLog, "Error from " << cacheRecord.GetName() << ": " << error);
}
LOG_DEBUG_S(ctx, HttpLog, "OutgoingUpdate " << cacheRecord.GetName());
cacheRecord.UpdateResponse(response, event->Get()->Error, ctx.Now());
RefreshQueue.push({it->first, it->second.RefreshTime});
LOG_DEBUG_S(ctx, HttpLog, "OutgoingSchedule " << cacheRecord.GetName() << " at " << cacheRecord.RefreshTime << " until " << cacheRecord.DeathTime);
if (isRetryableError) {
LOG_DEBUG_S(ctx, HttpLog, "OutgoingUpdate " << cacheRecord.GetName());
cacheRecord.UpdateResponse(response, event->Get()->Error, ctx.Now());
RefreshQueue.push({it->first, it->second.RefreshTime});
LOG_DEBUG_S(ctx, HttpLog, "OutgoingSchedule " << cacheRecord.GetName() << " at " << cacheRecord.RefreshTime << " until " << cacheRecord.DeathTime);
} else {
LOG_DEBUG_S(ctx, HttpLog, "Response with non retryable error: OutgoingForget " << cacheRecord.GetName());
Cache.erase(it);
}
}

void Handle(NHttp::TEvHttpProxy::TEvHttpOutgoingRequest::TPtr event, const NActors::TActorContext& ctx) {
Expand Down Expand Up @@ -249,6 +256,14 @@ class THttpOutgoingCacheActor : public NActors::TActorBootstrapped<THttpOutgoing
CFunc(NActors::TEvents::TSystem::Wakeup, HandleRefresh);
}
}

private:
static bool IsRetryableError(const TStringBuf& status) {
if (status == "401") {
return false;
}
return true;
}
};

const TDuration THttpOutgoingCacheActor::RefreshTimeout;
Expand Down
95 changes: 95 additions & 0 deletions ydb/library/actors/http/http_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include <util/system/tempfile.h>
#include "http.h"
#include "http_proxy.h"
#include "http_cache.h"



Expand Down Expand Up @@ -548,4 +549,98 @@ CRA/5XcX13GJwHHj6LCoc3sL7mt8qV9HKY2AOZ88mpObzISZxgPpdKCfjsrdm63V
UNIT_ASSERT_EQUAL(response->Response->Status, "400");
UNIT_ASSERT_EQUAL(response->Response->Body, "Invalid http header");
}

Y_UNIT_TEST(CheckOutgoingCacheProxy) {
NActors::TTestActorRuntimeBase actorSystem(1, true);
actorSystem.SetUseRealInterconnect();
TPortManager portManager;
TIpPort port = portManager.GetTcpPort();
TAutoPtr<NActors::IEventHandle> handle;
actorSystem.Initialize();
actorSystem.SetLogPriority(NActorsServices::HTTP, NActors::NLog::PRI_DEBUG);

NActors::IActor* proxy = NHttp::CreateHttpProxy();
NActors::TActorId proxyId = actorSystem.Register(proxy);
actorSystem.Send(new NActors::IEventHandle(proxyId, actorSystem.AllocateEdgeActor(), new NHttp::TEvHttpProxy::TEvAddListeningPort(port)), 0, true);
actorSystem.GrabEdgeEvent<NHttp::TEvHttpProxy::TEvConfirmListen>(handle);

NActors::TActorId serverId = actorSystem.AllocateEdgeActor();
actorSystem.Send(new NActors::IEventHandle(proxyId, serverId, new NHttp::TEvHttpProxy::TEvRegisterHandler("/test", serverId)), 0, true);

auto getOutgoingCachePolicy = [] (const NHttp::THttpRequest* request) -> NHttp::TCachePolicy {
NHttp::TCachePolicy policy;
if (request->Method != "GET") {
return policy;
}
TStringBuf url(request->URL);
if (url.EndsWith("/test")) {
policy.TimeToExpire = TDuration::Seconds(30);
policy.TimeToRefresh = TDuration::Seconds(5);
policy.KeepOnError = true;
}
return NHttp::GetDefaultCachePolicy(request, policy);
};

NActors::TActorId outgoingCacheProxyId = actorSystem.Register(NHttp::CreateOutgoingHttpCache(proxyId, getOutgoingCachePolicy));

// Initiate new request to server
NActors::TActorId clientId = actorSystem.AllocateEdgeActor();
NHttp::THttpOutgoingRequestPtr httpRequest = NHttp::THttpOutgoingRequest::CreateRequestGet("http://[::1]:" + ToString(port) + "/test");
actorSystem.Send(new NActors::IEventHandle(outgoingCacheProxyId, clientId, new NHttp::TEvHttpProxy::TEvHttpOutgoingRequest(httpRequest)), 0, true);
NHttp::TEvHttpProxy::TEvHttpIncomingRequest* request = actorSystem.GrabEdgeEvent<NHttp::TEvHttpProxy::TEvHttpIncomingRequest>(handle);
UNIT_ASSERT_EQUAL(request->Request->URL, "/test");

NHttp::THttpOutgoingResponsePtr httpResponse = request->Request->CreateResponseString("HTTP/1.1 200 Found\r\nConnection: Close\r\nTransfer-Encoding: chunked\r\n\r\n6\r\npassed\r\n0\r\n\r\n");
actorSystem.Send(new NActors::IEventHandle(handle->Sender, serverId, new NHttp::TEvHttpProxy::TEvHttpOutgoingResponse(httpResponse)), 0, true);
NHttp::TEvHttpProxy::TEvHttpIncomingResponse* response = actorSystem.GrabEdgeEvent<NHttp::TEvHttpProxy::TEvHttpIncomingResponse>(handle);
UNIT_ASSERT_EQUAL(response->Response->Status, "200");
UNIT_ASSERT_EQUAL(response->Response->Body, "passed");

// New request. Get response from cache
actorSystem.Send(new NActors::IEventHandle(outgoingCacheProxyId, clientId, new NHttp::TEvHttpProxy::TEvHttpOutgoingRequest(httpRequest)), 0, true);
response = actorSystem.GrabEdgeEvent<NHttp::TEvHttpProxy::TEvHttpIncomingResponse>(handle);
UNIT_ASSERT_EQUAL(response->Response->Status, "200");
UNIT_ASSERT_EQUAL(response->Response->Body, "passed");

// Generate non retryable error when refresh cache
request = actorSystem.GrabEdgeEvent<NHttp::TEvHttpProxy::TEvHttpIncomingRequest>(handle);
UNIT_ASSERT_EQUAL(request->Request->URL, "/test");
httpResponse = request->Request->CreateResponseString("HTTP/1.1 401 Unauthorized\r\nConnection: Close\r\nTransfer-Encoding: chunked\r\n\r\n13\r\nToken expired\r\n0\r\n\r\n");
actorSystem.Send(new NActors::IEventHandle(handle->Sender, serverId, new NHttp::TEvHttpProxy::TEvHttpOutgoingResponse(httpResponse)), 0, true);

// Waiting for refresh http cache
Sleep(TDuration::Seconds(7));

// Cache miss for this request. Initiate new request to server
actorSystem.Send(new NActors::IEventHandle(outgoingCacheProxyId, clientId, new NHttp::TEvHttpProxy::TEvHttpOutgoingRequest(httpRequest)), 0, true);
request = actorSystem.GrabEdgeEvent<NHttp::TEvHttpProxy::TEvHttpIncomingRequest>(handle);
UNIT_ASSERT_EQUAL(request->Request->URL, "/test");

httpResponse = request->Request->CreateResponseString("HTTP/1.1 200 Found\r\nConnection: Close\r\nTransfer-Encoding: chunked\r\n\r\n6\r\npassed\r\n0\r\n\r\n");
actorSystem.Send(new NActors::IEventHandle(handle->Sender, serverId, new NHttp::TEvHttpProxy::TEvHttpOutgoingResponse(httpResponse)), 0, true);
response = actorSystem.GrabEdgeEvent<NHttp::TEvHttpProxy::TEvHttpIncomingResponse>(handle);
UNIT_ASSERT_EQUAL(response->Response->Status, "200");
UNIT_ASSERT_EQUAL(response->Response->Body, "passed");

// New request. Get response from cache
actorSystem.Send(new NActors::IEventHandle(outgoingCacheProxyId, clientId, new NHttp::TEvHttpProxy::TEvHttpOutgoingRequest(httpRequest)), 0, true);
response = actorSystem.GrabEdgeEvent<NHttp::TEvHttpProxy::TEvHttpIncomingResponse>(handle);
UNIT_ASSERT_EQUAL(response->Response->Status, "200");
UNIT_ASSERT_EQUAL(response->Response->Body, "passed");

// Generate retryable error when refresh cache
request = actorSystem.GrabEdgeEvent<NHttp::TEvHttpProxy::TEvHttpIncomingRequest>(handle);
UNIT_ASSERT_EQUAL(request->Request->URL, "/test");
httpResponse = request->Request->CreateResponseString("HTTP/1.1 504 Gateway Timeout\r\nConnection: Close\r\nTransfer-Encoding: chunked\r\n\r\n7\r\nTimeout\r\n0\r\n\r\n");
actorSystem.Send(new NActors::IEventHandle(handle->Sender, serverId, new NHttp::TEvHttpProxy::TEvHttpOutgoingResponse(httpResponse)), 0, true);

// Waiting for refresh http cache
Sleep(TDuration::Seconds(7));

// Get response with error from cache
actorSystem.Send(new NActors::IEventHandle(outgoingCacheProxyId, clientId, new NHttp::TEvHttpProxy::TEvHttpOutgoingRequest(httpRequest)), 0, true);
response = actorSystem.GrabEdgeEvent<NHttp::TEvHttpProxy::TEvHttpIncomingResponse>(handle);
UNIT_ASSERT_EQUAL(response->Response->Status, "504");
UNIT_ASSERT_EQUAL(response->Response->Body, "Timeout");
}
}

0 comments on commit 9bfd1dc

Please sign in to comment.