Skip to content

Commit

Permalink
Fix pubsub metadata conflict (dapr#7421)
Browse files Browse the repository at this point in the history
* Temporary override to have Redis to handle event metadata.

Signed-off-by: Artur Souza <asouza.pro@gmail.com>

* E2E test change to reproduce HTTP header conflict in pubsub.

Signed-off-by: Artur Souza <asouza.pro@gmail.com>

* E2E test for content-length conflict with gRPC subscriber.

Signed-off-by: Artur Souza <asouza.pro@gmail.com>

* Hotfix to avoid content-length header conflict in HTTP and gRPC.

Signed-off-by: Artur Souza <asouza.pro@gmail.com>

* Revert "Temporary override to have Redis to handle event metadata."

This reverts commit 20b60a0.

Signed-off-by: Artur Souza <asouza.pro@gmail.com>

---------

Signed-off-by: Artur Souza <asouza.pro@gmail.com>
  • Loading branch information
artursouza authored Jan 22, 2024
1 parent 53454d0 commit 3b7bc67
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 9 deletions.
15 changes: 12 additions & 3 deletions pkg/messaging/v1/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,12 @@ const (

// ContentTypeHeader is the header key of content-type.
ContentTypeHeader = "content-type"
// ContentLengthHeader is the header key of content-length.
ContentLengthHeader = "content-length"
// DaprHeaderPrefix is the prefix if metadata is defined by non user-defined http headers.
DaprHeaderPrefix = "dapr-"
// gRPCBinaryMetadata is the suffix of grpc metadata binary value.
gRPCBinaryMetadataSuffix = "-bin"
// ContentLengthHeader is the header key of content-length.
ContentLengthHeader = "content-length"

// W3C trace correlation headers.
traceparentHeader = "traceparent"
Expand Down Expand Up @@ -233,7 +233,9 @@ func InternalMetadataToHTTPHeader(ctx context.Context, internalMD DaprInternalMe
continue
}

if strings.HasSuffix(keyName, gRPCBinaryMetadataSuffix) || keyName == ContentTypeHeader {
if strings.HasSuffix(keyName, gRPCBinaryMetadataSuffix) ||
keyName == ContentTypeHeader ||
keyName == ContentLengthHeader {
continue
}

Expand Down Expand Up @@ -454,6 +456,13 @@ func ProtobufToJSON(message protoreflect.ProtoMessage) ([]byte, error) {
// WithCustomGRPCMetadata applies a metadata map to the outgoing context metadata.
func WithCustomGRPCMetadata(ctx context.Context, md map[string]string) context.Context {
for k, v := range md {
if strings.EqualFold(k, ContentTypeHeader) ||
strings.EqualFold(k, ContentLengthHeader) {
// There is no use of the original payload's content-length because
// the entire data is already in the cloud event.
continue
}

// Uppercase keys will be converted to lowercase.
ctx = metadata.AppendToOutgoingContext(ctx, k, v)
}
Expand Down
11 changes: 8 additions & 3 deletions tests/e2e/pubsub/pubsub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,12 @@ func testPublish(t *testing.T, publisherExternalURL string, protocol string) rec
require.NoError(t, err)
offset += numberOfMessagesToPublish + 1

sentTopicAMessages, err := sendToPublisher(t, publisherExternalURL, "pubsub-a-topic", protocol, nil, "")
// Test bug where content-length metadata conflict makes message undeliverable in grpc subscriber.
// We set an arbitrarily large number that it is unlikely to match the size of the payload daprd delivers.
metadataContentLengthConflict := map[string]string{
"content-length": "9999999",
}
sentTopicAMessages, err := sendToPublisher(t, publisherExternalURL, "pubsub-a-topic", protocol, metadataContentLengthConflict, "")
require.NoError(t, err)
offset += numberOfMessagesToPublish + 1

Expand All @@ -290,10 +295,10 @@ func testPublish(t *testing.T, publisherExternalURL string, protocol string) rec
require.NoError(t, err)
offset += numberOfMessagesToPublish + 1

metadata := map[string]string{
metadataRawPayload := map[string]string{
"rawPayload": "true",
}
sentTopicRawMessages, err := sendToPublisher(t, publisherExternalURL, "pubsub-raw-topic", protocol, metadata, "")
sentTopicRawMessages, err := sendToPublisher(t, publisherExternalURL, "pubsub-raw-topic", protocol, metadataRawPayload, "")
require.NoError(t, err)
offset += numberOfMessagesToPublish + 1

Expand Down
9 changes: 6 additions & 3 deletions tests/e2e/pubsub_grpc/pubsub_grpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,10 @@ func testPublishBulk(t *testing.T, publisherExternalURL string, protocol string)
}

func testPublish(t *testing.T, publisherExternalURL string, protocol string) receivedMessagesResponse {
sentTopicAMessages, err := sendToPublisher(t, publisherExternalURL, "pubsub-a-topic", protocol, nil, "")
metadataContentLengthConflict := map[string]string{
"content-length": "9999999",
}
sentTopicAMessages, err := sendToPublisher(t, publisherExternalURL, "pubsub-a-topic", protocol, metadataContentLengthConflict, "")
require.NoError(t, err)
offset += numberOfMessagesToPublish + 1

Expand All @@ -295,10 +298,10 @@ func testPublish(t *testing.T, publisherExternalURL string, protocol string) rec
require.NoError(t, err)
offset += numberOfMessagesToPublish + 1

metadata := map[string]string{
metadataRawPayload := map[string]string{
"rawPayload": "true",
}
sentTopicRawMessages, err := sendToPublisher(t, publisherExternalURL, "pubsub-raw-topic", protocol, metadata, "")
sentTopicRawMessages, err := sendToPublisher(t, publisherExternalURL, "pubsub-raw-topic", protocol, metadataRawPayload, "")
require.NoError(t, err)
offset += numberOfMessagesToPublish + 1

Expand Down

0 comments on commit 3b7bc67

Please sign in to comment.