Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(spanner): remove session from pool upon "not found" refresh failure #9954

Merged
merged 2 commits into from
Sep 30, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 40 additions & 17 deletions google/cloud/spanner/internal/session_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include "google/cloud/spanner/internal/session_pool.h"
#include "google/cloud/spanner/internal/connection_impl.h"
#include "google/cloud/spanner/internal/session.h"
#include "google/cloud/spanner/internal/status_utils.h"
#include "google/cloud/spanner/options.h"
#include "google/cloud/completion_queue.h"
#include "google/cloud/internal/async_retry_loop.h"
Expand Down Expand Up @@ -84,21 +85,20 @@ void SessionPool::Initialize() {
}

SessionPool::~SessionPool() {
// All references to this object are via `shared_ptr`; since we're in the
// All references to this object are via `shared_ptr`. Since we're in the
// destructor that implies there can be no concurrent accesses to any member
// variables, including `current_timer_`.
//
// Note that it *is* possible the timer lambda in `ScheduleBackgroundWork`
// is executing concurrently. However, since we are in the destructor we know
// that the lambda must not have yet successfully finished a call to `lock()`
// on the `weak_ptr` to `this` it holds. Any subsequent or in-progress calls
// must return `nullptr`, and the lambda will not do any work nor reschedule
// the timer.
// or the response handler in `RefreshExpiringSessions` are executing
// concurrently. However, since we are in the destructor we know that
// they must not have successfully finished a call to `lock()` on the
// `weak_ptr` to `this` they hold. Any in-progress or subsequent `lock()`
// will now return `nullptr`, in which case no work is done.
current_timer_.cancel();
}

void SessionPool::ScheduleBackgroundWork(std::chrono::seconds relative_time) {
// See the comment in the destructor about the thread safety of this method.
std::weak_ptr<SessionPool> pool = shared_from_this();
current_timer_ =
cq_.MakeRelativeTimer(relative_time)
Expand Down Expand Up @@ -152,20 +152,43 @@ void SessionPool::RefreshExpiringSessions() {
}
}
}
std::weak_ptr<SessionPool> pool = shared_from_this();
for (auto& refresh : sessions_to_refresh) {
AsyncRefreshSession(cq_, refresh.first, std::move(refresh.second))
.then([](future<StatusOr<google::spanner::v1::ResultSet>> result) {
// We simply discard the response as handling IsSessionNotFound()
// by removing the session from the pool is problematic (and would
// not eliminate the possibility of IsSessionNotFound() elsewhere).
// The last-use time has already been updated to throttle attempts.
// TODO(#4026): Re-evaluate these decisions.
(void)result.get();
});
auto handler =
[pool, session_name = refresh.second](
future<StatusOr<google::spanner::v1::ResultSet>> result) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Optional: use auto result

auto response = result.get();
if (!response && IsSessionNotFound(response.status())) {
if (auto shared_pool = pool.lock()) {
// The pool still exists, but the bad session may no
// longer be in the pool because someone else has already
// tried to use it, discovered that it is bad, and so did
// not return it (or they are in process of doing all that).
// But if it is still in the pool, we remove it now.
shared_pool->Erase(session_name);
}
}
};
AsyncRefreshSession(cq_, std::move(refresh.first),
std::move(refresh.second))
.then(std::move(handler));
}
}

void SessionPool::Erase(std::string const& session_name) {
std::unique_ptr<Session> target;
std::unique_lock<std::mutex> lk(mu_);
for (auto& session : sessions_) {
if (session->session_name() == session_name) {
target = std::move(session); // deferred deletion
session = std::move(sessions_.back());
sessions_.pop_back();
break;
}
}
}

