Skip to content

Commit

Permalink
Merge branch 'master' into hang
Browse files Browse the repository at this point in the history
  • Loading branch information
mukundansundar authored Nov 29, 2021
2 parents bbc8260 + 91e1571 commit a8cf37c
Show file tree
Hide file tree
Showing 7 changed files with 127 additions and 39 deletions.
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,7 @@ cli
# CLI's auto-generated components directory
**/components

test_output.json
test_output.json

# Go Workspaces (introduced in Go 1.18+)
go.work
18 changes: 17 additions & 1 deletion pkg/standalone/publish.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package standalone
import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"net"
Expand Down Expand Up @@ -54,7 +55,22 @@ func (s *Standalone) Publish(publishAppID, pubsubName, topic string, payload []b
url = fmt.Sprintf("http://localhost:%s/v%s/publish/%s/%s", fmt.Sprintf("%v", instance.HTTPPort), api.RuntimeAPIVersion, pubsubName, topic)
}

r, err := httpc.Post(url, "application/json", bytes.NewBuffer(payload))
contentType := "application/json"

// Detect publishing with CloudEvents envelope.
var cloudEvent map[string]interface{}
if json.Unmarshal(payload, &cloudEvent); err == nil {
_, hasID := cloudEvent["id"]
_, hasSource := cloudEvent["source"]
_, hasSpecVersion := cloudEvent["specversion"]
_, hasType := cloudEvent["type"]
_, hasData := cloudEvent["data"]
if hasID && hasSource && hasSpecVersion && hasType && hasData {
contentType = "application/cloudevents+json"
}
}

r, err := httpc.Post(url, contentType, bytes.NewBuffer(payload))
if err != nil {
return err
}
Expand Down
42 changes: 37 additions & 5 deletions pkg/standalone/publish_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
package standalone

