Skip to content

Commit

Permalink
golang: allow injecting extra data (#38362)
Browse files Browse the repository at this point in the history
<!--
!!!ATTENTION!!!

If you are fixing *any* crash or *any* potential security issue, *do
not*
open a pull request in this repo. Please report the issue via emailing
envoy-security@googlegroups.com where the issue will be triaged
appropriately.
Thank you in advance for helping to keep Envoy secure.

!!!ATTENTION!!!

For an explanation of how to fill out the fields, please see the
relevant section
in
[PULL_REQUESTS.md](https://github.com/envoyproxy/envoy/blob/main/PULL_REQUESTS.md)
-->

Commit Message: golang: allow injecting extra data
Additional Description: This PR adds a feature that allows users to
flush the data immediately when processing the data asynchronously.
Risk Level: Low
Testing: Integration test
Docs Changes:
Release Notes:
Platform Specific Features:
[Optional Runtime guard:]
[Optional Fixes #Issue]
[Optional Fixes commit #PR or SHA]
[Optional Deprecated:]
[Optional [API
Considerations](https://github.com/envoyproxy/envoy/blob/main/api/review_checklist.md):]

Signed-off-by: spacewander <spacewanderlzx@gmail.com>
  • Loading branch information
spacewander authored Feb 24, 2025
1 parent 0b83af5 commit 6c11eac
Show file tree
Hide file tree
Showing 15 changed files with 434 additions and 1 deletion.
2 changes: 2 additions & 0 deletions contrib/golang/common/go/api/api.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ typedef enum { // NOLINT(modernize-use-using)
CAPIYield = -6,
CAPIInternalFailure = -7,
CAPISerializationFailure = -8,
CAPIInvalidScene = -9,
} CAPIStatus;

/* These APIs are related to the decode/encode phase, use the pointer of processState. */
Expand All @@ -79,6 +80,7 @@ CAPIStatus envoyGoFilterHttpSendLocalReply(void* s, int response_code, void* bod
int details_len);
CAPIStatus envoyGoFilterHttpSendPanicReply(void* s, void* details_data, int details_len);
CAPIStatus envoyGoFilterHttpAddData(void* s, void* data, int data_len, bool is_streaming);
CAPIStatus envoyGoFilterHttpInjectData(void* s, void* data, int data_len);

CAPIStatus envoyGoFilterHttpGetHeader(void* s, void* key_data, int key_len, uint64_t* value_data,
int* value_len);
Expand Down
1 change: 1 addition & 0 deletions contrib/golang/common/go/api/capi.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ type HttpCAPI interface {
HttpContinue(s unsafe.Pointer, status uint64)
HttpSendLocalReply(s unsafe.Pointer, responseCode int, bodyText string, headers map[string][]string, grpcStatus int64, details string)
HttpAddData(s unsafe.Pointer, data []byte, isStreaming bool)
HttpInjectData(s unsafe.Pointer, data []byte)

// Send a specialized reply that indicates that the filter has failed on the go side. Internally this is used for
// when unhandled panics are detected.
Expand Down
2 changes: 2 additions & 0 deletions contrib/golang/common/go/api/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,8 @@ type FilterProcessCallbacks interface {
// For example, turn a headers only request into a request with a body, add more body when processing trailers, and so on.
// The second argument isStreaming supplies if this caller streams data or buffers the full body.
AddData(data []byte, isStreaming bool)
// InjectData inject the content of slice data via Envoy StreamXXFilterCallbacks's injectXXDataToFilterChaininjectData.
InjectData(data []byte)
}

type DecoderFilterCallbacks interface {
Expand Down
8 changes: 8 additions & 0 deletions contrib/golang/filters/http/source/cgo.cc
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,14 @@ CAPIStatus envoyGoFilterHttpAddData(void* s, void* data, int data_len, bool is_s
});
}

CAPIStatus envoyGoFilterHttpInjectData(void* s, void* data, int data_length) {
return envoyGoFilterProcessStateHandlerWrapper(
s, [data, data_length](std::shared_ptr<Filter>& filter, ProcessorState& state) -> CAPIStatus {
auto value = stringViewFromGoPointer(data, data_length);
return filter->injectData(state, value);
});
}

// unsafe API, without copy memory from c to go.
CAPIStatus envoyGoFilterHttpGetHeader(void* s, void* key_data, int key_len, uint64_t* value_data,
int* value_len) {
Expand Down
12 changes: 11 additions & 1 deletion contrib/golang/filters/http/source/go/pkg/http/capi_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ func handleCApiStatus(status C.CAPIStatus) {
case C.CAPIFilterIsGone,
C.CAPIFilterIsDestroy,
C.CAPINotInGo,
C.CAPIInvalidPhase:
C.CAPIInvalidPhase,
C.CAPIInvalidScene:
panic(capiStatusToStr(status))
}
}
Expand All @@ -92,6 +93,8 @@ func capiStatusToStr(status C.CAPIStatus) string {
return errNotInGo
case C.CAPIInvalidPhase:
return errInvalidPhase
case C.CAPIInvalidScene:
return errInvalidScene
}

return "unknown status"
Expand Down Expand Up @@ -154,6 +157,13 @@ func (c *httpCApiImpl) HttpAddData(s unsafe.Pointer, data []byte, isStreaming bo
handleCApiStatus(res)
}

func (c *httpCApiImpl) HttpInjectData(s unsafe.Pointer, data []byte) {
state := (*processState)(s)
res := C.envoyGoFilterHttpInjectData(unsafe.Pointer(state.processState),
unsafe.Pointer(unsafe.SliceData(data)), C.int(len(data)))
handleCApiStatus(res)
}

func (c *httpCApiImpl) HttpGetHeader(s unsafe.Pointer, key string) string {
state := (*processState)(s)
var valueData C.uint64_t
Expand Down
4 changes: 4 additions & 0 deletions contrib/golang/filters/http/source/go/pkg/http/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,10 @@ func (s *processState) AddData(data []byte, isStreaming bool) {
cAPI.HttpAddData(unsafe.Pointer(s), data, isStreaming)
}

func (s *processState) InjectData(data []byte) {
cAPI.HttpInjectData(unsafe.Pointer(s), data)
}

func (r *httpRequest) StreamInfo() api.StreamInfo {
return &r.streamInfo
}
Expand Down
1 change: 1 addition & 0 deletions contrib/golang/filters/http/source/go/pkg/http/type.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ const (
errFilterDestroyed = "golang filter has been destroyed"
errNotInGo = "not proccessing Go"
errInvalidPhase = "invalid phase, maybe headers/buffer already continued"
errInvalidScene = "invalid scene for this API"
)

// api.HeaderMap
Expand Down
37 changes: 37 additions & 0 deletions contrib/golang/filters/http/source/golang_filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -567,6 +567,43 @@ CAPIStatus Filter::addData(ProcessorState& state, absl::string_view data, bool i
return CAPIStatus::CAPIYield;
}

CAPIStatus Filter::injectData(ProcessorState& state, absl::string_view data) {
// lock until this function return since it may running in a Go thread.
Thread::LockGuard lock(mutex_);
if (has_destroyed_) {
ENVOY_LOG(debug, "golang filter has been destroyed");
return CAPIStatus::CAPIFilterIsDestroy;
}
if (!state.isProcessingInGo()) {
ENVOY_LOG(debug, "golang filter is not processing Go");
return CAPIStatus::CAPINotInGo;
}
if (state.filterState() != FilterState::ProcessingData) {
ENVOY_LOG(error, "injectData is not supported when calling without processing data, use "
"`addData` instead.");
return CAPIStatus::CAPIInvalidPhase;
}

if (state.isThreadSafe()) {
ENVOY_LOG(error, "injectData is not supported when calling inside the callback context");
return CAPIStatus::CAPIInvalidScene;
}

auto data_to_write = std::make_shared<Buffer::OwnedImpl>(data);
auto weak_ptr = weak_from_this();
state.getDispatcher().post([this, &state, weak_ptr, data_to_write] {
if (!weak_ptr.expired() && !hasDestroyed()) {
ENVOY_LOG(debug, "golang filter inject data to filter chain, length: {}",
data_to_write->length());
state.injectDataToFilterChain(*data_to_write.get(), false);
} else {
ENVOY_LOG(debug, "golang filter has gone or destroyed in injectData event");
}
});

return CAPIStatus::CAPIOK;
}

CAPIStatus Filter::getHeader(ProcessorState& state, absl::string_view key, uint64_t* value_data,
int* value_len) {
Thread::LockGuard lock(mutex_);
Expand Down
1 change: 1 addition & 0 deletions contrib/golang/filters/http/source/golang_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,7 @@ class Filter : public Http::StreamFilter,
CAPIStatus sendPanicReply(ProcessorState& state, absl::string_view details);

CAPIStatus addData(ProcessorState& state, absl::string_view data, bool is_streaming);
CAPIStatus injectData(ProcessorState& state, absl::string_view data);

CAPIStatus getHeader(ProcessorState& state, absl::string_view key, uint64_t* value_data,
int* value_len);
Expand Down
153 changes: 153 additions & 0 deletions contrib/golang/filters/http/test/golang_integration_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -810,6 +810,7 @@ name: golang
const std::string METRIC{"metric"};
const std::string ACTION{"action"};
const std::string ADDDATA{"add_data"};
const std::string BUFFERINJECTDATA{"bufferinjectdata"};
};

INSTANTIATE_TEST_SUITE_P(IpVersions, GolangIntegrationTest,
Expand Down Expand Up @@ -1350,6 +1351,158 @@ TEST_P(GolangIntegrationTest, AddDataBufferAllDataAndAsync) {
cleanup();
}

TEST_P(GolangIntegrationTest, BufferInjectData_InBufferedDownstreamRequest) {
initializeBasicFilter(BUFFERINJECTDATA, "test.com");

codec_client_ = makeHttpConnection(makeClientConnection(lookupPort("http")));
Http::TestRequestHeaderMapImpl request_headers{{":method", "POST"},
{":path", "/test?bufferingly_decode"},
{":scheme", "http"},
{":authority", "test.com"}};

auto encoder_decoder = codec_client_->startRequest(request_headers, false);
Http::RequestEncoder& request_encoder = encoder_decoder.first;
codec_client_->sendData(request_encoder, "To ", false);
codec_client_->sendData(request_encoder, "be, ", true);

waitForNextUpstreamRequest();

auto body = "To be, or not to be, that is the question";
EXPECT_EQ(body, upstream_request_->body().toString());

cleanup();
}

TEST_P(GolangIntegrationTest, BufferInjectData_InNonBufferedDownstreamRequest) {
initializeBasicFilter(BUFFERINJECTDATA, "test.com");

codec_client_ = makeHttpConnection(makeClientConnection(lookupPort("http")));
Http::TestRequestHeaderMapImpl request_headers{{":method", "POST"},
{":path", "/test?nonbufferingly_decode"},
{":scheme", "http"},
{":authority", "test.com"}};

auto encoder_decoder = codec_client_->startRequest(request_headers, false);
Http::RequestEncoder& request_encoder = encoder_decoder.first;
codec_client_->sendData(request_encoder, "To be, ", false);
timeSystem().advanceTimeAndRun(std::chrono::milliseconds(10), *dispatcher_,
Event::Dispatcher::RunType::NonBlock);
codec_client_->sendData(request_encoder, "that is ", true);

waitForNextUpstreamRequest();

auto body = "To be, or not to be, that is the question";
EXPECT_EQ(body, upstream_request_->body().toString());

cleanup();
}

TEST_P(GolangIntegrationTest, BufferInjectData_InBufferedUpstreamResponse) {
initializeBasicFilter(BUFFERINJECTDATA, "test.com");

codec_client_ = makeHttpConnection(makeClientConnection(lookupPort("http")));
Http::TestRequestHeaderMapImpl request_headers{{":method", "POST"},
{":path", "/test?bufferingly_encode"},
{":scheme", "http"},
{":authority", "test.com"}};

auto encoder_decoder = codec_client_->startRequest(request_headers, true);
auto response = std::move(encoder_decoder.second);

waitForNextUpstreamRequest();

Http::TestResponseHeaderMapImpl response_headers{
{":status", "200"},
};
upstream_request_->encodeHeaders(response_headers, false);
Buffer::OwnedImpl response_data("To ");
upstream_request_->encodeData(response_data, false);
Buffer::OwnedImpl response_data2("be, ");
upstream_request_->encodeData(response_data2, true);

ASSERT_TRUE(response->waitForEndStream());

auto body = "To be, or not to be, that is the question";
EXPECT_EQ(body, response->body());

cleanup();
}

TEST_P(GolangIntegrationTest, BufferInjectData_InNonBufferedUpstreamResponse) {
initializeBasicFilter(BUFFERINJECTDATA, "test.com");

codec_client_ = makeHttpConnection(makeClientConnection(lookupPort("http")));
Http::TestRequestHeaderMapImpl request_headers{{":method", "POST"},
{":path", "/test?nonbufferingly_encode"},
{":scheme", "http"},
{":authority", "test.com"}};

auto encoder_decoder = codec_client_->startRequest(request_headers, true);
auto response = std::move(encoder_decoder.second);

waitForNextUpstreamRequest();

Http::TestResponseHeaderMapImpl response_headers{
{":status", "200"},
};
upstream_request_->encodeHeaders(response_headers, false);
Buffer::OwnedImpl response_data("To be, ");
upstream_request_->encodeData(response_data, false);
timeSystem().advanceTimeAndRun(std::chrono::milliseconds(10), *dispatcher_,
Event::Dispatcher::RunType::NonBlock);
Buffer::OwnedImpl response_data2("that is ");
upstream_request_->encodeData(response_data2, true);

ASSERT_TRUE(response->waitForEndStream());

auto body = "To be, or not to be, that is the question";
EXPECT_EQ(body, response->body());

cleanup();
}

TEST_P(GolangIntegrationTest, BufferInjectData_WithoutProcessingData) {
initializeBasicFilter(BUFFERINJECTDATA, "test.com");

codec_client_ = makeHttpConnection(makeClientConnection(lookupPort("http")));
Http::TestRequestHeaderMapImpl request_headers{
{":method", "POST"},
{":path", "/test?inject_data_when_processing_header"},
{":scheme", "http"},
{":authority", "test.com"}};

auto encoder_decoder = codec_client_->startRequest(request_headers, true);
auto response = std::move(encoder_decoder.second);

ASSERT_TRUE(response->waitForEndStream());

EXPECT_EQ("400", response->headers().getStatusValue());

cleanup();
}

TEST_P(GolangIntegrationTest, BufferInjectData_ProcessingDataSynchronously) {
initializeBasicFilter(BUFFERINJECTDATA, "test.com");

codec_client_ = makeHttpConnection(makeClientConnection(lookupPort("http")));
Http::TestRequestHeaderMapImpl request_headers{
{":method", "POST"},
{":path", "/test?inject_data_when_processing_data_synchronously"},
{":scheme", "http"},
{":authority", "test.com"}};

auto encoder_decoder = codec_client_->startRequest(request_headers, false);
Http::RequestEncoder& request_encoder = encoder_decoder.first;
codec_client_->sendData(request_encoder, "blahblah", true);
auto response = std::move(encoder_decoder.second);

ASSERT_TRUE(response->waitForEndStream());

EXPECT_EQ("400", response->headers().getStatusValue());

cleanup();
}

// Buffer exceed limit in decode header phase.
TEST_P(GolangIntegrationTest, BufferExceedLimit_DecodeHeader) {
testBufferExceedLimit("/test?databuffer=decode-header");
Expand Down
1 change: 1 addition & 0 deletions contrib/golang/filters/http/test/test_data/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ go_binary(
"//contrib/golang/filters/http/test/test_data/add_data",
"//contrib/golang/filters/http/test/test_data/basic",
"//contrib/golang/filters/http/test/test_data/buffer",
"//contrib/golang/filters/http/test/test_data/bufferinjectdata",
"//contrib/golang/filters/http/test/test_data/echo",
"//contrib/golang/filters/http/test/test_data/metric",
"//contrib/golang/filters/http/test/test_data/passthrough",
Expand Down
21 changes: 21 additions & 0 deletions contrib/golang/filters/http/test/test_data/bufferinjectdata/BUILD
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")

licenses(["notice"]) # Apache 2

go_library(
name = "bufferinjectdata",
srcs = [
"config.go",
"filter.go",
],
cgo = True,
importpath = "example.com/test-data/bufferinjectdata",
visibility = ["//visibility:public"],
deps = [
"//contrib/golang/common/go/api",
"//contrib/golang/filters/http/source/go/pkg/http",
"@com_github_cncf_xds_go//xds/type/v3:type",
"@org_golang_google_protobuf//types/known/anypb",
"@org_golang_google_protobuf//types/known/structpb",
],
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package bufferinjectdata

import (
"google.golang.org/protobuf/types/known/anypb"

"github.com/envoyproxy/envoy/contrib/golang/common/go/api"
"github.com/envoyproxy/envoy/contrib/golang/filters/http/source/go/pkg/http"
)

const Name = "bufferinjectdata"

func init() {
http.RegisterHttpFilterFactoryAndConfigParser(Name, filterFactory, &parser{})
}

type config struct {
}

type parser struct {
}

func (p *parser) Parse(any *anypb.Any, callbacks api.ConfigCallbackHandler) (interface{}, error) {
return &config{}, nil
}

func (p *parser) Merge(parent interface{}, child interface{}) interface{} {
return child
}

func filterFactory(c interface{}, callbacks api.FilterCallbackHandler) api.StreamFilter {
conf, ok := c.(*config)
if !ok {
panic("unexpected config type")
}
return &filter{
callbacks: callbacks,
config: conf,
}
}
Loading

0 comments on commit 6c11eac

Please sign in to comment.