Skip to content

Commit

Permalink
currentState implmentation for http
Browse files Browse the repository at this point in the history
add rest api for currentstate (works for http only)

Signed-off-by: Aneesh Puttur <aneeshputtur@gmail.com>
  • Loading branch information
aneeshkp committed Sep 30, 2022
1 parent 477bbac commit 310a61d
Show file tree
Hide file tree
Showing 3 changed files with 187 additions and 30 deletions.
100 changes: 92 additions & 8 deletions routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package restapi
import (
"encoding/json"
"fmt"
"strings"
"time"

"github.com/redhat-cne/sdk-go/pkg/types"
Expand All @@ -41,6 +42,11 @@ import (
"net/http"
)

const (
AMQ = "AMQ"
HTTP = "HTTP"
)

// createSubscription create subscription and send it to a channel that is shared by middleware to process
// Creates a new subscription .
// If subscription exists with same resource then existing subscription is returned .
Expand Down Expand Up @@ -306,7 +312,80 @@ func (s *Server) publishEvent(w http.ResponseWriter, r *http.Request) {
}
}

// pingForSubscribedEventStatus sends ping to the a listening address in the producer to fire all status as events
// getCurrentState get current status of the events that are subscribed to
func (s *Server) getCurrentState(w http.ResponseWriter, r *http.Request) {
queries := mux.Vars(r)
resourceAddress, ok := queries["resourceAddress"]
if !ok {
respondWithError(w, "resourceAddress parameter not found")
return
}

//identify publisher or subscriber is asking for status
var sub *pubsub.PubSub
if len(s.pubSubAPI.GetSubscriptions()) > 0 {
for _, subscriptions := range s.pubSubAPI.GetSubscriptions() {
if strings.Contains(subscriptions.GetResource(), resourceAddress) {
sub = subscriptions
break
}
}
} else if len(s.pubSubAPI.GetPublishers()) > 0 {
for _, publishers := range s.pubSubAPI.GetPublishers() {
if strings.Contains(publishers.GetResource(), resourceAddress) {
sub = publishers
break
}
}
} else {
respondWithError(w, "subscription not found")
return
}

if sub == nil {
respondWithError(w, "subscription not found")
return
}
cneEvent := event.CloudNativeEvent()
cneEvent.SetID(sub.ID)
cneEvent.Type = channel.STATUS.String()
cneEvent.SetTime(types.Timestamp{Time: time.Now().UTC()}.Time)
cneEvent.SetDataContentType(cloudevents.ApplicationJSON)
cneEvent.SetData(cne.Data{
Version: "v1",
})
ceEvent, err := cneEvent.NewCloudEvent(sub)

if err != nil {
respondWithError(w, err.Error())
} else {
// for http you send to the protocol address
statusChannel := make(chan *channel.StatusChan, 1)
if s.transportType == HTTP {
s.dataOut <- &channel.DataChan{
Type: channel.STATUS,
Data: ceEvent,
Address: sub.GetResource(),
StatusChan: statusChannel,
}
select {
case d := <-statusChannel:
if d.Data == nil {
respondWithError(w, "event not found")
} else {
respondWithJSON(w, http.StatusOK, *d.Data)
}
case <-time.After(5 * time.Second):
close(statusChannel)
respondWithError(w, "time Out waiting for status")
}
} else {
respondWithError(w, "Non HTTP protocol not supported")
}
}
}