import (
"bytes"
"net/http"
"os"
"runtime"
"testing"
Expand All @@ -24,9 +26,8 @@ func TestPublish(t *testing.T) {
topic string
lo ListOutput
listErr error
expectedPath string
postResponse string
resp string
handler http.HandlerFunc
errorExpected bool
errString string
}{
Expand All @@ -37,6 +38,7 @@ func TestPublish(t *testing.T) {
pubsubName: "test",
errString: "publishAppID is missing",
errorExpected: true,
handler: handlerTestPathResp("", ""),
},
{
name: "test empty topic",
Expand All @@ -45,6 +47,7 @@ func TestPublish(t *testing.T) {
pubsubName: "test",
errString: "topic is missing",
errorExpected: true,
handler: handlerTestPathResp("", ""),
},
{
name: "test empty pubsubName",
Expand All @@ -53,6 +56,7 @@ func TestPublish(t *testing.T) {
topic: "test",
errString: "pubsubName is missing",
errorExpected: true,
handler: handlerTestPathResp("", ""),
},
{
name: "test list error",
Expand All @@ -63,6 +67,7 @@ func TestPublish(t *testing.T) {
listErr: assert.AnError,
errString: assert.AnError.Error(),
errorExpected: true,
handler: handlerTestPathResp("", ""),
},
{
name: "test empty appID in list output",
Expand All @@ -76,6 +81,7 @@ func TestPublish(t *testing.T) {
},
errString: "couldn't find a running Dapr instance",
errorExpected: true,
handler: handlerTestPathResp("", ""),
},
{
name: "successful call not found",
Expand All @@ -88,18 +94,44 @@ func TestPublish(t *testing.T) {
},
errString: "couldn't find a running Dapr instance",
errorExpected: true,
handler: handlerTestPathResp("", ""),
},
{
name: "successful call",
publishAppID: "myAppID",
pubsubName: "testPubsubName",
topic: "testTopic",
payload: []byte("test payload"),
expectedPath: "/v1.0/publish/testPubsubName/testTopic",
postResponse: "test payload",
lo: ListOutput{
AppID: "myAppID",
},
handler: handlerTestPathResp("/v1.0/publish/testPubsubName/testTopic", ""),
},
{
name: "successful cloudevent envelope",
publishAppID: "myAppID",
pubsubName: "testPubsubName",
topic: "testTopic",
payload: []byte(`{"id": "1234", "source": "test", "specversion": "1.0", "type": "product.v1", "datacontenttype": "application/json", "data": {"id": "test", "description": "Testing 12345"}}`),
postResponse: "test payload",
lo: ListOutput{
AppID: "myAppID",
},
handler: func(w http.ResponseWriter, r *http.Request) {
if r.Header.Get("Content-Type") != "application/cloudevents+json" {
w.WriteHeader(http.StatusInternalServerError)

return
}
if r.Method == http.MethodGet {
w.Write([]byte(""))
} else {
buf := new(bytes.Buffer)
buf.ReadFrom(r.Body)
w.Write(buf.Bytes())
}
},
},
}
for _, socket := range []string{"", "/tmp"} {
Expand All @@ -110,7 +142,7 @@ func TestPublish(t *testing.T) {
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
if socket != "" {
ts, l := getTestSocketServer(tc.expectedPath, tc.resp, tc.publishAppID, socket)
ts, l := getTestSocketServerFunc(tc.handler, tc.publishAppID, socket)
go ts.Serve(l)
defer func() {
l.Close()
Expand All @@ -119,7 +151,7 @@ func TestPublish(t *testing.T) {
}
}()
} else {
ts, port := getTestServer(tc.expectedPath, tc.resp)
ts, port := getTestServerFunc(tc.handler)
ts.Start()
defer ts.Close()
tc.lo.HTTPPort = port
Expand Down
67 changes: 38 additions & 29 deletions pkg/standalone/testutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,42 +26,34 @@ func (m *mockDaprProcess) List() ([]ListOutput, error) {
return m.Lo, m.Err
}

func getTestServer(expectedPath, resp string) (*httptest.Server, int) {
ts := httptest.NewUnstartedServer(http.HandlerFunc(func(
w http.ResponseWriter, r *http.Request) {
if expectedPath != "" && r.RequestURI != expectedPath {
w.WriteHeader(http.StatusInternalServerError)
func getTestServerFunc(handler http.Handler) (*httptest.Server, int) {
ts := httptest.NewUnstartedServer(handler)

return
}
if r.Method == http.MethodGet {
w.Write([]byte(resp))
} else {
buf := new(bytes.Buffer)
buf.ReadFrom(r.Body)
w.Write(buf.Bytes())
}
}))
return ts, ts.Listener.Addr().(*net.TCPAddr).Port
}

func getTestServer(expectedPath, resp string) (*httptest.Server, int) {
ts := httptest.NewUnstartedServer(handlerTestPathResp(expectedPath, resp))

return ts, ts.Listener.Addr().(*net.TCPAddr).Port
}

func getTestSocketServerFunc(handler http.Handler, appID, path string) (*http.Server, net.Listener) {
s := &http.Server{
Handler: handler,
}

socket := utils.GetSocket(path, appID, "http")
l, err := net.Listen("unix", socket)
if err != nil {
panic(fmt.Sprintf("httptest: failed to listen on %v: %v", socket, err))
}
return s, l
}

func getTestSocketServer(expectedPath, resp, appID, path string) (*http.Server, net.Listener) {
s := &http.Server{
Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if expectedPath != "" && r.RequestURI != expectedPath {
w.WriteHeader(http.StatusInternalServerError)

return
}
if r.Method == http.MethodGet {
w.Write([]byte(resp))
} else {
buf := new(bytes.Buffer)
buf.ReadFrom(r.Body)
w.Write(buf.Bytes())
}
}),
Handler: handlerTestPathResp(expectedPath, resp),
}

socket := utils.GetSocket(path, appID, "http")
Expand All @@ -71,3 +63,20 @@ func getTestSocketServer(expectedPath, resp, appID, path string) (*http.Server,
}
return s, l
}

func handlerTestPathResp(expectedPath, resp string) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
if expectedPath != "" && r.RequestURI != expectedPath {
w.WriteHeader(http.StatusInternalServerError)

return
}
if r.Method == http.MethodGet {
w.Write([]byte(resp))
} else {
buf := new(bytes.Buffer)
buf.ReadFrom(r.Body)
w.Write(buf.Bytes())
}
}
}
2 changes: 1 addition & 1 deletion pkg/version/version_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func TestGetVersionsGithub(t *testing.T) {
t.Run("error on bad addr", func(t *testing.T) {
version, err := GetLatestReleaseGithub("http://a.super.non.existant.domain/")
assert.Equal(t, "", version)
assert.EqualError(t, err, "Get \"http://a.super.non.existant.domain/\": dial tcp: lookup a.super.non.existant.domain: no such host")
assert.Error(t, err)
})

s.Shutdown(context.Background())
Expand Down
23 changes: 21 additions & 2 deletions tests/e2e/standalone/standalone_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -486,7 +486,7 @@ func testPublish(t *testing.T) {
daprPath := getDaprPath()
for _, path := range socketCases {
executeAgainstRunningDapr(t, func() {
t.Run(fmt.Sprintf("publish from file with socket %s", path), func(t *testing.T) {
t.Run(fmt.Sprintf("publish message from file with socket %s", path), func(t *testing.T) {
output, err := spawn.Command(daprPath, "publish", "--publish-app-id", "pub_e2e", "--unix-domain-socket", path, "--pubsub", "pubsub", "--topic", "sample", "--data-file", "../testdata/message.json")
t.Log(output)
assert.NoError(t, err, "unable to publish from --data-file")
Expand All @@ -496,6 +496,26 @@ func testPublish(t *testing.T) {
assert.Equal(t, map[string]interface{}{"dapr": "is_great"}, event.Data)
})

t.Run(fmt.Sprintf("publish cloudevent from file with socket %s", path), func(t *testing.T) {
output, err := spawn.Command(daprPath, "publish", "--publish-app-id", "pub_e2e", "--unix-domain-socket", path, "--pubsub", "pubsub", "--topic", "sample", "--data-file", "../testdata/cloudevent.json")
t.Log(output)
assert.NoError(t, err, "unable to publish from --data-file")
assert.Contains(t, output, "Event published successfully")

event := <-events
assert.Equal(t, &common.TopicEvent{
ID: "3cc97064-edd1-49f4-b911-c959a7370e68",
Source: "e2e_test",
SpecVersion: "1.0",
Type: "test.v1",
DataContentType: "application/json",
Subject: "e2e_subject",
PubsubName: "pubsub",
Topic: "sample",
Data: map[string]interface{}{"dapr": "is_great"},
}, event)
})

t.Run(fmt.Sprintf("publish from string with socket %s", path), func(t *testing.T) {
output, err := spawn.Command(daprPath, "publish", "--publish-app-id", "pub_e2e", "--unix-domain-socket", path, "--pubsub", "pubsub", "--topic", "sample", "--data", "{\"cli\": \"is_working\"}")
t.Log(output)
Expand All @@ -518,7 +538,6 @@ func testPublish(t *testing.T) {
t.Log(output)
assert.Error(t, err, "--data and --data-file should not be allowed together")
assert.Contains(t, output, "Only one of --data and --data-file allowed in the same publish command")

})

output, err := spawn.Command(getDaprPath(), "stop", "--app-id", "pub_e2e")
Expand Down
9 changes: 9 additions & 0 deletions tests/e2e/testdata/cloudevent.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
{
"id": "3cc97064-edd1-49f4-b911-c959a7370e68",
"source": "e2e_test",
"specversion": "1.0",
"type": "test.v1",
"subject": "e2e_subject",
"datacontenttype": "application/json",
"data": {"dapr": "is_great"}
}

0 comments on commit a8cf37c

Please sign in to comment.