Skip to content

Commit

Permalink
golang: allow injecting extra data
Browse files Browse the repository at this point in the history
Signed-off-by: spacewander <spacewanderlzx@gmail.com>
  • Loading branch information
spacewander committed Feb 10, 2025
1 parent 6e6898f commit ca98608
Show file tree
Hide file tree
Showing 14 changed files with 433 additions and 1 deletion.
2 changes: 2 additions & 0 deletions contrib/golang/common/go/api/api.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ typedef enum { // NOLINT(modernize-use-using)
CAPIYield = -6,
CAPIInternalFailure = -7,
CAPISerializationFailure = -8,
CAPIInvalidScene = -9,
} CAPIStatus;

/* These APIs are related to the decode/encode phase, use the pointer of processState. */
Expand All @@ -79,6 +80,7 @@ CAPIStatus envoyGoFilterHttpSendLocalReply(void* s, int response_code, void* bod
int details_len);
CAPIStatus envoyGoFilterHttpSendPanicReply(void* s, void* details_data, int details_len);
CAPIStatus envoyGoFilterHttpAddData(void* s, void* data, int data_len, bool is_streaming);
CAPIStatus envoyGoFilterHttpInjectData(void* s, void* data, int data_len);

CAPIStatus envoyGoFilterHttpGetHeader(void* s, void* key_data, int key_len, uint64_t* value_data,
int* value_len);
Expand Down
1 change: 1 addition & 0 deletions contrib/golang/common/go/api/capi.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ type HttpCAPI interface {
HttpContinue(s unsafe.Pointer, status uint64)
HttpSendLocalReply(s unsafe.Pointer, responseCode int, bodyText string, headers map[string][]string, grpcStatus int64, details string)
HttpAddData(s unsafe.Pointer, data []byte, isStreaming bool)
HttpInjectData(s unsafe.Pointer, data []byte)

// Send a specialized reply that indicates that the filter has failed on the go side. Internally this is used for
// when unhandled panics are detected.
Expand Down
2 changes: 2 additions & 0 deletions contrib/golang/common/go/api/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,8 @@ type FilterProcessCallbacks interface {
// For example, turn a headers only request into a request with a body, add more body when processing trailers, and so on.
// The second argument isStreaming supplies if this caller streams data or buffers the full body.
AddData(data []byte, isStreaming bool)
// InjectData inject the content of slice data via Envoy StreamXXFilterCallbacks's injectXXDataToFilterChaininjectData.
InjectData(data []byte)
}

type DecoderFilterCallbacks interface {
Expand Down
8 changes: 8 additions & 0 deletions contrib/golang/filters/http/source/cgo.cc
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,14 @@ CAPIStatus envoyGoFilterHttpAddData(void* s, void* data, int data_len, bool is_s
});
}

CAPIStatus envoyGoFilterHttpInjectData(void* s, void* data, int data_length) {
return envoyGoFilterProcessStateHandlerWrapper(
s, [data, data_length](std::shared_ptr<Filter>& filter, ProcessorState& state) -> CAPIStatus {
auto value = stringViewFromGoPointer(data, data_length);
return filter->injectData(state, value);
});
}

// unsafe API, without copy memory from c to go.
CAPIStatus envoyGoFilterHttpGetHeader(void* s, void* key_data, int key_len, uint64_t* value_data,
int* value_len) {
Expand Down
12 changes: 11 additions & 1 deletion contrib/golang/filters/http/source/go/pkg/http/capi_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ func handleCApiStatus(status C.CAPIStatus) {
case C.CAPIFilterIsGone,
C.CAPIFilterIsDestroy,
C.CAPINotInGo,
C.CAPIInvalidPhase:
C.CAPIInvalidPhase,
C.CAPIInvalidScene:
panic(capiStatusToStr(status))
}
}
Expand All @@ -92,6 +93,8 @@ func capiStatusToStr(status C.CAPIStatus) string {
return errNotInGo
case C.CAPIInvalidPhase:
return errInvalidPhase
case C.CAPIInvalidScene:
return errInvalidScene
}

return "unknown status"
Expand Down Expand Up @@ -154,6 +157,13 @@ func (c *httpCApiImpl) HttpAddData(s unsafe.Pointer, data []byte, isStreaming bo
handleCApiStatus(res)
}