// pingForSubscribedEventStatus sends ping to the listening address in the producer to fire all status as events
func (s *Server) pingForSubscribedEventStatus(w http.ResponseWriter, r *http.Request) {
queries := mux.Vars(r)
subscriptionID, ok := queries["subscriptionid"]
Expand All @@ -333,9 +412,10 @@ func (s *Server) pingForSubscribedEventStatus(w http.ResponseWriter, r *http.Req
respondWithError(w, err.Error())
} else {
s.dataOut <- &channel.DataChan{
Type: channel.STATUS,
Data: ceEvent,
Address: fmt.Sprintf("%s/%s", sub.GetResource(), "status"),
Type: channel.STATUS,
StatusChan: nil,
Data: ceEvent,
Address: fmt.Sprintf("%s/%s", sub.GetResource(), "status"),
}
respondWithMessage(w, http.StatusAccepted, "ping sent")
}
Expand Down Expand Up @@ -367,11 +447,15 @@ func respondWithError(w http.ResponseWriter, message string) {
}

func respondWithJSON(w http.ResponseWriter, code int, payload interface{}) {
response, _ := json.Marshal(payload)
w.Header().Set("Content-Type", cloudevents.ApplicationJSON)
w.WriteHeader(code)
w.Write(response) //nolint:errcheck
if response, err := json.Marshal(payload); err == nil {
w.Header().Set("Content-Type", cloudevents.ApplicationJSON)
w.WriteHeader(code)
w.Write(response) //nolint:errcheck
} else {
respondWithMessage(w, http.StatusBadRequest, "error parsing event data")
}
}

func respondWithMessage(w http.ResponseWriter, code int, message string) {
w.Header().Set("Content-Type", cloudevents.ApplicationJSON)
respondWithJSON(w, code, map[string]string{"status": message})
Expand Down
31 changes: 19 additions & 12 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,12 +63,14 @@ type Server struct {
port int
apiPath string
//data out is amqp in channel
dataOut chan<- *channel.DataChan
closeCh <-chan struct{}
HTTPClient *http.Client
httpServer *http.Server
pubSubAPI *pubsubv1.API
status serverStatus
dataOut chan<- *channel.DataChan
closeCh <-chan struct{}
HTTPClient *http.Client
httpServer *http.Server
pubSubAPI *pubsubv1.API
status serverStatus
transportType string // AMQ or HTTP
transportAddress string // for AMQ this is base address to return events
}

// publisher/subscription data model
Expand Down Expand Up @@ -116,14 +118,16 @@ type swaggReqAccepted struct { //nolint:deadcode,unused
}

// InitServer is used to supply configurations for rest routes server
func InitServer(port int, apiPath, storePath string, dataOut chan<- *channel.DataChan, closeCh <-chan struct{}) *Server {
func InitServer(port int, apiPath, storePath string, transportType string, transportAddress string, dataOut chan<- *channel.DataChan, closeCh <-chan struct{}) *Server {
once.Do(func() {
ServerInstance = &Server{
port: port,
apiPath: apiPath,
dataOut: dataOut,
closeCh: closeCh,
status: notReady,
port: port,
apiPath: apiPath,
dataOut: dataOut,
closeCh: closeCh,
status: notReady,
transportType: transportType,
transportAddress: transportAddress,
HTTPClient: &http.Client{
Transport: &http.Transport{
MaxIdleConnsPerHost: 20,
Expand Down Expand Up @@ -193,6 +197,7 @@ func (s *Server) Start() {
}
s.status = starting
r := mux.NewRouter()

api := r.PathPrefix(s.apiPath).Subrouter()

// createSubscription create subscription and send it to a channel that is shared by middleware to process
Expand Down Expand Up @@ -263,6 +268,8 @@ func (s *Server) Start() {
// "$ref": "#/responses/badReq"
api.HandleFunc("/subscriptions/status/{subscriptionid}", s.pingForSubscribedEventStatus).Methods(http.MethodPut)

api.HandleFunc("/{resourceAddress:.*}/CurrentState", s.getCurrentState).Methods(http.MethodGet)

api.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) {
io.WriteString(w, "OK") //nolint:errcheck
}).Methods(http.MethodGet)
Expand Down
86 changes: 76 additions & 10 deletions server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ import (
"testing"
"time"

cloudevents "github.com/cloudevents/sdk-go/v2"
types2 "github.com/cloudevents/sdk-go/v2/types"
"github.com/google/uuid"

restapi "github.com/redhat-cne/rest-api"
"github.com/redhat-cne/sdk-go/pkg/channel"
"github.com/redhat-cne/sdk-go/pkg/event"
Expand All @@ -42,15 +46,17 @@ import (
var (
server *restapi.Server

eventOutCh chan *channel.DataChan
closeCh chan struct{}
wg sync.WaitGroup
port int = 8989
apPath string = "/routes/cne/v1/"
resource string = "test/test"
storePath string = "."
ObjSub pubsub.PubSub
ObjPub pubsub.PubSub
eventOutCh chan *channel.DataChan
closeCh chan struct{}
wg sync.WaitGroup
port int = 8989
apPath string = "/routes/cne/v1/"
resource string = "test/test"
storePath string = "."
ObjSub pubsub.PubSub
ObjPub pubsub.PubSub
transportType = "HTTP"
transportAddress = "localhost"
)

func init() {
Expand All @@ -59,13 +65,57 @@ func init() {
}

func TestMain(m *testing.M) {
server = restapi.InitServer(port, apPath, storePath, eventOutCh, closeCh)
server = restapi.InitServer(port, apPath, storePath, transportType, transportAddress, eventOutCh, closeCh)
//start http server
server.Start()

wg.Add(1)
go func() {
for d := range eventOutCh {
if d.Type == channel.STATUS && d.StatusChan != nil {
log.Info("WHY HERE")
clientID, _ := uuid.Parse("123e4567-e89b-12d3-a456-426614174001")
cneEvent := v1event.CloudNativeEvent()
cneEvent.SetID(ObjPub.ID)
cneEvent.Type = string(ptp.PtpStateChange)
cneEvent.SetTime(types.Timestamp{Time: time.Now().UTC()}.Time)
cneEvent.SetDataContentType(event.ApplicationJSON)
data := event.Data{
Version: "event",
Values: []event.DataValue{{
Resource: "test",
DataType: event.NOTIFICATION,
ValueType: event.ENUMERATION,
Value: ptp.ACQUIRING_SYNC,
},
},
}
data.SetVersion("v1") //nolint:errcheck
cneEvent.SetData(data)
e := cloudevents.Event{
Context: cloudevents.EventContextV1{
Type: string(ptp.PtpStateChange),
Source: cloudevents.URIRef{URL: url.URL{Scheme: "http", Host: "example.com", Path: "/source"}},
ID: "status event",
Time: &cloudevents.Timestamp{Time: time.Date(2020, 03, 21, 12, 34, 56, 780000000, time.UTC)},
DataSchema: &types2.URI{URL: url.URL{Scheme: "http", Host: "example.com", Path: "/schema"}},
Subject: func(s string) *string { return &s }("topic"),
}.AsV1(),
}
_ = e.SetData(cloudevents.ApplicationJSON, cneEvent)
func() {
defer func() {
if err := recover(); err != nil {
log.Errorf("error on clsoe channel")
}
}()
d.StatusChan <- &channel.StatusChan{
ID: "123",
ClientID: clientID,
Data: &e,
}
}()
}
log.Infof("incoming data %#v", d)
}
}()
Expand Down Expand Up @@ -231,6 +281,22 @@ func TestServer_ListPublishers(t *testing.T) {
assert.Greater(t, len(pubList), 0)
}

func TestServer_GetCurrentState(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
defer cancel()
time.Sleep(2 * time.Second)
req, err := http.NewRequestWithContext(ctx, "GET", fmt.Sprintf("http://localhost:%d%s%s/%s", port, apPath, resource, "CurrentState"), nil)
assert.Nil(t, err)
req.Header.Set("Content-Type", "application/json")
resp, err := server.HTTPClient.Do(req)
assert.Nil(t, err)
defer resp.Body.Close()
assert.Equal(t, http.StatusOK, resp.StatusCode)
_, err = ioutil.ReadAll(resp.Body)
assert.Nil(t, err)
}

func TestServer_TestPingStatusStatusCode(t *testing.T) {
req, err := http.NewRequest("PUT", fmt.Sprintf("http://localhost:%d%s%s%s", port, apPath, "subscriptions/status/", ObjSub.ID), nil)
assert.Nil(t, err)
Expand Down

0 comments on commit 310a61d

Please sign in to comment.