Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

golang: allow injecting extra data #38362

Merged
merged 1 commit into from
Feb 24, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions contrib/golang/common/go/api/api.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ typedef enum { // NOLINT(modernize-use-using)
CAPIYield = -6,
CAPIInternalFailure = -7,
CAPISerializationFailure = -8,
CAPIInvalidScene = -9,
} CAPIStatus;

/* These APIs are related to the decode/encode phase, use the pointer of processState. */
Expand All @@ -79,6 +80,7 @@ CAPIStatus envoyGoFilterHttpSendLocalReply(void* s, int response_code, void* bod
int details_len);
CAPIStatus envoyGoFilterHttpSendPanicReply(void* s, void* details_data, int details_len);
CAPIStatus envoyGoFilterHttpAddData(void* s, void* data, int data_len, bool is_streaming);
CAPIStatus envoyGoFilterHttpInjectData(void* s, void* data, int data_len);

CAPIStatus envoyGoFilterHttpGetHeader(void* s, void* key_data, int key_len, uint64_t* value_data,
int* value_len);
Expand Down
1 change: 1 addition & 0 deletions contrib/golang/common/go/api/capi.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ type HttpCAPI interface {
HttpContinue(s unsafe.Pointer, status uint64)
HttpSendLocalReply(s unsafe.Pointer, responseCode int, bodyText string, headers map[string][]string, grpcStatus int64, details string)
HttpAddData(s unsafe.Pointer, data []byte, isStreaming bool)
HttpInjectData(s unsafe.Pointer, data []byte)

// Send a specialized reply that indicates that the filter has failed on the go side. Internally this is used for
// when unhandled panics are detected.
Expand Down
2 changes: 2 additions & 0 deletions contrib/golang/common/go/api/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,8 @@ type FilterProcessCallbacks interface {
// For example, turn a headers only request into a request with a body, add more body when processing trailers, and so on.
// The second argument isStreaming supplies if this caller streams data or buffers the full body.
AddData(data []byte, isStreaming bool)
// InjectData inject the content of slice data via Envoy StreamXXFilterCallbacks's injectXXDataToFilterChaininjectData.
InjectData(data []byte)
}

type DecoderFilterCallbacks interface {
Expand Down
8 changes: 8 additions & 0 deletions contrib/golang/filters/http/source/cgo.cc
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,14 @@ CAPIStatus envoyGoFilterHttpAddData(void* s, void* data, int data_len, bool is_s
});
}

CAPIStatus envoyGoFilterHttpInjectData(void* s, void* data, int data_length) {
return envoyGoFilterProcessStateHandlerWrapper(
s, [data, data_length](std::shared_ptr<Filter>& filter, ProcessorState& state) -> CAPIStatus {
auto value = stringViewFromGoPointer(data, data_length);
return filter->injectData(state, value);
});
}

// unsafe API, without copy memory from c to go.
CAPIStatus envoyGoFilterHttpGetHeader(void* s, void* key_data, int key_len, uint64_t* value_data,
int* value_len) {
Expand Down
12 changes: 11 additions & 1 deletion contrib/golang/filters/http/source/go/pkg/http/capi_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ func handleCApiStatus(status C.CAPIStatus) {
case C.CAPIFilterIsGone,
C.CAPIFilterIsDestroy,
C.CAPINotInGo,
C.CAPIInvalidPhase:
C.CAPIInvalidPhase,
C.CAPIInvalidScene:
panic(capiStatusToStr(status))
}
}
Expand All @@ -92,6 +93,8 @@ func capiStatusToStr(status C.CAPIStatus) string {
return errNotInGo
case C.CAPIInvalidPhase:
return errInvalidPhase
case C.CAPIInvalidScene:
return errInvalidScene
}

return "unknown status"
Expand Down Expand Up @@ -154,6 +157,13 @@ func (c *httpCApiImpl) HttpAddData(s unsafe.Pointer, data []byte, isStreaming bo
handleCApiStatus(res)
}

func (c *httpCApiImpl) HttpInjectData(s unsafe.Pointer, data []byte) {
state := (*processState)(s)
res := C.envoyGoFilterHttpInjectData(unsafe.Pointer(state.processState),
unsafe.Pointer(unsafe.SliceData(data)), C.int(len(data)))
handleCApiStatus(res)
}

