Skip to content

Commit

Permalink
fixup: testcase
Browse files Browse the repository at this point in the history
  • Loading branch information
eugeneo committed Nov 3, 2023
1 parent bfeb3f7 commit cef3ccf
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 23 deletions.
7 changes: 2 additions & 5 deletions src/core/ext/xds/xds_transport.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,12 @@ class XdsTransportFactory : public InternallyRefCounted<XdsTransportFactory> {
// Represents a bidi streaming RPC call.
class StreamingCall : public InternallyRefCounted<StreamingCall> {
public:
class ReadDelayHandle : public InternallyRefCounted<ReadDelayHandle> {
class ReadDelayHandle : public RefCounted<ReadDelayHandle> {
public:
explicit ReadDelayHandle(RefCountedPtr<StreamingCall> call)
: call_(std::move(call)) {}

void Orphan() override {
gpr_log(GPR_ERROR, "Boop!");
call_->Read();
}
~ReadDelayHandle() override { call_->Read(); }

static RefCountedPtr<ReadDelayHandle> NoWait() { return nullptr; }

Expand Down
10 changes: 6 additions & 4 deletions test/core/xds/xds_client_ads_stream_wait_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,10 @@ TEST_F(XdsClientNotifyWatchersDone, Basic) {
.AddFooResource(XdsFooResource("foo1", 6))
.Serialize());
// XdsClient should have delivered the response to the watcher.
auto resource = watcher->WaitForNextResource();
ASSERT_NE(resource, nullptr);
EXPECT_EQ(resource->name, "foo1");
EXPECT_EQ(resource->value, 6);
auto resource = watcher->WaitForNextResourceAndHandle();
ASSERT_NE(resource, absl::nullopt);
EXPECT_EQ(resource->first->name, "foo1");
EXPECT_EQ(resource->first->value, 6);
// XdsClient should have sent an ACK message to the xDS server.
request = WaitForRequest(stream.get());
EXPECT_EQ(stream->read_count(), 0);
Expand All @@ -74,6 +74,8 @@ TEST_F(XdsClientNotifyWatchersDone, Basic) {
// Cancel watch.
CancelFooWatch(watcher.get(), "foo1");
EXPECT_TRUE(stream->Orphaned());
resource->second.reset();
EXPECT_EQ(stream->read_count(), 1);
}

} // namespace
Expand Down
43 changes: 29 additions & 14 deletions test/core/xds/xds_client_test_lib.h
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,11 @@ class XdsClientTestBase : public ::testing::Test {
XdsTestResourceType<ResourceStruct, all_resources_required_in_sotw>,
ResourceStruct> {
public:
using ResourceAndReadDelayHandle = std::pair<
std::shared_ptr<const ResourceStruct>,
RefCountedPtr<
XdsTransportFactory::XdsTransport::StreamingCall::ReadDelayHandle>>;

// A watcher implementation that queues delivered watches.
class Watcher : public XdsResourceTypeImpl<
XdsTestResourceType<ResourceStruct,
Expand All @@ -253,28 +258,37 @@ class XdsClientTestBase : public ::testing::Test {
return !queue_.empty();
}

std::shared_ptr<const ResourceStruct> WaitForNextResource(
absl::optional<ResourceAndReadDelayHandle> WaitForNextResourceAndHandle(
absl::Duration timeout = absl::Seconds(1),
SourceLocation location = SourceLocation()) {
MutexLock lock(&mu_);
if (!WaitForEventLocked(timeout)) return nullptr;
if (!WaitForEventLocked(timeout)) return absl::nullopt;
Event& event = queue_.front();
if (!absl::holds_alternative<std::shared_ptr<const ResourceStruct>>(
event)) {
if (!absl::holds_alternative<ResourceAndReadDelayHandle>(event)) {
EXPECT_TRUE(false)
<< "got unexpected event "
<< (absl::holds_alternative<absl::Status>(event)
? "error"
: "does-not-exist")
<< " at " << location.file() << ":" << location.line();
return nullptr;
return absl::nullopt;
}
auto foo =
std::move(absl::get<std::shared_ptr<const ResourceStruct>>(event));
auto foo = std::move(absl::get<ResourceAndReadDelayHandle>(event));
queue_.pop_front();
return foo;
}

std::shared_ptr<const ResourceStruct> WaitForNextResource(
absl::Duration timeout = absl::Seconds(1),
SourceLocation location = SourceLocation()) {
auto resource_and_handle =
WaitForNextResourceAndHandle(timeout, location);
if (!resource_and_handle.has_value()) {
return nullptr;
}
return std::move(resource_and_handle->first);
}

absl::optional<absl::Status> WaitForNextError(
absl::Duration timeout = absl::Seconds(1),
SourceLocation location = SourceLocation()) {
Expand All @@ -284,8 +298,7 @@ class XdsClientTestBase : public ::testing::Test {
if (!absl::holds_alternative<absl::Status>(event)) {
EXPECT_TRUE(false)
<< "got unexpected event "
<< (absl::holds_alternative<
std::shared_ptr<const ResourceStruct>>(event)
<< (absl::holds_alternative<ResourceAndReadDelayHandle>(event)
? "resource"
: "does-not-exist")
<< " at " << location.file() << ":" << location.line();
Expand Down Expand Up @@ -315,15 +328,17 @@ class XdsClientTestBase : public ::testing::Test {

private:
struct DoesNotExist {};
using Event = absl::variant<std::shared_ptr<const ResourceStruct>,
absl::Status, DoesNotExist>;
using Event =
absl::variant<ResourceAndReadDelayHandle, absl::Status, DoesNotExist>;

void OnResourceChanged(
std::shared_ptr<const ResourceStruct> foo,
RefCountedPtr<XdsTransportFactory::XdsTransport::StreamingCall::
ReadDelayHandle> /* read_delay_handle */) override {
RefCountedPtr<
XdsTransportFactory::XdsTransport::StreamingCall::ReadDelayHandle>
read_delay_handle) override {
MutexLock lock(&mu_);
queue_.push_back(std::move(foo));
queue_.emplace_back(
std::make_pair(std::move(foo), std::move(read_delay_handle)));
cv_.Signal();
}
void OnError(absl::Status status) override {
Expand Down

0 comments on commit cef3ccf

Please sign in to comment.