func (c *httpCApiImpl) HttpInjectData(s unsafe.Pointer, data []byte) {
state := (*processState)(s)
res := C.envoyGoFilterHttpInjectData(unsafe.Pointer(state.processState),
unsafe.Pointer(unsafe.SliceData(data)), C.int(len(data)))
handleCApiStatus(res)
}

func (c *httpCApiImpl) HttpGetHeader(s unsafe.Pointer, key string) string {
state := (*processState)(s)
var valueData C.uint64_t
Expand Down
4 changes: 4 additions & 0 deletions contrib/golang/filters/http/source/go/pkg/http/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,10 @@ func (s *processState) AddData(data []byte, isStreaming bool) {
cAPI.HttpAddData(unsafe.Pointer(s), data, isStreaming)
}

func (s *processState) InjectData(data []byte) {
cAPI.HttpInjectData(unsafe.Pointer(s), data)
}

func (r *httpRequest) StreamInfo() api.StreamInfo {
return &r.streamInfo
}
Expand Down
1 change: 1 addition & 0 deletions contrib/golang/filters/http/source/go/pkg/http/type.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ const (
errFilterDestroyed = "golang filter has been destroyed"
errNotInGo = "not proccessing Go"
errInvalidPhase = "invalid phase, maybe headers/buffer already continued"
errInvalidScene = "invalid scene for this API"
)

// api.HeaderMap
Expand Down
37 changes: 37 additions & 0 deletions contrib/golang/filters/http/source/golang_filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -567,6 +567,43 @@ CAPIStatus Filter::addData(ProcessorState& state, absl::string_view data, bool i
return CAPIStatus::CAPIYield;
}

CAPIStatus Filter::injectData(ProcessorState& state, absl::string_view data) {
// lock until this function return since it may running in a Go thread.
Thread::LockGuard lock(mutex_);
if (has_destroyed_) {
ENVOY_LOG(debug, "golang filter has been destroyed");
return CAPIStatus::CAPIFilterIsDestroy;
}
if (!state.isProcessingInGo()) {
ENVOY_LOG(debug, "golang filter is not processing Go");
return CAPIStatus::CAPINotInGo;
}
if (state.filterState() != FilterState::ProcessingData) {
ENVOY_LOG(error, "injectData is not supported when calling without processing data, use "
"`addData` instead.");
return CAPIStatus::CAPIInvalidPhase;
}

if (state.isThreadSafe()) {
ENVOY_LOG(error, "injectData is not supported when calling inside the callback context");
return CAPIStatus::CAPIInvalidScene;
}

auto data_to_write = std::make_shared<Buffer::OwnedImpl>(data);
auto weak_ptr = weak_from_this();
state.getDispatcher().post([this, &state, weak_ptr, data_to_write] {
if (!weak_ptr.expired() && !hasDestroyed()) {
ENVOY_LOG(debug, "golang filter inject data to filter chain, length: {}",
data_to_write->length());
state.injectDataToFilterChain(*data_to_write.get(), false);
} else {
ENVOY_LOG(debug, "golang filter has gone or destroyed in injectData event");
}
});

return CAPIStatus::CAPIOK;
}

CAPIStatus Filter::getHeader(ProcessorState& state, absl::string_view key, uint64_t* value_data,
int* value_len) {
Thread::LockGuard lock(mutex_);
Expand Down
1 change: 1 addition & 0 deletions contrib/golang/filters/http/source/golang_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,7 @@ class Filter : public Http::StreamFilter,
CAPIStatus sendPanicReply(ProcessorState& state, absl::string_view details);

CAPIStatus addData(ProcessorState& state, absl::string_view data, bool is_streaming);
CAPIStatus injectData(ProcessorState& state, absl::string_view data);

CAPIStatus getHeader(ProcessorState& state, absl::string_view key, uint64_t* value_data,
int* value_len);
Expand Down
153 changes: 153 additions & 0 deletions contrib/golang/filters/http/test/golang_integration_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -810,6 +810,7 @@ name: golang
const std::string METRIC{"metric"};
const std::string ACTION{"action"};
const std::string ADDDATA{"add_data"};
const std::string BUFFERINJECTDATA{"bufferinjectdata"};
};

INSTANTIATE_TEST_SUITE_P(IpVersions, GolangIntegrationTest,
Expand Down Expand Up @@ -1350,6 +1351,158 @@ TEST_P(GolangIntegrationTest, AddDataBufferAllDataAndAsync) {
cleanup();
}

TEST_P(GolangIntegrationTest, BufferInjectData_InBufferedDownstreamRequest) {
initializeBasicFilter(BUFFERINJECTDATA, "test.com");

codec_client_ = makeHttpConnection(makeClientConnection(lookupPort("http")));
Http::TestRequestHeaderMapImpl request_headers{{":method", "POST"},
{":path", "/test?bufferingly_decode"},
{":scheme", "http"},
{":authority", "test.com"}};

auto encoder_decoder = codec_client_->startRequest(request_headers, false);
Http::RequestEncoder& request_encoder = encoder_decoder.first;
codec_client_->sendData(request_encoder, "To ", false);
codec_client_->sendData(request_encoder, "be, ", true);

waitForNextUpstreamRequest();

auto body = "To be, or not to be, that is the question";
EXPECT_EQ(body, upstream_request_->body().toString());

cleanup();
}

TEST_P(GolangIntegrationTest, BufferInjectData_InNonBufferedDownstreamRequest) {
initializeBasicFilter(BUFFERINJECTDATA, "test.com");

codec_client_ = makeHttpConnection(makeClientConnection(lookupPort("http")));
Http::TestRequestHeaderMapImpl request_headers{{":method", "POST"},
{":path", "/test?nonbufferingly_decode"},
{":scheme", "http"},
{":authority", "test.com"}};

auto encoder_decoder = codec_client_->startRequest(request_headers, false);
Http::RequestEncoder& request_encoder = encoder_decoder.first;
codec_client_->sendData(request_encoder, "To be, ", false);
timeSystem().advanceTimeAndRun(std::chrono::milliseconds(10), *dispatcher_,
Event::Dispatcher::RunType::NonBlock);
codec_client_->sendData(request_encoder, "that is ", true);

waitForNextUpstreamRequest();

auto body = "To be, or not to be, that is the question";
EXPECT_EQ(body, upstream_request_->body().toString());

cleanup();
}

TEST_P(GolangIntegrationTest, BufferInjectData_InBufferedUpstreamResponse) {
initializeBasicFilter(BUFFERINJECTDATA, "test.com");

codec_client_ = makeHttpConnection(makeClientConnection(lookupPort("http")));
Http::TestRequestHeaderMapImpl request_headers{{":method", "POST"},
{":path", "/test?bufferingly_encode"},
{":scheme", "http"},
{":authority", "test.com"}};

auto encoder_decoder = codec_client_->startRequest(request_headers, true);
auto response = std::move(encoder_decoder.second);

waitForNextUpstreamRequest();

Http::TestResponseHeaderMapImpl response_headers{
{":status", "200"},
};
upstream_request_->encodeHeaders(response_headers, false);
Buffer::OwnedImpl response_data("To ");
upstream_request_->encodeData(response_data, false);
Buffer::OwnedImpl response_data2("be, ");
upstream_request_->encodeData(response_data2, true);

ASSERT_TRUE(response->waitForEndStream());

auto body = "To be, or not to be, that is the question";
EXPECT_EQ(body, response->body());

cleanup();
}

TEST_P(GolangIntegrationTest, BufferInjectData_InNonBufferedUpstreamResponse) {
initializeBasicFilter(BUFFERINJECTDATA, "test.com");

codec_client_ = makeHttpConnection(makeClientConnection(lookupPort("http")));
Http::TestRequestHeaderMapImpl request_headers{{":method", "POST"},
{":path", "/test?nonbufferingly_encode"},
{":scheme", "http"},
{":authority", "test.com"}};

auto encoder_decoder = codec_client_->startRequest(request_headers, true);
auto response = std::move(encoder_decoder.second);

waitForNextUpstreamRequest();

Http::TestResponseHeaderMapImpl response_headers{
{":status", "200"},
};
upstream_request_->encodeHeaders(response_headers, false);
Buffer::OwnedImpl response_data("To be, ");
upstream_request_->encodeData(response_data, false);
timeSystem().advanceTimeAndRun(std::chrono::milliseconds(10), *dispatcher_,
Event::Dispatcher::RunType::NonBlock);
Buffer::OwnedImpl response_data2("that is ");
upstream_request_->encodeData(response_data2, true);

ASSERT_TRUE(response->waitForEndStream());

auto body = "To be, or not to be, that is the question";
EXPECT_EQ(body, response->body());

cleanup();
}

TEST_P(GolangIntegrationTest, BufferInjectData_WithoutProcessingData) {
initializeBasicFilter(BUFFERINJECTDATA, "test.com");

codec_client_ = makeHttpConnection(makeClientConnection(lookupPort("http")));
Http::TestRequestHeaderMapImpl request_headers{
{":method", "POST"},
{":path", "/test?inject_data_when_processing_header"},
{":scheme", "http"},
{":authority", "test.com"}};

auto encoder_decoder = codec_client_->startRequest(request_headers, true);
auto response = std::move(encoder_decoder.second);

ASSERT_TRUE(response->waitForEndStream());

EXPECT_EQ("400", response->headers().getStatusValue());

cleanup();
}

TEST_P(GolangIntegrationTest, BufferInjectData_ProcessingDataSynchronously) {
initializeBasicFilter(BUFFERINJECTDATA, "test.com");

codec_client_ = makeHttpConnection(makeClientConnection(lookupPort("http")));
Http::TestRequestHeaderMapImpl request_headers{
{":method", "POST"},
{":path", "/test?inject_data_when_processing_data_synchronously"},
{":scheme", "http"},
{":authority", "test.com"}};

auto encoder_decoder = codec_client_->startRequest(request_headers, false);
Http::RequestEncoder& request_encoder = encoder_decoder.first;
codec_client_->sendData(request_encoder, "blahblah", true);
auto response = std::move(encoder_decoder.second);

ASSERT_TRUE(response->waitForEndStream());

EXPECT_EQ("400", response->headers().getStatusValue());

cleanup();
}

// Buffer exceed limit in decode header phase.
TEST_P(GolangIntegrationTest, BufferExceedLimit_DecodeHeader) {
testBufferExceedLimit("/test?databuffer=decode-header");
Expand Down
1 change: 1 addition & 0 deletions contrib/golang/filters/http/test/test_data/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ go_binary(
"//contrib/golang/filters/http/test/test_data/add_data",
"//contrib/golang/filters/http/test/test_data/basic",
"//contrib/golang/filters/http/test/test_data/buffer",
"//contrib/golang/filters/http/test/test_data/bufferinjectdata",
"//contrib/golang/filters/http/test/test_data/echo",
"//contrib/golang/filters/http/test/test_data/metric",
"//contrib/golang/filters/http/test/test_data/passthrough",
Expand Down
21 changes: 21 additions & 0 deletions contrib/golang/filters/http/test/test_data/bufferinjectdata/BUILD
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")

licenses(["notice"]) # Apache 2

go_library(
name = "bufferinjectdata",
srcs = [
"config.go",
"filter.go",
],
cgo = True,
importpath = "example.com/test-data/bufferinjectdata",
visibility = ["//visibility:public"],
deps = [
"//contrib/golang/common/go/api",
"//contrib/golang/filters/http/source/go/pkg/http",
"@com_github_cncf_xds_go//xds/type/v3:type",
"@org_golang_google_protobuf//types/known/anypb",
"@org_golang_google_protobuf//types/known/structpb",
],
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package bufferinjectdata

import (
"google.golang.org/protobuf/types/known/anypb"

"github.com/envoyproxy/envoy/contrib/golang/common/go/api"
"github.com/envoyproxy/envoy/contrib/golang/filters/http/source/go/pkg/http"
)

const Name = "buffer_inject_data"

func init() {
http.RegisterHttpFilterFactoryAndConfigParser(Name, filterFactory, &parser{})
}

type config struct {
}

type parser struct {
}

func (p *parser) Parse(any *anypb.Any, callbacks api.ConfigCallbackHandler) (interface{}, error) {
return &config{}, nil
}

func (p *parser) Merge(parent interface{}, child interface{}) interface{} {
return child
}

func filterFactory(c interface{}, callbacks api.FilterCallbackHandler) api.StreamFilter {
conf, ok := c.(*config)
if !ok {
panic("unexpected config type")
}
return &filter{
callbacks: callbacks,
config: conf,
}
}
Loading

0 comments on commit ca98608

Please sign in to comment.