func (c *httpCApiImpl) HttpGetHeader(s unsafe.Pointer, key string) string {
state := (*processState)(s)
var valueData C.uint64_t
Expand Down
4 changes: 4 additions & 0 deletions contrib/golang/filters/http/source/go/pkg/http/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,10 @@ func (s *processState) AddData(data []byte, isStreaming bool) {
cAPI.HttpAddData(unsafe.Pointer(s), data, isStreaming)
}

func (s *processState) InjectData(data []byte) {
cAPI.HttpInjectData(unsafe.Pointer(s), data)
}

func (r *httpRequest) StreamInfo() api.StreamInfo {
return &r.streamInfo
}
Expand Down
1 change: 1 addition & 0 deletions contrib/golang/filters/http/source/go/pkg/http/type.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ const (
errFilterDestroyed = "golang filter has been destroyed"
errNotInGo = "not proccessing Go"
errInvalidPhase = "invalid phase, maybe headers/buffer already continued"
errInvalidScene = "invalid scene for this API"
)

// api.HeaderMap
Expand Down
37 changes: 37 additions & 0 deletions contrib/golang/filters/http/source/golang_filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -567,6 +567,43 @@ CAPIStatus Filter::addData(ProcessorState& state, absl::string_view data, bool i
return CAPIStatus::CAPIYield;
}

CAPIStatus Filter::injectData(ProcessorState& state, absl::string_view data) {
// lock until this function return since it may running in a Go thread.
Thread::LockGuard lock(mutex_);
if (has_destroyed_) {
ENVOY_LOG(debug, "golang filter has been destroyed");
return CAPIStatus::CAPIFilterIsDestroy;
}
if (!state.isProcessingInGo()) {
ENVOY_LOG(debug, "golang filter is not processing Go");
return CAPIStatus::CAPINotInGo;
}
if (state.filterState() != FilterState::ProcessingData) {
ENVOY_LOG(error, "injectData is not supported when calling without processing data, use "
"`addData` instead.");
return CAPIStatus::CAPIInvalidPhase;
}

if (state.isThreadSafe()) {
ENVOY_LOG(error, "injectData is not supported when calling inside the callback context");
return CAPIStatus::CAPIInvalidScene;
}

auto data_to_write = std::make_shared<Buffer::OwnedImpl>(data);
auto weak_ptr = weak_from_this();
state.getDispatcher().post([this, &state, weak_ptr, data_to_write] {
if (!weak_ptr.expired() && !hasDestroyed()) {
ENVOY_LOG(debug, "golang filter inject data to filter chain, length: {}",
data_to_write->length());
state.injectDataToFilterChain(*data_to_write.get(), false);
} else {
ENVOY_LOG(debug, "golang filter has gone or destroyed in injectData event");
}
});

return CAPIStatus::CAPIOK;
}

CAPIStatus Filter::getHeader(ProcessorState& state, absl::string_view key, uint64_t* value_data,
int* value_len) {
Thread::LockGuard lock(mutex_);
Expand Down
1 change: 1 addition & 0 deletions contrib/golang/filters/http/source/golang_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,7 @@ class Filter : public Http::StreamFilter,
CAPIStatus sendPanicReply(ProcessorState& state, absl::string_view details);

CAPIStatus addData(ProcessorState& state, absl::string_view data, bool is_streaming);
CAPIStatus injectData(ProcessorState& state, absl::string_view data);

CAPIStatus getHeader(ProcessorState& state, absl::string_view key, uint64_t* value_data,
int* value_len);
Expand Down
153 changes: 153 additions & 0 deletions contrib/golang/filters/http/test/golang_integration_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -810,6 +810,7 @@ name: golang
const std::string METRIC{"metric"};
const std::string ACTION{"action"};
const std::string ADDDATA{"add_data"};
const std::string BUFFERINJECTDATA{"bufferinjectdata"};
};

INSTANTIATE_TEST_SUITE_P(IpVersions, GolangIntegrationTest,
Expand Down Expand Up @@ -1350,6 +1351,158 @@ TEST_P(GolangIntegrationTest, AddDataBufferAllDataAndAsync) {
cleanup();
}

TEST_P(GolangIntegrationTest, BufferInjectData_InBufferedDownstreamRequest) {
initializeBasicFilter(BUFFERINJECTDATA, "test.com");

codec_client_ = makeHttpConnection(makeClientConnection(lookupPort("http")));
Http::TestRequestHeaderMapImpl request_headers{{":method", "POST"},
{":path", "/test?bufferingly_decode"},
{":scheme", "http"},
{":authority", "test.com"}};

auto encoder_decoder = codec_client_->startRequest(request_headers, false);
Http::RequestEncoder& request_encoder = encoder_decoder.first;
codec_client_->sendData(request_encoder, "To ", false);
codec_client_->sendData(request_encoder, "be, ", true);

waitForNextUpstreamRequest();

auto body = "To be, or not to be, that is the question";
EXPECT_EQ(body, upstream_request_->body().toString());

cleanup();
}

TEST_P(GolangIntegrationTest, BufferInjectData_InNonBufferedDownstreamRequest) {
initializeBasicFilter(BUFFERINJECTDATA, "test.com");

codec_client_ = makeHttpConnection(makeClientConnection(lookupPort("http")));
Http::TestRequestHeaderMapImpl request_headers{{":method", "POST"},
{":path", "/test?nonbufferingly_decode"},
{":scheme", "http"},
{":authority", "test.com"}};

auto encoder_decoder = codec_client_->startRequest(request_headers, false);
Http::RequestEncoder& request_encoder = encoder_decoder.first;
codec_client_->sendData(request_encoder, "To be, ", false);
timeSystem().advanceTimeAndRun(std::chrono::milliseconds(10), *dispatcher_,
Event::Dispatcher::RunType::NonBlock);
codec_client_->sendData(request_encoder, "that is ", true);

waitForNextUpstreamRequest();

auto body = "To be, or not to be, that is the question";
EXPECT_EQ(body, upstream_request_->body().toString());

cleanup();
}

TEST_P(GolangIntegrationTest, BufferInjectData_InBufferedUpstreamResponse) {
initializeBasicFilter(BUFFERINJECTDATA, "test.com");

codec_client_ = makeHttpConnection(makeClientConnection(lookupPort("http")));
Http::TestRequestHeaderMapImpl request_headers{{":method", "POST"},
{":path", "/test?bufferingly_encode"},
{":scheme", "http"},
{":authority", "test.com"}};

auto encoder_decoder = codec_client_->startRequest(request_headers, true);
auto response = std::move(encoder_decoder.second);

waitForNextUpstreamRequest();

Http::TestResponseHeaderMapImpl response_headers{
{":status", "200"},
};
upstream_request_->encodeHeaders(response_headers, false);
Buffer::OwnedImpl response_data("To ");
upstream_request_->encodeData(response_data, false);
Buffer::OwnedImpl response_data2("be, ");
upstream_request_->encodeData(response_data2, true);

ASSERT_TRUE(response->waitForEndStream());

auto body = "To be, or not to be, that is the question";
EXPECT_EQ(body, response->body());

cleanup();
}

TEST_P(GolangIntegrationTest, BufferInjectData_InNonBufferedUpstreamResponse) {
initializeBasicFilter(BUFFERINJECTDATA, "test.com");

codec_client_ = makeHttpConnection(makeClientConnection(lookupPort("http")));
Http::TestRequestHeaderMapImpl request_headers{{":method", "POST"},
{":path", "/test?nonbufferingly_encode"},
{":scheme", "http"},
{":authority", "test.com"}};

auto encoder_decoder = codec_client_->startRequest(request_headers, true);
auto response = std::move(encoder_decoder.second);

waitForNextUpstreamRequest();

Http::TestResponseHeaderMapImpl response_headers{
{":status", "200"},
};
upstream_request_->encodeHeaders(response_headers, false);
Buffer::OwnedImpl response_data("To be, ");
upstream_request_->encodeData(response_data, false);
timeSystem().advanceTimeAndRun(std::chrono::milliseconds(10), *dispatcher_,
Event::Dispatcher::RunType::NonBlock);
Buffer::OwnedImpl response_data2("that is ");
upstream_request_->encodeData(response_data2, true);

ASSERT_TRUE(response->waitForEndStream());

auto body = "To be, or not to be, that is the question";
EXPECT_EQ(body, response->body());

cleanup();
}

TEST_P(GolangIntegrationTest, BufferInjectData_WithoutProcessingData) {
initializeBasicFilter(BUFFERINJECTDATA, "test.com");

codec_client_ = makeHttpConnection(makeClientConnection(lookupPort("http")));
Http::TestRequestHeaderMapImpl request_headers{
{":method", "POST"},
{":path", "/test?inject_data_when_processing_header"},
{":scheme", "http"},
{":authority", "test.com"}};

auto encoder_decoder = codec_client_->startRequest(request_headers, true);
auto response = std::move(encoder_decoder.second);

ASSERT_TRUE(response->waitForEndStream());

EXPECT_EQ("400", response->headers().getStatusValue());

cleanup();
}

TEST_P(GolangIntegrationTest, BufferInjectData_ProcessingDataSynchronously) {
initializeBasicFilter(BUFFERINJECTDATA, "test.com");

codec_client_ = makeHttpConnection(makeClientConnection(lookupPort("http")));
Http::TestRequestHeaderMapImpl request_headers{
{":method", "POST"},
{":path", "/test?inject_data_when_processing_data_synchronously"},
{":scheme", "http"},
{":authority", "test.com"}};

auto encoder_decoder = codec_client_->startRequest(request_headers, false);
Http::RequestEncoder& request_encoder = encoder_decoder.first;
codec_client_->sendData(request_encoder, "blahblah", true);
auto response = std::move(encoder_decoder.second);

ASSERT_TRUE(response->waitForEndStream());

EXPECT_EQ("400", response->headers().getStatusValue());

cleanup();
}

// Buffer exceed limit in decode header phase.
TEST_P(GolangIntegrationTest, BufferExceedLimit_DecodeHeader) {
testBufferExceedLimit("/test?databuffer=decode-header");
Expand Down
1 change: 1 addition & 0 deletions contrib/golang/filters/http/test/test_data/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ go_binary(
"//contrib/golang/filters/http/test/test_data/add_data",
"//contrib/golang/filters/http/test/test_data/basic",
"//contrib/golang/filters/http/test/test_data/buffer",
"//contrib/golang/filters/http/test/test_data/bufferinjectdata",
"//contrib/golang/filters/http/test/test_data/echo",
"//contrib/golang/filters/http/test/test_data/metric",
"//contrib/golang/filters/http/test/test_data/passthrough",
Expand Down
21 changes: 21 additions & 0 deletions contrib/golang/filters/http/test/test_data/bufferinjectdata/BUILD
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")

licenses(["notice"]) # Apache 2

go_library(
name = "bufferinjectdata",
srcs = [
"config.go",
"filter.go",
],
cgo = True,
importpath = "example.com/test-data/bufferinjectdata",
visibility = ["//visibility:public"],
deps = [
"//contrib/golang/common/go/api",
"//contrib/golang/filters/http/source/go/pkg/http",
"@com_github_cncf_xds_go//xds/type/v3:type",
"@org_golang_google_protobuf//types/known/anypb",
"@org_golang_google_protobuf//types/known/structpb",
],
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package bufferinjectdata

import (
"google.golang.org/protobuf/types/known/anypb"

"github.com/envoyproxy/envoy/contrib/golang/common/go/api"
"github.com/envoyproxy/envoy/contrib/golang/filters/http/source/go/pkg/http"
)

const Name = "bufferinjectdata"

func init() {
http.RegisterHttpFilterFactoryAndConfigParser(Name, filterFactory, &parser{})
}

type config struct {
}

type parser struct {
}

func (p *parser) Parse(any *anypb.Any, callbacks api.ConfigCallbackHandler) (interface{}, error) {
return &config{}, nil
}

func (p *parser) Merge(parent interface{}, child interface{}) interface{} {
return child
}

func filterFactory(c interface{}, callbacks api.FilterCallbackHandler) api.StreamFilter {
conf, ok := c.(*config)
if !ok {
panic("unexpected config type")
}
return &filter{
callbacks: callbacks,
config: conf,
}
}
Loading