Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

golang: allow flushing buffer when processing the data asynchronously #37958

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions .github/dependabot.yml
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,16 @@ updates:
interval: daily
time: "06:00"

- package-ecosystem: "gomod"
directory: "/contrib/golang/filters/http/test/test_data/buffer_flush"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@spacewander there is a current conversation about consolidating/rationalizing these deps to one

currently the go test deps all contain the same deps and whenever they change (ie protobuf) we get an avalanche of dependency updates (and ci)

any chance we could address this first? cc @doujiang24

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, just created a PR: #37962, PTAL cc @phlax

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#37962 has landed - so iiuc this bit should no longer be necessary

groups:
contrib-golang:
patterns:
- "*"
schedule:
interval: daily
time: "06:00"

- package-ecosystem: "gomod"
directory: "/contrib/golang/filters/http/test/test_data/dummy"
groups:
Expand Down
1 change: 1 addition & 0 deletions contrib/golang/common/go/api/api.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ CAPIStatus envoyGoFilterHttpGetBuffer(void* s, uint64_t buffer, void* value);
CAPIStatus envoyGoFilterHttpDrainBuffer(void* s, uint64_t buffer, uint64_t length);
CAPIStatus envoyGoFilterHttpSetBufferHelper(void* s, uint64_t buffer, void* data, int length,
bufferAction action);
CAPIStatus envoyGoFilterHttpFlushBuffer(void* s, uint64_t buffer, bool wait);

CAPIStatus envoyGoFilterHttpCopyTrailers(void* s, void* strs, void* buf);
CAPIStatus envoyGoFilterHttpSetTrailer(void* s, void* key_data, int key_len, void* value,
Expand Down
1 change: 1 addition & 0 deletions contrib/golang/common/go/api/capi.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ type HttpCAPI interface {
HttpDrainBuffer(s unsafe.Pointer, bufferPtr uint64, length uint64)
HttpSetBufferHelper(s unsafe.Pointer, bufferPtr uint64, value string, action BufferAction)
HttpSetBytesBufferHelper(s unsafe.Pointer, bufferPtr uint64, value []byte, action BufferAction)
HttpFlushBuffer(s unsafe.Pointer, bufferPtr uint64, wait bool)

HttpCopyTrailers(s unsafe.Pointer, num uint64, bytes uint64) map[string][]string
HttpSetTrailer(s unsafe.Pointer, key string, value string, add bool)
Expand Down
3 changes: 3 additions & 0 deletions contrib/golang/common/go/api/type.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,9 @@ type BufferInstance interface {

// Append append the contents of the string data to the buffer.
AppendString(s string) error

// TODO: Currently we only support wait=false
Flush(wait bool)
}

//*************** BufferInstance end **************//
Expand Down
8 changes: 8 additions & 0 deletions contrib/golang/filters/http/source/cgo.cc
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,14 @@ CAPIStatus envoyGoFilterHttpDrainBuffer(void* s, uint64_t buffer_ptr, uint64_t l
});
}

CAPIStatus envoyGoFilterHttpFlushBuffer(void* s, uint64_t buffer_ptr, bool wait) {
return envoyGoFilterProcessStateHandlerWrapper(
s, [buffer_ptr, wait](std::shared_ptr<Filter>& filter, ProcessorState& state) -> CAPIStatus {
auto buffer = reinterpret_cast<Buffer::Instance*>(buffer_ptr);
return filter->flushBuffer(state, buffer, wait);
});
}