/**
/*
* Grow the session pool by creating up to `sessions_to_create` sessions and
* adding them to the pool. Note that `lk` may be released and reacquired in
* this method.
Expand Down
3 changes: 3 additions & 0 deletions google/cloud/spanner/internal/session_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,9 @@ class SessionPool : public std::enable_shared_from_this<SessionPool> {
void MaintainPoolSize();
void RefreshExpiringSessions();

// Remove the named session from the pool (if it is present).
void Erase(std::string const& session_name);

spanner::Database const db_;
google::cloud::CompletionQueue cq_;
Options const opts_;
Expand Down
66 changes: 66 additions & 0 deletions google/cloud/spanner/internal/session_pool_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include "google/cloud/spanner/options.h"
#include "google/cloud/spanner/testing/fake_clock.h"
#include "google/cloud/spanner/testing/mock_spanner_stub.h"
#include "google/cloud/spanner/testing/status_utils.h"
#include "google/cloud/spanner/timestamp.h"
#include "google/cloud/internal/background_threads_impl.h"
#include "google/cloud/status.h"
Expand Down Expand Up @@ -488,6 +489,71 @@ TEST(SessionPool, SessionRefresh) {
// a call to RefreshExpiringSessions(). This should refresh "s2" and
// satisfy the AsyncExecuteSql() expectation.
impl->SimulateCompletion(true);

// We should still be able to allocate sessions "s1" and "s2".
auto s1 = pool->Allocate();
ASSERT_STATUS_OK(s1);
EXPECT_EQ("s1", (*s1)->session_name());
auto s2 = pool->Allocate();
ASSERT_STATUS_OK(s2);
EXPECT_EQ("s2", (*s2)->session_name());
}

TEST(SessionPool, SessionRefreshNotFound) {
auto mock = std::make_shared<StrictMock<spanner_testing::MockSpannerStub>>();
EXPECT_CALL(*mock, BatchCreateSessions)
.WillOnce(Return(ByMove(MakeSessionsResponse({"s1"}))))
.WillOnce(Return(ByMove(MakeSessionsResponse({"s2"}))))
.WillOnce(Return(ByMove(MakeSessionsResponse({"s3"}))));

EXPECT_CALL(*mock, AsyncExecuteSql)
.WillOnce([](CompletionQueue&, std::unique_ptr<grpc::ClientContext>,
google::spanner::v1::ExecuteSqlRequest const& request) {
EXPECT_EQ("s2", request.session());
// The "SELECT 1" refresh returns "Session not found".
return make_ready_future(StatusOr<google::spanner::v1::ResultSet>(
spanner_testing::SessionNotFoundError(request.session())));
});

auto db = spanner::Database("project", "instance", "database");
auto impl = std::make_shared<FakeCompletionQueueImpl>();
auto keep_alive_interval = std::chrono::seconds(1);
auto clock = std::make_shared<FakeSteadyClock>();
auto pool = MakeTestSessionPool(
db, {mock}, CompletionQueue(impl),
Options{}
.set<spanner::SessionPoolKeepAliveIntervalOption>(keep_alive_interval)
.set<SessionPoolClockOption>(clock));

// Allocate and release two session, "s1" and "s2". This will satisfy the
// the first two BatchCreateSessions() expectations.
{
auto s1 = pool->Allocate();
ASSERT_STATUS_OK(s1);
EXPECT_EQ("s1", (*s1)->session_name());
{
auto s2 = pool->Allocate();
ASSERT_STATUS_OK(s2);
EXPECT_EQ("s2", (*s2)->session_name());
}
// Wait for "s2" to need refreshing before releasing "s1".
clock->AdvanceTime(keep_alive_interval * 2);
}

// Simulate completion of pending operations, which will result in
// a call to RefreshExpiringSessions(). This should refresh "s2" and
// satisfy the AsyncExecuteSql() expectation, which fails the call.
impl->SimulateCompletion(true);

// We should still be able to allocate session "s1".
auto s1 = pool->Allocate();
ASSERT_STATUS_OK(s1);
EXPECT_EQ("s1", (*s1)->session_name());
// However "s2" will be gone now, so a new allocation will produce
// "s3", satisfying the final BatchCreateSessions() expectation.
auto s3 = pool->Allocate();
ASSERT_STATUS_OK(s3);
EXPECT_EQ("s3", (*s3)->session_name());
}

} // namespace
Expand Down