From 1b9d6c75edf6b464616ba453f115818b76f19ce6 Mon Sep 17 00:00:00 2001 From: Phil Kedy Date: Mon, 29 Nov 2021 01:45:33 -0500 Subject: [PATCH 1/3] Publish events with CloudEvents envelopes (#839) * Publish events with CloudEvent envelope with content type application/cloudevents+json * fix linter issue * Adding e2e test * Tweak --- pkg/standalone/publish.go | 18 ++++++- pkg/standalone/publish_test.go | 42 ++++++++++++++-- pkg/standalone/testutils.go | 67 ++++++++++++++----------- tests/e2e/standalone/standalone_test.go | 23 ++++++++- tests/e2e/testdata/cloudevent.json | 9 ++++ 5 files changed, 122 insertions(+), 37 deletions(-) create mode 100644 tests/e2e/testdata/cloudevent.json diff --git a/pkg/standalone/publish.go b/pkg/standalone/publish.go index eb27b0ade..0143d3872 100644 --- a/pkg/standalone/publish.go +++ b/pkg/standalone/publish.go @@ -8,6 +8,7 @@ package standalone import ( "bytes" "context" + "encoding/json" "errors" "fmt" "net" @@ -54,7 +55,22 @@ func (s *Standalone) Publish(publishAppID, pubsubName, topic string, payload []b url = fmt.Sprintf("http://localhost:%s/v%s/publish/%s/%s", fmt.Sprintf("%v", instance.HTTPPort), api.RuntimeAPIVersion, pubsubName, topic) } - r, err := httpc.Post(url, "application/json", bytes.NewBuffer(payload)) + contentType := "application/json" + + // Detect publishing with CloudEvents envelope. + var cloudEvent map[string]interface{} + if json.Unmarshal(payload, &cloudEvent); err == nil { + _, hasID := cloudEvent["id"] + _, hasSource := cloudEvent["source"] + _, hasSpecVersion := cloudEvent["specversion"] + _, hasType := cloudEvent["type"] + _, hasData := cloudEvent["data"] + if hasID && hasSource && hasSpecVersion && hasType && hasData { + contentType = "application/cloudevents+json" + } + } + + r, err := httpc.Post(url, contentType, bytes.NewBuffer(payload)) if err != nil { return err } diff --git a/pkg/standalone/publish_test.go b/pkg/standalone/publish_test.go index ca43079b2..357cceec0 100644 --- a/pkg/standalone/publish_test.go +++ b/pkg/standalone/publish_test.go @@ -6,6 +6,8 @@ package standalone import ( + "bytes" + "net/http" "os" "runtime" "testing" @@ -24,9 +26,8 @@ func TestPublish(t *testing.T) { topic string lo ListOutput listErr error - expectedPath string postResponse string - resp string + handler http.HandlerFunc errorExpected bool errString string }{ @@ -37,6 +38,7 @@ func TestPublish(t *testing.T) { pubsubName: "test", errString: "publishAppID is missing", errorExpected: true, + handler: handlerTestPathResp("", ""), }, { name: "test empty topic", @@ -45,6 +47,7 @@ func TestPublish(t *testing.T) { pubsubName: "test", errString: "topic is missing", errorExpected: true, + handler: handlerTestPathResp("", ""), }, { name: "test empty pubsubName", @@ -53,6 +56,7 @@ func TestPublish(t *testing.T) { topic: "test", errString: "pubsubName is missing", errorExpected: true, + handler: handlerTestPathResp("", ""), }, { name: "test list error", @@ -63,6 +67,7 @@ func TestPublish(t *testing.T) { listErr: assert.AnError, errString: assert.AnError.Error(), errorExpected: true, + handler: handlerTestPathResp("", ""), }, { name: "test empty appID in list output", @@ -76,6 +81,7 @@ func TestPublish(t *testing.T) { }, errString: "couldn't find a running Dapr instance", errorExpected: true, + handler: handlerTestPathResp("", ""), }, { name: "successful call not found", @@ -88,6 +94,7 @@ func TestPublish(t *testing.T) { }, errString: "couldn't find a running Dapr instance", errorExpected: true, + handler: handlerTestPathResp("", ""), }, { name: "successful call", @@ -95,11 +102,36 @@ func TestPublish(t *testing.T) { pubsubName: "testPubsubName", topic: "testTopic", payload: []byte("test payload"), - expectedPath: "/v1.0/publish/testPubsubName/testTopic", postResponse: "test payload", lo: ListOutput{ AppID: "myAppID", }, + handler: handlerTestPathResp("/v1.0/publish/testPubsubName/testTopic", ""), + }, + { + name: "successful cloudevent envelope", + publishAppID: "myAppID", + pubsubName: "testPubsubName", + topic: "testTopic", + payload: []byte(`{"id": "1234", "source": "test", "specversion": "1.0", "type": "product.v1", "datacontenttype": "application/json", "data": {"id": "test", "description": "Testing 12345"}}`), + postResponse: "test payload", + lo: ListOutput{ + AppID: "myAppID", + }, + handler: func(w http.ResponseWriter, r *http.Request) { + if r.Header.Get("Content-Type") != "application/cloudevents+json" { + w.WriteHeader(http.StatusInternalServerError) + + return + } + if r.Method == http.MethodGet { + w.Write([]byte("")) + } else { + buf := new(bytes.Buffer) + buf.ReadFrom(r.Body) + w.Write(buf.Bytes()) + } + }, }, } for _, socket := range []string{"", "/tmp"} { @@ -110,7 +142,7 @@ func TestPublish(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { if socket != "" { - ts, l := getTestSocketServer(tc.expectedPath, tc.resp, tc.publishAppID, socket) + ts, l := getTestSocketServerFunc(tc.handler, tc.publishAppID, socket) go ts.Serve(l) defer func() { l.Close() @@ -119,7 +151,7 @@ func TestPublish(t *testing.T) { } }() } else { - ts, port := getTestServer(tc.expectedPath, tc.resp) + ts, port := getTestServerFunc(tc.handler) ts.Start() defer ts.Close() tc.lo.HTTPPort = port diff --git a/pkg/standalone/testutils.go b/pkg/standalone/testutils.go index b375c1a47..cc70187bd 100644 --- a/pkg/standalone/testutils.go +++ b/pkg/standalone/testutils.go @@ -26,42 +26,34 @@ func (m *mockDaprProcess) List() ([]ListOutput, error) { return m.Lo, m.Err } -func getTestServer(expectedPath, resp string) (*httptest.Server, int) { - ts := httptest.NewUnstartedServer(http.HandlerFunc(func( - w http.ResponseWriter, r *http.Request) { - if expectedPath != "" && r.RequestURI != expectedPath { - w.WriteHeader(http.StatusInternalServerError) +func getTestServerFunc(handler http.Handler) (*httptest.Server, int) { + ts := httptest.NewUnstartedServer(handler) - return - } - if r.Method == http.MethodGet { - w.Write([]byte(resp)) - } else { - buf := new(bytes.Buffer) - buf.ReadFrom(r.Body) - w.Write(buf.Bytes()) - } - })) + return ts, ts.Listener.Addr().(*net.TCPAddr).Port +} + +func getTestServer(expectedPath, resp string) (*httptest.Server, int) { + ts := httptest.NewUnstartedServer(handlerTestPathResp(expectedPath, resp)) return ts, ts.Listener.Addr().(*net.TCPAddr).Port } +func getTestSocketServerFunc(handler http.Handler, appID, path string) (*http.Server, net.Listener) { + s := &http.Server{ + Handler: handler, + } + + socket := utils.GetSocket(path, appID, "http") + l, err := net.Listen("unix", socket) + if err != nil { + panic(fmt.Sprintf("httptest: failed to listen on %v: %v", socket, err)) + } + return s, l +} + func getTestSocketServer(expectedPath, resp, appID, path string) (*http.Server, net.Listener) { s := &http.Server{ - Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - if expectedPath != "" && r.RequestURI != expectedPath { - w.WriteHeader(http.StatusInternalServerError) - - return - } - if r.Method == http.MethodGet { - w.Write([]byte(resp)) - } else { - buf := new(bytes.Buffer) - buf.ReadFrom(r.Body) - w.Write(buf.Bytes()) - } - }), + Handler: handlerTestPathResp(expectedPath, resp), } socket := utils.GetSocket(path, appID, "http") @@ -71,3 +63,20 @@ func getTestSocketServer(expectedPath, resp, appID, path string) (*http.Server, } return s, l } + +func handlerTestPathResp(expectedPath, resp string) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + if expectedPath != "" && r.RequestURI != expectedPath { + w.WriteHeader(http.StatusInternalServerError) + + return + } + if r.Method == http.MethodGet { + w.Write([]byte(resp)) + } else { + buf := new(bytes.Buffer) + buf.ReadFrom(r.Body) + w.Write(buf.Bytes()) + } + } +} diff --git a/tests/e2e/standalone/standalone_test.go b/tests/e2e/standalone/standalone_test.go index a5f112979..38329822f 100644 --- a/tests/e2e/standalone/standalone_test.go +++ b/tests/e2e/standalone/standalone_test.go @@ -489,7 +489,7 @@ func testPublish(t *testing.T) { daprPath := getDaprPath() for _, path := range socketCases { executeAgainstRunningDapr(t, func() { - t.Run(fmt.Sprintf("publish from file with socket %s", path), func(t *testing.T) { + t.Run(fmt.Sprintf("publish message from file with socket %s", path), func(t *testing.T) { output, err := spawn.Command(daprPath, "publish", "--publish-app-id", "pub_e2e", "--unix-domain-socket", path, "--pubsub", "pubsub", "--topic", "sample", "--data-file", "../testdata/message.json") t.Log(output) assert.NoError(t, err, "unable to publish from --data-file") @@ -499,6 +499,26 @@ func testPublish(t *testing.T) { assert.Equal(t, map[string]interface{}{"dapr": "is_great"}, event.Data) }) + t.Run(fmt.Sprintf("publish cloudevent from file with socket %s", path), func(t *testing.T) { + output, err := spawn.Command(daprPath, "publish", "--publish-app-id", "pub_e2e", "--unix-domain-socket", path, "--pubsub", "pubsub", "--topic", "sample", "--data-file", "../testdata/cloudevent.json") + t.Log(output) + assert.NoError(t, err, "unable to publish from --data-file") + assert.Contains(t, output, "Event published successfully") + + event := <-events + assert.Equal(t, &common.TopicEvent{ + ID: "3cc97064-edd1-49f4-b911-c959a7370e68", + Source: "e2e_test", + SpecVersion: "1.0", + Type: "test.v1", + DataContentType: "application/json", + Subject: "e2e_subject", + PubsubName: "pubsub", + Topic: "sample", + Data: map[string]interface{}{"dapr": "is_great"}, + }, event) + }) + t.Run(fmt.Sprintf("publish from string with socket %s", path), func(t *testing.T) { output, err := spawn.Command(daprPath, "publish", "--publish-app-id", "pub_e2e", "--unix-domain-socket", path, "--pubsub", "pubsub", "--topic", "sample", "--data", "{\"cli\": \"is_working\"}") t.Log(output) @@ -521,7 +541,6 @@ func testPublish(t *testing.T) { t.Log(output) assert.Error(t, err, "--data and --data-file should not be allowed together") assert.Contains(t, output, "Only one of --data and --data-file allowed in the same publish command") - }) output, err := spawn.Command(getDaprPath(), "stop", "--app-id", "pub_e2e") diff --git a/tests/e2e/testdata/cloudevent.json b/tests/e2e/testdata/cloudevent.json new file mode 100644 index 000000000..16654ab2b --- /dev/null +++ b/tests/e2e/testdata/cloudevent.json @@ -0,0 +1,9 @@ +{ + "id": "3cc97064-edd1-49f4-b911-c959a7370e68", + "source": "e2e_test", + "specversion": "1.0", + "type": "test.v1", + "subject": "e2e_subject", + "datacontenttype": "application/json", + "data": {"dapr": "is_great"} +} From 278428cb314e08781260771ec01d8cd5436f3fc1 Mon Sep 17 00:00:00 2001 From: "Anh Le (Andy)" Date: Mon, 29 Nov 2021 14:23:10 +0700 Subject: [PATCH 2/3] version: fix failing test (#824) * version: fix failing test * version: fix linting errors in test * version: simplify tests on bad addr Co-authored-by: Mukundan Sundararajan --- pkg/version/version_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/version/version_test.go b/pkg/version/version_test.go index 48e5cc878..4a0c6fa7b 100644 --- a/pkg/version/version_test.go +++ b/pkg/version/version_test.go @@ -114,7 +114,7 @@ func TestGetVersionsGithub(t *testing.T) { t.Run("error on bad addr", func(t *testing.T) { version, err := GetLatestReleaseGithub("http://a.super.non.existant.domain/") assert.Equal(t, "", version) - assert.EqualError(t, err, "Get \"http://a.super.non.existant.domain/\": dial tcp: lookup a.super.non.existant.domain: no such host") + assert.Error(t, err) }) s.Shutdown(context.Background()) From 91e1571fe1ff62aa68edcdcfbdb935fbe6266c97 Mon Sep 17 00:00:00 2001 From: Bernd Verst <4535280+berndverst@users.noreply.github.com> Date: Sun, 28 Nov 2021 23:45:44 -0800 Subject: [PATCH 3/3] Ignore go.work for Go 1.18 Workspaces (#843) Co-authored-by: Mukundan Sundararajan --- .gitignore | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/.gitignore b/.gitignore index 1ae02b42b..7097dd727 100644 --- a/.gitignore +++ b/.gitignore @@ -25,4 +25,7 @@ cli # CLI's auto-generated components directory **/components -test_output.json \ No newline at end of file +test_output.json + +# Go Workspaces (introduced in Go 1.18+) +go.work