CAPIStatus envoyGoFilterHttpSetBufferHelper(void* s, uint64_t buffer_ptr, void* data, int length,
bufferAction action) {
return envoyGoFilterProcessStateHandlerWrapper(
Expand Down
6 changes: 6 additions & 0 deletions contrib/golang/filters/http/source/go/pkg/http/capi_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,12 @@ func (c *httpCApiImpl) HttpDrainBuffer(s unsafe.Pointer, bufferPtr uint64, lengt
handleCApiStatus(res)
}

func (c *httpCApiImpl) HttpFlushBuffer(s unsafe.Pointer, bufferPtr uint64, wait bool) {
state := (*processState)(s)
res := C.envoyGoFilterHttpFlushBuffer(unsafe.Pointer(state.processState), C.uint64_t(bufferPtr), C.bool(wait))
handleCApiStatus(res)
}

func (c *httpCApiImpl) HttpSetBufferHelper(s unsafe.Pointer, bufferPtr uint64, value string, action api.BufferAction) {
state := (*processState)(s)
c.httpSetBufferHelper(state, bufferPtr, unsafe.Pointer(unsafe.StringData(value)), C.int(len(value)), action)
Expand Down
9 changes: 9 additions & 0 deletions contrib/golang/filters/http/source/go/pkg/http/type.go
Original file line number Diff line number Diff line change
Expand Up @@ -435,6 +435,15 @@ func (b *httpBuffer) Reset() {
b.Drain(b.Len())
}

func (b *httpBuffer) Flush(wait bool) {
if b.length == 0 {
return
}

cAPI.HttpFlushBuffer(unsafe.Pointer(b.state), b.envoyBufferInstance, wait)
b.length = 0
}

func (b *httpBuffer) String() string {
if b.length == 0 {
return ""
Expand Down
35 changes: 35 additions & 0 deletions contrib/golang/filters/http/source/golang_filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -791,6 +791,41 @@ CAPIStatus Filter::drainBuffer(ProcessorState& state, Buffer::Instance* buffer,
return CAPIStatus::CAPIOK;
}

CAPIStatus Filter::flushBuffer(ProcessorState& state, Buffer::Instance* buffer, bool wait) {
// lock until this function return since it may running in a Go thread.
Thread::LockGuard lock(mutex_);
if (has_destroyed_) {
ENVOY_LOG(debug, "golang filter has been destroyed");
return CAPIStatus::CAPIFilterIsDestroy;
}
if (!state.isProcessingInGo()) {
ENVOY_LOG(debug, "golang filter is not processing Go");
return CAPIStatus::CAPINotInGo;
}
if (!state.doDataList.checkExisting(buffer)) {
ENVOY_LOG(debug, "invoking cgo api at invalid state: {}", __func__);
return CAPIStatus::CAPIInvalidPhase;
}

auto data_to_write = std::make_shared<Buffer::OwnedImpl>();
data_to_write->add(*buffer);
buffer->drain(buffer->length());

auto weak_ptr = weak_from_this();
state.getDispatcher().post([this, &state, weak_ptr, data_to_write, wait] {
if (!weak_ptr.expired() && !hasDestroyed()) {
ENVOY_LOG(debug, "golang filter inject data to filter chain, length: {}, wait: {}",
data_to_write->length(), wait);
state.injectDataToFilterChain(*data_to_write.get(), false);
// TODO: handle wait
} else {
ENVOY_LOG(debug, "golang filter has gone or destroyed in flushBuffer event");
}
});

return CAPIStatus::CAPIOK;
}

CAPIStatus Filter::setBufferHelper(ProcessorState& state, Buffer::Instance* buffer,
absl::string_view& value, bufferAction action) {
// lock until this function return since it may running in a Go thread.
Expand Down
1 change: 1 addition & 0 deletions contrib/golang/filters/http/source/golang_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,7 @@ class Filter : public Http::StreamFilter,
CAPIStatus removeHeader(ProcessorState& state, absl::string_view key);
CAPIStatus copyBuffer(ProcessorState& state, Buffer::Instance* buffer, char* data);
CAPIStatus drainBuffer(ProcessorState& state, Buffer::Instance* buffer, uint64_t length);
CAPIStatus flushBuffer(ProcessorState& state, Buffer::Instance* buffer, bool wait);
CAPIStatus setBufferHelper(ProcessorState& state, Buffer::Instance* buffer,
absl::string_view& value, bufferAction action);
CAPIStatus copyTrailers(ProcessorState& state, GoString* go_strs, char* go_buf);
Expand Down
1 change: 1 addition & 0 deletions contrib/golang/filters/http/test/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ envoy_cc_test(
"//contrib/golang/filters/http/test/test_data/add_data:filter.so",
"//contrib/golang/filters/http/test/test_data/basic:filter.so",
"//contrib/golang/filters/http/test/test_data/buffer:filter.so",
"//contrib/golang/filters/http/test/test_data/buffer_flush:filter.so",
"//contrib/golang/filters/http/test/test_data/echo:filter.so",
"//contrib/golang/filters/http/test/test_data/metric:filter.so",
"//contrib/golang/filters/http/test/test_data/passthrough:filter.so",
Expand Down
111 changes: 111 additions & 0 deletions contrib/golang/filters/http/test/golang_integration_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -810,6 +810,7 @@ name: golang
const std::string METRIC{"metric"};
const std::string ACTION{"action"};
const std::string ADDDATA{"add_data"};
const std::string BUFFERFLUSH{"buffer_flush"};
};

INSTANTIATE_TEST_SUITE_P(IpVersions, GolangIntegrationTest,
Expand Down Expand Up @@ -1350,6 +1351,116 @@ TEST_P(GolangIntegrationTest, AddDataBufferAllDataAndAsync) {
cleanup();
}

TEST_P(GolangIntegrationTest, BufferFlush_InBufferedDownstreamRequest) {
initializeBasicFilter(BUFFERFLUSH, "test.com");

codec_client_ = makeHttpConnection(makeClientConnection(lookupPort("http")));
Http::TestRequestHeaderMapImpl request_headers{{":method", "POST"},
{":path", "/test?bufferingly_decode"},
{":scheme", "http"},
{":authority", "test.com"}};

auto encoder_decoder = codec_client_->startRequest(request_headers, false);
Http::RequestEncoder& request_encoder = encoder_decoder.first;
codec_client_->sendData(request_encoder, "To ", false);
codec_client_->sendData(request_encoder, "be, ", true);

waitForNextUpstreamRequest();

auto body = "To be, or not to be, that is the question";
EXPECT_EQ(body, upstream_request_->body().toString());

cleanup();
}

TEST_P(GolangIntegrationTest, BufferFlush_InNonBufferedDownstreamRequest) {
initializeBasicFilter(BUFFERFLUSH, "test.com");

codec_client_ = makeHttpConnection(makeClientConnection(lookupPort("http")));
Http::TestRequestHeaderMapImpl request_headers{{":method", "POST"},
{":path", "/test?nonbufferingly_decode"},
{":scheme", "http"},
{":authority", "test.com"}};

auto encoder_decoder = codec_client_->startRequest(request_headers, false);
Http::RequestEncoder& request_encoder = encoder_decoder.first;
codec_client_->sendData(request_encoder, "To be, ", false);
timeSystem().advanceTimeAndRun(std::chrono::milliseconds(10), *dispatcher_,
Event::Dispatcher::RunType::NonBlock);
codec_client_->sendData(request_encoder, "that is ", true);

waitForNextUpstreamRequest();

auto body = "To be, or not to be, that is the question";
EXPECT_EQ(body, upstream_request_->body().toString());

cleanup();
}

TEST_P(GolangIntegrationTest, BufferFlush_InBufferedUpstreamResponse) {
initializeBasicFilter(BUFFERFLUSH, "test.com");

codec_client_ = makeHttpConnection(makeClientConnection(lookupPort("http")));
Http::TestRequestHeaderMapImpl request_headers{{":method", "POST"},
{":path", "/test?bufferingly_encode"},
{":scheme", "http"},
{":authority", "test.com"}};

auto encoder_decoder = codec_client_->startRequest(request_headers, true);
auto response = std::move(encoder_decoder.second);

waitForNextUpstreamRequest();

Http::TestResponseHeaderMapImpl response_headers{
{":status", "200"},
};
upstream_request_->encodeHeaders(response_headers, false);
Buffer::OwnedImpl response_data("To ");
upstream_request_->encodeData(response_data, false);
Buffer::OwnedImpl response_data2("be, ");
upstream_request_->encodeData(response_data2, true);

ASSERT_TRUE(response->waitForEndStream());

auto body = "To be, or not to be, that is the question";
EXPECT_EQ(body, response->body());

cleanup();
}

TEST_P(GolangIntegrationTest, BufferFlush_InNonBufferedUpstreamResponse) {
initializeBasicFilter(BUFFERFLUSH, "test.com");

codec_client_ = makeHttpConnection(makeClientConnection(lookupPort("http")));
Http::TestRequestHeaderMapImpl request_headers{{":method", "POST"},
{":path", "/test?nonbufferingly_encode"},
{":scheme", "http"},
{":authority", "test.com"}};

auto encoder_decoder = codec_client_->startRequest(request_headers, true);
auto response = std::move(encoder_decoder.second);

waitForNextUpstreamRequest();

Http::TestResponseHeaderMapImpl response_headers{
{":status", "200"},
};
upstream_request_->encodeHeaders(response_headers, false);
Buffer::OwnedImpl response_data("To be, ");
upstream_request_->encodeData(response_data, false);
timeSystem().advanceTimeAndRun(std::chrono::milliseconds(10), *dispatcher_,
Event::Dispatcher::RunType::NonBlock);
Buffer::OwnedImpl response_data2("that is ");
upstream_request_->encodeData(response_data2, true);

ASSERT_TRUE(response->waitForEndStream());

auto body = "To be, or not to be, that is the question";
EXPECT_EQ(body, response->body());

cleanup();
}

// Buffer exceed limit in decode header phase.
TEST_P(GolangIntegrationTest, BufferExceedLimit_DecodeHeader) {
testBufferExceedLimit("/test?databuffer=decode-header");
Expand Down
23 changes: 23 additions & 0 deletions contrib/golang/filters/http/test/test_data/buffer_flush/BUILD
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
load("@io_bazel_rules_go//go:def.bzl", "go_binary")

licenses(["notice"]) # Apache 2

go_binary(
name = "filter.so",
srcs = [
"config.go",
"filter.go",
],
out = "filter.so",
cgo = True,
importpath = "github.com/envoyproxy/envoy/contrib/golang/filters/http/test/test_data/buffer_flush",
linkmode = "c-shared",
visibility = ["//visibility:public"],
deps = [
"//contrib/golang/common/go/api",
"//contrib/golang/filters/http/source/go/pkg/http",
"@com_github_cncf_xds_go//xds/type/v3:type",
"@org_golang_google_protobuf//types/known/anypb",
"@org_golang_google_protobuf//types/known/structpb",
],
)
41 changes: 41 additions & 0 deletions contrib/golang/filters/http/test/test_data/buffer_flush/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package main

import (
"google.golang.org/protobuf/types/known/anypb"

"github.com/envoyproxy/envoy/contrib/golang/common/go/api"
"github.com/envoyproxy/envoy/contrib/golang/filters/http/source/go/pkg/http"
)

const Name = "buffer_flush"

func init() {
http.RegisterHttpFilterFactoryAndConfigParser(Name, filterFactory, &parser{})
}

type config struct {
}

type parser struct {
}

func (p *parser) Parse(any *anypb.Any, callbacks api.ConfigCallbackHandler) (interface{}, error) {
return &config{}, nil
}

func (p *parser) Merge(parent interface{}, child interface{}) interface{} {
return child
}

func filterFactory(c interface{}, callbacks api.FilterCallbackHandler) api.StreamFilter {
conf, ok := c.(*config)
if !ok {
panic("unexpected config type")
}
return &filter{
callbacks: callbacks,
config: conf,
}
}

func main() {}
Loading